diff options
Diffstat (limited to 'crates/ra_lsp_server/src/thread_watcher.rs')
-rw-r--r-- | crates/ra_lsp_server/src/thread_watcher.rs | 70 |
1 files changed, 70 insertions, 0 deletions
diff --git a/crates/ra_lsp_server/src/thread_watcher.rs b/crates/ra_lsp_server/src/thread_watcher.rs new file mode 100644 index 000000000..86a3a91e0 --- /dev/null +++ b/crates/ra_lsp_server/src/thread_watcher.rs | |||
@@ -0,0 +1,70 @@ | |||
1 | use std::thread; | ||
2 | use crossbeam_channel::{bounded, unbounded, Sender, Receiver}; | ||
3 | use drop_bomb::DropBomb; | ||
4 | use Result; | ||
5 | |||
6 | pub struct Worker<I, O> { | ||
7 | pub inp: Sender<I>, | ||
8 | pub out: Receiver<O>, | ||
9 | } | ||
10 | |||
11 | impl<I, O> Worker<I, O> { | ||
12 | pub fn spawn<F>(name: &'static str, buf: usize, f: F) -> (Self, ThreadWatcher) | ||
13 | where | ||
14 | F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static, | ||
15 | I: Send + 'static, | ||
16 | O: Send + 'static, | ||
17 | { | ||
18 | let ((inp, out), inp_r, out_s) = worker_chan(buf); | ||
19 | let worker = Worker { inp, out }; | ||
20 | let watcher = ThreadWatcher::spawn(name, move || f(inp_r, out_s)); | ||
21 | (worker, watcher) | ||
22 | } | ||
23 | |||
24 | pub fn stop(self) -> Receiver<O> { | ||
25 | self.out | ||
26 | } | ||
27 | |||
28 | pub fn send(&self, item: I) { | ||
29 | self.inp.send(item) | ||
30 | } | ||
31 | } | ||
32 | |||
33 | pub struct ThreadWatcher { | ||
34 | name: &'static str, | ||
35 | thread: thread::JoinHandle<()>, | ||
36 | bomb: DropBomb, | ||
37 | } | ||
38 | |||
39 | impl ThreadWatcher { | ||
40 | fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher { | ||
41 | let thread = thread::spawn(f); | ||
42 | ThreadWatcher { | ||
43 | name, | ||
44 | thread, | ||
45 | bomb: DropBomb::new(format!("ThreadWatcher {} was not stopped", name)), | ||
46 | } | ||
47 | } | ||
48 | |||
49 | pub fn stop(mut self) -> Result<()> { | ||
50 | info!("waiting for {} to finish ...", self.name); | ||
51 | let name = self.name; | ||
52 | self.bomb.defuse(); | ||
53 | let res = self.thread.join() | ||
54 | .map_err(|_| format_err!("ThreadWatcher {} died", name)); | ||
55 | match &res { | ||
56 | Ok(()) => info!("... {} terminated with ok", name), | ||
57 | Err(_) => error!("... {} terminated with err", name) | ||
58 | } | ||
59 | res | ||
60 | } | ||
61 | } | ||
62 | |||
63 | /// Sets up worker channels in a deadlock-avoind way. | ||
64 | /// If one sets both input and output buffers to a fixed size, | ||
65 | /// a worker might get stuck. | ||
66 | fn worker_chan<I, O>(buf: usize) -> ((Sender<I>, Receiver<O>), Receiver<I>, Sender<O>) { | ||
67 | let (input_sender, input_receiver) = bounded::<I>(buf); | ||
68 | let (output_sender, output_receiver) = unbounded::<O>(); | ||
69 | ((input_sender, output_receiver), input_receiver, output_sender) | ||
70 | } | ||