From bf352cd2511775a331d77dee261b64bd8359dacb Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Thu, 14 Feb 2019 20:43:45 +0300 Subject: automatically wait for worker threads closes #817 --- crates/ra_batch/src/lib.rs | 1 - crates/ra_lsp_server/src/main_loop.rs | 26 +++-- crates/ra_lsp_server/src/project_model.rs | 6 +- crates/ra_lsp_server/tests/heavy_tests/support.rs | 17 +-- crates/ra_vfs/src/io.rs | 106 +++++++++---------- crates/ra_vfs/src/lib.rs | 6 -- crates/ra_vfs/tests/vfs.rs | 1 - crates/thread_worker/Cargo.toml | 1 - crates/thread_worker/src/lib.rs | 120 ++++++++++++---------- 9 files changed, 133 insertions(+), 151 deletions(-) (limited to 'crates') diff --git a/crates/ra_batch/src/lib.rs b/crates/ra_batch/src/lib.rs index d08fad908..69d66113e 100644 --- a/crates/ra_batch/src/lib.rs +++ b/crates/ra_batch/src/lib.rs @@ -121,7 +121,6 @@ impl BatchDatabase { .collect(); let db = BatchDatabase::load(crate_graph, &mut vfs); - let _ = vfs.shutdown(); Ok((db, local_roots)) } } diff --git a/crates/ra_lsp_server/src/main_loop.rs b/crates/ra_lsp_server/src/main_loop.rs index a51299851..06443bb76 100644 --- a/crates/ra_lsp_server/src/main_loop.rs +++ b/crates/ra_lsp_server/src/main_loop.rs @@ -54,19 +54,20 @@ pub fn main_loop( ) -> Result<()> { let pool = ThreadPool::new(THREADPOOL_SIZE); let (task_sender, task_receiver) = unbounded::(); - let (ws_worker, ws_watcher) = workspace_loader(); - ws_worker.send(ws_root.clone()).unwrap(); // FIXME: support dynamic workspace loading. - let workspaces = match ws_worker.recv().unwrap() { - Ok(ws) => vec![ws], - Err(e) => { - log::error!("loading workspace failed: {}", e); - Vec::new() + let workspaces = { + let ws_worker = workspace_loader(); + ws_worker.sender().send(ws_root.clone()).unwrap(); + match ws_worker.receiver().recv().unwrap() { + Ok(ws) => vec![ws], + Err(e) => { + log::error!("loading workspace failed: {}", e); + Vec::new() + } } }; - ws_worker.shutdown(); - ws_watcher.shutdown().map_err(|_| format_err!("ws watcher died"))?; + let mut state = ServerWorldState::new(ws_root.clone(), workspaces); log::info!("server initialized, serving requests"); @@ -94,12 +95,9 @@ pub fn main_loop( log::info!("...threadpool has finished"); let vfs = Arc::try_unwrap(state.vfs).expect("all snapshots should be dead"); - let vfs_res = vfs.into_inner().shutdown(); + drop(vfs); - main_res?; - vfs_res.map_err(|_| format_err!("fs watcher died"))?; - - Ok(()) + main_res } enum Event { diff --git a/crates/ra_lsp_server/src/project_model.rs b/crates/ra_lsp_server/src/project_model.rs index 6800eb138..7d6440fad 100644 --- a/crates/ra_lsp_server/src/project_model.rs +++ b/crates/ra_lsp_server/src/project_model.rs @@ -1,6 +1,6 @@ use std::path::PathBuf; -use thread_worker::{WorkerHandle, Worker}; +use thread_worker::Worker; use crate::Result; @@ -8,8 +8,8 @@ pub use ra_project_model::{ ProjectWorkspace, CargoWorkspace, Package, Target, TargetKind, Sysroot, }; -pub fn workspace_loader() -> (Worker>, WorkerHandle) { - thread_worker::spawn::, _>( +pub fn workspace_loader() -> Worker> { + Worker::>::spawn( "workspace loader", 1, |input_receiver, output_sender| { diff --git a/crates/ra_lsp_server/tests/heavy_tests/support.rs b/crates/ra_lsp_server/tests/heavy_tests/support.rs index eee85f8c8..11f94b4ab 100644 --- a/crates/ra_lsp_server/tests/heavy_tests/support.rs +++ b/crates/ra_lsp_server/tests/heavy_tests/support.rs @@ -17,7 +17,7 @@ use lsp_types::{ use serde::Serialize; use serde_json::{to_string_pretty, Value}; use tempfile::TempDir; -use thread_worker::{WorkerHandle, Worker}; +use thread_worker::Worker; use test_utils::{parse_fixture, find_mismatch}; use ra_lsp_server::{ @@ -45,13 +45,12 @@ pub struct Server { messages: RefCell>, dir: TempDir, worker: Option>, - watcher: Option, } impl Server { fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server { let path = dir.path().to_path_buf(); - let (worker, watcher) = thread_worker::spawn::( + let worker = Worker::::spawn( "test server", 128, move |mut msg_receiver, mut msg_sender| { @@ -63,7 +62,6 @@ impl Server { dir, messages: Default::default(), worker: Some(worker), - watcher: Some(watcher), }; for (path, text) in files { @@ -117,7 +115,7 @@ impl Server { } fn send_request_(&self, r: RawRequest) -> Value { let id = r.id; - self.worker.as_ref().unwrap().send(RawMessage::Request(r)).unwrap(); + self.worker.as_ref().unwrap().sender().send(RawMessage::Request(r)).unwrap(); while let Some(msg) = self.recv() { match msg { RawMessage::Request(req) => panic!("unexpected request: {:?}", req), @@ -157,24 +155,19 @@ impl Server { } } fn recv(&self) -> Option { - recv_timeout(&self.worker.as_ref().unwrap().out).map(|msg| { + recv_timeout(&self.worker.as_ref().unwrap().receiver()).map(|msg| { self.messages.borrow_mut().push(msg.clone()); msg }) } fn send_notification(&self, not: RawNotification) { - self.worker.as_ref().unwrap().send(RawMessage::Notification(not)).unwrap(); + self.worker.as_ref().unwrap().sender().send(RawMessage::Notification(not)).unwrap(); } } impl Drop for Server { fn drop(&mut self) { self.send_request::(()); - let receiver = self.worker.take().unwrap().shutdown(); - while let Some(msg) = recv_timeout(&receiver) { - drop(msg); - } - self.watcher.take().unwrap().shutdown().unwrap(); } } diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs index 3952b200b..1b70cd8df 100644 --- a/crates/ra_vfs/src/io.rs +++ b/crates/ra_vfs/src/io.rs @@ -1,13 +1,11 @@ use std::{ fs, - thread, path::{Path, PathBuf}, sync::{mpsc, Arc}, time::Duration, }; use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select}; use relative_path::RelativePathBuf; -use thread_worker::WorkerHandle; use walkdir::WalkDir; use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; @@ -49,8 +47,7 @@ enum ChangeKind { const WATCHER_DELAY: Duration = Duration::from_millis(250); pub(crate) struct Worker { - worker: thread_worker::Worker, - worker_handle: WorkerHandle, + thread_worker: thread_worker::Worker, } impl Worker { @@ -62,82 +59,79 @@ impl Worker { // * we want to read all files from a single thread, to guarantee that // we always get fresher versions and never go back in time. // * we want to tear down everything neatly during shutdown. - let (worker, worker_handle) = thread_worker::spawn( + let thread_worker = thread_worker::Worker::spawn( "vfs", 128, // This are the channels we use to communicate with outside world. // If `input_receiver` is closed we need to tear ourselves down. // `output_sender` should not be closed unless the parent died. move |input_receiver, output_sender| { - // These are `std` channels notify will send events to - let (notify_sender, notify_receiver) = mpsc::channel(); + // Make sure that the destruction order is + // + // * notify_sender + // * _thread + // * watcher_sender + // + // this is required to avoid deadlocks. + // These are the corresponding crossbeam channels let (watcher_sender, watcher_receiver) = unbounded(); + let _thread; + { + // These are `std` channels notify will send events to + let (notify_sender, notify_receiver) = mpsc::channel(); - let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) - .map_err(|e| log::error!("failed to spawn notify {}", e)) - .ok(); - // Start a silly thread to transform between two channels - let thread = thread::spawn(move || { - notify_receiver - .into_iter() - .for_each(|event| convert_notify_event(event, &watcher_sender)) - }); + let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) + .map_err(|e| log::error!("failed to spawn notify {}", e)) + .ok(); + // Start a silly thread to transform between two channels + _thread = thread_worker::ScopedThread::spawn("notify-convertor", move || { + notify_receiver + .into_iter() + .for_each(|event| convert_notify_event(event, &watcher_sender)) + }); - // Process requests from the called or notifications from - // watcher until the caller says stop. - loop { - select! { - // Received request from the caller. If this channel is - // closed, we should shutdown everything. - recv(input_receiver) -> t => match t { - Err(RecvError) => { - drop(input_receiver); - break + // Process requests from the called or notifications from + // watcher until the caller says stop. + loop { + select! { + // Received request from the caller. If this channel is + // closed, we should shutdown everything. + recv(input_receiver) -> t => match t { + Err(RecvError) => { + drop(input_receiver); + break + }, + Ok(Task::AddRoot { root, config }) => { + watch_root(watcher.as_mut(), &output_sender, root, Arc::clone(&config)); + } + }, + // Watcher send us changes. If **this** channel is + // closed, the watcher has died, which indicates a bug + // -- escalate! + recv(watcher_receiver) -> event => match event { + Err(RecvError) => panic!("watcher is dead"), + Ok((path, change)) => { + handle_change(watcher.as_mut(), &output_sender, &*roots, path, change); + } }, - Ok(Task::AddRoot { root, config }) => { - watch_root(watcher.as_mut(), &output_sender, root, Arc::clone(&config)); - } - }, - // Watcher send us changes. If **this** channel is - // closed, the watcher has died, which indicates a bug - // -- escalate! - recv(watcher_receiver) -> event => match event { - Err(RecvError) => panic!("watcher is dead"), - Ok((path, change)) => { - handle_change(watcher.as_mut(), &output_sender, &*roots, path, change); - } - }, + } } } - // Stopped the watcher - drop(watcher.take()); // Drain pending events: we are not interested in them anyways! watcher_receiver.into_iter().for_each(|_| ()); - - let res = thread.join(); - match &res { - Ok(()) => log::info!("... Watcher terminated with ok"), - Err(_) => log::error!("... Watcher terminated with err"), - } - res.unwrap(); }, ); - Worker { worker, worker_handle } + Worker { thread_worker } } pub(crate) fn sender(&self) -> &Sender { - &self.worker.inp + &self.thread_worker.sender() } pub(crate) fn receiver(&self) -> &Receiver { - &self.worker.out - } - - pub(crate) fn shutdown(self) -> thread::Result<()> { - let _ = self.worker.shutdown(); - self.worker_handle.shutdown() + &self.thread_worker.receiver() } } diff --git a/crates/ra_vfs/src/lib.rs b/crates/ra_vfs/src/lib.rs index f07657db6..1fb255365 100644 --- a/crates/ra_vfs/src/lib.rs +++ b/crates/ra_vfs/src/lib.rs @@ -22,7 +22,6 @@ use std::{ fmt, fs, mem, path::{Path, PathBuf}, sync::Arc, - thread, }; use crossbeam_channel::Receiver; @@ -337,11 +336,6 @@ impl Vfs { mem::replace(&mut self.pending_changes, Vec::new()) } - /// Shutdown the VFS and terminate the background watching thread. - pub fn shutdown(self) -> thread::Result<()> { - self.worker.shutdown() - } - fn add_file( &mut self, root: VfsRoot, diff --git a/crates/ra_vfs/tests/vfs.rs b/crates/ra_vfs/tests/vfs.rs index 649ef96c9..c76e6ea26 100644 --- a/crates/ra_vfs/tests/vfs.rs +++ b/crates/ra_vfs/tests/vfs.rs @@ -158,6 +158,5 @@ fn test_vfs_works() -> std::io::Result<()> { Err(RecvTimeoutError::Timeout) ); - vfs.shutdown().unwrap(); Ok(()) } diff --git a/crates/thread_worker/Cargo.toml b/crates/thread_worker/Cargo.toml index 363b4c3b8..a9857d59d 100644 --- a/crates/thread_worker/Cargo.toml +++ b/crates/thread_worker/Cargo.toml @@ -5,7 +5,6 @@ version = "0.1.0" authors = ["rust-analyzer developers"] [dependencies] -drop_bomb = "0.1.0" crossbeam-channel = "0.3.5" log = "0.4.3" diff --git a/crates/thread_worker/src/lib.rs b/crates/thread_worker/src/lib.rs index a522a0843..d67e44e38 100644 --- a/crates/thread_worker/src/lib.rs +++ b/crates/thread_worker/src/lib.rs @@ -2,74 +2,80 @@ use std::thread; -use crossbeam_channel::{bounded, unbounded, Receiver, Sender, RecvError, SendError}; -use drop_bomb::DropBomb; +use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; -pub struct Worker { - pub inp: Sender, - pub out: Receiver, +/// Like `std::thread::JoinHandle<()>`, but joins thread in drop automatically. +pub struct ScopedThread { + // Option for drop + inner: Option>, } -pub struct WorkerHandle { - name: &'static str, - thread: thread::JoinHandle<()>, - bomb: DropBomb, -} +impl Drop for ScopedThread { + fn drop(&mut self) { + let inner = self.inner.take().unwrap(); + let name = inner.thread().name().unwrap().to_string(); + log::info!("waiting for {} to finish...", name); + let res = inner.join(); + log::info!(".. {} terminated with {}", name, if res.is_ok() { "ok" } else { "err" }); -pub fn spawn(name: &'static str, buf: usize, f: F) -> (Worker, WorkerHandle) -where - F: FnOnce(Receiver, Sender) + Send + 'static, - I: Send + 'static, - O: Send + 'static, -{ - let (worker, inp_r, out_s) = worker_chan(buf); - let watcher = WorkerHandle::spawn(name, move || f(inp_r, out_s)); - (worker, watcher) -} - -impl Worker { - /// Stops the worker. Returns the message receiver to fetch results which - /// have become ready before the worker is stopped. - pub fn shutdown(self) -> Receiver { - self.out + // escalate panic, but avoid aborting the process + match res { + Err(e) => { + if !thread::panicking() { + panic!(e) + } + } + _ => (), + } } +} - pub fn send(&self, item: I) -> Result<(), SendError> { - self.inp.send(item) - } - pub fn recv(&self) -> Result { - self.out.recv() +impl ScopedThread { + pub fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ScopedThread { + let inner = thread::Builder::new().name(name.into()).spawn(f).unwrap(); + ScopedThread { inner: Some(inner) } } } -impl WorkerHandle { - fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> WorkerHandle { - let thread = thread::spawn(f); - WorkerHandle { - name, - thread, - bomb: DropBomb::new(format!("WorkerHandle {} was not shutdown", name)), - } - } +/// A wrapper around event-processing thread with automatic shutdown semantics. +pub struct Worker { + // XXX: field order is significant here. + // + // In Rust, fields are dropped in the declaration order, and we rely on this + // here. We must close input first, so that the `thread` (who holds the + // opposite side of the channel) noticed shutdown. Then, we must join the + // thread, but we must keep out alive so that the thread does not panic. + // + // Note that a potential problem here is that we might drop some messages + // from receiver on the floor. This is ok for rust-analyzer: we have only a + // single client, so, if we are shutting down, nobody is interested in the + // unfinished work anyway! + sender: Sender, + _thread: ScopedThread, + receiver: Receiver, +} - pub fn shutdown(mut self) -> thread::Result<()> { - log::info!("waiting for {} to finish ...", self.name); - let name = self.name; - self.bomb.defuse(); - let res = self.thread.join(); - match &res { - Ok(()) => log::info!("... {} terminated with ok", name), - Err(_) => log::error!("... {} terminated with err", name), - } - res +impl Worker { + pub fn spawn(name: &'static str, buf: usize, f: F) -> Worker + where + F: FnOnce(Receiver, Sender) + Send + 'static, + I: Send + 'static, + O: Send + 'static, + { + // Set up worker channels in a deadlock-avoiding way. If one sets both input + // and output buffers to a fixed size, a worker might get stuck. + let (sender, input_receiver) = bounded::(buf); + let (output_sender, receiver) = unbounded::(); + let _thread = ScopedThread::spawn(name, move || f(input_receiver, output_sender)); + Worker { sender, _thread, receiver } } } -/// Sets up worker channels in a deadlock-avoiding way. -/// If one sets both input and output buffers to a fixed size, -/// a worker might get stuck. -fn worker_chan(buf: usize) -> (Worker, Receiver, Sender) { - let (input_sender, input_receiver) = bounded::(buf); - let (output_sender, output_receiver) = unbounded::(); - (Worker { inp: input_sender, out: output_receiver }, input_receiver, output_sender) +impl Worker { + pub fn sender(&self) -> &Sender { + &self.sender + } + pub fn receiver(&self) -> &Receiver { + &self.receiver + } } -- cgit v1.2.3