aboutsummaryrefslogtreecommitdiff
path: root/crates/ra_cargo_watch/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/ra_cargo_watch/src/lib.rs')
-rw-r--r--crates/ra_cargo_watch/src/lib.rs394
1 files changed, 394 insertions, 0 deletions
diff --git a/crates/ra_cargo_watch/src/lib.rs b/crates/ra_cargo_watch/src/lib.rs
new file mode 100644
index 000000000..9bc0fd405
--- /dev/null
+++ b/crates/ra_cargo_watch/src/lib.rs
@@ -0,0 +1,394 @@
1//! cargo_check provides the functionality needed to run `cargo check` or
2//! another compatible command (f.x. clippy) in a background thread and provide
3//! LSP diagnostics based on the output of the command.
4use cargo_metadata::Message;
5use crossbeam_channel::{never, select, unbounded, Receiver, RecvError, Sender};
6use lsp_types::{
7 Diagnostic, Url, WorkDoneProgress, WorkDoneProgressBegin, WorkDoneProgressEnd,
8 WorkDoneProgressReport,
9};
10use parking_lot::RwLock;
11use std::{
12 collections::HashMap,
13 path::PathBuf,
14 process::{Command, Stdio},
15 sync::Arc,
16 thread::JoinHandle,
17 time::Instant,
18};
19
20mod conv;
21
22use crate::conv::{map_rust_diagnostic_to_lsp, MappedRustDiagnostic, SuggestedFix};
23
24pub use crate::conv::url_from_path_with_drive_lowercasing;
25
26#[derive(Clone, Debug)]
27pub struct CheckOptions {
28 pub enable: bool,
29 pub args: Vec<String>,
30 pub command: String,
31 pub all_targets: bool,
32}
33
34/// CheckWatcher wraps the shared state and communication machinery used for
35/// running `cargo check` (or other compatible command) and providing
36/// diagnostics based on the output.
37/// The spawned thread is shut down when this struct is dropped.
38#[derive(Debug)]
39pub struct CheckWatcher {
40 pub task_recv: Receiver<CheckTask>,
41 pub shared: Arc<RwLock<CheckWatcherSharedState>>,
42 cmd_send: Option<Sender<CheckCommand>>,
43 handle: Option<JoinHandle<()>>,
44}
45
46impl CheckWatcher {
47 pub fn new(options: &CheckOptions, workspace_root: PathBuf) -> CheckWatcher {
48 let options = options.clone();
49 let shared = Arc::new(RwLock::new(CheckWatcherSharedState::new()));
50
51 let (task_send, task_recv) = unbounded::<CheckTask>();
52 let (cmd_send, cmd_recv) = unbounded::<CheckCommand>();
53 let shared_ = shared.clone();
54 let handle = std::thread::spawn(move || {
55 let mut check = CheckWatcherState::new(options, workspace_root, shared_);
56 check.run(&task_send, &cmd_recv);
57 });
58 CheckWatcher { task_recv, cmd_send: Some(cmd_send), handle: Some(handle), shared }
59 }
60
61 /// Schedule a re-start of the cargo check worker.
62 pub fn update(&self) {
63 if let Some(cmd_send) = &self.cmd_send {
64 cmd_send.send(CheckCommand::Update).unwrap();
65 }
66 }
67}
68
69impl std::ops::Drop for CheckWatcher {
70 fn drop(&mut self) {
71 if let Some(handle) = self.handle.take() {
72 // Take the sender out of the option
73 let recv = self.cmd_send.take();
74
75 // Dropping the sender finishes the thread loop
76 drop(recv);
77
78 // Join the thread, it should finish shortly. We don't really care
79 // whether it panicked, so it is safe to ignore the result
80 let _ = handle.join();
81 }
82 }
83}
84
85#[derive(Debug)]
86pub struct CheckWatcherSharedState {
87 diagnostic_collection: HashMap<Url, Vec<Diagnostic>>,
88 suggested_fix_collection: HashMap<Url, Vec<SuggestedFix>>,
89}
90
91impl CheckWatcherSharedState {
92 fn new() -> CheckWatcherSharedState {
93 CheckWatcherSharedState {
94 diagnostic_collection: HashMap::new(),
95 suggested_fix_collection: HashMap::new(),
96 }
97 }
98
99 /// Clear the cached diagnostics, and schedule updating diagnostics by the
100 /// server, to clear stale results.
101 pub fn clear(&mut self, task_send: &Sender<CheckTask>) {
102 let cleared_files: Vec<Url> = self.diagnostic_collection.keys().cloned().collect();
103
104 self.diagnostic_collection.clear();
105 self.suggested_fix_collection.clear();
106
107 for uri in cleared_files {
108 task_send.send(CheckTask::Update(uri.clone())).unwrap();
109 }
110 }
111
112 pub fn diagnostics_for(&self, uri: &Url) -> Option<&[Diagnostic]> {
113 self.diagnostic_collection.get(uri).map(|d| d.as_slice())
114 }
115
116 pub fn fixes_for(&self, uri: &Url) -> Option<&[SuggestedFix]> {
117 self.suggested_fix_collection.get(uri).map(|d| d.as_slice())
118 }
119
120 fn add_diagnostic(&mut self, file_uri: Url, diagnostic: Diagnostic) {
121 let diagnostics = self.diagnostic_collection.entry(file_uri).or_default();
122
123 // If we're building multiple targets it's possible we've already seen this diagnostic
124 let is_duplicate = diagnostics.iter().any(|d| are_diagnostics_equal(d, &diagnostic));
125 if is_duplicate {
126 return;
127 }
128
129 diagnostics.push(diagnostic);
130 }
131
132 fn add_suggested_fix_for_diagnostic(
133 &mut self,
134 mut suggested_fix: SuggestedFix,
135 diagnostic: &Diagnostic,
136 ) {
137 let file_uri = suggested_fix.location.uri.clone();
138 let file_suggestions = self.suggested_fix_collection.entry(file_uri).or_default();
139
140 let existing_suggestion: Option<&mut SuggestedFix> =
141 file_suggestions.iter_mut().find(|s| s == &&suggested_fix);
142 if let Some(existing_suggestion) = existing_suggestion {
143 // The existing suggestion also applies to this new diagnostic
144 existing_suggestion.diagnostics.push(diagnostic.clone());
145 } else {
146 // We haven't seen this suggestion before
147 suggested_fix.diagnostics.push(diagnostic.clone());
148 file_suggestions.push(suggested_fix);
149 }
150 }
151}
152
153#[derive(Debug)]
154pub enum CheckTask {
155 /// Request a update of the given files diagnostics
156 Update(Url),
157
158 /// Request check progress notification to client
159 Status(WorkDoneProgress),
160}
161
162pub enum CheckCommand {
163 /// Request re-start of check thread
164 Update,
165}
166
167struct CheckWatcherState {
168 options: CheckOptions,
169 workspace_root: PathBuf,
170 watcher: WatchThread,
171 last_update_req: Option<Instant>,
172 shared: Arc<RwLock<CheckWatcherSharedState>>,
173}
174
175impl CheckWatcherState {
176 fn new(
177 options: CheckOptions,
178 workspace_root: PathBuf,
179 shared: Arc<RwLock<CheckWatcherSharedState>>,
180 ) -> CheckWatcherState {
181 let watcher = WatchThread::new(&options, &workspace_root);
182 CheckWatcherState { options, workspace_root, watcher, last_update_req: None, shared }
183 }
184
185 fn run(&mut self, task_send: &Sender<CheckTask>, cmd_recv: &Receiver<CheckCommand>) {
186 loop {
187 select! {
188 recv(&cmd_recv) -> cmd => match cmd {
189 Ok(cmd) => self.handle_command(cmd),
190 Err(RecvError) => {
191 // Command channel has closed, so shut down
192 break;
193 },
194 },
195 recv(self.watcher.message_recv) -> msg => match msg {
196 Ok(msg) => self.handle_message(msg, task_send),
197 Err(RecvError) => {
198 // Watcher finished, replace it with a never channel to
199 // avoid busy-waiting.
200 std::mem::replace(&mut self.watcher.message_recv, never());
201 },
202 }
203 };
204
205 if self.should_recheck() {
206 self.last_update_req.take();
207 self.shared.write().clear(task_send);
208
209 // By replacing the watcher, we drop the previous one which
210 // causes it to shut down automatically.
211 self.watcher = WatchThread::new(&self.options, &self.workspace_root);
212 }
213 }
214 }
215
216 fn should_recheck(&mut self) -> bool {
217 if let Some(_last_update_req) = &self.last_update_req {
218 // We currently only request an update on save, as we need up to
219 // date source on disk for cargo check to do it's magic, so we
220 // don't really need to debounce the requests at this point.
221 return true;
222 }
223 false
224 }
225
226 fn handle_command(&mut self, cmd: CheckCommand) {
227 match cmd {
228 CheckCommand::Update => self.last_update_req = Some(Instant::now()),
229 }
230 }
231
232 fn handle_message(&mut self, msg: CheckEvent, task_send: &Sender<CheckTask>) {
233 match msg {
234 CheckEvent::Begin => {
235 task_send
236 .send(CheckTask::Status(WorkDoneProgress::Begin(WorkDoneProgressBegin {
237 title: "Running 'cargo check'".to_string(),
238 cancellable: Some(false),
239 message: None,
240 percentage: None,
241 })))
242 .unwrap();
243 }
244
245 CheckEvent::End => {
246 task_send
247 .send(CheckTask::Status(WorkDoneProgress::End(WorkDoneProgressEnd {
248 message: None,
249 })))
250 .unwrap();
251 }
252
253 CheckEvent::Msg(Message::CompilerArtifact(msg)) => {
254 task_send
255 .send(CheckTask::Status(WorkDoneProgress::Report(WorkDoneProgressReport {
256 cancellable: Some(false),
257 message: Some(msg.target.name),
258 percentage: None,
259 })))
260 .unwrap();
261 }
262
263 CheckEvent::Msg(Message::CompilerMessage(msg)) => {
264 let map_result =
265 match map_rust_diagnostic_to_lsp(&msg.message, &self.workspace_root) {
266 Some(map_result) => map_result,
267 None => return,
268 };
269
270 let MappedRustDiagnostic { location, diagnostic, suggested_fixes } = map_result;
271 let file_uri = location.uri.clone();
272
273 if !suggested_fixes.is_empty() {
274 for suggested_fix in suggested_fixes {
275 self.shared
276 .write()
277 .add_suggested_fix_for_diagnostic(suggested_fix, &diagnostic);
278 }
279 }
280 self.shared.write().add_diagnostic(file_uri, diagnostic);
281
282 task_send.send(CheckTask::Update(location.uri)).unwrap();
283 }
284
285 CheckEvent::Msg(Message::BuildScriptExecuted(_msg)) => {}
286 CheckEvent::Msg(Message::Unknown) => {}
287 }
288 }
289}
290
291/// WatchThread exists to wrap around the communication needed to be able to
292/// run `cargo check` without blocking. Currently the Rust standard library
293/// doesn't provide a way to read sub-process output without blocking, so we
294/// have to wrap sub-processes output handling in a thread and pass messages
295/// back over a channel.
296/// The correct way to dispose of the thread is to drop it, on which the
297/// sub-process will be killed, and the thread will be joined.
298struct WatchThread {
299 handle: Option<JoinHandle<()>>,
300 message_recv: Receiver<CheckEvent>,
301}
302
303enum CheckEvent {
304 Begin,
305 Msg(cargo_metadata::Message),
306 End,
307}
308
309impl WatchThread {
310 fn new(options: &CheckOptions, workspace_root: &PathBuf) -> WatchThread {
311 let mut args: Vec<String> = vec![
312 options.command.clone(),
313 "--message-format=json".to_string(),
314 "--manifest-path".to_string(),
315 format!("{}/Cargo.toml", workspace_root.to_string_lossy()),
316 ];
317 if options.all_targets {
318 args.push("--all-targets".to_string());
319 }
320 args.extend(options.args.iter().cloned());
321
322 let (message_send, message_recv) = unbounded();
323 let enabled = options.enable;
324 let handle = std::thread::spawn(move || {
325 if !enabled {
326 return;
327 }
328
329 let mut command = Command::new("cargo")
330 .args(&args)
331 .stdout(Stdio::piped())
332 .stderr(Stdio::null())
333 .spawn()
334 .expect("couldn't launch cargo");
335
336 // If we trigger an error here, we will do so in the loop instead,
337 // which will break out of the loop, and continue the shutdown
338 let _ = message_send.send(CheckEvent::Begin);
339
340 for message in cargo_metadata::parse_messages(command.stdout.take().unwrap()) {
341 let message = match message {
342 Ok(message) => message,
343 Err(err) => {
344 log::error!("Invalid json from cargo check, ignoring: {}", err);
345 continue;
346 }
347 };
348
349 match message_send.send(CheckEvent::Msg(message)) {
350 Ok(()) => {}
351 Err(_err) => {
352 // The send channel was closed, so we want to shutdown
353 break;
354 }
355 }
356 }
357
358 // We can ignore any error here, as we are already in the progress
359 // of shutting down.
360 let _ = message_send.send(CheckEvent::End);
361
362 // It is okay to ignore the result, as it only errors if the process is already dead
363 let _ = command.kill();
364
365 // Again, we don't care about the exit status so just ignore the result
366 let _ = command.wait();
367 });
368 WatchThread { handle: Some(handle), message_recv }
369 }
370}
371
372impl std::ops::Drop for WatchThread {
373 fn drop(&mut self) {
374 if let Some(handle) = self.handle.take() {
375 // Replace our reciever with dummy one, so we can drop and close the
376 // one actually communicating with the thread
377 let recv = std::mem::replace(&mut self.message_recv, never());
378
379 // Dropping the original reciever initiates thread sub-process shutdown
380 drop(recv);
381
382 // Join the thread, it should finish shortly. We don't really care
383 // whether it panicked, so it is safe to ignore the result
384 let _ = handle.join();
385 }
386 }
387}
388
389fn are_diagnostics_equal(left: &Diagnostic, right: &Diagnostic) -> bool {
390 left.source == right.source
391 && left.severity == right.severity
392 && left.range == right.range
393 && left.message == right.message
394}