aboutsummaryrefslogtreecommitdiff
path: root/crates/server/src/thread_watcher.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/server/src/thread_watcher.rs')
-rw-r--r--crates/server/src/thread_watcher.rs70
1 files changed, 0 insertions, 70 deletions
diff --git a/crates/server/src/thread_watcher.rs b/crates/server/src/thread_watcher.rs
deleted file mode 100644
index 86a3a91e0..000000000
--- a/crates/server/src/thread_watcher.rs
+++ /dev/null
@@ -1,70 +0,0 @@
1use std::thread;
2use crossbeam_channel::{bounded, unbounded, Sender, Receiver};
3use drop_bomb::DropBomb;
4use Result;
5
6pub struct Worker<I, O> {
7 pub inp: Sender<I>,
8 pub out: Receiver<O>,
9}
10
11impl<I, O> Worker<I, O> {
12 pub fn spawn<F>(name: &'static str, buf: usize, f: F) -> (Self, ThreadWatcher)
13 where
14 F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static,
15 I: Send + 'static,
16 O: Send + 'static,
17 {
18 let ((inp, out), inp_r, out_s) = worker_chan(buf);
19 let worker = Worker { inp, out };
20 let watcher = ThreadWatcher::spawn(name, move || f(inp_r, out_s));
21 (worker, watcher)
22 }
23
24 pub fn stop(self) -> Receiver<O> {
25 self.out
26 }
27
28 pub fn send(&self, item: I) {
29 self.inp.send(item)
30 }
31}
32
33pub struct ThreadWatcher {
34 name: &'static str,
35 thread: thread::JoinHandle<()>,
36 bomb: DropBomb,
37}
38
39impl ThreadWatcher {
40 fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher {
41 let thread = thread::spawn(f);
42 ThreadWatcher {
43 name,
44 thread,
45 bomb: DropBomb::new(format!("ThreadWatcher {} was not stopped", name)),
46 }
47 }
48
49 pub fn stop(mut self) -> Result<()> {
50 info!("waiting for {} to finish ...", self.name);
51 let name = self.name;
52 self.bomb.defuse();
53 let res = self.thread.join()
54 .map_err(|_| format_err!("ThreadWatcher {} died", name));
55 match &res {
56 Ok(()) => info!("... {} terminated with ok", name),
57 Err(_) => error!("... {} terminated with err", name)
58 }
59 res
60 }
61}
62
63/// Sets up worker channels in a deadlock-avoind way.
64/// If one sets both input and output buffers to a fixed size,
65/// a worker might get stuck.
66fn worker_chan<I, O>(buf: usize) -> ((Sender<I>, Receiver<O>), Receiver<I>, Sender<O>) {
67 let (input_sender, input_receiver) = bounded::<I>(buf);
68 let (output_sender, output_receiver) = unbounded::<O>();
69 ((input_sender, output_receiver), input_receiver, output_sender)
70}