aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleksey Kladov <[email protected]>2018-09-08 10:36:02 +0100
committerAleksey Kladov <[email protected]>2018-09-08 10:36:02 +0100
commit326ffcefe09906560a03d3184a2ce76841448702 (patch)
treeca6c3b738122d87ed849c757087b3b5a3b7c2048
parentd9ccebd913c767779e47f0c752e9f2f3dc35c080 (diff)
Deal with deadlocks in a more principaled way
-rw-r--r--crates/server/src/lib.rs3
-rw-r--r--crates/server/src/main_loop/mod.rs4
-rw-r--r--crates/server/src/project_model.rs15
-rw-r--r--crates/server/src/thread_watcher.rs10
-rw-r--r--crates/server/src/vfs.rs16
-rw-r--r--crates/server/tests/heavy_tests/support.rs18
6 files changed, 39 insertions, 27 deletions
diff --git a/crates/server/src/lib.rs b/crates/server/src/lib.rs
index 9e094af10..c8aebc59c 100644
--- a/crates/server/src/lib.rs
+++ b/crates/server/src/lib.rs
@@ -30,9 +30,8 @@ mod vfs;
30mod path_map; 30mod path_map;
31mod server_world; 31mod server_world;
32mod project_model; 32mod project_model;
33mod thread_watcher; 33pub mod thread_watcher;
34 34
35pub type Result<T> = ::std::result::Result<T, ::failure::Error>; 35pub type Result<T> = ::std::result::Result<T, ::failure::Error>;
36pub use caps::server_capabilities; 36pub use caps::server_capabilities;
37pub use main_loop::main_loop; 37pub use main_loop::main_loop;
38
diff --git a/crates/server/src/main_loop/mod.rs b/crates/server/src/main_loop/mod.rs
index 2ef1e2d17..b7f5efbb1 100644
--- a/crates/server/src/main_loop/mod.rs
+++ b/crates/server/src/main_loop/mod.rs
@@ -43,8 +43,8 @@ pub fn main_loop(
43 .build() 43 .build()
44 .unwrap(); 44 .unwrap();
45 let (task_sender, task_receiver) = unbounded::<Task>(); 45 let (task_sender, task_receiver) = unbounded::<Task>();
46 let (fs_sender, fs_receiver, fs_watcher) = vfs::roots_loader(); 46 let ((fs_sender, fs_receiver), fs_watcher) = vfs::roots_loader();
47 let (ws_sender, ws_receiver, ws_watcher) = workspace_loader(); 47 let ((ws_sender, ws_receiver), ws_watcher) = workspace_loader();
48 48
49 info!("server initialized, serving requests"); 49 info!("server initialized, serving requests");
50 let mut state = ServerWorldState::new(); 50 let mut state = ServerWorldState::new();
diff --git a/crates/server/src/project_model.rs b/crates/server/src/project_model.rs
index a712106d9..b9d6872c8 100644
--- a/crates/server/src/project_model.rs
+++ b/crates/server/src/project_model.rs
@@ -3,12 +3,12 @@ use std::{
3 path::{Path, PathBuf}, 3 path::{Path, PathBuf},
4}; 4};
5use cargo_metadata::{metadata_run, CargoOpt}; 5use cargo_metadata::{metadata_run, CargoOpt};
6use crossbeam_channel::{bounded, Sender, Receiver}; 6use crossbeam_channel::{Sender, Receiver};
7use libsyntax2::SmolStr; 7use libsyntax2::SmolStr;
8 8
9use { 9use {
10 Result, 10 Result,
11 thread_watcher::ThreadWatcher, 11 thread_watcher::{ThreadWatcher, worker_chan},
12}; 12};
13 13
14#[derive(Debug, Clone)] 14#[derive(Debug, Clone)]
@@ -162,15 +162,14 @@ impl TargetKind {
162 } 162 }
163} 163}
164 164
165pub fn workspace_loader() -> (Sender<PathBuf>, Receiver<Result<CargoWorkspace>>, ThreadWatcher) { 165pub fn workspace_loader() -> ((Sender<PathBuf>, Receiver<Result<CargoWorkspace>>), ThreadWatcher) {
166 let (path_sender, path_receiver) = bounded::<PathBuf>(16); 166 let (interface, input_receiver, output_sender) = worker_chan::<PathBuf, Result<CargoWorkspace>>(1);
167 let (ws_sender, ws_receiver) = bounded::<Result<CargoWorkspace>>(1);
168 let thread = ThreadWatcher::spawn("workspace loader", move || { 167 let thread = ThreadWatcher::spawn("workspace loader", move || {
169 path_receiver 168 input_receiver
170 .into_iter() 169 .into_iter()
171 .map(|path| CargoWorkspace::from_cargo_metadata(path.as_path())) 170 .map(|path| CargoWorkspace::from_cargo_metadata(path.as_path()))
172 .for_each(|it| ws_sender.send(it)) 171 .for_each(|it| output_sender.send(it))
173 }); 172 });
174 173
175 (path_sender, ws_receiver, thread) 174 (interface, thread)
176} 175}
diff --git a/crates/server/src/thread_watcher.rs b/crates/server/src/thread_watcher.rs
index 98bcdfd6c..74a0a58b7 100644
--- a/crates/server/src/thread_watcher.rs
+++ b/crates/server/src/thread_watcher.rs
@@ -1,4 +1,5 @@
1use std::thread; 1use std::thread;
2use crossbeam_channel::{bounded, unbounded, Sender, Receiver};
2use drop_bomb::DropBomb; 3use drop_bomb::DropBomb;
3use Result; 4use Result;
4 5
@@ -31,3 +32,12 @@ impl ThreadWatcher {
31 res 32 res
32 } 33 }
33} 34}
35
36/// Sets up worker channels in a deadlock-avoind way.
37/// If one sets both input and output buffers to a fixed size,
38/// a worker might get stuck.
39pub fn worker_chan<I, O>(buf: usize) -> ((Sender<I>, Receiver<O>), Receiver<I>, Sender<O>) {
40 let (input_sender, input_receiver) = bounded::<I>(buf);
41 let (output_sender, output_receiver) = unbounded::<O>();
42 ((input_sender, output_receiver), input_receiver, output_sender)
43}
diff --git a/crates/server/src/vfs.rs b/crates/server/src/vfs.rs
index 2699fc21e..c228f0b0a 100644
--- a/crates/server/src/vfs.rs
+++ b/crates/server/src/vfs.rs
@@ -3,11 +3,11 @@ use std::{
3 fs, 3 fs,
4}; 4};
5 5
6use crossbeam_channel::{Sender, Receiver, unbounded}; 6use crossbeam_channel::{Sender, Receiver};
7use walkdir::WalkDir; 7use walkdir::WalkDir;
8 8
9use { 9use {
10 thread_watcher::ThreadWatcher, 10 thread_watcher::{ThreadWatcher, worker_chan},
11}; 11};
12 12
13 13
@@ -22,11 +22,11 @@ pub enum FileEventKind {
22 Add(String), 22 Add(String),
23} 23}
24 24
25pub fn roots_loader() -> (Sender<PathBuf>, Receiver<(PathBuf, Vec<FileEvent>)>, ThreadWatcher) { 25pub fn roots_loader() -> ((Sender<PathBuf>, Receiver<(PathBuf, Vec<FileEvent>)>), ThreadWatcher) {
26 let (path_sender, path_receiver) = unbounded::<PathBuf>(); 26 let (interface, input_receiver, output_sender) =
27 let (event_sender, event_receiver) = unbounded::<(PathBuf, Vec<FileEvent>)>(); 27 worker_chan::<PathBuf, (PathBuf, Vec<FileEvent>)>(128);
28 let thread = ThreadWatcher::spawn("roots loader", move || { 28 let thread = ThreadWatcher::spawn("roots loader", move || {
29 path_receiver 29 input_receiver
30 .into_iter() 30 .into_iter()
31 .map(|path| { 31 .map(|path| {
32 debug!("loading {} ...", path.as_path().display()); 32 debug!("loading {} ...", path.as_path().display());
@@ -34,10 +34,10 @@ pub fn roots_loader() -> (Sender<PathBuf>, Receiver<(PathBuf, Vec<FileEvent>)>,
34 debug!("... loaded {}", path.as_path().display()); 34 debug!("... loaded {}", path.as_path().display());
35 (path, events) 35 (path, events)
36 }) 36 })
37 .for_each(|it| event_sender.send(it)) 37 .for_each(|it| output_sender.send(it))
38 }); 38 });
39 39
40 (path_sender, event_receiver, thread) 40 (interface, thread)
41} 41}
42 42
43fn load_root(path: &Path) -> Vec<FileEvent> { 43fn load_root(path: &Path) -> Vec<FileEvent> {
diff --git a/crates/server/tests/heavy_tests/support.rs b/crates/server/tests/heavy_tests/support.rs
index 297dcd9ae..2710ab59b 100644
--- a/crates/server/tests/heavy_tests/support.rs
+++ b/crates/server/tests/heavy_tests/support.rs
@@ -8,7 +8,7 @@ use std::{
8}; 8};
9 9
10use tempdir::TempDir; 10use tempdir::TempDir;
11use crossbeam_channel::{unbounded, after, Sender, Receiver}; 11use crossbeam_channel::{after, Sender, Receiver};
12use flexi_logger::Logger; 12use flexi_logger::Logger;
13use languageserver_types::{ 13use languageserver_types::{
14 Url, 14 Url,
@@ -22,7 +22,7 @@ use serde::Serialize;
22use serde_json::{Value, from_str, to_string_pretty}; 22use serde_json::{Value, from_str, to_string_pretty};
23use gen_lsp_server::{RawMessage, RawRequest, RawNotification}; 23use gen_lsp_server::{RawMessage, RawRequest, RawNotification};
24 24
25use m::{Result, main_loop, req}; 25use m::{Result, main_loop, req, thread_watcher::worker_chan};
26 26
27pub fn project(fixture: &str) -> Server { 27pub fn project(fixture: &str) -> Server {
28 static INIT: Once = Once::new(); 28 static INIT: Once = Once::new();
@@ -69,15 +69,19 @@ pub struct Server {
69impl Server { 69impl Server {
70 fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server { 70 fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server {
71 let path = dir.path().to_path_buf(); 71 let path = dir.path().to_path_buf();
72 let (client_sender, mut server_receiver) = unbounded(); 72 let ((msg_sender, msg_receiver), server) = {
73 let (mut server_sender, client_receiver) = unbounded(); 73 let (api, mut msg_receiver, mut msg_sender) = worker_chan::<RawMessage, RawMessage>(128);
74 let server = thread::spawn(move || main_loop(true, path, &mut server_receiver, &mut server_sender)); 74 let server = thread::spawn(move || {
75 main_loop(true, path, &mut msg_receiver, &mut msg_sender)
76 });
77 (api, server)
78 };
75 let res = Server { 79 let res = Server {
76 req_id: Cell::new(1), 80 req_id: Cell::new(1),
77 dir, 81 dir,
78 messages: Default::default(), 82 messages: Default::default(),
79 sender: Some(client_sender), 83 sender: Some(msg_sender),
80 receiver: client_receiver, 84 receiver: msg_receiver,
81 server: Some(server), 85 server: Some(server),
82 }; 86 };
83 87