From 478ba65f8da6ffd4a3fea09c6a4a1f0fb92e1c85 Mon Sep 17 00:00:00 2001 From: Emil Lauridsen Date: Wed, 15 Jan 2020 15:50:49 +0100 Subject: Manage check state updates in main_loop to reduce lock contention --- crates/ra_cargo_watch/src/lib.rs | 74 +++++++++++++++++------------------ crates/ra_lsp_server/src/main_loop.rs | 28 ++++++++++++- crates/ra_lsp_server/src/world.rs | 6 +-- 3 files changed, 64 insertions(+), 44 deletions(-) (limited to 'crates') diff --git a/crates/ra_cargo_watch/src/lib.rs b/crates/ra_cargo_watch/src/lib.rs index 20fa5a924..76fdc0031 100644 --- a/crates/ra_cargo_watch/src/lib.rs +++ b/crates/ra_cargo_watch/src/lib.rs @@ -38,7 +38,7 @@ pub struct CheckOptions { #[derive(Debug)] pub struct CheckWatcher { pub task_recv: Receiver, - pub shared: Arc>, + pub state: Arc>, cmd_send: Option>, handle: Option>, } @@ -46,22 +46,21 @@ pub struct CheckWatcher { impl CheckWatcher { pub fn new(options: &CheckOptions, workspace_root: PathBuf) -> CheckWatcher { let options = options.clone(); - let shared = Arc::new(RwLock::new(CheckWatcherSharedState::new())); + let state = Arc::new(RwLock::new(CheckState::new())); let (task_send, task_recv) = unbounded::(); let (cmd_send, cmd_recv) = unbounded::(); - let shared_ = shared.clone(); let handle = std::thread::spawn(move || { - let mut check = CheckWatcherState::new(options, workspace_root, shared_); + let mut check = CheckWatcherState::new(options, workspace_root); check.run(&task_send, &cmd_recv); }); - CheckWatcher { task_recv, cmd_send: Some(cmd_send), handle: Some(handle), shared } + CheckWatcher { task_recv, cmd_send: Some(cmd_send), handle: Some(handle), state } } /// Returns a CheckWatcher that doesn't actually do anything pub fn dummy() -> CheckWatcher { - let shared = Arc::new(RwLock::new(CheckWatcherSharedState::new())); - CheckWatcher { task_recv: never(), cmd_send: None, handle: None, shared } + let state = Arc::new(RwLock::new(CheckState::new())); + CheckWatcher { task_recv: never(), cmd_send: None, handle: None, state } } /// Schedule a re-start of the cargo check worker. @@ -89,14 +88,14 @@ impl std::ops::Drop for CheckWatcher { } #[derive(Debug)] -pub struct CheckWatcherSharedState { +pub struct CheckState { diagnostic_collection: HashMap>, suggested_fix_collection: HashMap>, } -impl CheckWatcherSharedState { - fn new() -> CheckWatcherSharedState { - CheckWatcherSharedState { +impl CheckState { + fn new() -> CheckState { + CheckState { diagnostic_collection: HashMap::new(), suggested_fix_collection: HashMap::new(), } @@ -104,15 +103,11 @@ impl CheckWatcherSharedState { /// Clear the cached diagnostics, and schedule updating diagnostics by the /// server, to clear stale results. - pub fn clear(&mut self, task_send: &Sender) { + pub fn clear(&mut self) -> Vec { let cleared_files: Vec = self.diagnostic_collection.keys().cloned().collect(); - self.diagnostic_collection.clear(); self.suggested_fix_collection.clear(); - - for uri in cleared_files { - task_send.send(CheckTask::Update(uri.clone())).unwrap(); - } + cleared_files } pub fn diagnostics_for(&self, uri: &Url) -> Option<&[Diagnostic]> { @@ -123,6 +118,13 @@ impl CheckWatcherSharedState { self.suggested_fix_collection.get(uri).map(|d| d.as_slice()) } + pub fn add_diagnostic_with_fixes(&mut self, file_uri: Url, diagnostic: DiagnosticWithFixes) { + for fix in diagnostic.suggested_fixes { + self.add_suggested_fix_for_diagnostic(fix, &diagnostic.diagnostic); + } + self.add_diagnostic(file_uri, diagnostic.diagnostic); + } + fn add_diagnostic(&mut self, file_uri: Url, diagnostic: Diagnostic) { let diagnostics = self.diagnostic_collection.entry(file_uri).or_default(); @@ -158,8 +160,11 @@ impl CheckWatcherSharedState { #[derive(Debug)] pub enum CheckTask { - /// Request a update of the given files diagnostics - Update(Url), + /// Request a clearing of all cached diagnostics from the check watcher + ClearDiagnostics, + + /// Request adding a diagnostic with fixes included to a file + AddDiagnostic(Url, DiagnosticWithFixes), /// Request check progress notification to client Status(WorkDoneProgress), @@ -175,21 +180,15 @@ struct CheckWatcherState { workspace_root: PathBuf, watcher: WatchThread, last_update_req: Option, - shared: Arc>, } impl CheckWatcherState { - fn new( - options: CheckOptions, - workspace_root: PathBuf, - shared: Arc>, - ) -> CheckWatcherState { + fn new(options: CheckOptions, workspace_root: PathBuf) -> CheckWatcherState { CheckWatcherState { options, workspace_root, watcher: WatchThread::dummy(), last_update_req: None, - shared, } } @@ -215,7 +214,7 @@ impl CheckWatcherState { if self.should_recheck() { self.last_update_req.take(); - self.shared.write().clear(task_send); + task_send.send(CheckTask::ClearDiagnostics).unwrap(); // By replacing the watcher, we drop the previous one which // causes it to shut down automatically. @@ -240,7 +239,7 @@ impl CheckWatcherState { } } - fn handle_message(&mut self, msg: CheckEvent, task_send: &Sender) { + fn handle_message(&self, msg: CheckEvent, task_send: &Sender) { match msg { CheckEvent::Begin => { task_send @@ -279,18 +278,9 @@ impl CheckWatcherState { }; let MappedRustDiagnostic { location, diagnostic, suggested_fixes } = map_result; - let file_uri = location.uri.clone(); - if !suggested_fixes.is_empty() { - for suggested_fix in suggested_fixes { - self.shared - .write() - .add_suggested_fix_for_diagnostic(suggested_fix, &diagnostic); - } - } - self.shared.write().add_diagnostic(file_uri, diagnostic); - - task_send.send(CheckTask::Update(location.uri)).unwrap(); + let diagnostic = DiagnosticWithFixes { diagnostic, suggested_fixes }; + task_send.send(CheckTask::AddDiagnostic(location.uri, diagnostic)).unwrap(); } CheckEvent::Msg(Message::BuildScriptExecuted(_msg)) => {} @@ -299,6 +289,12 @@ impl CheckWatcherState { } } +#[derive(Debug)] +pub struct DiagnosticWithFixes { + diagnostic: Diagnostic, + suggested_fixes: Vec, +} + /// WatchThread exists to wrap around the communication needed to be able to /// run `cargo check` without blocking. Currently the Rust standard library /// doesn't provide a way to read sub-process output without blocking, so we diff --git a/crates/ra_lsp_server/src/main_loop.rs b/crates/ra_lsp_server/src/main_loop.rs index 84012b99d..e087c4f7e 100644 --- a/crates/ra_lsp_server/src/main_loop.rs +++ b/crates/ra_lsp_server/src/main_loop.rs @@ -337,11 +337,34 @@ fn loop_turn( loop_state.in_flight_libraries -= 1; } Event::CheckWatcher(task) => match task { - CheckTask::Update(uri) => { + CheckTask::ClearDiagnostics => { + let cleared_files = world_state.check_watcher.state.write().clear(); + + // Send updated diagnostics for each cleared file + for url in cleared_files { + let path = url.to_file_path().map_err(|()| format!("invalid uri: {}", url))?; + if let Some(file_id) = world_state.vfs.read().path2file(&path) { + let params = handlers::publish_diagnostics( + &world_state.snapshot(), + FileId(file_id.0), + )?; + let not = notification_new::(params); + task_sender.send(Task::Notify(not)).unwrap(); + } + } + } + + CheckTask::AddDiagnostic(url, diagnostic) => { + world_state + .check_watcher + .state + .write() + .add_diagnostic_with_fixes(url.clone(), diagnostic); + // We manually send a diagnostic update when the watcher asks // us to, to avoid the issue of having to change the file to // receive updated diagnostics. - let path = uri.to_file_path().map_err(|()| format!("invalid uri: {}", uri))?; + let path = url.to_file_path().map_err(|()| format!("invalid uri: {}", url))?; if let Some(file_id) = world_state.vfs.read().path2file(&path) { let params = handlers::publish_diagnostics(&world_state.snapshot(), FileId(file_id.0))?; @@ -349,6 +372,7 @@ fn loop_turn( task_sender.send(Task::Notify(not)).unwrap(); } } + CheckTask::Status(progress) => { let params = req::ProgressParams { token: req::ProgressToken::String("rustAnalyzer/cargoWatcher".to_string()), diff --git a/crates/ra_lsp_server/src/world.rs b/crates/ra_lsp_server/src/world.rs index c0175c726..7a3030a51 100644 --- a/crates/ra_lsp_server/src/world.rs +++ b/crates/ra_lsp_server/src/world.rs @@ -13,7 +13,7 @@ use lsp_server::ErrorCode; use lsp_types::Url; use parking_lot::RwLock; use ra_cargo_watch::{ - url_from_path_with_drive_lowercasing, CheckOptions, CheckWatcher, CheckWatcherSharedState, + url_from_path_with_drive_lowercasing, CheckOptions, CheckState, CheckWatcher, }; use ra_ide::{ Analysis, AnalysisChange, AnalysisHost, CrateGraph, FeatureFlags, FileId, LibraryData, @@ -64,7 +64,7 @@ pub struct WorldSnapshot { pub analysis: Analysis, pub vfs: Arc>, pub latest_requests: Arc>, - pub check_watcher: Arc>, + pub check_watcher: Arc>, } impl WorldState { @@ -220,7 +220,7 @@ impl WorldState { analysis: self.analysis_host.analysis(), vfs: Arc::clone(&self.vfs), latest_requests: Arc::clone(&self.latest_requests), - check_watcher: self.check_watcher.shared.clone(), + check_watcher: self.check_watcher.state.clone(), } } -- cgit v1.2.3 From ade657cb668d843606f87601885927515e890954 Mon Sep 17 00:00:00 2001 From: Emil Lauridsen Date: Wed, 15 Jan 2020 15:53:08 +0100 Subject: Tweak naming slightly --- crates/ra_cargo_watch/src/lib.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'crates') diff --git a/crates/ra_cargo_watch/src/lib.rs b/crates/ra_cargo_watch/src/lib.rs index 76fdc0031..7f4c9280c 100644 --- a/crates/ra_cargo_watch/src/lib.rs +++ b/crates/ra_cargo_watch/src/lib.rs @@ -51,7 +51,7 @@ impl CheckWatcher { let (task_send, task_recv) = unbounded::(); let (cmd_send, cmd_recv) = unbounded::(); let handle = std::thread::spawn(move || { - let mut check = CheckWatcherState::new(options, workspace_root); + let mut check = CheckWatcherThread::new(options, workspace_root); check.run(&task_send, &cmd_recv); }); CheckWatcher { task_recv, cmd_send: Some(cmd_send), handle: Some(handle), state } @@ -175,16 +175,16 @@ pub enum CheckCommand { Update, } -struct CheckWatcherState { +struct CheckWatcherThread { options: CheckOptions, workspace_root: PathBuf, watcher: WatchThread, last_update_req: Option, } -impl CheckWatcherState { - fn new(options: CheckOptions, workspace_root: PathBuf) -> CheckWatcherState { - CheckWatcherState { +impl CheckWatcherThread { + fn new(options: CheckOptions, workspace_root: PathBuf) -> CheckWatcherThread { + CheckWatcherThread { options, workspace_root, watcher: WatchThread::dummy(), -- cgit v1.2.3 From 7a8c6351bf50c1dcfb111be9f91da3c1f9cf2ec3 Mon Sep 17 00:00:00 2001 From: Emil Lauridsen Date: Wed, 15 Jan 2020 16:33:58 +0100 Subject: Extract check task handling into function --- crates/ra_lsp_server/src/main_loop.rs | 103 ++++++++++++++++++---------------- 1 file changed, 56 insertions(+), 47 deletions(-) (limited to 'crates') diff --git a/crates/ra_lsp_server/src/main_loop.rs b/crates/ra_lsp_server/src/main_loop.rs index e087c4f7e..7822be2e2 100644 --- a/crates/ra_lsp_server/src/main_loop.rs +++ b/crates/ra_lsp_server/src/main_loop.rs @@ -9,7 +9,7 @@ use std::{error::Error, fmt, panic, path::PathBuf, sync::Arc, time::Instant}; use crossbeam_channel::{select, unbounded, RecvError, Sender}; use lsp_server::{Connection, ErrorCode, Message, Notification, Request, RequestId, Response}; -use lsp_types::{ClientCapabilities, NumberOrString}; +use lsp_types::{ClientCapabilities, NumberOrString, Url}; use ra_cargo_watch::{CheckOptions, CheckTask}; use ra_ide::{Canceled, FeatureFlags, FileId, LibraryData, SourceRootId}; use ra_prof::profile; @@ -336,52 +336,7 @@ fn loop_turn( world_state.maybe_collect_garbage(); loop_state.in_flight_libraries -= 1; } - Event::CheckWatcher(task) => match task { - CheckTask::ClearDiagnostics => { - let cleared_files = world_state.check_watcher.state.write().clear(); - - // Send updated diagnostics for each cleared file - for url in cleared_files { - let path = url.to_file_path().map_err(|()| format!("invalid uri: {}", url))?; - if let Some(file_id) = world_state.vfs.read().path2file(&path) { - let params = handlers::publish_diagnostics( - &world_state.snapshot(), - FileId(file_id.0), - )?; - let not = notification_new::(params); - task_sender.send(Task::Notify(not)).unwrap(); - } - } - } - - CheckTask::AddDiagnostic(url, diagnostic) => { - world_state - .check_watcher - .state - .write() - .add_diagnostic_with_fixes(url.clone(), diagnostic); - - // We manually send a diagnostic update when the watcher asks - // us to, to avoid the issue of having to change the file to - // receive updated diagnostics. - let path = url.to_file_path().map_err(|()| format!("invalid uri: {}", url))?; - if let Some(file_id) = world_state.vfs.read().path2file(&path) { - let params = - handlers::publish_diagnostics(&world_state.snapshot(), FileId(file_id.0))?; - let not = notification_new::(params); - task_sender.send(Task::Notify(not)).unwrap(); - } - } - - CheckTask::Status(progress) => { - let params = req::ProgressParams { - token: req::ProgressToken::String("rustAnalyzer/cargoWatcher".to_string()), - value: req::ProgressParamsValue::WorkDone(progress), - }; - let not = notification_new::(params); - task_sender.send(Task::Notify(not)).unwrap(); - } - }, + Event::CheckWatcher(task) => on_check_task(task, world_state, task_sender)?, Event::Msg(msg) => match msg { Message::Request(req) => on_request( world_state, @@ -629,6 +584,60 @@ fn on_notification( Ok(()) } +fn on_check_task( + task: CheckTask, + world_state: &WorldState, + task_sender: &Sender, +) -> Result<()> { + match task { + CheckTask::ClearDiagnostics => { + let cleared_files = world_state.check_watcher.state.write().clear(); + + // Send updated diagnostics for each cleared file + for url in cleared_files { + publish_diagnostics_for_url(&url, world_state, task_sender)?; + } + } + + CheckTask::AddDiagnostic(url, diagnostic) => { + world_state + .check_watcher + .state + .write() + .add_diagnostic_with_fixes(url.clone(), diagnostic); + + // We manually send a diagnostic update when the watcher asks + // us to, to avoid the issue of having to change the file to + // receive updated diagnostics. + publish_diagnostics_for_url(&url, world_state, task_sender)?; + } + + CheckTask::Status(progress) => { + let params = req::ProgressParams { + token: req::ProgressToken::String("rustAnalyzer/cargoWatcher".to_string()), + value: req::ProgressParamsValue::WorkDone(progress), + }; + let not = notification_new::(params); + task_sender.send(Task::Notify(not)).unwrap(); + } + } + Ok(()) +} + +fn publish_diagnostics_for_url( + url: &Url, + world_state: &WorldState, + task_sender: &Sender, +) -> Result<()> { + let path = url.to_file_path().map_err(|()| format!("invalid uri: {}", url))?; + if let Some(file_id) = world_state.vfs.read().path2file(&path) { + let params = handlers::publish_diagnostics(&world_state.snapshot(), FileId(file_id.0))?; + let not = notification_new::(params); + task_sender.send(Task::Notify(not)).unwrap(); + } + Ok(()) +} + struct PoolDispatcher<'a> { req: Option, pool: &'a ThreadPool, -- cgit v1.2.3