aboutsummaryrefslogtreecommitdiff
path: root/crates
diff options
context:
space:
mode:
authorbors[bot] <26634292+bors[bot]@users.noreply.github.com>2020-01-15 15:37:28 +0000
committerGitHub <[email protected]>2020-01-15 15:37:28 +0000
commitc0661ce7444223b0fff1f5d54adb41022ab788cb (patch)
tree24aa00d87f5a90d10d0ec37178331ec1c3f9428a /crates
parent80136367682ced76fb1a74403435c742685620c8 (diff)
parent7a8c6351bf50c1dcfb111be9f91da3c1f9cf2ec3 (diff)
Merge #2853
2853: Manage `cargo check` state updates in `main_loop` to reduce lock contention r=matklad a=kiljacken State is now updated exclusively from `main_loop` so several threads theoretically can't compete for the lock. Updates to the state are requested via the existing task channel. Also updates some naming to make slightly more sense. Based upon an idea/suggestion from @matklad on Zulip: > I think I've noticed at leas something suspicious! > > In WorldSnapshot, we store an Arc<RwLock<CheckWatcherSharedState>>. We read lock this lock in handle_diagnostics. > > Additionally, we .write this lock from the watcher thread in CheckWatcherState::run. > > I think in general this is less then ideal, b/c diagnostics request can be blocked on another thread. I think it makes sense to architect this in a way which does not block. > > For that, we stop sharing the state between ServerWorld and CheckWatcherState. Instead, the watcher thread sends new diagnostics via a channel, and we accomodate thouse diagnostics intot he server state in the main loop. > > So, instead of: > ```rust > struct Server { > diagnostics: Arc<Mutex<Vec<Diagnostics>>>, > } > > struct Watcher { > diagnostics: Arc<Mutex<Vec<Diagnostics>>>, > } > ``` > we'll have something like this: > ```rust > struct Server { > // this bit now *owns* diagnostics > diagnostisc: Vec<Diagnostics> > } > > struct Watcher { > diagnostics_sink: Sender<Vec<Diagnostics>>, > } > ``` > I am not sure this is the cuprit of slowness on widnows, but I think we should fix it, because it's very useful when all changes to the server's state can occur only via the main loop. > > Note how VFS is set up in a similar way: instead of modifing some global hash map with files, VFS sends a message to the main looop that hey, I have these new files for you. The main loop than incorporates the changes itself. > > Note that I think we'll still need some locks here, to share the state between ServerWorld and WorldSnapshot, but we won't actually be changing anyting mid-snapshot Co-authored-by: Emil Lauridsen <[email protected]>
Diffstat (limited to 'crates')
-rw-r--r--crates/ra_cargo_watch/src/lib.rs80
-rw-r--r--crates/ra_lsp_server/src/main_loop.rs79
-rw-r--r--crates/ra_lsp_server/src/world.rs6
3 files changed, 97 insertions, 68 deletions
diff --git a/crates/ra_cargo_watch/src/lib.rs b/crates/ra_cargo_watch/src/lib.rs
index 20fa5a924..7f4c9280c 100644
--- a/crates/ra_cargo_watch/src/lib.rs
+++ b/crates/ra_cargo_watch/src/lib.rs
@@ -38,7 +38,7 @@ pub struct CheckOptions {
38#[derive(Debug)] 38#[derive(Debug)]
39pub struct CheckWatcher { 39pub struct CheckWatcher {
40 pub task_recv: Receiver<CheckTask>, 40 pub task_recv: Receiver<CheckTask>,
41 pub shared: Arc<RwLock<CheckWatcherSharedState>>, 41 pub state: Arc<RwLock<CheckState>>,
42 cmd_send: Option<Sender<CheckCommand>>, 42 cmd_send: Option<Sender<CheckCommand>>,
43 handle: Option<JoinHandle<()>>, 43 handle: Option<JoinHandle<()>>,
44} 44}
@@ -46,22 +46,21 @@ pub struct CheckWatcher {
46impl CheckWatcher { 46impl CheckWatcher {
47 pub fn new(options: &CheckOptions, workspace_root: PathBuf) -> CheckWatcher { 47 pub fn new(options: &CheckOptions, workspace_root: PathBuf) -> CheckWatcher {
48 let options = options.clone(); 48 let options = options.clone();
49 let shared = Arc::new(RwLock::new(CheckWatcherSharedState::new())); 49 let state = Arc::new(RwLock::new(CheckState::new()));
50 50
51 let (task_send, task_recv) = unbounded::<CheckTask>(); 51 let (task_send, task_recv) = unbounded::<CheckTask>();
52 let (cmd_send, cmd_recv) = unbounded::<CheckCommand>(); 52 let (cmd_send, cmd_recv) = unbounded::<CheckCommand>();
53 let shared_ = shared.clone();
54 let handle = std::thread::spawn(move || { 53 let handle = std::thread::spawn(move || {
55 let mut check = CheckWatcherState::new(options, workspace_root, shared_); 54 let mut check = CheckWatcherThread::new(options, workspace_root);
56 check.run(&task_send, &cmd_recv); 55 check.run(&task_send, &cmd_recv);
57 }); 56 });
58 CheckWatcher { task_recv, cmd_send: Some(cmd_send), handle: Some(handle), shared } 57 CheckWatcher { task_recv, cmd_send: Some(cmd_send), handle: Some(handle), state }
59 } 58 }
60 59
61 /// Returns a CheckWatcher that doesn't actually do anything 60 /// Returns a CheckWatcher that doesn't actually do anything
62 pub fn dummy() -> CheckWatcher { 61 pub fn dummy() -> CheckWatcher {
63 let shared = Arc::new(RwLock::new(CheckWatcherSharedState::new())); 62 let state = Arc::new(RwLock::new(CheckState::new()));
64 CheckWatcher { task_recv: never(), cmd_send: None, handle: None, shared } 63 CheckWatcher { task_recv: never(), cmd_send: None, handle: None, state }
65 } 64 }
66 65
67 /// Schedule a re-start of the cargo check worker. 66 /// Schedule a re-start of the cargo check worker.
@@ -89,14 +88,14 @@ impl std::ops::Drop for CheckWatcher {
89} 88}
90 89
91#[derive(Debug)] 90#[derive(Debug)]
92pub struct CheckWatcherSharedState { 91pub struct CheckState {
93 diagnostic_collection: HashMap<Url, Vec<Diagnostic>>, 92 diagnostic_collection: HashMap<Url, Vec<Diagnostic>>,
94 suggested_fix_collection: HashMap<Url, Vec<SuggestedFix>>, 93 suggested_fix_collection: HashMap<Url, Vec<SuggestedFix>>,
95} 94}
96 95
97impl CheckWatcherSharedState { 96impl CheckState {
98 fn new() -> CheckWatcherSharedState { 97 fn new() -> CheckState {
99 CheckWatcherSharedState { 98 CheckState {
100 diagnostic_collection: HashMap::new(), 99 diagnostic_collection: HashMap::new(),
101 suggested_fix_collection: HashMap::new(), 100 suggested_fix_collection: HashMap::new(),
102 } 101 }
@@ -104,15 +103,11 @@ impl CheckWatcherSharedState {
104 103
105 /// Clear the cached diagnostics, and schedule updating diagnostics by the 104 /// Clear the cached diagnostics, and schedule updating diagnostics by the
106 /// server, to clear stale results. 105 /// server, to clear stale results.
107 pub fn clear(&mut self, task_send: &Sender<CheckTask>) { 106 pub fn clear(&mut self) -> Vec<Url> {
108 let cleared_files: Vec<Url> = self.diagnostic_collection.keys().cloned().collect(); 107 let cleared_files: Vec<Url> = self.diagnostic_collection.keys().cloned().collect();
109
110 self.diagnostic_collection.clear(); 108 self.diagnostic_collection.clear();
111 self.suggested_fix_collection.clear(); 109 self.suggested_fix_collection.clear();
112 110 cleared_files
113 for uri in cleared_files {
114 task_send.send(CheckTask::Update(uri.clone())).unwrap();
115 }
116 } 111 }
117 112
118 pub fn diagnostics_for(&self, uri: &Url) -> Option<&[Diagnostic]> { 113 pub fn diagnostics_for(&self, uri: &Url) -> Option<&[Diagnostic]> {
@@ -123,6 +118,13 @@ impl CheckWatcherSharedState {
123 self.suggested_fix_collection.get(uri).map(|d| d.as_slice()) 118 self.suggested_fix_collection.get(uri).map(|d| d.as_slice())
124 } 119 }
125 120
121 pub fn add_diagnostic_with_fixes(&mut self, file_uri: Url, diagnostic: DiagnosticWithFixes) {
122 for fix in diagnostic.suggested_fixes {
123 self.add_suggested_fix_for_diagnostic(fix, &diagnostic.diagnostic);
124 }
125 self.add_diagnostic(file_uri, diagnostic.diagnostic);
126 }
127
126 fn add_diagnostic(&mut self, file_uri: Url, diagnostic: Diagnostic) { 128 fn add_diagnostic(&mut self, file_uri: Url, diagnostic: Diagnostic) {
127 let diagnostics = self.diagnostic_collection.entry(file_uri).or_default(); 129 let diagnostics = self.diagnostic_collection.entry(file_uri).or_default();
128 130
@@ -158,8 +160,11 @@ impl CheckWatcherSharedState {
158 160
159#[derive(Debug)] 161#[derive(Debug)]
160pub enum CheckTask { 162pub enum CheckTask {
161 /// Request a update of the given files diagnostics 163 /// Request a clearing of all cached diagnostics from the check watcher
162 Update(Url), 164 ClearDiagnostics,
165
166 /// Request adding a diagnostic with fixes included to a file
167 AddDiagnostic(Url, DiagnosticWithFixes),
163 168
164 /// Request check progress notification to client 169 /// Request check progress notification to client
165 Status(WorkDoneProgress), 170 Status(WorkDoneProgress),
@@ -170,26 +175,20 @@ pub enum CheckCommand {
170 Update, 175 Update,
171} 176}
172 177
173struct CheckWatcherState { 178struct CheckWatcherThread {
174 options: CheckOptions, 179 options: CheckOptions,
175 workspace_root: PathBuf, 180 workspace_root: PathBuf,
176 watcher: WatchThread, 181 watcher: WatchThread,
177 last_update_req: Option<Instant>, 182 last_update_req: Option<Instant>,
178 shared: Arc<RwLock<CheckWatcherSharedState>>,
179} 183}
180 184
181impl CheckWatcherState { 185impl CheckWatcherThread {
182 fn new( 186 fn new(options: CheckOptions, workspace_root: PathBuf) -> CheckWatcherThread {
183 options: CheckOptions, 187 CheckWatcherThread {
184 workspace_root: PathBuf,
185 shared: Arc<RwLock<CheckWatcherSharedState>>,
186 ) -> CheckWatcherState {
187 CheckWatcherState {
188 options, 188 options,
189 workspace_root, 189 workspace_root,
190 watcher: WatchThread::dummy(), 190 watcher: WatchThread::dummy(),
191 last_update_req: None, 191 last_update_req: None,
192 shared,
193 } 192 }
194 } 193 }
195 194
@@ -215,7 +214,7 @@ impl CheckWatcherState {
215 214
216 if self.should_recheck() { 215 if self.should_recheck() {
217 self.last_update_req.take(); 216 self.last_update_req.take();
218 self.shared.write().clear(task_send); 217 task_send.send(CheckTask::ClearDiagnostics).unwrap();
219 218
220 // By replacing the watcher, we drop the previous one which 219 // By replacing the watcher, we drop the previous one which
221 // causes it to shut down automatically. 220 // causes it to shut down automatically.
@@ -240,7 +239,7 @@ impl CheckWatcherState {
240 } 239 }
241 } 240 }
242 241
243 fn handle_message(&mut self, msg: CheckEvent, task_send: &Sender<CheckTask>) { 242 fn handle_message(&self, msg: CheckEvent, task_send: &Sender<CheckTask>) {
244 match msg { 243 match msg {
245 CheckEvent::Begin => { 244 CheckEvent::Begin => {
246 task_send 245 task_send
@@ -279,18 +278,9 @@ impl CheckWatcherState {
279 }; 278 };
280 279
281 let MappedRustDiagnostic { location, diagnostic, suggested_fixes } = map_result; 280 let MappedRustDiagnostic { location, diagnostic, suggested_fixes } = map_result;
282 let file_uri = location.uri.clone();
283 281
284 if !suggested_fixes.is_empty() { 282 let diagnostic = DiagnosticWithFixes { diagnostic, suggested_fixes };
285 for suggested_fix in suggested_fixes { 283 task_send.send(CheckTask::AddDiagnostic(location.uri, diagnostic)).unwrap();
286 self.shared
287 .write()
288 .add_suggested_fix_for_diagnostic(suggested_fix, &diagnostic);
289 }
290 }
291 self.shared.write().add_diagnostic(file_uri, diagnostic);
292
293 task_send.send(CheckTask::Update(location.uri)).unwrap();
294 } 284 }
295 285
296 CheckEvent::Msg(Message::BuildScriptExecuted(_msg)) => {} 286 CheckEvent::Msg(Message::BuildScriptExecuted(_msg)) => {}
@@ -299,6 +289,12 @@ impl CheckWatcherState {
299 } 289 }
300} 290}
301 291
292#[derive(Debug)]
293pub struct DiagnosticWithFixes {
294 diagnostic: Diagnostic,
295 suggested_fixes: Vec<SuggestedFix>,
296}
297
302/// WatchThread exists to wrap around the communication needed to be able to 298/// WatchThread exists to wrap around the communication needed to be able to
303/// run `cargo check` without blocking. Currently the Rust standard library 299/// run `cargo check` without blocking. Currently the Rust standard library
304/// doesn't provide a way to read sub-process output without blocking, so we 300/// doesn't provide a way to read sub-process output without blocking, so we
diff --git a/crates/ra_lsp_server/src/main_loop.rs b/crates/ra_lsp_server/src/main_loop.rs
index 84012b99d..7822be2e2 100644
--- a/crates/ra_lsp_server/src/main_loop.rs
+++ b/crates/ra_lsp_server/src/main_loop.rs
@@ -9,7 +9,7 @@ use std::{error::Error, fmt, panic, path::PathBuf, sync::Arc, time::Instant};
9 9
10use crossbeam_channel::{select, unbounded, RecvError, Sender}; 10use crossbeam_channel::{select, unbounded, RecvError, Sender};
11use lsp_server::{Connection, ErrorCode, Message, Notification, Request, RequestId, Response}; 11use lsp_server::{Connection, ErrorCode, Message, Notification, Request, RequestId, Response};
12use lsp_types::{ClientCapabilities, NumberOrString}; 12use lsp_types::{ClientCapabilities, NumberOrString, Url};
13use ra_cargo_watch::{CheckOptions, CheckTask}; 13use ra_cargo_watch::{CheckOptions, CheckTask};
14use ra_ide::{Canceled, FeatureFlags, FileId, LibraryData, SourceRootId}; 14use ra_ide::{Canceled, FeatureFlags, FileId, LibraryData, SourceRootId};
15use ra_prof::profile; 15use ra_prof::profile;
@@ -336,28 +336,7 @@ fn loop_turn(
336 world_state.maybe_collect_garbage(); 336 world_state.maybe_collect_garbage();
337 loop_state.in_flight_libraries -= 1; 337 loop_state.in_flight_libraries -= 1;
338 } 338 }
339 Event::CheckWatcher(task) => match task { 339 Event::CheckWatcher(task) => on_check_task(task, world_state, task_sender)?,
340 CheckTask::Update(uri) => {
341 // We manually send a diagnostic update when the watcher asks
342 // us to, to avoid the issue of having to change the file to
343 // receive updated diagnostics.
344 let path = uri.to_file_path().map_err(|()| format!("invalid uri: {}", uri))?;
345 if let Some(file_id) = world_state.vfs.read().path2file(&path) {
346 let params =
347 handlers::publish_diagnostics(&world_state.snapshot(), FileId(file_id.0))?;
348 let not = notification_new::<req::PublishDiagnostics>(params);
349 task_sender.send(Task::Notify(not)).unwrap();
350 }
351 }
352 CheckTask::Status(progress) => {
353 let params = req::ProgressParams {
354 token: req::ProgressToken::String("rustAnalyzer/cargoWatcher".to_string()),
355 value: req::ProgressParamsValue::WorkDone(progress),
356 };
357 let not = notification_new::<req::Progress>(params);
358 task_sender.send(Task::Notify(not)).unwrap();
359 }
360 },
361 Event::Msg(msg) => match msg { 340 Event::Msg(msg) => match msg {
362 Message::Request(req) => on_request( 341 Message::Request(req) => on_request(
363 world_state, 342 world_state,
@@ -605,6 +584,60 @@ fn on_notification(
605 Ok(()) 584 Ok(())
606} 585}
607 586
587fn on_check_task(
588 task: CheckTask,
589 world_state: &WorldState,
590 task_sender: &Sender<Task>,
591) -> Result<()> {
592 match task {
593 CheckTask::ClearDiagnostics => {
594 let cleared_files = world_state.check_watcher.state.write().clear();
595
596 // Send updated diagnostics for each cleared file
597 for url in cleared_files {
598 publish_diagnostics_for_url(&url, world_state, task_sender)?;
599 }
600 }
601
602 CheckTask::AddDiagnostic(url, diagnostic) => {
603 world_state
604 .check_watcher
605 .state
606 .write()
607 .add_diagnostic_with_fixes(url.clone(), diagnostic);
608
609 // We manually send a diagnostic update when the watcher asks
610 // us to, to avoid the issue of having to change the file to
611 // receive updated diagnostics.
612 publish_diagnostics_for_url(&url, world_state, task_sender)?;
613 }
614
615 CheckTask::Status(progress) => {
616 let params = req::ProgressParams {
617 token: req::ProgressToken::String("rustAnalyzer/cargoWatcher".to_string()),
618 value: req::ProgressParamsValue::WorkDone(progress),
619 };
620 let not = notification_new::<req::Progress>(params);
621 task_sender.send(Task::Notify(not)).unwrap();
622 }
623 }
624 Ok(())
625}
626
627fn publish_diagnostics_for_url(
628 url: &Url,
629 world_state: &WorldState,
630 task_sender: &Sender<Task>,
631) -> Result<()> {
632 let path = url.to_file_path().map_err(|()| format!("invalid uri: {}", url))?;
633 if let Some(file_id) = world_state.vfs.read().path2file(&path) {
634 let params = handlers::publish_diagnostics(&world_state.snapshot(), FileId(file_id.0))?;
635 let not = notification_new::<req::PublishDiagnostics>(params);
636 task_sender.send(Task::Notify(not)).unwrap();
637 }
638 Ok(())
639}
640
608struct PoolDispatcher<'a> { 641struct PoolDispatcher<'a> {
609 req: Option<Request>, 642 req: Option<Request>,
610 pool: &'a ThreadPool, 643 pool: &'a ThreadPool,
diff --git a/crates/ra_lsp_server/src/world.rs b/crates/ra_lsp_server/src/world.rs
index c0175c726..7a3030a51 100644
--- a/crates/ra_lsp_server/src/world.rs
+++ b/crates/ra_lsp_server/src/world.rs
@@ -13,7 +13,7 @@ use lsp_server::ErrorCode;
13use lsp_types::Url; 13use lsp_types::Url;
14use parking_lot::RwLock; 14use parking_lot::RwLock;
15use ra_cargo_watch::{ 15use ra_cargo_watch::{
16 url_from_path_with_drive_lowercasing, CheckOptions, CheckWatcher, CheckWatcherSharedState, 16 url_from_path_with_drive_lowercasing, CheckOptions, CheckState, CheckWatcher,
17}; 17};
18use ra_ide::{ 18use ra_ide::{
19 Analysis, AnalysisChange, AnalysisHost, CrateGraph, FeatureFlags, FileId, LibraryData, 19 Analysis, AnalysisChange, AnalysisHost, CrateGraph, FeatureFlags, FileId, LibraryData,
@@ -64,7 +64,7 @@ pub struct WorldSnapshot {
64 pub analysis: Analysis, 64 pub analysis: Analysis,
65 pub vfs: Arc<RwLock<Vfs>>, 65 pub vfs: Arc<RwLock<Vfs>>,
66 pub latest_requests: Arc<RwLock<LatestRequests>>, 66 pub latest_requests: Arc<RwLock<LatestRequests>>,
67 pub check_watcher: Arc<RwLock<CheckWatcherSharedState>>, 67 pub check_watcher: Arc<RwLock<CheckState>>,
68} 68}
69 69
70impl WorldState { 70impl WorldState {
@@ -220,7 +220,7 @@ impl WorldState {
220 analysis: self.analysis_host.analysis(), 220 analysis: self.analysis_host.analysis(),
221 vfs: Arc::clone(&self.vfs), 221 vfs: Arc::clone(&self.vfs),
222 latest_requests: Arc::clone(&self.latest_requests), 222 latest_requests: Arc::clone(&self.latest_requests),
223 check_watcher: self.check_watcher.shared.clone(), 223 check_watcher: self.check_watcher.state.clone(),
224 } 224 }
225 } 225 }
226 226