diff options
Diffstat (limited to 'crates/thread_worker/src/lib.rs')
-rw-r--r-- | crates/thread_worker/src/lib.rs | 39 |
1 files changed, 5 insertions, 34 deletions
diff --git a/crates/thread_worker/src/lib.rs b/crates/thread_worker/src/lib.rs index d8d0d9bf2..68e5c124d 100644 --- a/crates/thread_worker/src/lib.rs +++ b/crates/thread_worker/src/lib.rs | |||
@@ -1,39 +1,7 @@ | |||
1 | //! Small utility to correctly spawn crossbeam-channel based worker threads. | 1 | //! Small utility to correctly spawn crossbeam-channel based worker threads. |
2 | 2 | ||
3 | use std::thread; | ||
4 | |||
5 | use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; | 3 | use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; |
6 | 4 | ||
7 | /// Like `std::thread::JoinHandle<()>`, but joins thread in drop automatically. | ||
8 | pub struct ScopedThread { | ||
9 | // Option for drop | ||
10 | inner: Option<thread::JoinHandle<()>>, | ||
11 | } | ||
12 | |||
13 | impl Drop for ScopedThread { | ||
14 | fn drop(&mut self) { | ||
15 | let inner = self.inner.take().unwrap(); | ||
16 | let name = inner.thread().name().unwrap().to_string(); | ||
17 | log::info!("waiting for {} to finish...", name); | ||
18 | let res = inner.join(); | ||
19 | log::info!(".. {} terminated with {}", name, if res.is_ok() { "ok" } else { "err" }); | ||
20 | |||
21 | // escalate panic, but avoid aborting the process | ||
22 | if let Err(e) = res { | ||
23 | if !thread::panicking() { | ||
24 | panic!(e) | ||
25 | } | ||
26 | } | ||
27 | } | ||
28 | } | ||
29 | |||
30 | impl ScopedThread { | ||
31 | pub fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ScopedThread { | ||
32 | let inner = thread::Builder::new().name(name.into()).spawn(f).unwrap(); | ||
33 | ScopedThread { inner: Some(inner) } | ||
34 | } | ||
35 | } | ||
36 | |||
37 | /// A wrapper around event-processing thread with automatic shutdown semantics. | 5 | /// A wrapper around event-processing thread with automatic shutdown semantics. |
38 | pub struct Worker<I, O> { | 6 | pub struct Worker<I, O> { |
39 | // XXX: field order is significant here. | 7 | // XXX: field order is significant here. |
@@ -48,7 +16,7 @@ pub struct Worker<I, O> { | |||
48 | // single client, so, if we are shutting down, nobody is interested in the | 16 | // single client, so, if we are shutting down, nobody is interested in the |
49 | // unfinished work anyway! | 17 | // unfinished work anyway! |
50 | sender: Sender<I>, | 18 | sender: Sender<I>, |
51 | _thread: ScopedThread, | 19 | _thread: jod_thread::JoinHandle<()>, |
52 | receiver: Receiver<O>, | 20 | receiver: Receiver<O>, |
53 | } | 21 | } |
54 | 22 | ||
@@ -63,7 +31,10 @@ impl<I, O> Worker<I, O> { | |||
63 | // and output buffers to a fixed size, a worker might get stuck. | 31 | // and output buffers to a fixed size, a worker might get stuck. |
64 | let (sender, input_receiver) = bounded::<I>(buf); | 32 | let (sender, input_receiver) = bounded::<I>(buf); |
65 | let (output_sender, receiver) = unbounded::<O>(); | 33 | let (output_sender, receiver) = unbounded::<O>(); |
66 | let _thread = ScopedThread::spawn(name, move || f(input_receiver, output_sender)); | 34 | let _thread = jod_thread::Builder::new() |
35 | .name(name.to_string()) | ||
36 | .spawn(move || f(input_receiver, output_sender)) | ||
37 | .expect("failed to spawn a thread"); | ||
67 | Worker { sender, _thread, receiver } | 38 | Worker { sender, _thread, receiver } |
68 | } | 39 | } |
69 | } | 40 | } |