aboutsummaryrefslogtreecommitdiff
path: root/crates/server/src/thread_watcher.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/server/src/thread_watcher.rs')
-rw-r--r--crates/server/src/thread_watcher.rs31
1 files changed, 29 insertions, 2 deletions
diff --git a/crates/server/src/thread_watcher.rs b/crates/server/src/thread_watcher.rs
index 74a0a58b7..86a3a91e0 100644
--- a/crates/server/src/thread_watcher.rs
+++ b/crates/server/src/thread_watcher.rs
@@ -3,6 +3,33 @@ use crossbeam_channel::{bounded, unbounded, Sender, Receiver};
3use drop_bomb::DropBomb; 3use drop_bomb::DropBomb;
4use Result; 4use Result;
5 5
6pub struct Worker<I, O> {
7 pub inp: Sender<I>,
8 pub out: Receiver<O>,
9}
10
11impl<I, O> Worker<I, O> {
12 pub fn spawn<F>(name: &'static str, buf: usize, f: F) -> (Self, ThreadWatcher)
13 where
14 F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static,
15 I: Send + 'static,
16 O: Send + 'static,
17 {
18 let ((inp, out), inp_r, out_s) = worker_chan(buf);
19 let worker = Worker { inp, out };
20 let watcher = ThreadWatcher::spawn(name, move || f(inp_r, out_s));
21 (worker, watcher)
22 }
23
24 pub fn stop(self) -> Receiver<O> {
25 self.out
26 }
27
28 pub fn send(&self, item: I) {
29 self.inp.send(item)
30 }
31}
32
6pub struct ThreadWatcher { 33pub struct ThreadWatcher {
7 name: &'static str, 34 name: &'static str,
8 thread: thread::JoinHandle<()>, 35 thread: thread::JoinHandle<()>,
@@ -10,7 +37,7 @@ pub struct ThreadWatcher {
10} 37}
11 38
12impl ThreadWatcher { 39impl ThreadWatcher {
13 pub fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher { 40 fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher {
14 let thread = thread::spawn(f); 41 let thread = thread::spawn(f);
15 ThreadWatcher { 42 ThreadWatcher {
16 name, 43 name,
@@ -36,7 +63,7 @@ impl ThreadWatcher {
36/// Sets up worker channels in a deadlock-avoind way. 63/// Sets up worker channels in a deadlock-avoind way.
37/// If one sets both input and output buffers to a fixed size, 64/// If one sets both input and output buffers to a fixed size,
38/// a worker might get stuck. 65/// a worker might get stuck.
39pub fn worker_chan<I, O>(buf: usize) -> ((Sender<I>, Receiver<O>), Receiver<I>, Sender<O>) { 66fn worker_chan<I, O>(buf: usize) -> ((Sender<I>, Receiver<O>), Receiver<I>, Sender<O>) {
40 let (input_sender, input_receiver) = bounded::<I>(buf); 67 let (input_sender, input_receiver) = bounded::<I>(buf);
41 let (output_sender, output_receiver) = unbounded::<O>(); 68 let (output_sender, output_receiver) = unbounded::<O>();
42 ((input_sender, output_receiver), input_receiver, output_sender) 69 ((input_sender, output_receiver), input_receiver, output_sender)