diff options
author | bors[bot] <26634292+bors[bot]@users.noreply.github.com> | 2020-01-15 15:37:28 +0000 |
---|---|---|
committer | GitHub <[email protected]> | 2020-01-15 15:37:28 +0000 |
commit | c0661ce7444223b0fff1f5d54adb41022ab788cb (patch) | |
tree | 24aa00d87f5a90d10d0ec37178331ec1c3f9428a /crates/ra_cargo_watch/src/lib.rs | |
parent | 80136367682ced76fb1a74403435c742685620c8 (diff) | |
parent | 7a8c6351bf50c1dcfb111be9f91da3c1f9cf2ec3 (diff) |
Merge #2853
2853: Manage `cargo check` state updates in `main_loop` to reduce lock contention r=matklad a=kiljacken
State is now updated exclusively from `main_loop` so several threads theoretically can't compete for the lock. Updates to the state are requested via the existing task channel.
Also updates some naming to make slightly more sense.
Based upon an idea/suggestion from @matklad on Zulip:
> I think I've noticed at leas something suspicious!
>
> In WorldSnapshot, we store an Arc<RwLock<CheckWatcherSharedState>>. We read lock this lock in handle_diagnostics.
>
> Additionally, we .write this lock from the watcher thread in CheckWatcherState::run.
>
> I think in general this is less then ideal, b/c diagnostics request can be blocked on another thread. I think it makes sense to architect this in a way which does not block.
>
> For that, we stop sharing the state between ServerWorld and CheckWatcherState. Instead, the watcher thread sends new diagnostics via a channel, and we accomodate thouse diagnostics intot he server state in the main loop.
>
> So, instead of:
> ```rust
> struct Server {
> diagnostics: Arc<Mutex<Vec<Diagnostics>>>,
> }
>
> struct Watcher {
> diagnostics: Arc<Mutex<Vec<Diagnostics>>>,
> }
> ```
> we'll have something like this:
> ```rust
> struct Server {
> // this bit now *owns* diagnostics
> diagnostisc: Vec<Diagnostics>
> }
>
> struct Watcher {
> diagnostics_sink: Sender<Vec<Diagnostics>>,
> }
> ```
> I am not sure this is the cuprit of slowness on widnows, but I think we should fix it, because it's very useful when all changes to the server's state can occur only via the main loop.
>
> Note how VFS is set up in a similar way: instead of modifing some global hash map with files, VFS sends a message to the main looop that hey, I have these new files for you. The main loop than incorporates the changes itself.
>
> Note that I think we'll still need some locks here, to share the state between ServerWorld and WorldSnapshot, but we won't actually be changing anyting mid-snapshot
Co-authored-by: Emil Lauridsen <[email protected]>
Diffstat (limited to 'crates/ra_cargo_watch/src/lib.rs')
-rw-r--r-- | crates/ra_cargo_watch/src/lib.rs | 80 |
1 files changed, 38 insertions, 42 deletions
diff --git a/crates/ra_cargo_watch/src/lib.rs b/crates/ra_cargo_watch/src/lib.rs index 20fa5a924..7f4c9280c 100644 --- a/crates/ra_cargo_watch/src/lib.rs +++ b/crates/ra_cargo_watch/src/lib.rs | |||
@@ -38,7 +38,7 @@ pub struct CheckOptions { | |||
38 | #[derive(Debug)] | 38 | #[derive(Debug)] |
39 | pub struct CheckWatcher { | 39 | pub struct CheckWatcher { |
40 | pub task_recv: Receiver<CheckTask>, | 40 | pub task_recv: Receiver<CheckTask>, |
41 | pub shared: Arc<RwLock<CheckWatcherSharedState>>, | 41 | pub state: Arc<RwLock<CheckState>>, |
42 | cmd_send: Option<Sender<CheckCommand>>, | 42 | cmd_send: Option<Sender<CheckCommand>>, |
43 | handle: Option<JoinHandle<()>>, | 43 | handle: Option<JoinHandle<()>>, |
44 | } | 44 | } |
@@ -46,22 +46,21 @@ pub struct CheckWatcher { | |||
46 | impl CheckWatcher { | 46 | impl CheckWatcher { |
47 | pub fn new(options: &CheckOptions, workspace_root: PathBuf) -> CheckWatcher { | 47 | pub fn new(options: &CheckOptions, workspace_root: PathBuf) -> CheckWatcher { |
48 | let options = options.clone(); | 48 | let options = options.clone(); |
49 | let shared = Arc::new(RwLock::new(CheckWatcherSharedState::new())); | 49 | let state = Arc::new(RwLock::new(CheckState::new())); |
50 | 50 | ||
51 | let (task_send, task_recv) = unbounded::<CheckTask>(); | 51 | let (task_send, task_recv) = unbounded::<CheckTask>(); |
52 | let (cmd_send, cmd_recv) = unbounded::<CheckCommand>(); | 52 | let (cmd_send, cmd_recv) = unbounded::<CheckCommand>(); |
53 | let shared_ = shared.clone(); | ||
54 | let handle = std::thread::spawn(move || { | 53 | let handle = std::thread::spawn(move || { |
55 | let mut check = CheckWatcherState::new(options, workspace_root, shared_); | 54 | let mut check = CheckWatcherThread::new(options, workspace_root); |
56 | check.run(&task_send, &cmd_recv); | 55 | check.run(&task_send, &cmd_recv); |
57 | }); | 56 | }); |
58 | CheckWatcher { task_recv, cmd_send: Some(cmd_send), handle: Some(handle), shared } | 57 | CheckWatcher { task_recv, cmd_send: Some(cmd_send), handle: Some(handle), state } |
59 | } | 58 | } |
60 | 59 | ||
61 | /// Returns a CheckWatcher that doesn't actually do anything | 60 | /// Returns a CheckWatcher that doesn't actually do anything |
62 | pub fn dummy() -> CheckWatcher { | 61 | pub fn dummy() -> CheckWatcher { |
63 | let shared = Arc::new(RwLock::new(CheckWatcherSharedState::new())); | 62 | let state = Arc::new(RwLock::new(CheckState::new())); |
64 | CheckWatcher { task_recv: never(), cmd_send: None, handle: None, shared } | 63 | CheckWatcher { task_recv: never(), cmd_send: None, handle: None, state } |
65 | } | 64 | } |
66 | 65 | ||
67 | /// Schedule a re-start of the cargo check worker. | 66 | /// Schedule a re-start of the cargo check worker. |
@@ -89,14 +88,14 @@ impl std::ops::Drop for CheckWatcher { | |||
89 | } | 88 | } |
90 | 89 | ||
91 | #[derive(Debug)] | 90 | #[derive(Debug)] |
92 | pub struct CheckWatcherSharedState { | 91 | pub struct CheckState { |
93 | diagnostic_collection: HashMap<Url, Vec<Diagnostic>>, | 92 | diagnostic_collection: HashMap<Url, Vec<Diagnostic>>, |
94 | suggested_fix_collection: HashMap<Url, Vec<SuggestedFix>>, | 93 | suggested_fix_collection: HashMap<Url, Vec<SuggestedFix>>, |
95 | } | 94 | } |
96 | 95 | ||
97 | impl CheckWatcherSharedState { | 96 | impl CheckState { |
98 | fn new() -> CheckWatcherSharedState { | 97 | fn new() -> CheckState { |
99 | CheckWatcherSharedState { | 98 | CheckState { |
100 | diagnostic_collection: HashMap::new(), | 99 | diagnostic_collection: HashMap::new(), |
101 | suggested_fix_collection: HashMap::new(), | 100 | suggested_fix_collection: HashMap::new(), |
102 | } | 101 | } |
@@ -104,15 +103,11 @@ impl CheckWatcherSharedState { | |||
104 | 103 | ||
105 | /// Clear the cached diagnostics, and schedule updating diagnostics by the | 104 | /// Clear the cached diagnostics, and schedule updating diagnostics by the |
106 | /// server, to clear stale results. | 105 | /// server, to clear stale results. |
107 | pub fn clear(&mut self, task_send: &Sender<CheckTask>) { | 106 | pub fn clear(&mut self) -> Vec<Url> { |
108 | let cleared_files: Vec<Url> = self.diagnostic_collection.keys().cloned().collect(); | 107 | let cleared_files: Vec<Url> = self.diagnostic_collection.keys().cloned().collect(); |
109 | |||
110 | self.diagnostic_collection.clear(); | 108 | self.diagnostic_collection.clear(); |
111 | self.suggested_fix_collection.clear(); | 109 | self.suggested_fix_collection.clear(); |
112 | 110 | cleared_files | |
113 | for uri in cleared_files { | ||
114 | task_send.send(CheckTask::Update(uri.clone())).unwrap(); | ||
115 | } | ||
116 | } | 111 | } |
117 | 112 | ||
118 | pub fn diagnostics_for(&self, uri: &Url) -> Option<&[Diagnostic]> { | 113 | pub fn diagnostics_for(&self, uri: &Url) -> Option<&[Diagnostic]> { |
@@ -123,6 +118,13 @@ impl CheckWatcherSharedState { | |||
123 | self.suggested_fix_collection.get(uri).map(|d| d.as_slice()) | 118 | self.suggested_fix_collection.get(uri).map(|d| d.as_slice()) |
124 | } | 119 | } |
125 | 120 | ||
121 | pub fn add_diagnostic_with_fixes(&mut self, file_uri: Url, diagnostic: DiagnosticWithFixes) { | ||
122 | for fix in diagnostic.suggested_fixes { | ||
123 | self.add_suggested_fix_for_diagnostic(fix, &diagnostic.diagnostic); | ||
124 | } | ||
125 | self.add_diagnostic(file_uri, diagnostic.diagnostic); | ||
126 | } | ||
127 | |||
126 | fn add_diagnostic(&mut self, file_uri: Url, diagnostic: Diagnostic) { | 128 | fn add_diagnostic(&mut self, file_uri: Url, diagnostic: Diagnostic) { |
127 | let diagnostics = self.diagnostic_collection.entry(file_uri).or_default(); | 129 | let diagnostics = self.diagnostic_collection.entry(file_uri).or_default(); |
128 | 130 | ||
@@ -158,8 +160,11 @@ impl CheckWatcherSharedState { | |||
158 | 160 | ||
159 | #[derive(Debug)] | 161 | #[derive(Debug)] |
160 | pub enum CheckTask { | 162 | pub enum CheckTask { |
161 | /// Request a update of the given files diagnostics | 163 | /// Request a clearing of all cached diagnostics from the check watcher |
162 | Update(Url), | 164 | ClearDiagnostics, |
165 | |||
166 | /// Request adding a diagnostic with fixes included to a file | ||
167 | AddDiagnostic(Url, DiagnosticWithFixes), | ||
163 | 168 | ||
164 | /// Request check progress notification to client | 169 | /// Request check progress notification to client |
165 | Status(WorkDoneProgress), | 170 | Status(WorkDoneProgress), |
@@ -170,26 +175,20 @@ pub enum CheckCommand { | |||
170 | Update, | 175 | Update, |
171 | } | 176 | } |
172 | 177 | ||
173 | struct CheckWatcherState { | 178 | struct CheckWatcherThread { |
174 | options: CheckOptions, | 179 | options: CheckOptions, |
175 | workspace_root: PathBuf, | 180 | workspace_root: PathBuf, |
176 | watcher: WatchThread, | 181 | watcher: WatchThread, |
177 | last_update_req: Option<Instant>, | 182 | last_update_req: Option<Instant>, |
178 | shared: Arc<RwLock<CheckWatcherSharedState>>, | ||
179 | } | 183 | } |
180 | 184 | ||
181 | impl CheckWatcherState { | 185 | impl CheckWatcherThread { |
182 | fn new( | 186 | fn new(options: CheckOptions, workspace_root: PathBuf) -> CheckWatcherThread { |
183 | options: CheckOptions, | 187 | CheckWatcherThread { |
184 | workspace_root: PathBuf, | ||
185 | shared: Arc<RwLock<CheckWatcherSharedState>>, | ||
186 | ) -> CheckWatcherState { | ||
187 | CheckWatcherState { | ||
188 | options, | 188 | options, |
189 | workspace_root, | 189 | workspace_root, |
190 | watcher: WatchThread::dummy(), | 190 | watcher: WatchThread::dummy(), |
191 | last_update_req: None, | 191 | last_update_req: None, |
192 | shared, | ||
193 | } | 192 | } |
194 | } | 193 | } |
195 | 194 | ||
@@ -215,7 +214,7 @@ impl CheckWatcherState { | |||
215 | 214 | ||
216 | if self.should_recheck() { | 215 | if self.should_recheck() { |
217 | self.last_update_req.take(); | 216 | self.last_update_req.take(); |
218 | self.shared.write().clear(task_send); | 217 | task_send.send(CheckTask::ClearDiagnostics).unwrap(); |
219 | 218 | ||
220 | // By replacing the watcher, we drop the previous one which | 219 | // By replacing the watcher, we drop the previous one which |
221 | // causes it to shut down automatically. | 220 | // causes it to shut down automatically. |
@@ -240,7 +239,7 @@ impl CheckWatcherState { | |||
240 | } | 239 | } |
241 | } | 240 | } |
242 | 241 | ||
243 | fn handle_message(&mut self, msg: CheckEvent, task_send: &Sender<CheckTask>) { | 242 | fn handle_message(&self, msg: CheckEvent, task_send: &Sender<CheckTask>) { |
244 | match msg { | 243 | match msg { |
245 | CheckEvent::Begin => { | 244 | CheckEvent::Begin => { |
246 | task_send | 245 | task_send |
@@ -279,18 +278,9 @@ impl CheckWatcherState { | |||
279 | }; | 278 | }; |
280 | 279 | ||
281 | let MappedRustDiagnostic { location, diagnostic, suggested_fixes } = map_result; | 280 | let MappedRustDiagnostic { location, diagnostic, suggested_fixes } = map_result; |
282 | let file_uri = location.uri.clone(); | ||
283 | 281 | ||
284 | if !suggested_fixes.is_empty() { | 282 | let diagnostic = DiagnosticWithFixes { diagnostic, suggested_fixes }; |
285 | for suggested_fix in suggested_fixes { | 283 | task_send.send(CheckTask::AddDiagnostic(location.uri, diagnostic)).unwrap(); |
286 | self.shared | ||
287 | .write() | ||
288 | .add_suggested_fix_for_diagnostic(suggested_fix, &diagnostic); | ||
289 | } | ||
290 | } | ||
291 | self.shared.write().add_diagnostic(file_uri, diagnostic); | ||
292 | |||
293 | task_send.send(CheckTask::Update(location.uri)).unwrap(); | ||
294 | } | 284 | } |
295 | 285 | ||
296 | CheckEvent::Msg(Message::BuildScriptExecuted(_msg)) => {} | 286 | CheckEvent::Msg(Message::BuildScriptExecuted(_msg)) => {} |
@@ -299,6 +289,12 @@ impl CheckWatcherState { | |||
299 | } | 289 | } |
300 | } | 290 | } |
301 | 291 | ||
292 | #[derive(Debug)] | ||
293 | pub struct DiagnosticWithFixes { | ||
294 | diagnostic: Diagnostic, | ||
295 | suggested_fixes: Vec<SuggestedFix>, | ||
296 | } | ||
297 | |||
302 | /// WatchThread exists to wrap around the communication needed to be able to | 298 | /// WatchThread exists to wrap around the communication needed to be able to |
303 | /// run `cargo check` without blocking. Currently the Rust standard library | 299 | /// run `cargo check` without blocking. Currently the Rust standard library |
304 | /// doesn't provide a way to read sub-process output without blocking, so we | 300 | /// doesn't provide a way to read sub-process output without blocking, so we |