From 193992fd14e88d91a3695f10204232d4c81192dc Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Tue, 18 Dec 2018 12:45:20 +0300 Subject: move thread worker to a separate crate --- crates/ra_lsp_server/src/lib.rs | 1 - crates/ra_lsp_server/src/main_loop.rs | 6 +-- crates/ra_lsp_server/src/project_model.rs | 10 ++-- crates/ra_lsp_server/src/thread_watcher.rs | 81 ------------------------------ crates/ra_lsp_server/src/vfs.rs | 7 ++- 5 files changed, 10 insertions(+), 95 deletions(-) delete mode 100644 crates/ra_lsp_server/src/thread_watcher.rs (limited to 'crates/ra_lsp_server/src') diff --git a/crates/ra_lsp_server/src/lib.rs b/crates/ra_lsp_server/src/lib.rs index 75c6fa1b8..1d7258c35 100644 --- a/crates/ra_lsp_server/src/lib.rs +++ b/crates/ra_lsp_server/src/lib.rs @@ -5,7 +5,6 @@ mod path_map; mod project_model; pub mod req; mod server_world; -pub mod thread_watcher; mod vfs; pub type Result = ::std::result::Result; diff --git a/crates/ra_lsp_server/src/main_loop.rs b/crates/ra_lsp_server/src/main_loop.rs index 41f70f263..eab82ee85 100644 --- a/crates/ra_lsp_server/src/main_loop.rs +++ b/crates/ra_lsp_server/src/main_loop.rs @@ -10,6 +10,7 @@ use gen_lsp_server::{ use languageserver_types::NumberOrString; use ra_analysis::{Canceled, FileId, LibraryData}; use rayon; +use thread_worker::Worker; use threadpool::ThreadPool; use rustc_hash::FxHashSet; use serde::{de::DeserializeOwned, Serialize}; @@ -21,7 +22,6 @@ use crate::{ project_model::{workspace_loader, CargoWorkspace}, req, server_world::{ServerWorld, ServerWorldState}, - thread_watcher::Worker, vfs::{self, FileEvent}, Result, }; @@ -92,8 +92,8 @@ pub fn main_loop( let ws_res = ws_watcher.stop(); main_res?; - fs_res?; - ws_res?; + fs_res.map_err(|_| format_err!("fs watcher died"))?; + ws_res.map_err(|_| format_err!("ws watcher died"))?; Ok(()) } diff --git a/crates/ra_lsp_server/src/project_model.rs b/crates/ra_lsp_server/src/project_model.rs index cb91ada90..b881f8b6f 100644 --- a/crates/ra_lsp_server/src/project_model.rs +++ b/crates/ra_lsp_server/src/project_model.rs @@ -4,11 +4,9 @@ use cargo_metadata::{metadata_run, CargoOpt}; use ra_syntax::SmolStr; use rustc_hash::{FxHashMap, FxHashSet}; use failure::{format_err, bail}; +use thread_worker::{WorkerHandle, Worker}; -use crate::{ - Result, - thread_watcher::{ThreadWatcher, Worker}, -}; +use crate::Result; /// `CargoWorksapce` represents the logical structure of, well, a Cargo /// workspace. It pretty closely mirrors `cargo metadata` output. @@ -199,8 +197,8 @@ impl TargetKind { } } -pub fn workspace_loader() -> (Worker>, ThreadWatcher) { - Worker::>::spawn( +pub fn workspace_loader() -> (Worker>, WorkerHandle) { + thread_worker::spawn::, _>( "workspace loader", 1, |input_receiver, output_sender| { diff --git a/crates/ra_lsp_server/src/thread_watcher.rs b/crates/ra_lsp_server/src/thread_watcher.rs deleted file mode 100644 index 99825d440..000000000 --- a/crates/ra_lsp_server/src/thread_watcher.rs +++ /dev/null @@ -1,81 +0,0 @@ -use std::thread; - -use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; -use drop_bomb::DropBomb; -use failure::format_err; - -use crate::Result; - -pub struct Worker { - pub inp: Sender, - pub out: Receiver, -} - -impl Worker { - pub fn spawn(name: &'static str, buf: usize, f: F) -> (Self, ThreadWatcher) - where - F: FnOnce(Receiver, Sender) + Send + 'static, - I: Send + 'static, - O: Send + 'static, - { - let (worker, inp_r, out_s) = worker_chan(buf); - let watcher = ThreadWatcher::spawn(name, move || f(inp_r, out_s)); - (worker, watcher) - } - - pub fn stop(self) -> Receiver { - self.out - } - - pub fn send(&self, item: I) { - self.inp.send(item) - } -} - -pub struct ThreadWatcher { - name: &'static str, - thread: thread::JoinHandle<()>, - bomb: DropBomb, -} - -impl ThreadWatcher { - fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher { - let thread = thread::spawn(f); - ThreadWatcher { - name, - thread, - bomb: DropBomb::new(format!("ThreadWatcher {} was not stopped", name)), - } - } - - pub fn stop(mut self) -> Result<()> { - log::info!("waiting for {} to finish ...", self.name); - let name = self.name; - self.bomb.defuse(); - let res = self - .thread - .join() - .map_err(|_| format_err!("ThreadWatcher {} died", name)); - match &res { - Ok(()) => log::info!("... {} terminated with ok", name), - Err(_) => log::error!("... {} terminated with err", name), - } - res - } -} - -/// Sets up worker channels in a deadlock-avoind way. -/// If one sets both input and output buffers to a fixed size, -/// a worker might get stuck. -fn worker_chan(buf: usize) -> (Worker, Receiver, Sender) { - let (input_sender, input_receiver) = bounded::(buf); - let (output_sender, output_receiver) = unbounded::(); - ( - Worker { - inp: input_sender, - out: output_receiver, - }, - input_receiver, - output_sender, - ) -} diff --git a/crates/ra_lsp_server/src/vfs.rs b/crates/ra_lsp_server/src/vfs.rs index 00ab3e6c3..fcf7693d8 100644 --- a/crates/ra_lsp_server/src/vfs.rs +++ b/crates/ra_lsp_server/src/vfs.rs @@ -4,8 +4,7 @@ use std::{ }; use walkdir::WalkDir; - -use crate::thread_watcher::{ThreadWatcher, Worker}; +use thread_worker::{WorkerHandle, Worker}; #[derive(Debug)] pub struct FileEvent { @@ -18,8 +17,8 @@ pub enum FileEventKind { Add(String), } -pub fn roots_loader() -> (Worker)>, ThreadWatcher) { - Worker::)>::spawn( +pub fn roots_loader() -> (Worker)>, WorkerHandle) { + thread_worker::spawn::), _>( "roots loader", 128, |input_receiver, output_sender| { -- cgit v1.2.3