aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbors[bot] <bors[bot]@users.noreply.github.com>2018-12-18 09:54:56 +0000
committerbors[bot] <bors[bot]@users.noreply.github.com>2018-12-18 09:54:56 +0000
commitf3ea21ac16481de1197c9daadee3be96546974b5 (patch)
treefced7beb98137d8675e0f864c7b41c909b97ebf5
parent4a1ab869b7aaa38d55e8995ec1b49e72b55a5965 (diff)
parent193992fd14e88d91a3695f10204232d4c81192dc (diff)
Merge #286
286: move thread worker to a separate crate r=matklad a=matklad Going to tackle https://github.com/rust-analyzer/rust-analyzer/issues/243, this is a refactoring towards that goal Co-authored-by: Aleksey Kladov <[email protected]>
-rw-r--r--Cargo.lock10
-rw-r--r--crates/ra_lsp_server/Cargo.toml1
-rw-r--r--crates/ra_lsp_server/src/lib.rs1
-rw-r--r--crates/ra_lsp_server/src/main_loop.rs6
-rw-r--r--crates/ra_lsp_server/src/project_model.rs10
-rw-r--r--crates/ra_lsp_server/src/vfs.rs7
-rw-r--r--crates/ra_lsp_server/tests/heavy_tests/support.rs6
-rw-r--r--crates/thread_worker/Cargo.toml11
-rw-r--r--crates/thread_worker/src/lib.rs (renamed from crates/ra_lsp_server/src/thread_watcher.rs)56
9 files changed, 62 insertions, 46 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 56d5c65b9..15209184d 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -700,6 +700,7 @@ dependencies = [
700 "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)", 700 "tempdir 0.3.7 (registry+https://github.com/rust-lang/crates.io-index)",
701 "test_utils 0.1.0", 701 "test_utils 0.1.0",
702 "text_unit 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)", 702 "text_unit 0.1.5 (registry+https://github.com/rust-lang/crates.io-index)",
703 "thread_worker 0.1.0",
703 "threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)", 704 "threadpool 1.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
704 "url_serde 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)", 705 "url_serde 0.2.0 (registry+https://github.com/rust-lang/crates.io-index)",
705 "walkdir 2.2.6 (registry+https://github.com/rust-lang/crates.io-index)", 706 "walkdir 2.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -1117,6 +1118,15 @@ dependencies = [
1117] 1118]
1118 1119
1119[[package]] 1120[[package]]
1121name = "thread_worker"
1122version = "0.1.0"
1123dependencies = [
1124 "crossbeam-channel 0.2.6 (registry+https://github.com/rust-lang/crates.io-index)",
1125 "drop_bomb 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
1126 "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
1127]
1128
1129[[package]]
1120name = "threadpool" 1130name = "threadpool"
1121version = "1.7.1" 1131version = "1.7.1"
1122source = "registry+https://github.com/rust-lang/crates.io-index" 1132source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/crates/ra_lsp_server/Cargo.toml b/crates/ra_lsp_server/Cargo.toml
index 133decc52..30a8d35cd 100644
--- a/crates/ra_lsp_server/Cargo.toml
+++ b/crates/ra_lsp_server/Cargo.toml
@@ -26,6 +26,7 @@ text_unit = { version = "0.1.2", features = ["serde"] }
26smol_str = { version = "0.1.5", features = ["serde"] } 26smol_str = { version = "0.1.5", features = ["serde"] }
27rustc-hash = "1.0" 27rustc-hash = "1.0"
28 28
29thread_worker = { path = "../thread_worker" }
29ra_syntax = { path = "../ra_syntax" } 30ra_syntax = { path = "../ra_syntax" }
30ra_editor = { path = "../ra_editor" } 31ra_editor = { path = "../ra_editor" }
31ra_text_edit = { path = "../ra_text_edit" } 32ra_text_edit = { path = "../ra_text_edit" }
diff --git a/crates/ra_lsp_server/src/lib.rs b/crates/ra_lsp_server/src/lib.rs
index 75c6fa1b8..1d7258c35 100644
--- a/crates/ra_lsp_server/src/lib.rs
+++ b/crates/ra_lsp_server/src/lib.rs
@@ -5,7 +5,6 @@ mod path_map;
5mod project_model; 5mod project_model;
6pub mod req; 6pub mod req;
7mod server_world; 7mod server_world;
8pub mod thread_watcher;
9mod vfs; 8mod vfs;
10 9
11pub type Result<T> = ::std::result::Result<T, ::failure::Error>; 10pub type Result<T> = ::std::result::Result<T, ::failure::Error>;
diff --git a/crates/ra_lsp_server/src/main_loop.rs b/crates/ra_lsp_server/src/main_loop.rs
index 41f70f263..eab82ee85 100644
--- a/crates/ra_lsp_server/src/main_loop.rs
+++ b/crates/ra_lsp_server/src/main_loop.rs
@@ -10,6 +10,7 @@ use gen_lsp_server::{
10use languageserver_types::NumberOrString; 10use languageserver_types::NumberOrString;
11use ra_analysis::{Canceled, FileId, LibraryData}; 11use ra_analysis::{Canceled, FileId, LibraryData};
12use rayon; 12use rayon;
13use thread_worker::Worker;
13use threadpool::ThreadPool; 14use threadpool::ThreadPool;
14use rustc_hash::FxHashSet; 15use rustc_hash::FxHashSet;
15use serde::{de::DeserializeOwned, Serialize}; 16use serde::{de::DeserializeOwned, Serialize};
@@ -21,7 +22,6 @@ use crate::{
21 project_model::{workspace_loader, CargoWorkspace}, 22 project_model::{workspace_loader, CargoWorkspace},
22 req, 23 req,
23 server_world::{ServerWorld, ServerWorldState}, 24 server_world::{ServerWorld, ServerWorldState},
24 thread_watcher::Worker,
25 vfs::{self, FileEvent}, 25 vfs::{self, FileEvent},
26 Result, 26 Result,
27}; 27};
@@ -92,8 +92,8 @@ pub fn main_loop(
92 let ws_res = ws_watcher.stop(); 92 let ws_res = ws_watcher.stop();
93 93
94 main_res?; 94 main_res?;
95 fs_res?; 95 fs_res.map_err(|_| format_err!("fs watcher died"))?;
96 ws_res?; 96 ws_res.map_err(|_| format_err!("ws watcher died"))?;
97 97
98 Ok(()) 98 Ok(())
99} 99}
diff --git a/crates/ra_lsp_server/src/project_model.rs b/crates/ra_lsp_server/src/project_model.rs
index cb91ada90..b881f8b6f 100644
--- a/crates/ra_lsp_server/src/project_model.rs
+++ b/crates/ra_lsp_server/src/project_model.rs
@@ -4,11 +4,9 @@ use cargo_metadata::{metadata_run, CargoOpt};
4use ra_syntax::SmolStr; 4use ra_syntax::SmolStr;
5use rustc_hash::{FxHashMap, FxHashSet}; 5use rustc_hash::{FxHashMap, FxHashSet};
6use failure::{format_err, bail}; 6use failure::{format_err, bail};
7use thread_worker::{WorkerHandle, Worker};
7 8
8use crate::{ 9use crate::Result;
9 Result,
10 thread_watcher::{ThreadWatcher, Worker},
11};
12 10
13/// `CargoWorksapce` represents the logical structure of, well, a Cargo 11/// `CargoWorksapce` represents the logical structure of, well, a Cargo
14/// workspace. It pretty closely mirrors `cargo metadata` output. 12/// workspace. It pretty closely mirrors `cargo metadata` output.
@@ -199,8 +197,8 @@ impl TargetKind {
199 } 197 }
200} 198}
201 199
202pub fn workspace_loader() -> (Worker<PathBuf, Result<CargoWorkspace>>, ThreadWatcher) { 200pub fn workspace_loader() -> (Worker<PathBuf, Result<CargoWorkspace>>, WorkerHandle) {
203 Worker::<PathBuf, Result<CargoWorkspace>>::spawn( 201 thread_worker::spawn::<PathBuf, Result<CargoWorkspace>, _>(
204 "workspace loader", 202 "workspace loader",
205 1, 203 1,
206 |input_receiver, output_sender| { 204 |input_receiver, output_sender| {
diff --git a/crates/ra_lsp_server/src/vfs.rs b/crates/ra_lsp_server/src/vfs.rs
index 00ab3e6c3..fcf7693d8 100644
--- a/crates/ra_lsp_server/src/vfs.rs
+++ b/crates/ra_lsp_server/src/vfs.rs
@@ -4,8 +4,7 @@ use std::{
4}; 4};
5 5
6use walkdir::WalkDir; 6use walkdir::WalkDir;
7 7use thread_worker::{WorkerHandle, Worker};
8use crate::thread_watcher::{ThreadWatcher, Worker};
9 8
10#[derive(Debug)] 9#[derive(Debug)]
11pub struct FileEvent { 10pub struct FileEvent {
@@ -18,8 +17,8 @@ pub enum FileEventKind {
18 Add(String), 17 Add(String),
19} 18}
20 19
21pub fn roots_loader() -> (Worker<PathBuf, (PathBuf, Vec<FileEvent>)>, ThreadWatcher) { 20pub fn roots_loader() -> (Worker<PathBuf, (PathBuf, Vec<FileEvent>)>, WorkerHandle) {
22 Worker::<PathBuf, (PathBuf, Vec<FileEvent>)>::spawn( 21 thread_worker::spawn::<PathBuf, (PathBuf, Vec<FileEvent>), _>(
23 "roots loader", 22 "roots loader",
24 128, 23 128,
25 |input_receiver, output_sender| { 24 |input_receiver, output_sender| {
diff --git a/crates/ra_lsp_server/tests/heavy_tests/support.rs b/crates/ra_lsp_server/tests/heavy_tests/support.rs
index 4b75be3ee..07a878a26 100644
--- a/crates/ra_lsp_server/tests/heavy_tests/support.rs
+++ b/crates/ra_lsp_server/tests/heavy_tests/support.rs
@@ -17,11 +17,11 @@ use languageserver_types::{
17use serde::Serialize; 17use serde::Serialize;
18use serde_json::{to_string_pretty, Value}; 18use serde_json::{to_string_pretty, Value};
19use tempdir::TempDir; 19use tempdir::TempDir;
20use thread_worker::{WorkerHandle, Worker};
20use test_utils::{parse_fixture, find_mismatch}; 21use test_utils::{parse_fixture, find_mismatch};
21 22
22use ra_lsp_server::{ 23use ra_lsp_server::{
23 main_loop, req, 24 main_loop, req,
24 thread_watcher::{ThreadWatcher, Worker},
25}; 25};
26 26
27pub fn project(fixture: &str) -> Server { 27pub fn project(fixture: &str) -> Server {
@@ -45,13 +45,13 @@ pub struct Server {
45 messages: RefCell<Vec<RawMessage>>, 45 messages: RefCell<Vec<RawMessage>>,
46 dir: TempDir, 46 dir: TempDir,
47 worker: Option<Worker<RawMessage, RawMessage>>, 47 worker: Option<Worker<RawMessage, RawMessage>>,
48 watcher: Option<ThreadWatcher>, 48 watcher: Option<WorkerHandle>,
49} 49}
50 50
51impl Server { 51impl Server {
52 fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server { 52 fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server {
53 let path = dir.path().to_path_buf(); 53 let path = dir.path().to_path_buf();
54 let (worker, watcher) = Worker::<RawMessage, RawMessage>::spawn( 54 let (worker, watcher) = thread_worker::spawn::<RawMessage, RawMessage, _>(
55 "test server", 55 "test server",
56 128, 56 128,
57 move |mut msg_receiver, mut msg_sender| { 57 move |mut msg_receiver, mut msg_sender| {
diff --git a/crates/thread_worker/Cargo.toml b/crates/thread_worker/Cargo.toml
new file mode 100644
index 000000000..62d66a1a3
--- /dev/null
+++ b/crates/thread_worker/Cargo.toml
@@ -0,0 +1,11 @@
1[package]
2edition = "2018"
3name = "thread_worker"
4version = "0.1.0"
5authors = ["Aleksey Kladov <[email protected]>"]
6
7[dependencies]
8drop_bomb = "0.1.0"
9crossbeam-channel = "0.2.4"
10log = "0.4.3"
11
diff --git a/crates/ra_lsp_server/src/thread_watcher.rs b/crates/thread_worker/src/lib.rs
index 99825d440..e558559ef 100644
--- a/crates/ra_lsp_server/src/thread_watcher.rs
+++ b/crates/thread_worker/src/lib.rs
@@ -1,28 +1,35 @@
1//! Small utility to correctly spawn crossbeam-channel based worker threads.
2
1use std::thread; 3use std::thread;
2 4
3use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; 5use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
4use drop_bomb::DropBomb; 6use drop_bomb::DropBomb;
5use failure::format_err;
6
7use crate::Result;
8 7
9pub struct Worker<I, O> { 8pub struct Worker<I, O> {
10 pub inp: Sender<I>, 9 pub inp: Sender<I>,
11 pub out: Receiver<O>, 10 pub out: Receiver<O>,
12} 11}
13 12
14impl<I, O> Worker<I, O> { 13pub struct WorkerHandle {
15 pub fn spawn<F>(name: &'static str, buf: usize, f: F) -> (Self, ThreadWatcher) 14 name: &'static str,
16 where 15 thread: thread::JoinHandle<()>,
17 F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static, 16 bomb: DropBomb,
18 I: Send + 'static, 17}
19 O: Send + 'static,
20 {
21 let (worker, inp_r, out_s) = worker_chan(buf);
22 let watcher = ThreadWatcher::spawn(name, move || f(inp_r, out_s));
23 (worker, watcher)
24 }
25 18
19pub fn spawn<I, O, F>(name: &'static str, buf: usize, f: F) -> (Worker<I, O>, WorkerHandle)
20where
21 F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static,
22 I: Send + 'static,
23 O: Send + 'static,
24{
25 let (worker, inp_r, out_s) = worker_chan(buf);
26 let watcher = WorkerHandle::spawn(name, move || f(inp_r, out_s));
27 (worker, watcher)
28}
29
30impl<I, O> Worker<I, O> {
31 /// Stops the worker. Returns the message receiver to fetch results which
32 /// have become ready before the worker is stopped.
26 pub fn stop(self) -> Receiver<O> { 33 pub fn stop(self) -> Receiver<O> {
27 self.out 34 self.out
28 } 35 }
@@ -32,30 +39,21 @@ impl<I, O> Worker<I, O> {
32 } 39 }
33} 40}
34 41
35pub struct ThreadWatcher { 42impl WorkerHandle {
36 name: &'static str, 43 fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> WorkerHandle {
37 thread: thread::JoinHandle<()>,
38 bomb: DropBomb,
39}
40
41impl ThreadWatcher {
42 fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher {
43 let thread = thread::spawn(f); 44 let thread = thread::spawn(f);
44 ThreadWatcher { 45 WorkerHandle {
45 name, 46 name,
46 thread, 47 thread,
47 bomb: DropBomb::new(format!("ThreadWatcher {} was not stopped", name)), 48 bomb: DropBomb::new(format!("WorkerHandle {} was not stopped", name)),
48 } 49 }
49 } 50 }
50 51
51 pub fn stop(mut self) -> Result<()> { 52 pub fn stop(mut self) -> thread::Result<()> {
52 log::info!("waiting for {} to finish ...", self.name); 53 log::info!("waiting for {} to finish ...", self.name);
53 let name = self.name; 54 let name = self.name;
54 self.bomb.defuse(); 55 self.bomb.defuse();
55 let res = self 56 let res = self.thread.join();
56 .thread
57 .join()
58 .map_err(|_| format_err!("ThreadWatcher {} died", name));
59 match &res { 57 match &res {
60 Ok(()) => log::info!("... {} terminated with ok", name), 58 Ok(()) => log::info!("... {} terminated with ok", name),
61 Err(_) => log::error!("... {} terminated with err", name), 59 Err(_) => log::error!("... {} terminated with err", name),