aboutsummaryrefslogtreecommitdiff
path: root/crates/thread_worker
diff options
context:
space:
mode:
authorAleksey Kladov <[email protected]>2018-12-18 09:45:20 +0000
committerAleksey Kladov <[email protected]>2018-12-18 09:52:17 +0000
commit193992fd14e88d91a3695f10204232d4c81192dc (patch)
treefced7beb98137d8675e0f864c7b41c909b97ebf5 /crates/thread_worker
parent4a1ab869b7aaa38d55e8995ec1b49e72b55a5965 (diff)
move thread worker to a separate crate
Diffstat (limited to 'crates/thread_worker')
-rw-r--r--crates/thread_worker/Cargo.toml11
-rw-r--r--crates/thread_worker/src/lib.rs79
2 files changed, 90 insertions, 0 deletions
diff --git a/crates/thread_worker/Cargo.toml b/crates/thread_worker/Cargo.toml
new file mode 100644
index 000000000..62d66a1a3
--- /dev/null
+++ b/crates/thread_worker/Cargo.toml
@@ -0,0 +1,11 @@
1[package]
2edition = "2018"
3name = "thread_worker"
4version = "0.1.0"
5authors = ["Aleksey Kladov <[email protected]>"]
6
7[dependencies]
8drop_bomb = "0.1.0"
9crossbeam-channel = "0.2.4"
10log = "0.4.3"
11
diff --git a/crates/thread_worker/src/lib.rs b/crates/thread_worker/src/lib.rs
new file mode 100644
index 000000000..e558559ef
--- /dev/null
+++ b/crates/thread_worker/src/lib.rs
@@ -0,0 +1,79 @@
1//! Small utility to correctly spawn crossbeam-channel based worker threads.
2
3use std::thread;
4
5use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
6use drop_bomb::DropBomb;
7
8pub struct Worker<I, O> {
9 pub inp: Sender<I>,
10 pub out: Receiver<O>,
11}
12
13pub struct WorkerHandle {
14 name: &'static str,
15 thread: thread::JoinHandle<()>,
16 bomb: DropBomb,
17}
18
19pub fn spawn<I, O, F>(name: &'static str, buf: usize, f: F) -> (Worker<I, O>, WorkerHandle)
20where
21 F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static,
22 I: Send + 'static,
23 O: Send + 'static,
24{
25 let (worker, inp_r, out_s) = worker_chan(buf);
26 let watcher = WorkerHandle::spawn(name, move || f(inp_r, out_s));
27 (worker, watcher)
28}
29
30impl<I, O> Worker<I, O> {
31 /// Stops the worker. Returns the message receiver to fetch results which
32 /// have become ready before the worker is stopped.
33 pub fn stop(self) -> Receiver<O> {
34 self.out
35 }
36
37 pub fn send(&self, item: I) {
38 self.inp.send(item)
39 }
40}
41
42impl WorkerHandle {
43 fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> WorkerHandle {
44 let thread = thread::spawn(f);
45 WorkerHandle {
46 name,
47 thread,
48 bomb: DropBomb::new(format!("WorkerHandle {} was not stopped", name)),
49 }
50 }
51
52 pub fn stop(mut self) -> thread::Result<()> {
53 log::info!("waiting for {} to finish ...", self.name);
54 let name = self.name;
55 self.bomb.defuse();
56 let res = self.thread.join();
57 match &res {
58 Ok(()) => log::info!("... {} terminated with ok", name),
59 Err(_) => log::error!("... {} terminated with err", name),
60 }
61 res
62 }
63}
64
65/// Sets up worker channels in a deadlock-avoind way.
66/// If one sets both input and output buffers to a fixed size,
67/// a worker might get stuck.
68fn worker_chan<I, O>(buf: usize) -> (Worker<I, O>, Receiver<I>, Sender<O>) {
69 let (input_sender, input_receiver) = bounded::<I>(buf);
70 let (output_sender, output_receiver) = unbounded::<O>();
71 (
72 Worker {
73 inp: input_sender,
74 out: output_receiver,
75 },
76 input_receiver,
77 output_sender,
78 )
79}