From bf352cd2511775a331d77dee261b64bd8359dacb Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Thu, 14 Feb 2019 20:43:45 +0300 Subject: automatically wait for worker threads closes #817 --- crates/thread_worker/src/lib.rs | 120 +++++++++++++++++++++------------------- 1 file changed, 63 insertions(+), 57 deletions(-) (limited to 'crates/thread_worker/src') diff --git a/crates/thread_worker/src/lib.rs b/crates/thread_worker/src/lib.rs index a522a0843..d67e44e38 100644 --- a/crates/thread_worker/src/lib.rs +++ b/crates/thread_worker/src/lib.rs @@ -2,74 +2,80 @@ use std::thread; -use crossbeam_channel::{bounded, unbounded, Receiver, Sender, RecvError, SendError}; -use drop_bomb::DropBomb; +use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; -pub struct Worker { - pub inp: Sender, - pub out: Receiver, +/// Like `std::thread::JoinHandle<()>`, but joins thread in drop automatically. +pub struct ScopedThread { + // Option for drop + inner: Option>, } -pub struct WorkerHandle { - name: &'static str, - thread: thread::JoinHandle<()>, - bomb: DropBomb, -} +impl Drop for ScopedThread { + fn drop(&mut self) { + let inner = self.inner.take().unwrap(); + let name = inner.thread().name().unwrap().to_string(); + log::info!("waiting for {} to finish...", name); + let res = inner.join(); + log::info!(".. {} terminated with {}", name, if res.is_ok() { "ok" } else { "err" }); -pub fn spawn(name: &'static str, buf: usize, f: F) -> (Worker, WorkerHandle) -where - F: FnOnce(Receiver, Sender) + Send + 'static, - I: Send + 'static, - O: Send + 'static, -{ - let (worker, inp_r, out_s) = worker_chan(buf); - let watcher = WorkerHandle::spawn(name, move || f(inp_r, out_s)); - (worker, watcher) -} - -impl Worker { - /// Stops the worker. Returns the message receiver to fetch results which - /// have become ready before the worker is stopped. - pub fn shutdown(self) -> Receiver { - self.out + // escalate panic, but avoid aborting the process + match res { + Err(e) => { + if !thread::panicking() { + panic!(e) + } + } + _ => (), + } } +} - pub fn send(&self, item: I) -> Result<(), SendError> { - self.inp.send(item) - } - pub fn recv(&self) -> Result { - self.out.recv() +impl ScopedThread { + pub fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ScopedThread { + let inner = thread::Builder::new().name(name.into()).spawn(f).unwrap(); + ScopedThread { inner: Some(inner) } } } -impl WorkerHandle { - fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> WorkerHandle { - let thread = thread::spawn(f); - WorkerHandle { - name, - thread, - bomb: DropBomb::new(format!("WorkerHandle {} was not shutdown", name)), - } - } +/// A wrapper around event-processing thread with automatic shutdown semantics. +pub struct Worker { + // XXX: field order is significant here. + // + // In Rust, fields are dropped in the declaration order, and we rely on this + // here. We must close input first, so that the `thread` (who holds the + // opposite side of the channel) noticed shutdown. Then, we must join the + // thread, but we must keep out alive so that the thread does not panic. + // + // Note that a potential problem here is that we might drop some messages + // from receiver on the floor. This is ok for rust-analyzer: we have only a + // single client, so, if we are shutting down, nobody is interested in the + // unfinished work anyway! + sender: Sender, + _thread: ScopedThread, + receiver: Receiver, +} - pub fn shutdown(mut self) -> thread::Result<()> { - log::info!("waiting for {} to finish ...", self.name); - let name = self.name; - self.bomb.defuse(); - let res = self.thread.join(); - match &res { - Ok(()) => log::info!("... {} terminated with ok", name), - Err(_) => log::error!("... {} terminated with err", name), - } - res +impl Worker { + pub fn spawn(name: &'static str, buf: usize, f: F) -> Worker + where + F: FnOnce(Receiver, Sender) + Send + 'static, + I: Send + 'static, + O: Send + 'static, + { + // Set up worker channels in a deadlock-avoiding way. If one sets both input + // and output buffers to a fixed size, a worker might get stuck. + let (sender, input_receiver) = bounded::(buf); + let (output_sender, receiver) = unbounded::(); + let _thread = ScopedThread::spawn(name, move || f(input_receiver, output_sender)); + Worker { sender, _thread, receiver } } } -/// Sets up worker channels in a deadlock-avoiding way. -/// If one sets both input and output buffers to a fixed size, -/// a worker might get stuck. -fn worker_chan(buf: usize) -> (Worker, Receiver, Sender) { - let (input_sender, input_receiver) = bounded::(buf); - let (output_sender, output_receiver) = unbounded::(); - (Worker { inp: input_sender, out: output_receiver }, input_receiver, output_sender) +impl Worker { + pub fn sender(&self) -> &Sender { + &self.sender + } + pub fn receiver(&self) -> &Receiver { + &self.receiver + } } -- cgit v1.2.3