diff options
author | bors[bot] <bors[bot]@users.noreply.github.com> | 2019-02-14 18:22:19 +0000 |
---|---|---|
committer | bors[bot] <bors[bot]@users.noreply.github.com> | 2019-02-14 18:22:19 +0000 |
commit | c530f04df51459d6d5f70c475e93127217f6e27f (patch) | |
tree | 2e9abadb6e64fed7738b5b3fbc2eb13787efdaca /crates/thread_worker/src/lib.rs | |
parent | 10bf61b83b2600ed3cb7e7825f1cd0ee83e9b7e7 (diff) | |
parent | e0b8942c56378b7966af39058f27b11a0d02890f (diff) |
Merge #833
833: automatically wait for worker threads r=matklad a=matklad
closes #817
Co-authored-by: Aleksey Kladov <[email protected]>
Diffstat (limited to 'crates/thread_worker/src/lib.rs')
-rw-r--r-- | crates/thread_worker/src/lib.rs | 120 |
1 files changed, 63 insertions, 57 deletions
diff --git a/crates/thread_worker/src/lib.rs b/crates/thread_worker/src/lib.rs index a522a0843..d67e44e38 100644 --- a/crates/thread_worker/src/lib.rs +++ b/crates/thread_worker/src/lib.rs | |||
@@ -2,74 +2,80 @@ | |||
2 | 2 | ||
3 | use std::thread; | 3 | use std::thread; |
4 | 4 | ||
5 | use crossbeam_channel::{bounded, unbounded, Receiver, Sender, RecvError, SendError}; | 5 | use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; |
6 | use drop_bomb::DropBomb; | ||
7 | 6 | ||
8 | pub struct Worker<I, O> { | 7 | /// Like `std::thread::JoinHandle<()>`, but joins thread in drop automatically. |
9 | pub inp: Sender<I>, | 8 | pub struct ScopedThread { |
10 | pub out: Receiver<O>, | 9 | // Option for drop |
10 | inner: Option<thread::JoinHandle<()>>, | ||
11 | } | 11 | } |
12 | 12 | ||
13 | pub struct WorkerHandle { | 13 | impl Drop for ScopedThread { |
14 | name: &'static str, | 14 | fn drop(&mut self) { |
15 | thread: thread::JoinHandle<()>, | 15 | let inner = self.inner.take().unwrap(); |
16 | bomb: DropBomb, | 16 | let name = inner.thread().name().unwrap().to_string(); |
17 | } | 17 | log::info!("waiting for {} to finish...", name); |
18 | let res = inner.join(); | ||
19 | log::info!(".. {} terminated with {}", name, if res.is_ok() { "ok" } else { "err" }); | ||
18 | 20 | ||
19 | pub fn spawn<I, O, F>(name: &'static str, buf: usize, f: F) -> (Worker<I, O>, WorkerHandle) | 21 | // escalate panic, but avoid aborting the process |
20 | where | 22 | match res { |
21 | F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static, | 23 | Err(e) => { |
22 | I: Send + 'static, | 24 | if !thread::panicking() { |
23 | O: Send + 'static, | 25 | panic!(e) |
24 | { | 26 | } |
25 | let (worker, inp_r, out_s) = worker_chan(buf); | 27 | } |
26 | let watcher = WorkerHandle::spawn(name, move || f(inp_r, out_s)); | 28 | _ => (), |
27 | (worker, watcher) | 29 | } |
28 | } | ||
29 | |||
30 | impl<I, O> Worker<I, O> { | ||
31 | /// Stops the worker. Returns the message receiver to fetch results which | ||
32 | /// have become ready before the worker is stopped. | ||
33 | pub fn shutdown(self) -> Receiver<O> { | ||
34 | self.out | ||
35 | } | 30 | } |
31 | } | ||
36 | 32 | ||
37 | pub fn send(&self, item: I) -> Result<(), SendError<I>> { | 33 | impl ScopedThread { |
38 | self.inp.send(item) | 34 | pub fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ScopedThread { |
39 | } | 35 | let inner = thread::Builder::new().name(name.into()).spawn(f).unwrap(); |
40 | pub fn recv(&self) -> Result<O, RecvError> { | 36 | ScopedThread { inner: Some(inner) } |
41 | self.out.recv() | ||
42 | } | 37 | } |
43 | } | 38 | } |
44 | 39 | ||
45 | impl WorkerHandle { | 40 | /// A wrapper around event-processing thread with automatic shutdown semantics. |
46 | fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> WorkerHandle { | 41 | pub struct Worker<I, O> { |
47 | let thread = thread::spawn(f); | 42 | // XXX: field order is significant here. |
48 | WorkerHandle { | 43 | // |
49 | name, | 44 | // In Rust, fields are dropped in the declaration order, and we rely on this |
50 | thread, | 45 | // here. We must close input first, so that the `thread` (who holds the |
51 | bomb: DropBomb::new(format!("WorkerHandle {} was not shutdown", name)), | 46 | // opposite side of the channel) noticed shutdown. Then, we must join the |
52 | } | 47 | // thread, but we must keep out alive so that the thread does not panic. |
53 | } | 48 | // |
49 | // Note that a potential problem here is that we might drop some messages | ||
50 | // from receiver on the floor. This is ok for rust-analyzer: we have only a | ||
51 | // single client, so, if we are shutting down, nobody is interested in the | ||
52 | // unfinished work anyway! | ||
53 | sender: Sender<I>, | ||
54 | _thread: ScopedThread, | ||
55 | receiver: Receiver<O>, | ||
56 | } | ||
54 | 57 | ||
55 | pub fn shutdown(mut self) -> thread::Result<()> { | 58 | impl<I, O> Worker<I, O> { |
56 | log::info!("waiting for {} to finish ...", self.name); | 59 | pub fn spawn<F>(name: &'static str, buf: usize, f: F) -> Worker<I, O> |
57 | let name = self.name; | 60 | where |
58 | self.bomb.defuse(); | 61 | F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static, |
59 | let res = self.thread.join(); | 62 | I: Send + 'static, |
60 | match &res { | 63 | O: Send + 'static, |
61 | Ok(()) => log::info!("... {} terminated with ok", name), | 64 | { |
62 | Err(_) => log::error!("... {} terminated with err", name), | 65 | // Set up worker channels in a deadlock-avoiding way. If one sets both input |
63 | } | 66 | // and output buffers to a fixed size, a worker might get stuck. |
64 | res | 67 | let (sender, input_receiver) = bounded::<I>(buf); |
68 | let (output_sender, receiver) = unbounded::<O>(); | ||
69 | let _thread = ScopedThread::spawn(name, move || f(input_receiver, output_sender)); | ||
70 | Worker { sender, _thread, receiver } | ||
65 | } | 71 | } |
66 | } | 72 | } |
67 | 73 | ||
68 | /// Sets up worker channels in a deadlock-avoiding way. | 74 | impl<I, O> Worker<I, O> { |
69 | /// If one sets both input and output buffers to a fixed size, | 75 | pub fn sender(&self) -> &Sender<I> { |
70 | /// a worker might get stuck. | 76 | &self.sender |
71 | fn worker_chan<I, O>(buf: usize) -> (Worker<I, O>, Receiver<I>, Sender<O>) { | 77 | } |
72 | let (input_sender, input_receiver) = bounded::<I>(buf); | 78 | pub fn receiver(&self) -> &Receiver<O> { |
73 | let (output_sender, output_receiver) = unbounded::<O>(); | 79 | &self.receiver |
74 | (Worker { inp: input_sender, out: output_receiver }, input_receiver, output_sender) | 80 | } |
75 | } | 81 | } |