From b5021411a84822cb3f1e3aeffad9550dd15bdeb6 Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Sun, 16 Sep 2018 12:54:24 +0300 Subject: rename all things --- crates/ra_lsp_server/src/thread_watcher.rs | 70 ++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) create mode 100644 crates/ra_lsp_server/src/thread_watcher.rs (limited to 'crates/ra_lsp_server/src/thread_watcher.rs') diff --git a/crates/ra_lsp_server/src/thread_watcher.rs b/crates/ra_lsp_server/src/thread_watcher.rs new file mode 100644 index 000000000..86a3a91e0 --- /dev/null +++ b/crates/ra_lsp_server/src/thread_watcher.rs @@ -0,0 +1,70 @@ +use std::thread; +use crossbeam_channel::{bounded, unbounded, Sender, Receiver}; +use drop_bomb::DropBomb; +use Result; + +pub struct Worker { + pub inp: Sender, + pub out: Receiver, +} + +impl Worker { + pub fn spawn(name: &'static str, buf: usize, f: F) -> (Self, ThreadWatcher) + where + F: FnOnce(Receiver, Sender) + Send + 'static, + I: Send + 'static, + O: Send + 'static, + { + let ((inp, out), inp_r, out_s) = worker_chan(buf); + let worker = Worker { inp, out }; + let watcher = ThreadWatcher::spawn(name, move || f(inp_r, out_s)); + (worker, watcher) + } + + pub fn stop(self) -> Receiver { + self.out + } + + pub fn send(&self, item: I) { + self.inp.send(item) + } +} + +pub struct ThreadWatcher { + name: &'static str, + thread: thread::JoinHandle<()>, + bomb: DropBomb, +} + +impl ThreadWatcher { + fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher { + let thread = thread::spawn(f); + ThreadWatcher { + name, + thread, + bomb: DropBomb::new(format!("ThreadWatcher {} was not stopped", name)), + } + } + + pub fn stop(mut self) -> Result<()> { + info!("waiting for {} to finish ...", self.name); + let name = self.name; + self.bomb.defuse(); + let res = self.thread.join() + .map_err(|_| format_err!("ThreadWatcher {} died", name)); + match &res { + Ok(()) => info!("... {} terminated with ok", name), + Err(_) => error!("... {} terminated with err", name) + } + res + } +} + +/// Sets up worker channels in a deadlock-avoind way. +/// If one sets both input and output buffers to a fixed size, +/// a worker might get stuck. +fn worker_chan(buf: usize) -> ((Sender, Receiver), Receiver, Sender) { + let (input_sender, input_receiver) = bounded::(buf); + let (output_sender, output_receiver) = unbounded::(); + ((input_sender, output_receiver), input_receiver, output_sender) +} -- cgit v1.2.3