From 326ffcefe09906560a03d3184a2ce76841448702 Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Sat, 8 Sep 2018 12:36:02 +0300 Subject: Deal with deadlocks in a more principaled way --- crates/server/src/lib.rs | 3 +-- crates/server/src/main_loop/mod.rs | 4 ++-- crates/server/src/project_model.rs | 15 +++++++-------- crates/server/src/thread_watcher.rs | 10 ++++++++++ crates/server/src/vfs.rs | 16 ++++++++-------- crates/server/tests/heavy_tests/support.rs | 18 +++++++++++------- 6 files changed, 39 insertions(+), 27 deletions(-) (limited to 'crates/server') diff --git a/crates/server/src/lib.rs b/crates/server/src/lib.rs index 9e094af10..c8aebc59c 100644 --- a/crates/server/src/lib.rs +++ b/crates/server/src/lib.rs @@ -30,9 +30,8 @@ mod vfs; mod path_map; mod server_world; mod project_model; -mod thread_watcher; +pub mod thread_watcher; pub type Result = ::std::result::Result; pub use caps::server_capabilities; pub use main_loop::main_loop; - diff --git a/crates/server/src/main_loop/mod.rs b/crates/server/src/main_loop/mod.rs index 2ef1e2d17..b7f5efbb1 100644 --- a/crates/server/src/main_loop/mod.rs +++ b/crates/server/src/main_loop/mod.rs @@ -43,8 +43,8 @@ pub fn main_loop( .build() .unwrap(); let (task_sender, task_receiver) = unbounded::(); - let (fs_sender, fs_receiver, fs_watcher) = vfs::roots_loader(); - let (ws_sender, ws_receiver, ws_watcher) = workspace_loader(); + let ((fs_sender, fs_receiver), fs_watcher) = vfs::roots_loader(); + let ((ws_sender, ws_receiver), ws_watcher) = workspace_loader(); info!("server initialized, serving requests"); let mut state = ServerWorldState::new(); diff --git a/crates/server/src/project_model.rs b/crates/server/src/project_model.rs index a712106d9..b9d6872c8 100644 --- a/crates/server/src/project_model.rs +++ b/crates/server/src/project_model.rs @@ -3,12 +3,12 @@ use std::{ path::{Path, PathBuf}, }; use cargo_metadata::{metadata_run, CargoOpt}; -use crossbeam_channel::{bounded, Sender, Receiver}; +use crossbeam_channel::{Sender, Receiver}; use libsyntax2::SmolStr; use { Result, - thread_watcher::ThreadWatcher, + thread_watcher::{ThreadWatcher, worker_chan}, }; #[derive(Debug, Clone)] @@ -162,15 +162,14 @@ impl TargetKind { } } -pub fn workspace_loader() -> (Sender, Receiver>, ThreadWatcher) { - let (path_sender, path_receiver) = bounded::(16); - let (ws_sender, ws_receiver) = bounded::>(1); +pub fn workspace_loader() -> ((Sender, Receiver>), ThreadWatcher) { + let (interface, input_receiver, output_sender) = worker_chan::>(1); let thread = ThreadWatcher::spawn("workspace loader", move || { - path_receiver + input_receiver .into_iter() .map(|path| CargoWorkspace::from_cargo_metadata(path.as_path())) - .for_each(|it| ws_sender.send(it)) + .for_each(|it| output_sender.send(it)) }); - (path_sender, ws_receiver, thread) + (interface, thread) } diff --git a/crates/server/src/thread_watcher.rs b/crates/server/src/thread_watcher.rs index 98bcdfd6c..74a0a58b7 100644 --- a/crates/server/src/thread_watcher.rs +++ b/crates/server/src/thread_watcher.rs @@ -1,4 +1,5 @@ use std::thread; +use crossbeam_channel::{bounded, unbounded, Sender, Receiver}; use drop_bomb::DropBomb; use Result; @@ -31,3 +32,12 @@ impl ThreadWatcher { res } } + +/// Sets up worker channels in a deadlock-avoind way. +/// If one sets both input and output buffers to a fixed size, +/// a worker might get stuck. +pub fn worker_chan(buf: usize) -> ((Sender, Receiver), Receiver, Sender) { + let (input_sender, input_receiver) = bounded::(buf); + let (output_sender, output_receiver) = unbounded::(); + ((input_sender, output_receiver), input_receiver, output_sender) +} diff --git a/crates/server/src/vfs.rs b/crates/server/src/vfs.rs index 2699fc21e..c228f0b0a 100644 --- a/crates/server/src/vfs.rs +++ b/crates/server/src/vfs.rs @@ -3,11 +3,11 @@ use std::{ fs, }; -use crossbeam_channel::{Sender, Receiver, unbounded}; +use crossbeam_channel::{Sender, Receiver}; use walkdir::WalkDir; use { - thread_watcher::ThreadWatcher, + thread_watcher::{ThreadWatcher, worker_chan}, }; @@ -22,11 +22,11 @@ pub enum FileEventKind { Add(String), } -pub fn roots_loader() -> (Sender, Receiver<(PathBuf, Vec)>, ThreadWatcher) { - let (path_sender, path_receiver) = unbounded::(); - let (event_sender, event_receiver) = unbounded::<(PathBuf, Vec)>(); +pub fn roots_loader() -> ((Sender, Receiver<(PathBuf, Vec)>), ThreadWatcher) { + let (interface, input_receiver, output_sender) = + worker_chan::)>(128); let thread = ThreadWatcher::spawn("roots loader", move || { - path_receiver + input_receiver .into_iter() .map(|path| { debug!("loading {} ...", path.as_path().display()); @@ -34,10 +34,10 @@ pub fn roots_loader() -> (Sender, Receiver<(PathBuf, Vec)>, debug!("... loaded {}", path.as_path().display()); (path, events) }) - .for_each(|it| event_sender.send(it)) + .for_each(|it| output_sender.send(it)) }); - (path_sender, event_receiver, thread) + (interface, thread) } fn load_root(path: &Path) -> Vec { diff --git a/crates/server/tests/heavy_tests/support.rs b/crates/server/tests/heavy_tests/support.rs index 297dcd9ae..2710ab59b 100644 --- a/crates/server/tests/heavy_tests/support.rs +++ b/crates/server/tests/heavy_tests/support.rs @@ -8,7 +8,7 @@ use std::{ }; use tempdir::TempDir; -use crossbeam_channel::{unbounded, after, Sender, Receiver}; +use crossbeam_channel::{after, Sender, Receiver}; use flexi_logger::Logger; use languageserver_types::{ Url, @@ -22,7 +22,7 @@ use serde::Serialize; use serde_json::{Value, from_str, to_string_pretty}; use gen_lsp_server::{RawMessage, RawRequest, RawNotification}; -use m::{Result, main_loop, req}; +use m::{Result, main_loop, req, thread_watcher::worker_chan}; pub fn project(fixture: &str) -> Server { static INIT: Once = Once::new(); @@ -69,15 +69,19 @@ pub struct Server { impl Server { fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server { let path = dir.path().to_path_buf(); - let (client_sender, mut server_receiver) = unbounded(); - let (mut server_sender, client_receiver) = unbounded(); - let server = thread::spawn(move || main_loop(true, path, &mut server_receiver, &mut server_sender)); + let ((msg_sender, msg_receiver), server) = { + let (api, mut msg_receiver, mut msg_sender) = worker_chan::(128); + let server = thread::spawn(move || { + main_loop(true, path, &mut msg_receiver, &mut msg_sender) + }); + (api, server) + }; let res = Server { req_id: Cell::new(1), dir, messages: Default::default(), - sender: Some(client_sender), - receiver: client_receiver, + sender: Some(msg_sender), + receiver: msg_receiver, server: Some(server), }; -- cgit v1.2.3