aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock1
-rw-r--r--crates/ra_batch/src/lib.rs1
-rw-r--r--crates/ra_ide_api/src/call_info.rs22
-rw-r--r--crates/ra_ide_api/src/completion/complete_postfix.rs21
-rw-r--r--crates/ra_ide_api/src/completion/snapshots/completion_item__postfix_completion_works_for_trivial_path_expression.snap76
-rw-r--r--crates/ra_lsp_server/src/main_loop.rs26
-rw-r--r--crates/ra_lsp_server/src/project_model.rs6
-rw-r--r--crates/ra_lsp_server/tests/heavy_tests/support.rs17
-rw-r--r--crates/ra_vfs/src/io.rs91
-rw-r--r--crates/ra_vfs/src/lib.rs8
-rw-r--r--crates/ra_vfs/tests/vfs.rs1
-rw-r--r--crates/thread_worker/Cargo.toml1
-rw-r--r--crates/thread_worker/src/lib.rs120
13 files changed, 210 insertions, 181 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 65ffff90a..266b88f19 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1651,7 +1651,6 @@ name = "thread_worker"
1651version = "0.1.0" 1651version = "0.1.0"
1652dependencies = [ 1652dependencies = [
1653 "crossbeam-channel 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)", 1653 "crossbeam-channel 0.3.8 (registry+https://github.com/rust-lang/crates.io-index)",
1654 "drop_bomb 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)",
1655 "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", 1654 "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)",
1656] 1655]
1657 1656
diff --git a/crates/ra_batch/src/lib.rs b/crates/ra_batch/src/lib.rs
index d08fad908..69d66113e 100644
--- a/crates/ra_batch/src/lib.rs
+++ b/crates/ra_batch/src/lib.rs
@@ -121,7 +121,6 @@ impl BatchDatabase {
121 .collect(); 121 .collect();
122 122
123 let db = BatchDatabase::load(crate_graph, &mut vfs); 123 let db = BatchDatabase::load(crate_graph, &mut vfs);
124 let _ = vfs.shutdown();
125 Ok((db, local_roots)) 124 Ok((db, local_roots))
126 } 125 }
127} 126}
diff --git a/crates/ra_ide_api/src/call_info.rs b/crates/ra_ide_api/src/call_info.rs
index 1b279615c..c5e8d5843 100644
--- a/crates/ra_ide_api/src/call_info.rs
+++ b/crates/ra_ide_api/src/call_info.rs
@@ -1,7 +1,7 @@
1use test_utils::tested_by; 1use test_utils::tested_by;
2use ra_db::SourceDatabase; 2use ra_db::SourceDatabase;
3use ra_syntax::{ 3use ra_syntax::{
4 AstNode, SyntaxNode, TextUnit, TextRange, 4 AstNode, SyntaxNode, TextUnit,
5 SyntaxKind::FN_DEF, 5 SyntaxKind::FN_DEF,
6 ast::{self, ArgListOwner}, 6 ast::{self, ArgListOwner},
7 algo::find_node_at_offset, 7 algo::find_node_at_offset,
@@ -38,28 +38,20 @@ pub(crate) fn call_info(db: &RootDatabase, position: FilePosition) -> Option<Cal
38 } 38 }
39 } else if num_params > 1 { 39 } else if num_params > 1 {
40 // Count how many parameters into the call we are. 40 // Count how many parameters into the call we are.
41 // TODO: This is best effort for now and should be fixed at some point.
42 // It may be better to see where we are in the arg_list and then check
43 // where offset is in that list (or beyond).
44 // Revisit this after we get documentation comments in.
45 if let Some(ref arg_list) = calling_node.arg_list() { 41 if let Some(ref arg_list) = calling_node.arg_list() {
46 let arg_list_range = arg_list.syntax().range(); 42 let arg_list_range = arg_list.syntax().range();
47 if !arg_list_range.contains_inclusive(position.offset) { 43 if !arg_list_range.contains_inclusive(position.offset) {
48 tested_by!(call_info_bad_offset); 44 tested_by!(call_info_bad_offset);
49 return None; 45 return None;
50 } 46 }
51 let start = arg_list_range.start();
52 47
53 let range_search = TextRange::from_to(start, position.offset); 48 let param = arg_list
54 let mut commas: usize = 49 .args()
55 arg_list.syntax().text().slice(range_search).to_string().matches(',').count(); 50 .position(|arg| arg.syntax().range().contains(position.offset))
51 .or(Some(num_params - 1))
52 .unwrap();
56 53
57 // If we have a method call eat the first param since it's just self. 54 call_info.active_parameter = Some(param);
58 if has_self {
59 commas += 1;
60 }
61
62 call_info.active_parameter = Some(commas);
63 } 55 }
64 } 56 }
65 57
diff --git a/crates/ra_ide_api/src/completion/complete_postfix.rs b/crates/ra_ide_api/src/completion/complete_postfix.rs
index 10a3c8db7..cf8c63269 100644
--- a/crates/ra_ide_api/src/completion/complete_postfix.rs
+++ b/crates/ra_ide_api/src/completion/complete_postfix.rs
@@ -15,7 +15,7 @@ use ra_syntax::{
15}; 15};
16use ra_text_edit::TextEditBuilder; 16use ra_text_edit::TextEditBuilder;
17 17
18fn postfix_snippet(ctx: &CompletionContext, label: &str, snippet: &str) -> Builder { 18fn postfix_snippet(ctx: &CompletionContext, label: &str, detail: &str, snippet: &str) -> Builder {
19 let replace_range = ctx.source_range(); 19 let replace_range = ctx.source_range();
20 let receiver_range = ctx.dot_receiver.expect("no receiver available").syntax().range(); 20 let receiver_range = ctx.dot_receiver.expect("no receiver available").syntax().range();
21 let delete_range = TextRange::from_to(receiver_range.start(), replace_range.start()); 21 let delete_range = TextRange::from_to(receiver_range.start(), replace_range.start());
@@ -23,22 +23,33 @@ fn postfix_snippet(ctx: &CompletionContext, label: &str, snippet: &str) -> Build
23 builder.delete(delete_range); 23 builder.delete(delete_range);
24 CompletionItem::new(CompletionKind::Postfix, replace_range, label) 24 CompletionItem::new(CompletionKind::Postfix, replace_range, label)
25 .snippet(snippet) 25 .snippet(snippet)
26 .detail(detail)
26 .text_edit(builder.finish()) 27 .text_edit(builder.finish())
27} 28}
28 29
29pub(super) fn complete_postfix(acc: &mut Completions, ctx: &CompletionContext) { 30pub(super) fn complete_postfix(acc: &mut Completions, ctx: &CompletionContext) {
30 if let Some(dot_receiver) = ctx.dot_receiver { 31 if let Some(dot_receiver) = ctx.dot_receiver {
31 let receiver_text = dot_receiver.syntax().text().to_string(); 32 let receiver_text = dot_receiver.syntax().text().to_string();
32 postfix_snippet(ctx, "not", &format!("!{}", receiver_text)).add_to(acc); 33 postfix_snippet(ctx, "not", "!expr", &format!("!{}", receiver_text)).add_to(acc);
33 postfix_snippet(ctx, "if", &format!("if {} {{$0}}", receiver_text)).add_to(acc); 34 postfix_snippet(ctx, "ref", "&expr", &format!("&{}", receiver_text)).add_to(acc);
35 postfix_snippet(ctx, "refm", "&mut expr", &format!("&mut {}", receiver_text)).add_to(acc);
36 postfix_snippet(ctx, "if", "if expr {}", &format!("if {} {{$0}}", receiver_text))
37 .add_to(acc);
34 postfix_snippet( 38 postfix_snippet(
35 ctx, 39 ctx,
36 "match", 40 "match",
41 "match expr {}",
37 &format!("match {} {{\n${{1:_}} => {{$0\\}},\n}}", receiver_text), 42 &format!("match {} {{\n${{1:_}} => {{$0\\}},\n}}", receiver_text),
38 ) 43 )
39 .add_to(acc); 44 .add_to(acc);
40 postfix_snippet(ctx, "while", &format!("while {} {{\n$0\n}}", receiver_text)).add_to(acc); 45 postfix_snippet(
41 postfix_snippet(ctx, "dbg", &format!("dbg!({})", receiver_text)).add_to(acc); 46 ctx,
47 "while",
48 "while expr {}",
49 &format!("while {} {{\n$0\n}}", receiver_text),
50 )
51 .add_to(acc);
52 postfix_snippet(ctx, "dbg", "dbg!(expr)", &format!("dbg!({})", receiver_text)).add_to(acc);
42 } 53 }
43} 54}
44 55
diff --git a/crates/ra_ide_api/src/completion/snapshots/completion_item__postfix_completion_works_for_trivial_path_expression.snap b/crates/ra_ide_api/src/completion/snapshots/completion_item__postfix_completion_works_for_trivial_path_expression.snap
index bc886ef0b..d3e53e4e8 100644
--- a/crates/ra_ide_api/src/completion/snapshots/completion_item__postfix_completion_works_for_trivial_path_expression.snap
+++ b/crates/ra_ide_api/src/completion/snapshots/completion_item__postfix_completion_works_for_trivial_path_expression.snap
@@ -1,15 +1,17 @@
1--- 1---
2created: "2019-02-03T11:38:42.897384636+00:00" 2created: "2019-02-14T18:33:26.102469493Z"
3creator: [email protected] 3creator: [email protected]
4expression: kind_completions
5source: crates/ra_ide_api/src/completion/completion_item.rs 4source: crates/ra_ide_api/src/completion/completion_item.rs
5expression: kind_completions
6--- 6---
7[ 7[
8 CompletionItem { 8 CompletionItem {
9 completion_kind: Postfix, 9 completion_kind: Postfix,
10 label: "dbg", 10 label: "dbg",
11 kind: None, 11 kind: None,
12 detail: None, 12 detail: Some(
13 "dbg!(expr)"
14 ),
13 documentation: None, 15 documentation: None,
14 lookup: None, 16 lookup: None,
15 insert_text: Some( 17 insert_text: Some(
@@ -32,7 +34,9 @@ source: crates/ra_ide_api/src/completion/completion_item.rs
32 completion_kind: Postfix, 34 completion_kind: Postfix,
33 label: "if", 35 label: "if",
34 kind: None, 36 kind: None,
35 detail: None, 37 detail: Some(
38 "if expr {}"
39 ),
36 documentation: None, 40 documentation: None,
37 lookup: None, 41 lookup: None,
38 insert_text: Some( 42 insert_text: Some(
@@ -55,7 +59,9 @@ source: crates/ra_ide_api/src/completion/completion_item.rs
55 completion_kind: Postfix, 59 completion_kind: Postfix,
56 label: "match", 60 label: "match",
57 kind: None, 61 kind: None,
58 detail: None, 62 detail: Some(
63 "match expr {}"
64 ),
59 documentation: None, 65 documentation: None,
60 lookup: None, 66 lookup: None,
61 insert_text: Some( 67 insert_text: Some(
@@ -78,7 +84,9 @@ source: crates/ra_ide_api/src/completion/completion_item.rs
78 completion_kind: Postfix, 84 completion_kind: Postfix,
79 label: "not", 85 label: "not",
80 kind: None, 86 kind: None,
81 detail: None, 87 detail: Some(
88 "!expr"
89 ),
82 documentation: None, 90 documentation: None,
83 lookup: None, 91 lookup: None,
84 insert_text: Some( 92 insert_text: Some(
@@ -99,9 +107,61 @@ source: crates/ra_ide_api/src/completion/completion_item.rs
99 }, 107 },
100 CompletionItem { 108 CompletionItem {
101 completion_kind: Postfix, 109 completion_kind: Postfix,
110 label: "ref",
111 kind: None,
112 detail: Some(
113 "&expr"
114 ),
115 documentation: None,
116 lookup: None,
117 insert_text: Some(
118 "&bar"
119 ),
120 insert_text_format: Snippet,
121 source_range: [76; 76),
122 text_edit: Some(
123 TextEdit {
124 atoms: [
125 AtomTextEdit {
126 delete: [72; 76),
127 insert: ""
128 }
129 ]
130 }
131 )
132 },
133 CompletionItem {
134 completion_kind: Postfix,
135 label: "refm",
136 kind: None,
137 detail: Some(
138 "&mut expr"
139 ),
140 documentation: None,
141 lookup: None,
142 insert_text: Some(
143 "&mut bar"
144 ),
145 insert_text_format: Snippet,
146 source_range: [76; 76),
147 text_edit: Some(
148 TextEdit {
149 atoms: [
150 AtomTextEdit {
151 delete: [72; 76),
152 insert: ""
153 }
154 ]
155 }
156 )
157 },
158 CompletionItem {
159 completion_kind: Postfix,
102 label: "while", 160 label: "while",
103 kind: None, 161 kind: None,
104 detail: None, 162 detail: Some(
163 "while expr {}"
164 ),
105 documentation: None, 165 documentation: None,
106 lookup: None, 166 lookup: None,
107 insert_text: Some( 167 insert_text: Some(
diff --git a/crates/ra_lsp_server/src/main_loop.rs b/crates/ra_lsp_server/src/main_loop.rs
index a51299851..06443bb76 100644
--- a/crates/ra_lsp_server/src/main_loop.rs
+++ b/crates/ra_lsp_server/src/main_loop.rs
@@ -54,19 +54,20 @@ pub fn main_loop(
54) -> Result<()> { 54) -> Result<()> {
55 let pool = ThreadPool::new(THREADPOOL_SIZE); 55 let pool = ThreadPool::new(THREADPOOL_SIZE);
56 let (task_sender, task_receiver) = unbounded::<Task>(); 56 let (task_sender, task_receiver) = unbounded::<Task>();
57 let (ws_worker, ws_watcher) = workspace_loader();
58 57
59 ws_worker.send(ws_root.clone()).unwrap();
60 // FIXME: support dynamic workspace loading. 58 // FIXME: support dynamic workspace loading.
61 let workspaces = match ws_worker.recv().unwrap() { 59 let workspaces = {
62 Ok(ws) => vec![ws], 60 let ws_worker = workspace_loader();
63 Err(e) => { 61 ws_worker.sender().send(ws_root.clone()).unwrap();
64 log::error!("loading workspace failed: {}", e); 62 match ws_worker.receiver().recv().unwrap() {
65 Vec::new() 63 Ok(ws) => vec![ws],
64 Err(e) => {
65 log::error!("loading workspace failed: {}", e);
66 Vec::new()
67 }
66 } 68 }
67 }; 69 };
68 ws_worker.shutdown(); 70
69 ws_watcher.shutdown().map_err(|_| format_err!("ws watcher died"))?;
70 let mut state = ServerWorldState::new(ws_root.clone(), workspaces); 71 let mut state = ServerWorldState::new(ws_root.clone(), workspaces);
71 72
72 log::info!("server initialized, serving requests"); 73 log::info!("server initialized, serving requests");
@@ -94,12 +95,9 @@ pub fn main_loop(
94 log::info!("...threadpool has finished"); 95 log::info!("...threadpool has finished");
95 96
96 let vfs = Arc::try_unwrap(state.vfs).expect("all snapshots should be dead"); 97 let vfs = Arc::try_unwrap(state.vfs).expect("all snapshots should be dead");
97 let vfs_res = vfs.into_inner().shutdown(); 98 drop(vfs);
98 99
99 main_res?; 100 main_res
100 vfs_res.map_err(|_| format_err!("fs watcher died"))?;
101
102 Ok(())
103} 101}
104 102
105enum Event { 103enum Event {
diff --git a/crates/ra_lsp_server/src/project_model.rs b/crates/ra_lsp_server/src/project_model.rs
index 6800eb138..7d6440fad 100644
--- a/crates/ra_lsp_server/src/project_model.rs
+++ b/crates/ra_lsp_server/src/project_model.rs
@@ -1,6 +1,6 @@
1use std::path::PathBuf; 1use std::path::PathBuf;
2 2
3use thread_worker::{WorkerHandle, Worker}; 3use thread_worker::Worker;
4 4
5use crate::Result; 5use crate::Result;
6 6
@@ -8,8 +8,8 @@ pub use ra_project_model::{
8 ProjectWorkspace, CargoWorkspace, Package, Target, TargetKind, Sysroot, 8 ProjectWorkspace, CargoWorkspace, Package, Target, TargetKind, Sysroot,
9}; 9};
10 10
11pub fn workspace_loader() -> (Worker<PathBuf, Result<ProjectWorkspace>>, WorkerHandle) { 11pub fn workspace_loader() -> Worker<PathBuf, Result<ProjectWorkspace>> {
12 thread_worker::spawn::<PathBuf, Result<ProjectWorkspace>, _>( 12 Worker::<PathBuf, Result<ProjectWorkspace>>::spawn(
13 "workspace loader", 13 "workspace loader",
14 1, 14 1,
15 |input_receiver, output_sender| { 15 |input_receiver, output_sender| {
diff --git a/crates/ra_lsp_server/tests/heavy_tests/support.rs b/crates/ra_lsp_server/tests/heavy_tests/support.rs
index eee85f8c8..11f94b4ab 100644
--- a/crates/ra_lsp_server/tests/heavy_tests/support.rs
+++ b/crates/ra_lsp_server/tests/heavy_tests/support.rs
@@ -17,7 +17,7 @@ use lsp_types::{
17use serde::Serialize; 17use serde::Serialize;
18use serde_json::{to_string_pretty, Value}; 18use serde_json::{to_string_pretty, Value};
19use tempfile::TempDir; 19use tempfile::TempDir;
20use thread_worker::{WorkerHandle, Worker}; 20use thread_worker::Worker;
21use test_utils::{parse_fixture, find_mismatch}; 21use test_utils::{parse_fixture, find_mismatch};
22 22
23use ra_lsp_server::{ 23use ra_lsp_server::{
@@ -45,13 +45,12 @@ pub struct Server {
45 messages: RefCell<Vec<RawMessage>>, 45 messages: RefCell<Vec<RawMessage>>,
46 dir: TempDir, 46 dir: TempDir,
47 worker: Option<Worker<RawMessage, RawMessage>>, 47 worker: Option<Worker<RawMessage, RawMessage>>,
48 watcher: Option<WorkerHandle>,
49} 48}
50 49
51impl Server { 50impl Server {
52 fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server { 51 fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server {
53 let path = dir.path().to_path_buf(); 52 let path = dir.path().to_path_buf();
54 let (worker, watcher) = thread_worker::spawn::<RawMessage, RawMessage, _>( 53 let worker = Worker::<RawMessage, RawMessage>::spawn(
55 "test server", 54 "test server",
56 128, 55 128,
57 move |mut msg_receiver, mut msg_sender| { 56 move |mut msg_receiver, mut msg_sender| {
@@ -63,7 +62,6 @@ impl Server {
63 dir, 62 dir,
64 messages: Default::default(), 63 messages: Default::default(),
65 worker: Some(worker), 64 worker: Some(worker),
66 watcher: Some(watcher),
67 }; 65 };
68 66
69 for (path, text) in files { 67 for (path, text) in files {
@@ -117,7 +115,7 @@ impl Server {
117 } 115 }
118 fn send_request_(&self, r: RawRequest) -> Value { 116 fn send_request_(&self, r: RawRequest) -> Value {
119 let id = r.id; 117 let id = r.id;
120 self.worker.as_ref().unwrap().send(RawMessage::Request(r)).unwrap(); 118 self.worker.as_ref().unwrap().sender().send(RawMessage::Request(r)).unwrap();
121 while let Some(msg) = self.recv() { 119 while let Some(msg) = self.recv() {
122 match msg { 120 match msg {
123 RawMessage::Request(req) => panic!("unexpected request: {:?}", req), 121 RawMessage::Request(req) => panic!("unexpected request: {:?}", req),
@@ -157,24 +155,19 @@ impl Server {
157 } 155 }
158 } 156 }
159 fn recv(&self) -> Option<RawMessage> { 157 fn recv(&self) -> Option<RawMessage> {
160 recv_timeout(&self.worker.as_ref().unwrap().out).map(|msg| { 158 recv_timeout(&self.worker.as_ref().unwrap().receiver()).map(|msg| {
161 self.messages.borrow_mut().push(msg.clone()); 159 self.messages.borrow_mut().push(msg.clone());
162 msg 160 msg
163 }) 161 })
164 } 162 }
165 fn send_notification(&self, not: RawNotification) { 163 fn send_notification(&self, not: RawNotification) {
166 self.worker.as_ref().unwrap().send(RawMessage::Notification(not)).unwrap(); 164 self.worker.as_ref().unwrap().sender().send(RawMessage::Notification(not)).unwrap();
167 } 165 }
168} 166}
169 167
170impl Drop for Server { 168impl Drop for Server {
171 fn drop(&mut self) { 169 fn drop(&mut self) {
172 self.send_request::<Shutdown>(()); 170 self.send_request::<Shutdown>(());
173 let receiver = self.worker.take().unwrap().shutdown();
174 while let Some(msg) = recv_timeout(&receiver) {
175 drop(msg);
176 }
177 self.watcher.take().unwrap().shutdown().unwrap();
178 } 171 }
179} 172}
180 173
diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs
index 3952b200b..f64b4c532 100644
--- a/crates/ra_vfs/src/io.rs
+++ b/crates/ra_vfs/src/io.rs
@@ -1,13 +1,11 @@
1use std::{ 1use std::{
2 fs, 2 fs,
3 thread,
4 path::{Path, PathBuf}, 3 path::{Path, PathBuf},
5 sync::{mpsc, Arc}, 4 sync::{mpsc, Arc},
6 time::Duration, 5 time::Duration,
7}; 6};
8use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select}; 7use crossbeam_channel::{Sender, unbounded, RecvError, select};
9use relative_path::RelativePathBuf; 8use relative_path::RelativePathBuf;
10use thread_worker::WorkerHandle;
11use walkdir::WalkDir; 9use walkdir::WalkDir;
12use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; 10use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher};
13 11
@@ -48,37 +46,42 @@ enum ChangeKind {
48 46
49const WATCHER_DELAY: Duration = Duration::from_millis(250); 47const WATCHER_DELAY: Duration = Duration::from_millis(250);
50 48
51pub(crate) struct Worker { 49pub(crate) type Worker = thread_worker::Worker<Task, TaskResult>;
52 worker: thread_worker::Worker<Task, TaskResult>, 50pub(crate) fn start(roots: Arc<Roots>) -> Worker {
53 worker_handle: WorkerHandle, 51 // This is a pretty elaborate setup of threads & channels! It is
54} 52 // explained by the following concerns:
55 53 // * we need to burn a thread translating from notify's mpsc to
56impl Worker { 54 // crossbeam_channel.
57 pub(crate) fn start(roots: Arc<Roots>) -> Worker { 55 // * we want to read all files from a single thread, to guarantee that
58 // This is a pretty elaborate setup of threads & channels! It is 56 // we always get fresher versions and never go back in time.
59 // explained by the following concerns: 57 // * we want to tear down everything neatly during shutdown.
60 // * we need to burn a thread translating from notify's mpsc to 58 Worker::spawn(
61 // crossbeam_channel. 59 "vfs",
62 // * we want to read all files from a single thread, to guarantee that 60 128,
63 // we always get fresher versions and never go back in time. 61 // This are the channels we use to communicate with outside world.
64 // * we want to tear down everything neatly during shutdown. 62 // If `input_receiver` is closed we need to tear ourselves down.
65 let (worker, worker_handle) = thread_worker::spawn( 63 // `output_sender` should not be closed unless the parent died.
66 "vfs", 64 move |input_receiver, output_sender| {
67 128, 65 // Make sure that the destruction order is
68 // This are the channels we use to communicate with outside world. 66 //
69 // If `input_receiver` is closed we need to tear ourselves down. 67 // * notify_sender
70 // `output_sender` should not be closed unless the parent died. 68 // * _thread
71 move |input_receiver, output_sender| { 69 // * watcher_sender
70 //
71 // this is required to avoid deadlocks.
72
73 // These are the corresponding crossbeam channels
74 let (watcher_sender, watcher_receiver) = unbounded();
75 let _thread;
76 {
72 // These are `std` channels notify will send events to 77 // These are `std` channels notify will send events to
73 let (notify_sender, notify_receiver) = mpsc::channel(); 78 let (notify_sender, notify_receiver) = mpsc::channel();
74 // These are the corresponding crossbeam channels
75 let (watcher_sender, watcher_receiver) = unbounded();
76 79
77 let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) 80 let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY)
78 .map_err(|e| log::error!("failed to spawn notify {}", e)) 81 .map_err(|e| log::error!("failed to spawn notify {}", e))
79 .ok(); 82 .ok();
80 // Start a silly thread to transform between two channels 83 // Start a silly thread to transform between two channels
81 let thread = thread::spawn(move || { 84 _thread = thread_worker::ScopedThread::spawn("notify-convertor", move || {
82 notify_receiver 85 notify_receiver
83 .into_iter() 86 .into_iter()
84 .for_each(|event| convert_notify_event(event, &watcher_sender)) 87 .for_each(|event| convert_notify_event(event, &watcher_sender))
@@ -110,35 +113,11 @@ impl Worker {
110 }, 113 },
111 } 114 }
112 } 115 }
113 // Stopped the watcher 116 }
114 drop(watcher.take()); 117 // Drain pending events: we are not interested in them anyways!
115 // Drain pending events: we are not interested in them anyways! 118 watcher_receiver.into_iter().for_each(|_| ());
116 watcher_receiver.into_iter().for_each(|_| ()); 119 },
117 120 )
118 let res = thread.join();
119 match &res {
120 Ok(()) => log::info!("... Watcher terminated with ok"),
121 Err(_) => log::error!("... Watcher terminated with err"),
122 }
123 res.unwrap();
124 },
125 );
126
127 Worker { worker, worker_handle }
128 }
129
130 pub(crate) fn sender(&self) -> &Sender<Task> {
131 &self.worker.inp
132 }
133
134 pub(crate) fn receiver(&self) -> &Receiver<TaskResult> {
135 &self.worker.out
136 }
137
138 pub(crate) fn shutdown(self) -> thread::Result<()> {
139 let _ = self.worker.shutdown();
140 self.worker_handle.shutdown()
141 }
142} 121}
143 122
144fn watch_root( 123fn watch_root(
diff --git a/crates/ra_vfs/src/lib.rs b/crates/ra_vfs/src/lib.rs
index f07657db6..cfdc1275f 100644
--- a/crates/ra_vfs/src/lib.rs
+++ b/crates/ra_vfs/src/lib.rs
@@ -22,7 +22,6 @@ use std::{
22 fmt, fs, mem, 22 fmt, fs, mem,
23 path::{Path, PathBuf}, 23 path::{Path, PathBuf},
24 sync::Arc, 24 sync::Arc,
25 thread,
26}; 25};
27 26
28use crossbeam_channel::Receiver; 27use crossbeam_channel::Receiver;
@@ -160,7 +159,7 @@ impl fmt::Debug for Vfs {
160impl Vfs { 159impl Vfs {
161 pub fn new(roots: Vec<PathBuf>) -> (Vfs, Vec<VfsRoot>) { 160 pub fn new(roots: Vec<PathBuf>) -> (Vfs, Vec<VfsRoot>) {
162 let roots = Arc::new(Roots::new(roots)); 161 let roots = Arc::new(Roots::new(roots));
163 let worker = io::Worker::start(Arc::clone(&roots)); 162 let worker = io::start(Arc::clone(&roots));
164 let mut root2files = ArenaMap::default(); 163 let mut root2files = ArenaMap::default();
165 164
166 for (root, config) in roots.iter() { 165 for (root, config) in roots.iter() {
@@ -337,11 +336,6 @@ impl Vfs {
337 mem::replace(&mut self.pending_changes, Vec::new()) 336 mem::replace(&mut self.pending_changes, Vec::new())
338 } 337 }
339 338
340 /// Shutdown the VFS and terminate the background watching thread.
341 pub fn shutdown(self) -> thread::Result<()> {
342 self.worker.shutdown()
343 }
344
345 fn add_file( 339 fn add_file(
346 &mut self, 340 &mut self,
347 root: VfsRoot, 341 root: VfsRoot,
diff --git a/crates/ra_vfs/tests/vfs.rs b/crates/ra_vfs/tests/vfs.rs
index 0ed59bb19..b31e0f288 100644
--- a/crates/ra_vfs/tests/vfs.rs
+++ b/crates/ra_vfs/tests/vfs.rs
@@ -160,6 +160,5 @@ fn test_vfs_works() -> std::io::Result<()> {
160 Err(RecvTimeoutError::Timeout) 160 Err(RecvTimeoutError::Timeout)
161 ); 161 );
162 162
163 vfs.shutdown().unwrap();
164 Ok(()) 163 Ok(())
165} 164}
diff --git a/crates/thread_worker/Cargo.toml b/crates/thread_worker/Cargo.toml
index 363b4c3b8..a9857d59d 100644
--- a/crates/thread_worker/Cargo.toml
+++ b/crates/thread_worker/Cargo.toml
@@ -5,7 +5,6 @@ version = "0.1.0"
5authors = ["rust-analyzer developers"] 5authors = ["rust-analyzer developers"]
6 6
7[dependencies] 7[dependencies]
8drop_bomb = "0.1.0"
9crossbeam-channel = "0.3.5" 8crossbeam-channel = "0.3.5"
10log = "0.4.3" 9log = "0.4.3"
11 10
diff --git a/crates/thread_worker/src/lib.rs b/crates/thread_worker/src/lib.rs
index a522a0843..d67e44e38 100644
--- a/crates/thread_worker/src/lib.rs
+++ b/crates/thread_worker/src/lib.rs
@@ -2,74 +2,80 @@
2 2
3use std::thread; 3use std::thread;
4 4
5use crossbeam_channel::{bounded, unbounded, Receiver, Sender, RecvError, SendError}; 5use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
6use drop_bomb::DropBomb;
7 6
8pub struct Worker<I, O> { 7/// Like `std::thread::JoinHandle<()>`, but joins thread in drop automatically.
9 pub inp: Sender<I>, 8pub struct ScopedThread {
10 pub out: Receiver<O>, 9 // Option for drop
10 inner: Option<thread::JoinHandle<()>>,
11} 11}
12 12
13pub struct WorkerHandle { 13impl Drop for ScopedThread {
14 name: &'static str, 14 fn drop(&mut self) {
15 thread: thread::JoinHandle<()>, 15 let inner = self.inner.take().unwrap();
16 bomb: DropBomb, 16 let name = inner.thread().name().unwrap().to_string();
17} 17 log::info!("waiting for {} to finish...", name);
18 let res = inner.join();
19 log::info!(".. {} terminated with {}", name, if res.is_ok() { "ok" } else { "err" });
18 20
19pub fn spawn<I, O, F>(name: &'static str, buf: usize, f: F) -> (Worker<I, O>, WorkerHandle) 21 // escalate panic, but avoid aborting the process
20where 22 match res {
21 F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static, 23 Err(e) => {
22 I: Send + 'static, 24 if !thread::panicking() {
23 O: Send + 'static, 25 panic!(e)
24{ 26 }
25 let (worker, inp_r, out_s) = worker_chan(buf); 27 }
26 let watcher = WorkerHandle::spawn(name, move || f(inp_r, out_s)); 28 _ => (),
27 (worker, watcher) 29 }
28}
29
30impl<I, O> Worker<I, O> {
31 /// Stops the worker. Returns the message receiver to fetch results which
32 /// have become ready before the worker is stopped.
33 pub fn shutdown(self) -> Receiver<O> {
34 self.out
35 } 30 }
31}
36 32
37 pub fn send(&self, item: I) -> Result<(), SendError<I>> { 33impl ScopedThread {
38 self.inp.send(item) 34 pub fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ScopedThread {
39 } 35 let inner = thread::Builder::new().name(name.into()).spawn(f).unwrap();
40 pub fn recv(&self) -> Result<O, RecvError> { 36 ScopedThread { inner: Some(inner) }
41 self.out.recv()
42 } 37 }
43} 38}
44 39
45impl WorkerHandle { 40/// A wrapper around event-processing thread with automatic shutdown semantics.
46 fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> WorkerHandle { 41pub struct Worker<I, O> {
47 let thread = thread::spawn(f); 42 // XXX: field order is significant here.
48 WorkerHandle { 43 //
49 name, 44 // In Rust, fields are dropped in the declaration order, and we rely on this
50 thread, 45 // here. We must close input first, so that the `thread` (who holds the
51 bomb: DropBomb::new(format!("WorkerHandle {} was not shutdown", name)), 46 // opposite side of the channel) noticed shutdown. Then, we must join the
52 } 47 // thread, but we must keep out alive so that the thread does not panic.
53 } 48 //
49 // Note that a potential problem here is that we might drop some messages
50 // from receiver on the floor. This is ok for rust-analyzer: we have only a
51 // single client, so, if we are shutting down, nobody is interested in the
52 // unfinished work anyway!
53 sender: Sender<I>,
54 _thread: ScopedThread,
55 receiver: Receiver<O>,
56}
54 57
55 pub fn shutdown(mut self) -> thread::Result<()> { 58impl<I, O> Worker<I, O> {
56 log::info!("waiting for {} to finish ...", self.name); 59 pub fn spawn<F>(name: &'static str, buf: usize, f: F) -> Worker<I, O>
57 let name = self.name; 60 where
58 self.bomb.defuse(); 61 F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static,
59 let res = self.thread.join(); 62 I: Send + 'static,
60 match &res { 63 O: Send + 'static,
61 Ok(()) => log::info!("... {} terminated with ok", name), 64 {
62 Err(_) => log::error!("... {} terminated with err", name), 65 // Set up worker channels in a deadlock-avoiding way. If one sets both input
63 } 66 // and output buffers to a fixed size, a worker might get stuck.
64 res 67 let (sender, input_receiver) = bounded::<I>(buf);
68 let (output_sender, receiver) = unbounded::<O>();
69 let _thread = ScopedThread::spawn(name, move || f(input_receiver, output_sender));
70 Worker { sender, _thread, receiver }
65 } 71 }
66} 72}
67 73
68/// Sets up worker channels in a deadlock-avoiding way. 74impl<I, O> Worker<I, O> {
69/// If one sets both input and output buffers to a fixed size, 75 pub fn sender(&self) -> &Sender<I> {
70/// a worker might get stuck. 76 &self.sender
71fn worker_chan<I, O>(buf: usize) -> (Worker<I, O>, Receiver<I>, Sender<O>) { 77 }
72 let (input_sender, input_receiver) = bounded::<I>(buf); 78 pub fn receiver(&self) -> &Receiver<O> {
73 let (output_sender, output_receiver) = unbounded::<O>(); 79 &self.receiver
74 (Worker { inp: input_sender, out: output_receiver }, input_receiver, output_sender) 80 }
75} 81}