aboutsummaryrefslogtreecommitdiff
path: root/crates/ra_cargo_watch/src/lib.rs
diff options
context:
space:
mode:
authorbors[bot] <26634292+bors[bot]@users.noreply.github.com>2020-01-15 15:37:28 +0000
committerGitHub <[email protected]>2020-01-15 15:37:28 +0000
commitc0661ce7444223b0fff1f5d54adb41022ab788cb (patch)
tree24aa00d87f5a90d10d0ec37178331ec1c3f9428a /crates/ra_cargo_watch/src/lib.rs
parent80136367682ced76fb1a74403435c742685620c8 (diff)
parent7a8c6351bf50c1dcfb111be9f91da3c1f9cf2ec3 (diff)
Merge #2853
2853: Manage `cargo check` state updates in `main_loop` to reduce lock contention r=matklad a=kiljacken State is now updated exclusively from `main_loop` so several threads theoretically can't compete for the lock. Updates to the state are requested via the existing task channel. Also updates some naming to make slightly more sense. Based upon an idea/suggestion from @matklad on Zulip: > I think I've noticed at leas something suspicious! > > In WorldSnapshot, we store an Arc<RwLock<CheckWatcherSharedState>>. We read lock this lock in handle_diagnostics. > > Additionally, we .write this lock from the watcher thread in CheckWatcherState::run. > > I think in general this is less then ideal, b/c diagnostics request can be blocked on another thread. I think it makes sense to architect this in a way which does not block. > > For that, we stop sharing the state between ServerWorld and CheckWatcherState. Instead, the watcher thread sends new diagnostics via a channel, and we accomodate thouse diagnostics intot he server state in the main loop. > > So, instead of: > ```rust > struct Server { > diagnostics: Arc<Mutex<Vec<Diagnostics>>>, > } > > struct Watcher { > diagnostics: Arc<Mutex<Vec<Diagnostics>>>, > } > ``` > we'll have something like this: > ```rust > struct Server { > // this bit now *owns* diagnostics > diagnostisc: Vec<Diagnostics> > } > > struct Watcher { > diagnostics_sink: Sender<Vec<Diagnostics>>, > } > ``` > I am not sure this is the cuprit of slowness on widnows, but I think we should fix it, because it's very useful when all changes to the server's state can occur only via the main loop. > > Note how VFS is set up in a similar way: instead of modifing some global hash map with files, VFS sends a message to the main looop that hey, I have these new files for you. The main loop than incorporates the changes itself. > > Note that I think we'll still need some locks here, to share the state between ServerWorld and WorldSnapshot, but we won't actually be changing anyting mid-snapshot Co-authored-by: Emil Lauridsen <[email protected]>
Diffstat (limited to 'crates/ra_cargo_watch/src/lib.rs')
-rw-r--r--crates/ra_cargo_watch/src/lib.rs80
1 files changed, 38 insertions, 42 deletions
diff --git a/crates/ra_cargo_watch/src/lib.rs b/crates/ra_cargo_watch/src/lib.rs
index 20fa5a924..7f4c9280c 100644
--- a/crates/ra_cargo_watch/src/lib.rs
+++ b/crates/ra_cargo_watch/src/lib.rs
@@ -38,7 +38,7 @@ pub struct CheckOptions {
38#[derive(Debug)] 38#[derive(Debug)]
39pub struct CheckWatcher { 39pub struct CheckWatcher {
40 pub task_recv: Receiver<CheckTask>, 40 pub task_recv: Receiver<CheckTask>,
41 pub shared: Arc<RwLock<CheckWatcherSharedState>>, 41 pub state: Arc<RwLock<CheckState>>,
42 cmd_send: Option<Sender<CheckCommand>>, 42 cmd_send: Option<Sender<CheckCommand>>,
43 handle: Option<JoinHandle<()>>, 43 handle: Option<JoinHandle<()>>,
44} 44}
@@ -46,22 +46,21 @@ pub struct CheckWatcher {
46impl CheckWatcher { 46impl CheckWatcher {
47 pub fn new(options: &CheckOptions, workspace_root: PathBuf) -> CheckWatcher { 47 pub fn new(options: &CheckOptions, workspace_root: PathBuf) -> CheckWatcher {
48 let options = options.clone(); 48 let options = options.clone();
49 let shared = Arc::new(RwLock::new(CheckWatcherSharedState::new())); 49 let state = Arc::new(RwLock::new(CheckState::new()));
50 50
51 let (task_send, task_recv) = unbounded::<CheckTask>(); 51 let (task_send, task_recv) = unbounded::<CheckTask>();
52 let (cmd_send, cmd_recv) = unbounded::<CheckCommand>(); 52 let (cmd_send, cmd_recv) = unbounded::<CheckCommand>();
53 let shared_ = shared.clone();
54 let handle = std::thread::spawn(move || { 53 let handle = std::thread::spawn(move || {
55 let mut check = CheckWatcherState::new(options, workspace_root, shared_); 54 let mut check = CheckWatcherThread::new(options, workspace_root);
56 check.run(&task_send, &cmd_recv); 55 check.run(&task_send, &cmd_recv);
57 }); 56 });
58 CheckWatcher { task_recv, cmd_send: Some(cmd_send), handle: Some(handle), shared } 57 CheckWatcher { task_recv, cmd_send: Some(cmd_send), handle: Some(handle), state }
59 } 58 }
60 59
61 /// Returns a CheckWatcher that doesn't actually do anything 60 /// Returns a CheckWatcher that doesn't actually do anything
62 pub fn dummy() -> CheckWatcher { 61 pub fn dummy() -> CheckWatcher {
63 let shared = Arc::new(RwLock::new(CheckWatcherSharedState::new())); 62 let state = Arc::new(RwLock::new(CheckState::new()));
64 CheckWatcher { task_recv: never(), cmd_send: None, handle: None, shared } 63 CheckWatcher { task_recv: never(), cmd_send: None, handle: None, state }
65 } 64 }
66 65
67 /// Schedule a re-start of the cargo check worker. 66 /// Schedule a re-start of the cargo check worker.
@@ -89,14 +88,14 @@ impl std::ops::Drop for CheckWatcher {
89} 88}
90 89
91#[derive(Debug)] 90#[derive(Debug)]
92pub struct CheckWatcherSharedState { 91pub struct CheckState {
93 diagnostic_collection: HashMap<Url, Vec<Diagnostic>>, 92 diagnostic_collection: HashMap<Url, Vec<Diagnostic>>,
94 suggested_fix_collection: HashMap<Url, Vec<SuggestedFix>>, 93 suggested_fix_collection: HashMap<Url, Vec<SuggestedFix>>,
95} 94}
96 95
97impl CheckWatcherSharedState { 96impl CheckState {
98 fn new() -> CheckWatcherSharedState { 97 fn new() -> CheckState {
99 CheckWatcherSharedState { 98 CheckState {
100 diagnostic_collection: HashMap::new(), 99 diagnostic_collection: HashMap::new(),
101 suggested_fix_collection: HashMap::new(), 100 suggested_fix_collection: HashMap::new(),
102 } 101 }
@@ -104,15 +103,11 @@ impl CheckWatcherSharedState {
104 103
105 /// Clear the cached diagnostics, and schedule updating diagnostics by the 104 /// Clear the cached diagnostics, and schedule updating diagnostics by the
106 /// server, to clear stale results. 105 /// server, to clear stale results.
107 pub fn clear(&mut self, task_send: &Sender<CheckTask>) { 106 pub fn clear(&mut self) -> Vec<Url> {
108 let cleared_files: Vec<Url> = self.diagnostic_collection.keys().cloned().collect(); 107 let cleared_files: Vec<Url> = self.diagnostic_collection.keys().cloned().collect();
109
110 self.diagnostic_collection.clear(); 108 self.diagnostic_collection.clear();
111 self.suggested_fix_collection.clear(); 109 self.suggested_fix_collection.clear();
112 110 cleared_files
113 for uri in cleared_files {
114 task_send.send(CheckTask::Update(uri.clone())).unwrap();
115 }
116 } 111 }
117 112
118 pub fn diagnostics_for(&self, uri: &Url) -> Option<&[Diagnostic]> { 113 pub fn diagnostics_for(&self, uri: &Url) -> Option<&[Diagnostic]> {
@@ -123,6 +118,13 @@ impl CheckWatcherSharedState {
123 self.suggested_fix_collection.get(uri).map(|d| d.as_slice()) 118 self.suggested_fix_collection.get(uri).map(|d| d.as_slice())
124 } 119 }
125 120
121 pub fn add_diagnostic_with_fixes(&mut self, file_uri: Url, diagnostic: DiagnosticWithFixes) {
122 for fix in diagnostic.suggested_fixes {
123 self.add_suggested_fix_for_diagnostic(fix, &diagnostic.diagnostic);
124 }
125 self.add_diagnostic(file_uri, diagnostic.diagnostic);
126 }
127
126 fn add_diagnostic(&mut self, file_uri: Url, diagnostic: Diagnostic) { 128 fn add_diagnostic(&mut self, file_uri: Url, diagnostic: Diagnostic) {
127 let diagnostics = self.diagnostic_collection.entry(file_uri).or_default(); 129 let diagnostics = self.diagnostic_collection.entry(file_uri).or_default();
128 130
@@ -158,8 +160,11 @@ impl CheckWatcherSharedState {
158 160
159#[derive(Debug)] 161#[derive(Debug)]
160pub enum CheckTask { 162pub enum CheckTask {
161 /// Request a update of the given files diagnostics 163 /// Request a clearing of all cached diagnostics from the check watcher
162 Update(Url), 164 ClearDiagnostics,
165
166 /// Request adding a diagnostic with fixes included to a file
167 AddDiagnostic(Url, DiagnosticWithFixes),
163 168
164 /// Request check progress notification to client 169 /// Request check progress notification to client
165 Status(WorkDoneProgress), 170 Status(WorkDoneProgress),
@@ -170,26 +175,20 @@ pub enum CheckCommand {
170 Update, 175 Update,
171} 176}
172 177
173struct CheckWatcherState { 178struct CheckWatcherThread {
174 options: CheckOptions, 179 options: CheckOptions,
175 workspace_root: PathBuf, 180 workspace_root: PathBuf,
176 watcher: WatchThread, 181 watcher: WatchThread,
177 last_update_req: Option<Instant>, 182 last_update_req: Option<Instant>,
178 shared: Arc<RwLock<CheckWatcherSharedState>>,
179} 183}
180 184
181impl CheckWatcherState { 185impl CheckWatcherThread {
182 fn new( 186 fn new(options: CheckOptions, workspace_root: PathBuf) -> CheckWatcherThread {
183 options: CheckOptions, 187 CheckWatcherThread {
184 workspace_root: PathBuf,
185 shared: Arc<RwLock<CheckWatcherSharedState>>,
186 ) -> CheckWatcherState {
187 CheckWatcherState {
188 options, 188 options,
189 workspace_root, 189 workspace_root,
190 watcher: WatchThread::dummy(), 190 watcher: WatchThread::dummy(),
191 last_update_req: None, 191 last_update_req: None,
192 shared,
193 } 192 }
194 } 193 }
195 194
@@ -215,7 +214,7 @@ impl CheckWatcherState {
215 214
216 if self.should_recheck() { 215 if self.should_recheck() {
217 self.last_update_req.take(); 216 self.last_update_req.take();
218 self.shared.write().clear(task_send); 217 task_send.send(CheckTask::ClearDiagnostics).unwrap();
219 218
220 // By replacing the watcher, we drop the previous one which 219 // By replacing the watcher, we drop the previous one which
221 // causes it to shut down automatically. 220 // causes it to shut down automatically.
@@ -240,7 +239,7 @@ impl CheckWatcherState {
240 } 239 }
241 } 240 }
242 241
243 fn handle_message(&mut self, msg: CheckEvent, task_send: &Sender<CheckTask>) { 242 fn handle_message(&self, msg: CheckEvent, task_send: &Sender<CheckTask>) {
244 match msg { 243 match msg {
245 CheckEvent::Begin => { 244 CheckEvent::Begin => {
246 task_send 245 task_send
@@ -279,18 +278,9 @@ impl CheckWatcherState {
279 }; 278 };
280 279
281 let MappedRustDiagnostic { location, diagnostic, suggested_fixes } = map_result; 280 let MappedRustDiagnostic { location, diagnostic, suggested_fixes } = map_result;
282 let file_uri = location.uri.clone();
283 281
284 if !suggested_fixes.is_empty() { 282 let diagnostic = DiagnosticWithFixes { diagnostic, suggested_fixes };
285 for suggested_fix in suggested_fixes { 283 task_send.send(CheckTask::AddDiagnostic(location.uri, diagnostic)).unwrap();
286 self.shared
287 .write()
288 .add_suggested_fix_for_diagnostic(suggested_fix, &diagnostic);
289 }
290 }
291 self.shared.write().add_diagnostic(file_uri, diagnostic);
292
293 task_send.send(CheckTask::Update(location.uri)).unwrap();
294 } 284 }
295 285
296 CheckEvent::Msg(Message::BuildScriptExecuted(_msg)) => {} 286 CheckEvent::Msg(Message::BuildScriptExecuted(_msg)) => {}
@@ -299,6 +289,12 @@ impl CheckWatcherState {
299 } 289 }
300} 290}
301 291
292#[derive(Debug)]
293pub struct DiagnosticWithFixes {
294 diagnostic: Diagnostic,
295 suggested_fixes: Vec<SuggestedFix>,
296}
297
302/// WatchThread exists to wrap around the communication needed to be able to 298/// WatchThread exists to wrap around the communication needed to be able to
303/// run `cargo check` without blocking. Currently the Rust standard library 299/// run `cargo check` without blocking. Currently the Rust standard library
304/// doesn't provide a way to read sub-process output without blocking, so we 300/// doesn't provide a way to read sub-process output without blocking, so we