diff options
author | Aleksey Kladov <[email protected]> | 2018-12-18 09:45:20 +0000 |
---|---|---|
committer | Aleksey Kladov <[email protected]> | 2018-12-18 09:52:17 +0000 |
commit | 193992fd14e88d91a3695f10204232d4c81192dc (patch) | |
tree | fced7beb98137d8675e0f864c7b41c909b97ebf5 | |
parent | 4a1ab869b7aaa38d55e8995ec1b49e72b55a5965 (diff) |
move thread worker to a separate crate
-rw-r--r-- | Cargo.lock | 10 | ||||
-rw-r--r-- | crates/ra_lsp_server/Cargo.toml | 1 | ||||
-rw-r--r-- | crates/ra_lsp_server/src/lib.rs | 1 | ||||
-rw-r--r-- | crates/ra_lsp_server/src/main_loop.rs | 6 | ||||
-rw-r--r-- | crates/ra_lsp_server/src/project_model.rs | 10 | ||||
-rw-r--r-- | crates/ra_lsp_server/src/vfs.rs | 7 | ||||
-rw-r--r-- | crates/ra_lsp_server/tests/heavy_tests/support.rs | 6 | ||||
-rw-r--r-- | crates/thread_worker/Cargo.toml | 11 | ||||
-rw-r--r-- | crates/thread_worker/src/lib.rs (renamed from crates/ra_lsp_server/src/thread_watcher.rs) | 56 |
9 files changed, 62 insertions, 46 deletions
diff --git a/Cargo.lock b/Cargo.lock index 56d5c65b9..15209184d 100644 --- a/Cargo.lock +++ b/Cargo.lock | |||
@@ -700,6 +700,7 @@ dependencies = [ | |||
700 | "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", | 700 | "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", |
701 | "test_utils 0.1.0", | 701 | "test_utils 0.1.0", |
702 | "text_unit 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", | 702 | "text_unit 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", |
703 | "thread_worker 0.1.0", | ||
703 | "threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)", | 704 | "threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)", |
704 | "url_serde 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", | 705 | "url_serde 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", |
705 | "walkdir 2.2.6 (registry+https://github.com/rust-lang/crates.io-index)", | 706 | "walkdir 2.2.6 (registry+https://github.com/rust-lang/crates.io-index)", |
@@ -1117,6 +1118,15 @@ dependencies = [ | |||
1117 | ] | 1118 | ] |
1118 | 1119 | ||
1119 | [[package]] | 1120 | [[package]] |
1121 | name = "thread_worker" | ||
1122 | version = "0.1.0" | ||
1123 | dependencies = [ | ||
1124 | "crossbeam-channel 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)", | ||
1125 | "drop_bomb 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", | ||
1126 | "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", | ||
1127 | ] | ||
1128 | |||
1129 | [[package]] | ||
1120 | name = "threadpool" | 1130 | name = "threadpool" |
1121 | version = "1.7.1" | 1131 | version = "1.7.1" |
1122 | source = "registry+https://github.com/rust-lang/crates.io-index" | 1132 | source = "registry+https://github.com/rust-lang/crates.io-index" |
diff --git a/crates/ra_lsp_server/Cargo.toml b/crates/ra_lsp_server/Cargo.toml index 133decc52..30a8d35cd 100644 --- a/crates/ra_lsp_server/Cargo.toml +++ b/crates/ra_lsp_server/Cargo.toml | |||
@@ -26,6 +26,7 @@ text_unit = { version = "0.1.2", features = ["serde"] } | |||
26 | smol_str = { version = "0.1.5", features = ["serde"] } | 26 | smol_str = { version = "0.1.5", features = ["serde"] } |
27 | rustc-hash = "1.0" | 27 | rustc-hash = "1.0" |
28 | 28 | ||
29 | thread_worker = { path = "../thread_worker" } | ||
29 | ra_syntax = { path = "../ra_syntax" } | 30 | ra_syntax = { path = "../ra_syntax" } |
30 | ra_editor = { path = "../ra_editor" } | 31 | ra_editor = { path = "../ra_editor" } |
31 | ra_text_edit = { path = "../ra_text_edit" } | 32 | ra_text_edit = { path = "../ra_text_edit" } |
diff --git a/crates/ra_lsp_server/src/lib.rs b/crates/ra_lsp_server/src/lib.rs index 75c6fa1b8..1d7258c35 100644 --- a/crates/ra_lsp_server/src/lib.rs +++ b/crates/ra_lsp_server/src/lib.rs | |||
@@ -5,7 +5,6 @@ mod path_map; | |||
5 | mod project_model; | 5 | mod project_model; |
6 | pub mod req; | 6 | pub mod req; |
7 | mod server_world; | 7 | mod server_world; |
8 | pub mod thread_watcher; | ||
9 | mod vfs; | 8 | mod vfs; |
10 | 9 | ||
11 | pub type Result<T> = ::std::result::Result<T, ::failure::Error>; | 10 | pub type Result<T> = ::std::result::Result<T, ::failure::Error>; |
diff --git a/crates/ra_lsp_server/src/main_loop.rs b/crates/ra_lsp_server/src/main_loop.rs index 41f70f263..eab82ee85 100644 --- a/crates/ra_lsp_server/src/main_loop.rs +++ b/crates/ra_lsp_server/src/main_loop.rs | |||
@@ -10,6 +10,7 @@ use gen_lsp_server::{ | |||
10 | use languageserver_types::NumberOrString; | 10 | use languageserver_types::NumberOrString; |
11 | use ra_analysis::{Canceled, FileId, LibraryData}; | 11 | use ra_analysis::{Canceled, FileId, LibraryData}; |
12 | use rayon; | 12 | use rayon; |
13 | use thread_worker::Worker; | ||
13 | use threadpool::ThreadPool; | 14 | use threadpool::ThreadPool; |
14 | use rustc_hash::FxHashSet; | 15 | use rustc_hash::FxHashSet; |
15 | use serde::{de::DeserializeOwned, Serialize}; | 16 | use serde::{de::DeserializeOwned, Serialize}; |
@@ -21,7 +22,6 @@ use crate::{ | |||
21 | project_model::{workspace_loader, CargoWorkspace}, | 22 | project_model::{workspace_loader, CargoWorkspace}, |
22 | req, | 23 | req, |
23 | server_world::{ServerWorld, ServerWorldState}, | 24 | server_world::{ServerWorld, ServerWorldState}, |
24 | thread_watcher::Worker, | ||
25 | vfs::{self, FileEvent}, | 25 | vfs::{self, FileEvent}, |
26 | Result, | 26 | Result, |
27 | }; | 27 | }; |
@@ -92,8 +92,8 @@ pub fn main_loop( | |||
92 | let ws_res = ws_watcher.stop(); | 92 | let ws_res = ws_watcher.stop(); |
93 | 93 | ||
94 | main_res?; | 94 | main_res?; |
95 | fs_res?; | 95 | fs_res.map_err(|_| format_err!("fs watcher died"))?; |
96 | ws_res?; | 96 | ws_res.map_err(|_| format_err!("ws watcher died"))?; |
97 | 97 | ||
98 | Ok(()) | 98 | Ok(()) |
99 | } | 99 | } |
diff --git a/crates/ra_lsp_server/src/project_model.rs b/crates/ra_lsp_server/src/project_model.rs index cb91ada90..b881f8b6f 100644 --- a/crates/ra_lsp_server/src/project_model.rs +++ b/crates/ra_lsp_server/src/project_model.rs | |||
@@ -4,11 +4,9 @@ use cargo_metadata::{metadata_run, CargoOpt}; | |||
4 | use ra_syntax::SmolStr; | 4 | use ra_syntax::SmolStr; |
5 | use rustc_hash::{FxHashMap, FxHashSet}; | 5 | use rustc_hash::{FxHashMap, FxHashSet}; |
6 | use failure::{format_err, bail}; | 6 | use failure::{format_err, bail}; |
7 | use thread_worker::{WorkerHandle, Worker}; | ||
7 | 8 | ||
8 | use crate::{ | 9 | use crate::Result; |
9 | Result, | ||
10 | thread_watcher::{ThreadWatcher, Worker}, | ||
11 | }; | ||
12 | 10 | ||
13 | /// `CargoWorksapce` represents the logical structure of, well, a Cargo | 11 | /// `CargoWorksapce` represents the logical structure of, well, a Cargo |
14 | /// workspace. It pretty closely mirrors `cargo metadata` output. | 12 | /// workspace. It pretty closely mirrors `cargo metadata` output. |
@@ -199,8 +197,8 @@ impl TargetKind { | |||
199 | } | 197 | } |
200 | } | 198 | } |
201 | 199 | ||
202 | pub fn workspace_loader() -> (Worker<PathBuf, Result<CargoWorkspace>>, ThreadWatcher) { | 200 | pub fn workspace_loader() -> (Worker<PathBuf, Result<CargoWorkspace>>, WorkerHandle) { |
203 | Worker::<PathBuf, Result<CargoWorkspace>>::spawn( | 201 | thread_worker::spawn::<PathBuf, Result<CargoWorkspace>, _>( |
204 | "workspace loader", | 202 | "workspace loader", |
205 | 1, | 203 | 1, |
206 | |input_receiver, output_sender| { | 204 | |input_receiver, output_sender| { |
diff --git a/crates/ra_lsp_server/src/vfs.rs b/crates/ra_lsp_server/src/vfs.rs index 00ab3e6c3..fcf7693d8 100644 --- a/crates/ra_lsp_server/src/vfs.rs +++ b/crates/ra_lsp_server/src/vfs.rs | |||
@@ -4,8 +4,7 @@ use std::{ | |||
4 | }; | 4 | }; |
5 | 5 | ||
6 | use walkdir::WalkDir; | 6 | use walkdir::WalkDir; |
7 | 7 | use thread_worker::{WorkerHandle, Worker}; | |
8 | use crate::thread_watcher::{ThreadWatcher, Worker}; | ||
9 | 8 | ||
10 | #[derive(Debug)] | 9 | #[derive(Debug)] |
11 | pub struct FileEvent { | 10 | pub struct FileEvent { |
@@ -18,8 +17,8 @@ pub enum FileEventKind { | |||
18 | Add(String), | 17 | Add(String), |
19 | } | 18 | } |
20 | 19 | ||
21 | pub fn roots_loader() -> (Worker<PathBuf, (PathBuf, Vec<FileEvent>)>, ThreadWatcher) { | 20 | pub fn roots_loader() -> (Worker<PathBuf, (PathBuf, Vec<FileEvent>)>, WorkerHandle) { |
22 | Worker::<PathBuf, (PathBuf, Vec<FileEvent>)>::spawn( | 21 | thread_worker::spawn::<PathBuf, (PathBuf, Vec<FileEvent>), _>( |
23 | "roots loader", | 22 | "roots loader", |
24 | 128, | 23 | 128, |
25 | |input_receiver, output_sender| { | 24 | |input_receiver, output_sender| { |
diff --git a/crates/ra_lsp_server/tests/heavy_tests/support.rs b/crates/ra_lsp_server/tests/heavy_tests/support.rs index 4b75be3ee..07a878a26 100644 --- a/crates/ra_lsp_server/tests/heavy_tests/support.rs +++ b/crates/ra_lsp_server/tests/heavy_tests/support.rs | |||
@@ -17,11 +17,11 @@ use languageserver_types::{ | |||
17 | use serde::Serialize; | 17 | use serde::Serialize; |
18 | use serde_json::{to_string_pretty, Value}; | 18 | use serde_json::{to_string_pretty, Value}; |
19 | use tempdir::TempDir; | 19 | use tempdir::TempDir; |
20 | use thread_worker::{WorkerHandle, Worker}; | ||
20 | use test_utils::{parse_fixture, find_mismatch}; | 21 | use test_utils::{parse_fixture, find_mismatch}; |
21 | 22 | ||
22 | use ra_lsp_server::{ | 23 | use ra_lsp_server::{ |
23 | main_loop, req, | 24 | main_loop, req, |
24 | thread_watcher::{ThreadWatcher, Worker}, | ||
25 | }; | 25 | }; |
26 | 26 | ||
27 | pub fn project(fixture: &str) -> Server { | 27 | pub fn project(fixture: &str) -> Server { |
@@ -45,13 +45,13 @@ pub struct Server { | |||
45 | messages: RefCell<Vec<RawMessage>>, | 45 | messages: RefCell<Vec<RawMessage>>, |
46 | dir: TempDir, | 46 | dir: TempDir, |
47 | worker: Option<Worker<RawMessage, RawMessage>>, | 47 | worker: Option<Worker<RawMessage, RawMessage>>, |
48 | watcher: Option<ThreadWatcher>, | 48 | watcher: Option<WorkerHandle>, |
49 | } | 49 | } |
50 | 50 | ||
51 | impl Server { | 51 | impl Server { |
52 | fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server { | 52 | fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server { |
53 | let path = dir.path().to_path_buf(); | 53 | let path = dir.path().to_path_buf(); |
54 | let (worker, watcher) = Worker::<RawMessage, RawMessage>::spawn( | 54 | let (worker, watcher) = thread_worker::spawn::<RawMessage, RawMessage, _>( |
55 | "test server", | 55 | "test server", |
56 | 128, | 56 | 128, |
57 | move |mut msg_receiver, mut msg_sender| { | 57 | move |mut msg_receiver, mut msg_sender| { |
diff --git a/crates/thread_worker/Cargo.toml b/crates/thread_worker/Cargo.toml new file mode 100644 index 000000000..62d66a1a3 --- /dev/null +++ b/crates/thread_worker/Cargo.toml | |||
@@ -0,0 +1,11 @@ | |||
1 | [package] | ||
2 | edition = "2018" | ||
3 | name = "thread_worker" | ||
4 | version = "0.1.0" | ||
5 | authors = ["Aleksey Kladov <[email protected]>"] | ||
6 | |||
7 | [dependencies] | ||
8 | drop_bomb = "0.1.0" | ||
9 | crossbeam-channel = "0.2.4" | ||
10 | log = "0.4.3" | ||
11 | |||
diff --git a/crates/ra_lsp_server/src/thread_watcher.rs b/crates/thread_worker/src/lib.rs index 99825d440..e558559ef 100644 --- a/crates/ra_lsp_server/src/thread_watcher.rs +++ b/crates/thread_worker/src/lib.rs | |||
@@ -1,28 +1,35 @@ | |||
1 | //! Small utility to correctly spawn crossbeam-channel based worker threads. | ||
2 | |||
1 | use std::thread; | 3 | use std::thread; |
2 | 4 | ||
3 | use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; | 5 | use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; |
4 | use drop_bomb::DropBomb; | 6 | use drop_bomb::DropBomb; |
5 | use failure::format_err; | ||
6 | |||
7 | use crate::Result; | ||
8 | 7 | ||
9 | pub struct Worker<I, O> { | 8 | pub struct Worker<I, O> { |
10 | pub inp: Sender<I>, | 9 | pub inp: Sender<I>, |
11 | pub out: Receiver<O>, | 10 | pub out: Receiver<O>, |
12 | } | 11 | } |
13 | 12 | ||
14 | impl<I, O> Worker<I, O> { | 13 | pub struct WorkerHandle { |
15 | pub fn spawn<F>(name: &'static str, buf: usize, f: F) -> (Self, ThreadWatcher) | 14 | name: &'static str, |
16 | where | 15 | thread: thread::JoinHandle<()>, |
17 | F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static, | 16 | bomb: DropBomb, |
18 | I: Send + 'static, | 17 | } |
19 | O: Send + 'static, | ||
20 | { | ||
21 | let (worker, inp_r, out_s) = worker_chan(buf); | ||
22 | let watcher = ThreadWatcher::spawn(name, move || f(inp_r, out_s)); | ||
23 | (worker, watcher) | ||
24 | } | ||
25 | 18 | ||
19 | pub fn spawn<I, O, F>(name: &'static str, buf: usize, f: F) -> (Worker<I, O>, WorkerHandle) | ||
20 | where | ||
21 | F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static, | ||
22 | I: Send + 'static, | ||
23 | O: Send + 'static, | ||
24 | { | ||
25 | let (worker, inp_r, out_s) = worker_chan(buf); | ||
26 | let watcher = WorkerHandle::spawn(name, move || f(inp_r, out_s)); | ||
27 | (worker, watcher) | ||
28 | } | ||
29 | |||
30 | impl<I, O> Worker<I, O> { | ||
31 | /// Stops the worker. Returns the message receiver to fetch results which | ||
32 | /// have become ready before the worker is stopped. | ||
26 | pub fn stop(self) -> Receiver<O> { | 33 | pub fn stop(self) -> Receiver<O> { |
27 | self.out | 34 | self.out |
28 | } | 35 | } |
@@ -32,30 +39,21 @@ impl<I, O> Worker<I, O> { | |||
32 | } | 39 | } |
33 | } | 40 | } |
34 | 41 | ||
35 | pub struct ThreadWatcher { | 42 | impl WorkerHandle { |
36 | name: &'static str, | 43 | fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> WorkerHandle { |
37 | thread: thread::JoinHandle<()>, | ||
38 | bomb: DropBomb, | ||
39 | } | ||
40 | |||
41 | impl ThreadWatcher { | ||
42 | fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher { | ||
43 | let thread = thread::spawn(f); | 44 | let thread = thread::spawn(f); |
44 | ThreadWatcher { | 45 | WorkerHandle { |
45 | name, | 46 | name, |
46 | thread, | 47 | thread, |
47 | bomb: DropBomb::new(format!("ThreadWatcher {} was not stopped", name)), | 48 | bomb: DropBomb::new(format!("WorkerHandle {} was not stopped", name)), |
48 | } | 49 | } |
49 | } | 50 | } |
50 | 51 | ||
51 | pub fn stop(mut self) -> Result<()> { | 52 | pub fn stop(mut self) -> thread::Result<()> { |
52 | log::info!("waiting for {} to finish ...", self.name); | 53 | log::info!("waiting for {} to finish ...", self.name); |
53 | let name = self.name; | 54 | let name = self.name; |
54 | self.bomb.defuse(); | 55 | self.bomb.defuse(); |
55 | let res = self | 56 | let res = self.thread.join(); |
56 | .thread | ||
57 | .join() | ||
58 | .map_err(|_| format_err!("ThreadWatcher {} died", name)); | ||
59 | match &res { | 57 | match &res { |
60 | Ok(()) => log::info!("... {} terminated with ok", name), | 58 | Ok(()) => log::info!("... {} terminated with ok", name), |
61 | Err(_) => log::error!("... {} terminated with err", name), | 59 | Err(_) => log::error!("... {} terminated with err", name), |