aboutsummaryrefslogtreecommitdiff
path: root/crates/thread_worker/src/lib.rs
diff options
context:
space:
mode:
authorbors[bot] <bors[bot]@users.noreply.github.com>2019-02-14 18:22:19 +0000
committerbors[bot] <bors[bot]@users.noreply.github.com>2019-02-14 18:22:19 +0000
commitc530f04df51459d6d5f70c475e93127217f6e27f (patch)
tree2e9abadb6e64fed7738b5b3fbc2eb13787efdaca /crates/thread_worker/src/lib.rs
parent10bf61b83b2600ed3cb7e7825f1cd0ee83e9b7e7 (diff)
parente0b8942c56378b7966af39058f27b11a0d02890f (diff)
Merge #833
833: automatically wait for worker threads r=matklad a=matklad closes #817 Co-authored-by: Aleksey Kladov <[email protected]>
Diffstat (limited to 'crates/thread_worker/src/lib.rs')
-rw-r--r--crates/thread_worker/src/lib.rs120
1 files changed, 63 insertions, 57 deletions
diff --git a/crates/thread_worker/src/lib.rs b/crates/thread_worker/src/lib.rs
index a522a0843..d67e44e38 100644
--- a/crates/thread_worker/src/lib.rs
+++ b/crates/thread_worker/src/lib.rs
@@ -2,74 +2,80 @@
2 2
3use std::thread; 3use std::thread;
4 4
5use crossbeam_channel::{bounded, unbounded, Receiver, Sender, RecvError, SendError}; 5use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
6use drop_bomb::DropBomb;
7 6
8pub struct Worker<I, O> { 7/// Like `std::thread::JoinHandle<()>`, but joins thread in drop automatically.
9 pub inp: Sender<I>, 8pub struct ScopedThread {
10 pub out: Receiver<O>, 9 // Option for drop
10 inner: Option<thread::JoinHandle<()>>,
11} 11}
12 12
13pub struct WorkerHandle { 13impl Drop for ScopedThread {
14 name: &'static str, 14 fn drop(&mut self) {
15 thread: thread::JoinHandle<()>, 15 let inner = self.inner.take().unwrap();
16 bomb: DropBomb, 16 let name = inner.thread().name().unwrap().to_string();
17} 17 log::info!("waiting for {} to finish...", name);
18 let res = inner.join();
19 log::info!(".. {} terminated with {}", name, if res.is_ok() { "ok" } else { "err" });
18 20
19pub fn spawn<I, O, F>(name: &'static str, buf: usize, f: F) -> (Worker<I, O>, WorkerHandle) 21 // escalate panic, but avoid aborting the process
20where 22 match res {
21 F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static, 23 Err(e) => {
22 I: Send + 'static, 24 if !thread::panicking() {
23 O: Send + 'static, 25 panic!(e)
24{ 26 }
25 let (worker, inp_r, out_s) = worker_chan(buf); 27 }
26 let watcher = WorkerHandle::spawn(name, move || f(inp_r, out_s)); 28 _ => (),
27 (worker, watcher) 29 }
28}
29
30impl<I, O> Worker<I, O> {
31 /// Stops the worker. Returns the message receiver to fetch results which
32 /// have become ready before the worker is stopped.
33 pub fn shutdown(self) -> Receiver<O> {
34 self.out
35 } 30 }
31}
36 32
37 pub fn send(&self, item: I) -> Result<(), SendError<I>> { 33impl ScopedThread {
38 self.inp.send(item) 34 pub fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ScopedThread {
39 } 35 let inner = thread::Builder::new().name(name.into()).spawn(f).unwrap();
40 pub fn recv(&self) -> Result<O, RecvError> { 36 ScopedThread { inner: Some(inner) }
41 self.out.recv()
42 } 37 }
43} 38}
44 39
45impl WorkerHandle { 40/// A wrapper around event-processing thread with automatic shutdown semantics.
46 fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> WorkerHandle { 41pub struct Worker<I, O> {
47 let thread = thread::spawn(f); 42 // XXX: field order is significant here.
48 WorkerHandle { 43 //
49 name, 44 // In Rust, fields are dropped in the declaration order, and we rely on this
50 thread, 45 // here. We must close input first, so that the `thread` (who holds the
51 bomb: DropBomb::new(format!("WorkerHandle {} was not shutdown", name)), 46 // opposite side of the channel) noticed shutdown. Then, we must join the
52 } 47 // thread, but we must keep out alive so that the thread does not panic.
53 } 48 //
49 // Note that a potential problem here is that we might drop some messages
50 // from receiver on the floor. This is ok for rust-analyzer: we have only a
51 // single client, so, if we are shutting down, nobody is interested in the
52 // unfinished work anyway!
53 sender: Sender<I>,
54 _thread: ScopedThread,
55 receiver: Receiver<O>,
56}
54 57
55 pub fn shutdown(mut self) -> thread::Result<()> { 58impl<I, O> Worker<I, O> {
56 log::info!("waiting for {} to finish ...", self.name); 59 pub fn spawn<F>(name: &'static str, buf: usize, f: F) -> Worker<I, O>
57 let name = self.name; 60 where
58 self.bomb.defuse(); 61 F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static,
59 let res = self.thread.join(); 62 I: Send + 'static,
60 match &res { 63 O: Send + 'static,
61 Ok(()) => log::info!("... {} terminated with ok", name), 64 {
62 Err(_) => log::error!("... {} terminated with err", name), 65 // Set up worker channels in a deadlock-avoiding way. If one sets both input
63 } 66 // and output buffers to a fixed size, a worker might get stuck.
64 res 67 let (sender, input_receiver) = bounded::<I>(buf);
68 let (output_sender, receiver) = unbounded::<O>();
69 let _thread = ScopedThread::spawn(name, move || f(input_receiver, output_sender));
70 Worker { sender, _thread, receiver }
65 } 71 }
66} 72}
67 73
68/// Sets up worker channels in a deadlock-avoiding way. 74impl<I, O> Worker<I, O> {
69/// If one sets both input and output buffers to a fixed size, 75 pub fn sender(&self) -> &Sender<I> {
70/// a worker might get stuck. 76 &self.sender
71fn worker_chan<I, O>(buf: usize) -> (Worker<I, O>, Receiver<I>, Sender<O>) { 77 }
72 let (input_sender, input_receiver) = bounded::<I>(buf); 78 pub fn receiver(&self) -> &Receiver<O> {
73 let (output_sender, output_receiver) = unbounded::<O>(); 79 &self.receiver
74 (Worker { inp: input_sender, out: output_receiver }, input_receiver, output_sender) 80 }
75} 81}