diff options
author | Aleksey Kladov <[email protected]> | 2018-09-08 10:36:02 +0100 |
---|---|---|
committer | Aleksey Kladov <[email protected]> | 2018-09-08 10:36:02 +0100 |
commit | 326ffcefe09906560a03d3184a2ce76841448702 (patch) | |
tree | ca6c3b738122d87ed849c757087b3b5a3b7c2048 /crates | |
parent | d9ccebd913c767779e47f0c752e9f2f3dc35c080 (diff) |
Deal with deadlocks in a more principaled way
Diffstat (limited to 'crates')
-rw-r--r-- | crates/server/src/lib.rs | 3 | ||||
-rw-r--r-- | crates/server/src/main_loop/mod.rs | 4 | ||||
-rw-r--r-- | crates/server/src/project_model.rs | 15 | ||||
-rw-r--r-- | crates/server/src/thread_watcher.rs | 10 | ||||
-rw-r--r-- | crates/server/src/vfs.rs | 16 | ||||
-rw-r--r-- | crates/server/tests/heavy_tests/support.rs | 18 |
6 files changed, 39 insertions, 27 deletions
diff --git a/crates/server/src/lib.rs b/crates/server/src/lib.rs index 9e094af10..c8aebc59c 100644 --- a/crates/server/src/lib.rs +++ b/crates/server/src/lib.rs | |||
@@ -30,9 +30,8 @@ mod vfs; | |||
30 | mod path_map; | 30 | mod path_map; |
31 | mod server_world; | 31 | mod server_world; |
32 | mod project_model; | 32 | mod project_model; |
33 | mod thread_watcher; | 33 | pub mod thread_watcher; |
34 | 34 | ||
35 | pub type Result<T> = ::std::result::Result<T, ::failure::Error>; | 35 | pub type Result<T> = ::std::result::Result<T, ::failure::Error>; |
36 | pub use caps::server_capabilities; | 36 | pub use caps::server_capabilities; |
37 | pub use main_loop::main_loop; | 37 | pub use main_loop::main_loop; |
38 | |||
diff --git a/crates/server/src/main_loop/mod.rs b/crates/server/src/main_loop/mod.rs index 2ef1e2d17..b7f5efbb1 100644 --- a/crates/server/src/main_loop/mod.rs +++ b/crates/server/src/main_loop/mod.rs | |||
@@ -43,8 +43,8 @@ pub fn main_loop( | |||
43 | .build() | 43 | .build() |
44 | .unwrap(); | 44 | .unwrap(); |
45 | let (task_sender, task_receiver) = unbounded::<Task>(); | 45 | let (task_sender, task_receiver) = unbounded::<Task>(); |
46 | let (fs_sender, fs_receiver, fs_watcher) = vfs::roots_loader(); | 46 | let ((fs_sender, fs_receiver), fs_watcher) = vfs::roots_loader(); |
47 | let (ws_sender, ws_receiver, ws_watcher) = workspace_loader(); | 47 | let ((ws_sender, ws_receiver), ws_watcher) = workspace_loader(); |
48 | 48 | ||
49 | info!("server initialized, serving requests"); | 49 | info!("server initialized, serving requests"); |
50 | let mut state = ServerWorldState::new(); | 50 | let mut state = ServerWorldState::new(); |
diff --git a/crates/server/src/project_model.rs b/crates/server/src/project_model.rs index a712106d9..b9d6872c8 100644 --- a/crates/server/src/project_model.rs +++ b/crates/server/src/project_model.rs | |||
@@ -3,12 +3,12 @@ use std::{ | |||
3 | path::{Path, PathBuf}, | 3 | path::{Path, PathBuf}, |
4 | }; | 4 | }; |
5 | use cargo_metadata::{metadata_run, CargoOpt}; | 5 | use cargo_metadata::{metadata_run, CargoOpt}; |
6 | use crossbeam_channel::{bounded, Sender, Receiver}; | 6 | use crossbeam_channel::{Sender, Receiver}; |
7 | use libsyntax2::SmolStr; | 7 | use libsyntax2::SmolStr; |
8 | 8 | ||
9 | use { | 9 | use { |
10 | Result, | 10 | Result, |
11 | thread_watcher::ThreadWatcher, | 11 | thread_watcher::{ThreadWatcher, worker_chan}, |
12 | }; | 12 | }; |
13 | 13 | ||
14 | #[derive(Debug, Clone)] | 14 | #[derive(Debug, Clone)] |
@@ -162,15 +162,14 @@ impl TargetKind { | |||
162 | } | 162 | } |
163 | } | 163 | } |
164 | 164 | ||
165 | pub fn workspace_loader() -> (Sender<PathBuf>, Receiver<Result<CargoWorkspace>>, ThreadWatcher) { | 165 | pub fn workspace_loader() -> ((Sender<PathBuf>, Receiver<Result<CargoWorkspace>>), ThreadWatcher) { |
166 | let (path_sender, path_receiver) = bounded::<PathBuf>(16); | 166 | let (interface, input_receiver, output_sender) = worker_chan::<PathBuf, Result<CargoWorkspace>>(1); |
167 | let (ws_sender, ws_receiver) = bounded::<Result<CargoWorkspace>>(1); | ||
168 | let thread = ThreadWatcher::spawn("workspace loader", move || { | 167 | let thread = ThreadWatcher::spawn("workspace loader", move || { |
169 | path_receiver | 168 | input_receiver |
170 | .into_iter() | 169 | .into_iter() |
171 | .map(|path| CargoWorkspace::from_cargo_metadata(path.as_path())) | 170 | .map(|path| CargoWorkspace::from_cargo_metadata(path.as_path())) |
172 | .for_each(|it| ws_sender.send(it)) | 171 | .for_each(|it| output_sender.send(it)) |
173 | }); | 172 | }); |
174 | 173 | ||
175 | (path_sender, ws_receiver, thread) | 174 | (interface, thread) |
176 | } | 175 | } |
diff --git a/crates/server/src/thread_watcher.rs b/crates/server/src/thread_watcher.rs index 98bcdfd6c..74a0a58b7 100644 --- a/crates/server/src/thread_watcher.rs +++ b/crates/server/src/thread_watcher.rs | |||
@@ -1,4 +1,5 @@ | |||
1 | use std::thread; | 1 | use std::thread; |
2 | use crossbeam_channel::{bounded, unbounded, Sender, Receiver}; | ||
2 | use drop_bomb::DropBomb; | 3 | use drop_bomb::DropBomb; |
3 | use Result; | 4 | use Result; |
4 | 5 | ||
@@ -31,3 +32,12 @@ impl ThreadWatcher { | |||
31 | res | 32 | res |
32 | } | 33 | } |
33 | } | 34 | } |
35 | |||
36 | /// Sets up worker channels in a deadlock-avoind way. | ||
37 | /// If one sets both input and output buffers to a fixed size, | ||
38 | /// a worker might get stuck. | ||
39 | pub fn worker_chan<I, O>(buf: usize) -> ((Sender<I>, Receiver<O>), Receiver<I>, Sender<O>) { | ||
40 | let (input_sender, input_receiver) = bounded::<I>(buf); | ||
41 | let (output_sender, output_receiver) = unbounded::<O>(); | ||
42 | ((input_sender, output_receiver), input_receiver, output_sender) | ||
43 | } | ||
diff --git a/crates/server/src/vfs.rs b/crates/server/src/vfs.rs index 2699fc21e..c228f0b0a 100644 --- a/crates/server/src/vfs.rs +++ b/crates/server/src/vfs.rs | |||
@@ -3,11 +3,11 @@ use std::{ | |||
3 | fs, | 3 | fs, |
4 | }; | 4 | }; |
5 | 5 | ||
6 | use crossbeam_channel::{Sender, Receiver, unbounded}; | 6 | use crossbeam_channel::{Sender, Receiver}; |
7 | use walkdir::WalkDir; | 7 | use walkdir::WalkDir; |
8 | 8 | ||
9 | use { | 9 | use { |
10 | thread_watcher::ThreadWatcher, | 10 | thread_watcher::{ThreadWatcher, worker_chan}, |
11 | }; | 11 | }; |
12 | 12 | ||
13 | 13 | ||
@@ -22,11 +22,11 @@ pub enum FileEventKind { | |||
22 | Add(String), | 22 | Add(String), |
23 | } | 23 | } |
24 | 24 | ||
25 | pub fn roots_loader() -> (Sender<PathBuf>, Receiver<(PathBuf, Vec<FileEvent>)>, ThreadWatcher) { | 25 | pub fn roots_loader() -> ((Sender<PathBuf>, Receiver<(PathBuf, Vec<FileEvent>)>), ThreadWatcher) { |
26 | let (path_sender, path_receiver) = unbounded::<PathBuf>(); | 26 | let (interface, input_receiver, output_sender) = |
27 | let (event_sender, event_receiver) = unbounded::<(PathBuf, Vec<FileEvent>)>(); | 27 | worker_chan::<PathBuf, (PathBuf, Vec<FileEvent>)>(128); |
28 | let thread = ThreadWatcher::spawn("roots loader", move || { | 28 | let thread = ThreadWatcher::spawn("roots loader", move || { |
29 | path_receiver | 29 | input_receiver |
30 | .into_iter() | 30 | .into_iter() |
31 | .map(|path| { | 31 | .map(|path| { |
32 | debug!("loading {} ...", path.as_path().display()); | 32 | debug!("loading {} ...", path.as_path().display()); |
@@ -34,10 +34,10 @@ pub fn roots_loader() -> (Sender<PathBuf>, Receiver<(PathBuf, Vec<FileEvent>)>, | |||
34 | debug!("... loaded {}", path.as_path().display()); | 34 | debug!("... loaded {}", path.as_path().display()); |
35 | (path, events) | 35 | (path, events) |
36 | }) | 36 | }) |
37 | .for_each(|it| event_sender.send(it)) | 37 | .for_each(|it| output_sender.send(it)) |
38 | }); | 38 | }); |
39 | 39 | ||
40 | (path_sender, event_receiver, thread) | 40 | (interface, thread) |
41 | } | 41 | } |
42 | 42 | ||
43 | fn load_root(path: &Path) -> Vec<FileEvent> { | 43 | fn load_root(path: &Path) -> Vec<FileEvent> { |
diff --git a/crates/server/tests/heavy_tests/support.rs b/crates/server/tests/heavy_tests/support.rs index 297dcd9ae..2710ab59b 100644 --- a/crates/server/tests/heavy_tests/support.rs +++ b/crates/server/tests/heavy_tests/support.rs | |||
@@ -8,7 +8,7 @@ use std::{ | |||
8 | }; | 8 | }; |
9 | 9 | ||
10 | use tempdir::TempDir; | 10 | use tempdir::TempDir; |
11 | use crossbeam_channel::{unbounded, after, Sender, Receiver}; | 11 | use crossbeam_channel::{after, Sender, Receiver}; |
12 | use flexi_logger::Logger; | 12 | use flexi_logger::Logger; |
13 | use languageserver_types::{ | 13 | use languageserver_types::{ |
14 | Url, | 14 | Url, |
@@ -22,7 +22,7 @@ use serde::Serialize; | |||
22 | use serde_json::{Value, from_str, to_string_pretty}; | 22 | use serde_json::{Value, from_str, to_string_pretty}; |
23 | use gen_lsp_server::{RawMessage, RawRequest, RawNotification}; | 23 | use gen_lsp_server::{RawMessage, RawRequest, RawNotification}; |
24 | 24 | ||
25 | use m::{Result, main_loop, req}; | 25 | use m::{Result, main_loop, req, thread_watcher::worker_chan}; |
26 | 26 | ||
27 | pub fn project(fixture: &str) -> Server { | 27 | pub fn project(fixture: &str) -> Server { |
28 | static INIT: Once = Once::new(); | 28 | static INIT: Once = Once::new(); |
@@ -69,15 +69,19 @@ pub struct Server { | |||
69 | impl Server { | 69 | impl Server { |
70 | fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server { | 70 | fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server { |
71 | let path = dir.path().to_path_buf(); | 71 | let path = dir.path().to_path_buf(); |
72 | let (client_sender, mut server_receiver) = unbounded(); | 72 | let ((msg_sender, msg_receiver), server) = { |
73 | let (mut server_sender, client_receiver) = unbounded(); | 73 | let (api, mut msg_receiver, mut msg_sender) = worker_chan::<RawMessage, RawMessage>(128); |
74 | let server = thread::spawn(move || main_loop(true, path, &mut server_receiver, &mut server_sender)); | 74 | let server = thread::spawn(move || { |
75 | main_loop(true, path, &mut msg_receiver, &mut msg_sender) | ||
76 | }); | ||
77 | (api, server) | ||
78 | }; | ||
75 | let res = Server { | 79 | let res = Server { |
76 | req_id: Cell::new(1), | 80 | req_id: Cell::new(1), |
77 | dir, | 81 | dir, |
78 | messages: Default::default(), | 82 | messages: Default::default(), |
79 | sender: Some(client_sender), | 83 | sender: Some(msg_sender), |
80 | receiver: client_receiver, | 84 | receiver: msg_receiver, |
81 | server: Some(server), | 85 | server: Some(server), |
82 | }; | 86 | }; |
83 | 87 | ||