From 193992fd14e88d91a3695f10204232d4c81192dc Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Tue, 18 Dec 2018 12:45:20 +0300 Subject: move thread worker to a separate crate --- crates/ra_lsp_server/Cargo.toml | 1 + crates/ra_lsp_server/src/lib.rs | 1 - crates/ra_lsp_server/src/main_loop.rs | 6 +- crates/ra_lsp_server/src/project_model.rs | 10 ++- crates/ra_lsp_server/src/thread_watcher.rs | 81 ----------------------- crates/ra_lsp_server/src/vfs.rs | 7 +- crates/ra_lsp_server/tests/heavy_tests/support.rs | 6 +- crates/thread_worker/Cargo.toml | 11 +++ crates/thread_worker/src/lib.rs | 79 ++++++++++++++++++++++ 9 files changed, 104 insertions(+), 98 deletions(-) delete mode 100644 crates/ra_lsp_server/src/thread_watcher.rs create mode 100644 crates/thread_worker/Cargo.toml create mode 100644 crates/thread_worker/src/lib.rs (limited to 'crates') diff --git a/crates/ra_lsp_server/Cargo.toml b/crates/ra_lsp_server/Cargo.toml index 133decc52..30a8d35cd 100644 --- a/crates/ra_lsp_server/Cargo.toml +++ b/crates/ra_lsp_server/Cargo.toml @@ -26,6 +26,7 @@ text_unit = { version = "0.1.2", features = ["serde"] } smol_str = { version = "0.1.5", features = ["serde"] } rustc-hash = "1.0" +thread_worker = { path = "../thread_worker" } ra_syntax = { path = "../ra_syntax" } ra_editor = { path = "../ra_editor" } ra_text_edit = { path = "../ra_text_edit" } diff --git a/crates/ra_lsp_server/src/lib.rs b/crates/ra_lsp_server/src/lib.rs index 75c6fa1b8..1d7258c35 100644 --- a/crates/ra_lsp_server/src/lib.rs +++ b/crates/ra_lsp_server/src/lib.rs @@ -5,7 +5,6 @@ mod path_map; mod project_model; pub mod req; mod server_world; -pub mod thread_watcher; mod vfs; pub type Result = ::std::result::Result; diff --git a/crates/ra_lsp_server/src/main_loop.rs b/crates/ra_lsp_server/src/main_loop.rs index 41f70f263..eab82ee85 100644 --- a/crates/ra_lsp_server/src/main_loop.rs +++ b/crates/ra_lsp_server/src/main_loop.rs @@ -10,6 +10,7 @@ use gen_lsp_server::{ use languageserver_types::NumberOrString; use ra_analysis::{Canceled, FileId, LibraryData}; use rayon; +use thread_worker::Worker; use threadpool::ThreadPool; use rustc_hash::FxHashSet; use serde::{de::DeserializeOwned, Serialize}; @@ -21,7 +22,6 @@ use crate::{ project_model::{workspace_loader, CargoWorkspace}, req, server_world::{ServerWorld, ServerWorldState}, - thread_watcher::Worker, vfs::{self, FileEvent}, Result, }; @@ -92,8 +92,8 @@ pub fn main_loop( let ws_res = ws_watcher.stop(); main_res?; - fs_res?; - ws_res?; + fs_res.map_err(|_| format_err!("fs watcher died"))?; + ws_res.map_err(|_| format_err!("ws watcher died"))?; Ok(()) } diff --git a/crates/ra_lsp_server/src/project_model.rs b/crates/ra_lsp_server/src/project_model.rs index cb91ada90..b881f8b6f 100644 --- a/crates/ra_lsp_server/src/project_model.rs +++ b/crates/ra_lsp_server/src/project_model.rs @@ -4,11 +4,9 @@ use cargo_metadata::{metadata_run, CargoOpt}; use ra_syntax::SmolStr; use rustc_hash::{FxHashMap, FxHashSet}; use failure::{format_err, bail}; +use thread_worker::{WorkerHandle, Worker}; -use crate::{ - Result, - thread_watcher::{ThreadWatcher, Worker}, -}; +use crate::Result; /// `CargoWorksapce` represents the logical structure of, well, a Cargo /// workspace. It pretty closely mirrors `cargo metadata` output. @@ -199,8 +197,8 @@ impl TargetKind { } } -pub fn workspace_loader() -> (Worker>, ThreadWatcher) { - Worker::>::spawn( +pub fn workspace_loader() -> (Worker>, WorkerHandle) { + thread_worker::spawn::, _>( "workspace loader", 1, |input_receiver, output_sender| { diff --git a/crates/ra_lsp_server/src/thread_watcher.rs b/crates/ra_lsp_server/src/thread_watcher.rs deleted file mode 100644 index 99825d440..000000000 --- a/crates/ra_lsp_server/src/thread_watcher.rs +++ /dev/null @@ -1,81 +0,0 @@ -use std::thread; - -use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; -use drop_bomb::DropBomb; -use failure::format_err; - -use crate::Result; - -pub struct Worker { - pub inp: Sender, - pub out: Receiver, -} - -impl Worker { - pub fn spawn(name: &'static str, buf: usize, f: F) -> (Self, ThreadWatcher) - where - F: FnOnce(Receiver, Sender) + Send + 'static, - I: Send + 'static, - O: Send + 'static, - { - let (worker, inp_r, out_s) = worker_chan(buf); - let watcher = ThreadWatcher::spawn(name, move || f(inp_r, out_s)); - (worker, watcher) - } - - pub fn stop(self) -> Receiver { - self.out - } - - pub fn send(&self, item: I) { - self.inp.send(item) - } -} - -pub struct ThreadWatcher { - name: &'static str, - thread: thread::JoinHandle<()>, - bomb: DropBomb, -} - -impl ThreadWatcher { - fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher { - let thread = thread::spawn(f); - ThreadWatcher { - name, - thread, - bomb: DropBomb::new(format!("ThreadWatcher {} was not stopped", name)), - } - } - - pub fn stop(mut self) -> Result<()> { - log::info!("waiting for {} to finish ...", self.name); - let name = self.name; - self.bomb.defuse(); - let res = self - .thread - .join() - .map_err(|_| format_err!("ThreadWatcher {} died", name)); - match &res { - Ok(()) => log::info!("... {} terminated with ok", name), - Err(_) => log::error!("... {} terminated with err", name), - } - res - } -} - -/// Sets up worker channels in a deadlock-avoind way. -/// If one sets both input and output buffers to a fixed size, -/// a worker might get stuck. -fn worker_chan(buf: usize) -> (Worker, Receiver, Sender) { - let (input_sender, input_receiver) = bounded::(buf); - let (output_sender, output_receiver) = unbounded::(); - ( - Worker { - inp: input_sender, - out: output_receiver, - }, - input_receiver, - output_sender, - ) -} diff --git a/crates/ra_lsp_server/src/vfs.rs b/crates/ra_lsp_server/src/vfs.rs index 00ab3e6c3..fcf7693d8 100644 --- a/crates/ra_lsp_server/src/vfs.rs +++ b/crates/ra_lsp_server/src/vfs.rs @@ -4,8 +4,7 @@ use std::{ }; use walkdir::WalkDir; - -use crate::thread_watcher::{ThreadWatcher, Worker}; +use thread_worker::{WorkerHandle, Worker}; #[derive(Debug)] pub struct FileEvent { @@ -18,8 +17,8 @@ pub enum FileEventKind { Add(String), } -pub fn roots_loader() -> (Worker)>, ThreadWatcher) { - Worker::)>::spawn( +pub fn roots_loader() -> (Worker)>, WorkerHandle) { + thread_worker::spawn::), _>( "roots loader", 128, |input_receiver, output_sender| { diff --git a/crates/ra_lsp_server/tests/heavy_tests/support.rs b/crates/ra_lsp_server/tests/heavy_tests/support.rs index 4b75be3ee..07a878a26 100644 --- a/crates/ra_lsp_server/tests/heavy_tests/support.rs +++ b/crates/ra_lsp_server/tests/heavy_tests/support.rs @@ -17,11 +17,11 @@ use languageserver_types::{ use serde::Serialize; use serde_json::{to_string_pretty, Value}; use tempdir::TempDir; +use thread_worker::{WorkerHandle, Worker}; use test_utils::{parse_fixture, find_mismatch}; use ra_lsp_server::{ main_loop, req, - thread_watcher::{ThreadWatcher, Worker}, }; pub fn project(fixture: &str) -> Server { @@ -45,13 +45,13 @@ pub struct Server { messages: RefCell>, dir: TempDir, worker: Option>, - watcher: Option, + watcher: Option, } impl Server { fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server { let path = dir.path().to_path_buf(); - let (worker, watcher) = Worker::::spawn( + let (worker, watcher) = thread_worker::spawn::( "test server", 128, move |mut msg_receiver, mut msg_sender| { diff --git a/crates/thread_worker/Cargo.toml b/crates/thread_worker/Cargo.toml new file mode 100644 index 000000000..62d66a1a3 --- /dev/null +++ b/crates/thread_worker/Cargo.toml @@ -0,0 +1,11 @@ +[package] +edition = "2018" +name = "thread_worker" +version = "0.1.0" +authors = ["Aleksey Kladov "] + +[dependencies] +drop_bomb = "0.1.0" +crossbeam-channel = "0.2.4" +log = "0.4.3" + diff --git a/crates/thread_worker/src/lib.rs b/crates/thread_worker/src/lib.rs new file mode 100644 index 000000000..e558559ef --- /dev/null +++ b/crates/thread_worker/src/lib.rs @@ -0,0 +1,79 @@ +//! Small utility to correctly spawn crossbeam-channel based worker threads. + +use std::thread; + +use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; +use drop_bomb::DropBomb; + +pub struct Worker { + pub inp: Sender, + pub out: Receiver, +} + +pub struct WorkerHandle { + name: &'static str, + thread: thread::JoinHandle<()>, + bomb: DropBomb, +} + +pub fn spawn(name: &'static str, buf: usize, f: F) -> (Worker, WorkerHandle) +where + F: FnOnce(Receiver, Sender) + Send + 'static, + I: Send + 'static, + O: Send + 'static, +{ + let (worker, inp_r, out_s) = worker_chan(buf); + let watcher = WorkerHandle::spawn(name, move || f(inp_r, out_s)); + (worker, watcher) +} + +impl Worker { + /// Stops the worker. Returns the message receiver to fetch results which + /// have become ready before the worker is stopped. + pub fn stop(self) -> Receiver { + self.out + } + + pub fn send(&self, item: I) { + self.inp.send(item) + } +} + +impl WorkerHandle { + fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> WorkerHandle { + let thread = thread::spawn(f); + WorkerHandle { + name, + thread, + bomb: DropBomb::new(format!("WorkerHandle {} was not stopped", name)), + } + } + + pub fn stop(mut self) -> thread::Result<()> { + log::info!("waiting for {} to finish ...", self.name); + let name = self.name; + self.bomb.defuse(); + let res = self.thread.join(); + match &res { + Ok(()) => log::info!("... {} terminated with ok", name), + Err(_) => log::error!("... {} terminated with err", name), + } + res + } +} + +/// Sets up worker channels in a deadlock-avoind way. +/// If one sets both input and output buffers to a fixed size, +/// a worker might get stuck. +fn worker_chan(buf: usize) -> (Worker, Receiver, Sender) { + let (input_sender, input_receiver) = bounded::(buf); + let (output_sender, output_receiver) = unbounded::(); + ( + Worker { + inp: input_sender, + out: output_receiver, + }, + input_receiver, + output_sender, + ) +} -- cgit v1.2.3