diff options
Diffstat (limited to 'crates/thread_worker')
-rw-r--r-- | crates/thread_worker/Cargo.toml | 11 | ||||
-rw-r--r-- | crates/thread_worker/src/lib.rs | 79 |
2 files changed, 90 insertions, 0 deletions
diff --git a/crates/thread_worker/Cargo.toml b/crates/thread_worker/Cargo.toml new file mode 100644 index 000000000..62d66a1a3 --- /dev/null +++ b/crates/thread_worker/Cargo.toml | |||
@@ -0,0 +1,11 @@ | |||
1 | [package] | ||
2 | edition = "2018" | ||
3 | name = "thread_worker" | ||
4 | version = "0.1.0" | ||
5 | authors = ["Aleksey Kladov <[email protected]>"] | ||
6 | |||
7 | [dependencies] | ||
8 | drop_bomb = "0.1.0" | ||
9 | crossbeam-channel = "0.2.4" | ||
10 | log = "0.4.3" | ||
11 | |||
diff --git a/crates/thread_worker/src/lib.rs b/crates/thread_worker/src/lib.rs new file mode 100644 index 000000000..e558559ef --- /dev/null +++ b/crates/thread_worker/src/lib.rs | |||
@@ -0,0 +1,79 @@ | |||
1 | //! Small utility to correctly spawn crossbeam-channel based worker threads. | ||
2 | |||
3 | use std::thread; | ||
4 | |||
5 | use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; | ||
6 | use drop_bomb::DropBomb; | ||
7 | |||
8 | pub struct Worker<I, O> { | ||
9 | pub inp: Sender<I>, | ||
10 | pub out: Receiver<O>, | ||
11 | } | ||
12 | |||
13 | pub struct WorkerHandle { | ||
14 | name: &'static str, | ||
15 | thread: thread::JoinHandle<()>, | ||
16 | bomb: DropBomb, | ||
17 | } | ||
18 | |||
19 | pub fn spawn<I, O, F>(name: &'static str, buf: usize, f: F) -> (Worker<I, O>, WorkerHandle) | ||
20 | where | ||
21 | F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static, | ||
22 | I: Send + 'static, | ||
23 | O: Send + 'static, | ||
24 | { | ||
25 | let (worker, inp_r, out_s) = worker_chan(buf); | ||
26 | let watcher = WorkerHandle::spawn(name, move || f(inp_r, out_s)); | ||
27 | (worker, watcher) | ||
28 | } | ||
29 | |||
30 | impl<I, O> Worker<I, O> { | ||
31 | /// Stops the worker. Returns the message receiver to fetch results which | ||
32 | /// have become ready before the worker is stopped. | ||
33 | pub fn stop(self) -> Receiver<O> { | ||
34 | self.out | ||
35 | } | ||
36 | |||
37 | pub fn send(&self, item: I) { | ||
38 | self.inp.send(item) | ||
39 | } | ||
40 | } | ||
41 | |||
42 | impl WorkerHandle { | ||
43 | fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> WorkerHandle { | ||
44 | let thread = thread::spawn(f); | ||
45 | WorkerHandle { | ||
46 | name, | ||
47 | thread, | ||
48 | bomb: DropBomb::new(format!("WorkerHandle {} was not stopped", name)), | ||
49 | } | ||
50 | } | ||
51 | |||
52 | pub fn stop(mut self) -> thread::Result<()> { | ||
53 | log::info!("waiting for {} to finish ...", self.name); | ||
54 | let name = self.name; | ||
55 | self.bomb.defuse(); | ||
56 | let res = self.thread.join(); | ||
57 | match &res { | ||
58 | Ok(()) => log::info!("... {} terminated with ok", name), | ||
59 | Err(_) => log::error!("... {} terminated with err", name), | ||
60 | } | ||
61 | res | ||
62 | } | ||
63 | } | ||
64 | |||
65 | /// Sets up worker channels in a deadlock-avoind way. | ||
66 | /// If one sets both input and output buffers to a fixed size, | ||
67 | /// a worker might get stuck. | ||
68 | fn worker_chan<I, O>(buf: usize) -> (Worker<I, O>, Receiver<I>, Sender<O>) { | ||
69 | let (input_sender, input_receiver) = bounded::<I>(buf); | ||
70 | let (output_sender, output_receiver) = unbounded::<O>(); | ||
71 | ( | ||
72 | Worker { | ||
73 | inp: input_sender, | ||
74 | out: output_receiver, | ||
75 | }, | ||
76 | input_receiver, | ||
77 | output_sender, | ||
78 | ) | ||
79 | } | ||