diff options
author | Aleksey Kladov <[email protected]> | 2018-09-08 11:15:01 +0100 |
---|---|---|
committer | Aleksey Kladov <[email protected]> | 2018-09-08 11:15:01 +0100 |
commit | 7daaddb2ac281dcad3ac99496b1cf3f06840887d (patch) | |
tree | 3a6c9ac35c0f198c782591b50eb50225769955bb /crates/server/src/thread_watcher.rs | |
parent | 326ffcefe09906560a03d3184a2ce76841448702 (diff) |
Some abstraction around workers
Diffstat (limited to 'crates/server/src/thread_watcher.rs')
-rw-r--r-- | crates/server/src/thread_watcher.rs | 31 |
1 files changed, 29 insertions, 2 deletions
diff --git a/crates/server/src/thread_watcher.rs b/crates/server/src/thread_watcher.rs index 74a0a58b7..86a3a91e0 100644 --- a/crates/server/src/thread_watcher.rs +++ b/crates/server/src/thread_watcher.rs | |||
@@ -3,6 +3,33 @@ use crossbeam_channel::{bounded, unbounded, Sender, Receiver}; | |||
3 | use drop_bomb::DropBomb; | 3 | use drop_bomb::DropBomb; |
4 | use Result; | 4 | use Result; |
5 | 5 | ||
6 | pub struct Worker<I, O> { | ||
7 | pub inp: Sender<I>, | ||
8 | pub out: Receiver<O>, | ||
9 | } | ||
10 | |||
11 | impl<I, O> Worker<I, O> { | ||
12 | pub fn spawn<F>(name: &'static str, buf: usize, f: F) -> (Self, ThreadWatcher) | ||
13 | where | ||
14 | F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static, | ||
15 | I: Send + 'static, | ||
16 | O: Send + 'static, | ||
17 | { | ||
18 | let ((inp, out), inp_r, out_s) = worker_chan(buf); | ||
19 | let worker = Worker { inp, out }; | ||
20 | let watcher = ThreadWatcher::spawn(name, move || f(inp_r, out_s)); | ||
21 | (worker, watcher) | ||
22 | } | ||
23 | |||
24 | pub fn stop(self) -> Receiver<O> { | ||
25 | self.out | ||
26 | } | ||
27 | |||
28 | pub fn send(&self, item: I) { | ||
29 | self.inp.send(item) | ||
30 | } | ||
31 | } | ||
32 | |||
6 | pub struct ThreadWatcher { | 33 | pub struct ThreadWatcher { |
7 | name: &'static str, | 34 | name: &'static str, |
8 | thread: thread::JoinHandle<()>, | 35 | thread: thread::JoinHandle<()>, |
@@ -10,7 +37,7 @@ pub struct ThreadWatcher { | |||
10 | } | 37 | } |
11 | 38 | ||
12 | impl ThreadWatcher { | 39 | impl ThreadWatcher { |
13 | pub fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher { | 40 | fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher { |
14 | let thread = thread::spawn(f); | 41 | let thread = thread::spawn(f); |
15 | ThreadWatcher { | 42 | ThreadWatcher { |
16 | name, | 43 | name, |
@@ -36,7 +63,7 @@ impl ThreadWatcher { | |||
36 | /// Sets up worker channels in a deadlock-avoind way. | 63 | /// Sets up worker channels in a deadlock-avoind way. |
37 | /// If one sets both input and output buffers to a fixed size, | 64 | /// If one sets both input and output buffers to a fixed size, |
38 | /// a worker might get stuck. | 65 | /// a worker might get stuck. |
39 | pub fn worker_chan<I, O>(buf: usize) -> ((Sender<I>, Receiver<O>), Receiver<I>, Sender<O>) { | 66 | fn worker_chan<I, O>(buf: usize) -> ((Sender<I>, Receiver<O>), Receiver<I>, Sender<O>) { |
40 | let (input_sender, input_receiver) = bounded::<I>(buf); | 67 | let (input_sender, input_receiver) = bounded::<I>(buf); |
41 | let (output_sender, output_receiver) = unbounded::<O>(); | 68 | let (output_sender, output_receiver) = unbounded::<O>(); |
42 | ((input_sender, output_receiver), input_receiver, output_sender) | 69 | ((input_sender, output_receiver), input_receiver, output_sender) |