aboutsummaryrefslogtreecommitdiff
path: root/crates/thread_worker/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/thread_worker/src/lib.rs')
-rw-r--r--crates/thread_worker/src/lib.rs39
1 files changed, 5 insertions, 34 deletions
diff --git a/crates/thread_worker/src/lib.rs b/crates/thread_worker/src/lib.rs
index d8d0d9bf2..68e5c124d 100644
--- a/crates/thread_worker/src/lib.rs
+++ b/crates/thread_worker/src/lib.rs
@@ -1,39 +1,7 @@
1//! Small utility to correctly spawn crossbeam-channel based worker threads. 1//! Small utility to correctly spawn crossbeam-channel based worker threads.
2 2
3use std::thread;
4
5use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; 3use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
6 4
7/// Like `std::thread::JoinHandle<()>`, but joins thread in drop automatically.
8pub struct ScopedThread {
9 // Option for drop
10 inner: Option<thread::JoinHandle<()>>,
11}
12
13impl Drop for ScopedThread {
14 fn drop(&mut self) {
15 let inner = self.inner.take().unwrap();
16 let name = inner.thread().name().unwrap().to_string();
17 log::info!("waiting for {} to finish...", name);
18 let res = inner.join();
19 log::info!(".. {} terminated with {}", name, if res.is_ok() { "ok" } else { "err" });
20
21 // escalate panic, but avoid aborting the process
22 if let Err(e) = res {
23 if !thread::panicking() {
24 panic!(e)
25 }
26 }
27 }
28}
29
30impl ScopedThread {
31 pub fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ScopedThread {
32 let inner = thread::Builder::new().name(name.into()).spawn(f).unwrap();
33 ScopedThread { inner: Some(inner) }
34 }
35}
36
37/// A wrapper around event-processing thread with automatic shutdown semantics. 5/// A wrapper around event-processing thread with automatic shutdown semantics.
38pub struct Worker<I, O> { 6pub struct Worker<I, O> {
39 // XXX: field order is significant here. 7 // XXX: field order is significant here.
@@ -48,7 +16,7 @@ pub struct Worker<I, O> {
48 // single client, so, if we are shutting down, nobody is interested in the 16 // single client, so, if we are shutting down, nobody is interested in the
49 // unfinished work anyway! 17 // unfinished work anyway!
50 sender: Sender<I>, 18 sender: Sender<I>,
51 _thread: ScopedThread, 19 _thread: jod_thread::JoinHandle<()>,
52 receiver: Receiver<O>, 20 receiver: Receiver<O>,
53} 21}
54 22
@@ -63,7 +31,10 @@ impl<I, O> Worker<I, O> {
63 // and output buffers to a fixed size, a worker might get stuck. 31 // and output buffers to a fixed size, a worker might get stuck.
64 let (sender, input_receiver) = bounded::<I>(buf); 32 let (sender, input_receiver) = bounded::<I>(buf);
65 let (output_sender, receiver) = unbounded::<O>(); 33 let (output_sender, receiver) = unbounded::<O>();
66 let _thread = ScopedThread::spawn(name, move || f(input_receiver, output_sender)); 34 let _thread = jod_thread::Builder::new()
35 .name(name.to_string())
36 .spawn(move || f(input_receiver, output_sender))
37 .expect("failed to spawn a thread");
67 Worker { sender, _thread, receiver } 38 Worker { sender, _thread, receiver }
68 } 39 }
69} 40}