diff options
Diffstat (limited to 'crates/ra_cargo_watch/src/lib.rs')
-rw-r--r-- | crates/ra_cargo_watch/src/lib.rs | 394 |
1 files changed, 394 insertions, 0 deletions
diff --git a/crates/ra_cargo_watch/src/lib.rs b/crates/ra_cargo_watch/src/lib.rs new file mode 100644 index 000000000..9bc0fd405 --- /dev/null +++ b/crates/ra_cargo_watch/src/lib.rs | |||
@@ -0,0 +1,394 @@ | |||
1 | //! cargo_check provides the functionality needed to run `cargo check` or | ||
2 | //! another compatible command (f.x. clippy) in a background thread and provide | ||
3 | //! LSP diagnostics based on the output of the command. | ||
4 | use cargo_metadata::Message; | ||
5 | use crossbeam_channel::{never, select, unbounded, Receiver, RecvError, Sender}; | ||
6 | use lsp_types::{ | ||
7 | Diagnostic, Url, WorkDoneProgress, WorkDoneProgressBegin, WorkDoneProgressEnd, | ||
8 | WorkDoneProgressReport, | ||
9 | }; | ||
10 | use parking_lot::RwLock; | ||
11 | use std::{ | ||
12 | collections::HashMap, | ||
13 | path::PathBuf, | ||
14 | process::{Command, Stdio}, | ||
15 | sync::Arc, | ||
16 | thread::JoinHandle, | ||
17 | time::Instant, | ||
18 | }; | ||
19 | |||
20 | mod conv; | ||
21 | |||
22 | use crate::conv::{map_rust_diagnostic_to_lsp, MappedRustDiagnostic, SuggestedFix}; | ||
23 | |||
24 | pub use crate::conv::url_from_path_with_drive_lowercasing; | ||
25 | |||
26 | #[derive(Clone, Debug)] | ||
27 | pub struct CheckOptions { | ||
28 | pub enable: bool, | ||
29 | pub args: Vec<String>, | ||
30 | pub command: String, | ||
31 | pub all_targets: bool, | ||
32 | } | ||
33 | |||
34 | /// CheckWatcher wraps the shared state and communication machinery used for | ||
35 | /// running `cargo check` (or other compatible command) and providing | ||
36 | /// diagnostics based on the output. | ||
37 | /// The spawned thread is shut down when this struct is dropped. | ||
38 | #[derive(Debug)] | ||
39 | pub struct CheckWatcher { | ||
40 | pub task_recv: Receiver<CheckTask>, | ||
41 | pub shared: Arc<RwLock<CheckWatcherSharedState>>, | ||
42 | cmd_send: Option<Sender<CheckCommand>>, | ||
43 | handle: Option<JoinHandle<()>>, | ||
44 | } | ||
45 | |||
46 | impl CheckWatcher { | ||
47 | pub fn new(options: &CheckOptions, workspace_root: PathBuf) -> CheckWatcher { | ||
48 | let options = options.clone(); | ||
49 | let shared = Arc::new(RwLock::new(CheckWatcherSharedState::new())); | ||
50 | |||
51 | let (task_send, task_recv) = unbounded::<CheckTask>(); | ||
52 | let (cmd_send, cmd_recv) = unbounded::<CheckCommand>(); | ||
53 | let shared_ = shared.clone(); | ||
54 | let handle = std::thread::spawn(move || { | ||
55 | let mut check = CheckWatcherState::new(options, workspace_root, shared_); | ||
56 | check.run(&task_send, &cmd_recv); | ||
57 | }); | ||
58 | CheckWatcher { task_recv, cmd_send: Some(cmd_send), handle: Some(handle), shared } | ||
59 | } | ||
60 | |||
61 | /// Schedule a re-start of the cargo check worker. | ||
62 | pub fn update(&self) { | ||
63 | if let Some(cmd_send) = &self.cmd_send { | ||
64 | cmd_send.send(CheckCommand::Update).unwrap(); | ||
65 | } | ||
66 | } | ||
67 | } | ||
68 | |||
69 | impl std::ops::Drop for CheckWatcher { | ||
70 | fn drop(&mut self) { | ||
71 | if let Some(handle) = self.handle.take() { | ||
72 | // Take the sender out of the option | ||
73 | let recv = self.cmd_send.take(); | ||
74 | |||
75 | // Dropping the sender finishes the thread loop | ||
76 | drop(recv); | ||
77 | |||
78 | // Join the thread, it should finish shortly. We don't really care | ||
79 | // whether it panicked, so it is safe to ignore the result | ||
80 | let _ = handle.join(); | ||
81 | } | ||
82 | } | ||
83 | } | ||
84 | |||
85 | #[derive(Debug)] | ||
86 | pub struct CheckWatcherSharedState { | ||
87 | diagnostic_collection: HashMap<Url, Vec<Diagnostic>>, | ||
88 | suggested_fix_collection: HashMap<Url, Vec<SuggestedFix>>, | ||
89 | } | ||
90 | |||
91 | impl CheckWatcherSharedState { | ||
92 | fn new() -> CheckWatcherSharedState { | ||
93 | CheckWatcherSharedState { | ||
94 | diagnostic_collection: HashMap::new(), | ||
95 | suggested_fix_collection: HashMap::new(), | ||
96 | } | ||
97 | } | ||
98 | |||
99 | /// Clear the cached diagnostics, and schedule updating diagnostics by the | ||
100 | /// server, to clear stale results. | ||
101 | pub fn clear(&mut self, task_send: &Sender<CheckTask>) { | ||
102 | let cleared_files: Vec<Url> = self.diagnostic_collection.keys().cloned().collect(); | ||
103 | |||
104 | self.diagnostic_collection.clear(); | ||
105 | self.suggested_fix_collection.clear(); | ||
106 | |||
107 | for uri in cleared_files { | ||
108 | task_send.send(CheckTask::Update(uri.clone())).unwrap(); | ||
109 | } | ||
110 | } | ||
111 | |||
112 | pub fn diagnostics_for(&self, uri: &Url) -> Option<&[Diagnostic]> { | ||
113 | self.diagnostic_collection.get(uri).map(|d| d.as_slice()) | ||
114 | } | ||
115 | |||
116 | pub fn fixes_for(&self, uri: &Url) -> Option<&[SuggestedFix]> { | ||
117 | self.suggested_fix_collection.get(uri).map(|d| d.as_slice()) | ||
118 | } | ||
119 | |||
120 | fn add_diagnostic(&mut self, file_uri: Url, diagnostic: Diagnostic) { | ||
121 | let diagnostics = self.diagnostic_collection.entry(file_uri).or_default(); | ||
122 | |||
123 | // If we're building multiple targets it's possible we've already seen this diagnostic | ||
124 | let is_duplicate = diagnostics.iter().any(|d| are_diagnostics_equal(d, &diagnostic)); | ||
125 | if is_duplicate { | ||
126 | return; | ||
127 | } | ||
128 | |||
129 | diagnostics.push(diagnostic); | ||
130 | } | ||
131 | |||
132 | fn add_suggested_fix_for_diagnostic( | ||
133 | &mut self, | ||
134 | mut suggested_fix: SuggestedFix, | ||
135 | diagnostic: &Diagnostic, | ||
136 | ) { | ||
137 | let file_uri = suggested_fix.location.uri.clone(); | ||
138 | let file_suggestions = self.suggested_fix_collection.entry(file_uri).or_default(); | ||
139 | |||
140 | let existing_suggestion: Option<&mut SuggestedFix> = | ||
141 | file_suggestions.iter_mut().find(|s| s == &&suggested_fix); | ||
142 | if let Some(existing_suggestion) = existing_suggestion { | ||
143 | // The existing suggestion also applies to this new diagnostic | ||
144 | existing_suggestion.diagnostics.push(diagnostic.clone()); | ||
145 | } else { | ||
146 | // We haven't seen this suggestion before | ||
147 | suggested_fix.diagnostics.push(diagnostic.clone()); | ||
148 | file_suggestions.push(suggested_fix); | ||
149 | } | ||
150 | } | ||
151 | } | ||
152 | |||
153 | #[derive(Debug)] | ||
154 | pub enum CheckTask { | ||
155 | /// Request a update of the given files diagnostics | ||
156 | Update(Url), | ||
157 | |||
158 | /// Request check progress notification to client | ||
159 | Status(WorkDoneProgress), | ||
160 | } | ||
161 | |||
162 | pub enum CheckCommand { | ||
163 | /// Request re-start of check thread | ||
164 | Update, | ||
165 | } | ||
166 | |||
167 | struct CheckWatcherState { | ||
168 | options: CheckOptions, | ||
169 | workspace_root: PathBuf, | ||
170 | watcher: WatchThread, | ||
171 | last_update_req: Option<Instant>, | ||
172 | shared: Arc<RwLock<CheckWatcherSharedState>>, | ||
173 | } | ||
174 | |||
175 | impl CheckWatcherState { | ||
176 | fn new( | ||
177 | options: CheckOptions, | ||
178 | workspace_root: PathBuf, | ||
179 | shared: Arc<RwLock<CheckWatcherSharedState>>, | ||
180 | ) -> CheckWatcherState { | ||
181 | let watcher = WatchThread::new(&options, &workspace_root); | ||
182 | CheckWatcherState { options, workspace_root, watcher, last_update_req: None, shared } | ||
183 | } | ||
184 | |||
185 | fn run(&mut self, task_send: &Sender<CheckTask>, cmd_recv: &Receiver<CheckCommand>) { | ||
186 | loop { | ||
187 | select! { | ||
188 | recv(&cmd_recv) -> cmd => match cmd { | ||
189 | Ok(cmd) => self.handle_command(cmd), | ||
190 | Err(RecvError) => { | ||
191 | // Command channel has closed, so shut down | ||
192 | break; | ||
193 | }, | ||
194 | }, | ||
195 | recv(self.watcher.message_recv) -> msg => match msg { | ||
196 | Ok(msg) => self.handle_message(msg, task_send), | ||
197 | Err(RecvError) => { | ||
198 | // Watcher finished, replace it with a never channel to | ||
199 | // avoid busy-waiting. | ||
200 | std::mem::replace(&mut self.watcher.message_recv, never()); | ||
201 | }, | ||
202 | } | ||
203 | }; | ||
204 | |||
205 | if self.should_recheck() { | ||
206 | self.last_update_req.take(); | ||
207 | self.shared.write().clear(task_send); | ||
208 | |||
209 | // By replacing the watcher, we drop the previous one which | ||
210 | // causes it to shut down automatically. | ||
211 | self.watcher = WatchThread::new(&self.options, &self.workspace_root); | ||
212 | } | ||
213 | } | ||
214 | } | ||
215 | |||
216 | fn should_recheck(&mut self) -> bool { | ||
217 | if let Some(_last_update_req) = &self.last_update_req { | ||
218 | // We currently only request an update on save, as we need up to | ||
219 | // date source on disk for cargo check to do it's magic, so we | ||
220 | // don't really need to debounce the requests at this point. | ||
221 | return true; | ||
222 | } | ||
223 | false | ||
224 | } | ||
225 | |||
226 | fn handle_command(&mut self, cmd: CheckCommand) { | ||
227 | match cmd { | ||
228 | CheckCommand::Update => self.last_update_req = Some(Instant::now()), | ||
229 | } | ||
230 | } | ||
231 | |||
232 | fn handle_message(&mut self, msg: CheckEvent, task_send: &Sender<CheckTask>) { | ||
233 | match msg { | ||
234 | CheckEvent::Begin => { | ||
235 | task_send | ||
236 | .send(CheckTask::Status(WorkDoneProgress::Begin(WorkDoneProgressBegin { | ||
237 | title: "Running 'cargo check'".to_string(), | ||
238 | cancellable: Some(false), | ||
239 | message: None, | ||
240 | percentage: None, | ||
241 | }))) | ||
242 | .unwrap(); | ||
243 | } | ||
244 | |||
245 | CheckEvent::End => { | ||
246 | task_send | ||
247 | .send(CheckTask::Status(WorkDoneProgress::End(WorkDoneProgressEnd { | ||
248 | message: None, | ||
249 | }))) | ||
250 | .unwrap(); | ||
251 | } | ||
252 | |||
253 | CheckEvent::Msg(Message::CompilerArtifact(msg)) => { | ||
254 | task_send | ||
255 | .send(CheckTask::Status(WorkDoneProgress::Report(WorkDoneProgressReport { | ||
256 | cancellable: Some(false), | ||
257 | message: Some(msg.target.name), | ||
258 | percentage: None, | ||
259 | }))) | ||
260 | .unwrap(); | ||
261 | } | ||
262 | |||
263 | CheckEvent::Msg(Message::CompilerMessage(msg)) => { | ||
264 | let map_result = | ||
265 | match map_rust_diagnostic_to_lsp(&msg.message, &self.workspace_root) { | ||
266 | Some(map_result) => map_result, | ||
267 | None => return, | ||
268 | }; | ||
269 | |||
270 | let MappedRustDiagnostic { location, diagnostic, suggested_fixes } = map_result; | ||
271 | let file_uri = location.uri.clone(); | ||
272 | |||
273 | if !suggested_fixes.is_empty() { | ||
274 | for suggested_fix in suggested_fixes { | ||
275 | self.shared | ||
276 | .write() | ||
277 | .add_suggested_fix_for_diagnostic(suggested_fix, &diagnostic); | ||
278 | } | ||
279 | } | ||
280 | self.shared.write().add_diagnostic(file_uri, diagnostic); | ||
281 | |||
282 | task_send.send(CheckTask::Update(location.uri)).unwrap(); | ||
283 | } | ||
284 | |||
285 | CheckEvent::Msg(Message::BuildScriptExecuted(_msg)) => {} | ||
286 | CheckEvent::Msg(Message::Unknown) => {} | ||
287 | } | ||
288 | } | ||
289 | } | ||
290 | |||
291 | /// WatchThread exists to wrap around the communication needed to be able to | ||
292 | /// run `cargo check` without blocking. Currently the Rust standard library | ||
293 | /// doesn't provide a way to read sub-process output without blocking, so we | ||
294 | /// have to wrap sub-processes output handling in a thread and pass messages | ||
295 | /// back over a channel. | ||
296 | /// The correct way to dispose of the thread is to drop it, on which the | ||
297 | /// sub-process will be killed, and the thread will be joined. | ||
298 | struct WatchThread { | ||
299 | handle: Option<JoinHandle<()>>, | ||
300 | message_recv: Receiver<CheckEvent>, | ||
301 | } | ||
302 | |||
303 | enum CheckEvent { | ||
304 | Begin, | ||
305 | Msg(cargo_metadata::Message), | ||
306 | End, | ||
307 | } | ||
308 | |||
309 | impl WatchThread { | ||
310 | fn new(options: &CheckOptions, workspace_root: &PathBuf) -> WatchThread { | ||
311 | let mut args: Vec<String> = vec![ | ||
312 | options.command.clone(), | ||
313 | "--message-format=json".to_string(), | ||
314 | "--manifest-path".to_string(), | ||
315 | format!("{}/Cargo.toml", workspace_root.to_string_lossy()), | ||
316 | ]; | ||
317 | if options.all_targets { | ||
318 | args.push("--all-targets".to_string()); | ||
319 | } | ||
320 | args.extend(options.args.iter().cloned()); | ||
321 | |||
322 | let (message_send, message_recv) = unbounded(); | ||
323 | let enabled = options.enable; | ||
324 | let handle = std::thread::spawn(move || { | ||
325 | if !enabled { | ||
326 | return; | ||
327 | } | ||
328 | |||
329 | let mut command = Command::new("cargo") | ||
330 | .args(&args) | ||
331 | .stdout(Stdio::piped()) | ||
332 | .stderr(Stdio::null()) | ||
333 | .spawn() | ||
334 | .expect("couldn't launch cargo"); | ||
335 | |||
336 | // If we trigger an error here, we will do so in the loop instead, | ||
337 | // which will break out of the loop, and continue the shutdown | ||
338 | let _ = message_send.send(CheckEvent::Begin); | ||
339 | |||
340 | for message in cargo_metadata::parse_messages(command.stdout.take().unwrap()) { | ||
341 | let message = match message { | ||
342 | Ok(message) => message, | ||
343 | Err(err) => { | ||
344 | log::error!("Invalid json from cargo check, ignoring: {}", err); | ||
345 | continue; | ||
346 | } | ||
347 | }; | ||
348 | |||
349 | match message_send.send(CheckEvent::Msg(message)) { | ||
350 | Ok(()) => {} | ||
351 | Err(_err) => { | ||
352 | // The send channel was closed, so we want to shutdown | ||
353 | break; | ||
354 | } | ||
355 | } | ||
356 | } | ||
357 | |||
358 | // We can ignore any error here, as we are already in the progress | ||
359 | // of shutting down. | ||
360 | let _ = message_send.send(CheckEvent::End); | ||
361 | |||
362 | // It is okay to ignore the result, as it only errors if the process is already dead | ||
363 | let _ = command.kill(); | ||
364 | |||
365 | // Again, we don't care about the exit status so just ignore the result | ||
366 | let _ = command.wait(); | ||
367 | }); | ||
368 | WatchThread { handle: Some(handle), message_recv } | ||
369 | } | ||
370 | } | ||
371 | |||
372 | impl std::ops::Drop for WatchThread { | ||
373 | fn drop(&mut self) { | ||
374 | if let Some(handle) = self.handle.take() { | ||
375 | // Replace our reciever with dummy one, so we can drop and close the | ||
376 | // one actually communicating with the thread | ||
377 | let recv = std::mem::replace(&mut self.message_recv, never()); | ||
378 | |||
379 | // Dropping the original reciever initiates thread sub-process shutdown | ||
380 | drop(recv); | ||
381 | |||
382 | // Join the thread, it should finish shortly. We don't really care | ||
383 | // whether it panicked, so it is safe to ignore the result | ||
384 | let _ = handle.join(); | ||
385 | } | ||
386 | } | ||
387 | } | ||
388 | |||
389 | fn are_diagnostics_equal(left: &Diagnostic, right: &Diagnostic) -> bool { | ||
390 | left.source == right.source | ||
391 | && left.severity == right.severity | ||
392 | && left.range == right.range | ||
393 | && left.message == right.message | ||
394 | } | ||