aboutsummaryrefslogtreecommitdiff
path: root/crates/ra_cargo_watch
diff options
context:
space:
mode:
authorEmil Lauridsen <[email protected]>2019-12-27 10:31:25 +0000
committerEmil Lauridsen <[email protected]>2019-12-27 10:32:05 +0000
commita2d10694ccfcd12dad8796fc86966ea10ca3fc01 (patch)
tree9f2f2f22f0cce670cb92e49758837ccc0a5ac956 /crates/ra_cargo_watch
parent428a6ff5b8bad2c80a3522599195bf2a393f744e (diff)
Consistent, hopefully robust, shutdown/cancelation story for cargo check subprocess
Diffstat (limited to 'crates/ra_cargo_watch')
-rw-r--r--crates/ra_cargo_watch/src/lib.rs66
1 files changed, 50 insertions, 16 deletions
diff --git a/crates/ra_cargo_watch/src/lib.rs b/crates/ra_cargo_watch/src/lib.rs
index c86386610..70afd7f8a 100644
--- a/crates/ra_cargo_watch/src/lib.rs
+++ b/crates/ra_cargo_watch/src/lib.rs
@@ -2,7 +2,7 @@
2//! another compatible command (f.x. clippy) in a background thread and provide 2//! another compatible command (f.x. clippy) in a background thread and provide
3//! LSP diagnostics based on the output of the command. 3//! LSP diagnostics based on the output of the command.
4use cargo_metadata::Message; 4use cargo_metadata::Message;
5use crossbeam_channel::{select, unbounded, Receiver, RecvError, Sender, TryRecvError}; 5use crossbeam_channel::{select, unbounded, Receiver, RecvError, Sender};
6use lsp_types::{ 6use lsp_types::{
7 Diagnostic, Url, WorkDoneProgress, WorkDoneProgressBegin, WorkDoneProgressEnd, 7 Diagnostic, Url, WorkDoneProgress, WorkDoneProgressBegin, WorkDoneProgressEnd,
8 WorkDoneProgressReport, 8 WorkDoneProgressReport,
@@ -191,7 +191,8 @@ impl CheckWatcherState {
191 self.last_update_req.take(); 191 self.last_update_req.take();
192 self.shared.write().clear(task_send); 192 self.shared.write().clear(task_send);
193 193
194 self.watcher.cancel(); 194 // By replacing the watcher, we drop the previous one which
195 // causes it to shut down automatically.
195 self.watcher = WatchThread::new(&self.options, &self.workspace_root); 196 self.watcher = WatchThread::new(&self.options, &self.workspace_root);
196 } 197 }
197 } 198 }
@@ -277,9 +278,11 @@ impl CheckWatcherState {
277/// doesn't provide a way to read sub-process output without blocking, so we 278/// doesn't provide a way to read sub-process output without blocking, so we
278/// have to wrap sub-processes output handling in a thread and pass messages 279/// have to wrap sub-processes output handling in a thread and pass messages
279/// back over a channel. 280/// back over a channel.
281/// The correct way to dispose of the thread is to drop it, on which the
282/// sub-process will be killed, and the thread will be joined.
280struct WatchThread { 283struct WatchThread {
284 handle: Option<JoinHandle<()>>,
281 message_recv: Receiver<CheckEvent>, 285 message_recv: Receiver<CheckEvent>,
282 cancel_send: Sender<()>,
283} 286}
284 287
285enum CheckEvent { 288enum CheckEvent {
@@ -302,9 +305,8 @@ impl WatchThread {
302 args.extend(options.args.iter().cloned()); 305 args.extend(options.args.iter().cloned());
303 306
304 let (message_send, message_recv) = unbounded(); 307 let (message_send, message_recv) = unbounded();
305 let (cancel_send, cancel_recv) = unbounded();
306 let enabled = options.enable; 308 let enabled = options.enable;
307 std::thread::spawn(move || { 309 let handle = std::thread::spawn(move || {
308 if !enabled { 310 if !enabled {
309 return; 311 return;
310 } 312 }
@@ -316,24 +318,56 @@ impl WatchThread {
316 .spawn() 318 .spawn()
317 .expect("couldn't launch cargo"); 319 .expect("couldn't launch cargo");
318 320
319 message_send.send(CheckEvent::Begin).unwrap(); 321 // If we trigger an error here, we will do so in the loop instead,
322 // which will break out of the loop, and continue the shutdown
323 let _ = message_send.send(CheckEvent::Begin);
324
320 for message in cargo_metadata::parse_messages(command.stdout.take().unwrap()) { 325 for message in cargo_metadata::parse_messages(command.stdout.take().unwrap()) {
321 match cancel_recv.try_recv() { 326 let message = match message {
322 Ok(()) | Err(TryRecvError::Disconnected) => { 327 Ok(message) => message,
323 command.kill().expect("couldn't kill command"); 328 Err(err) => {
329 log::error!("Invalid json from cargo check, ignoring: {}", err);
330 continue;
324 } 331 }
325 Err(TryRecvError::Empty) => (), 332 };
326 }
327 333
328 message_send.send(CheckEvent::Msg(message.unwrap())).unwrap(); 334 match message_send.send(CheckEvent::Msg(message)) {
335 Ok(()) => {}
336 Err(_err) => {
337 // The send channel was closed, so we want to shutdown
338 break;
339 }
340 }
329 } 341 }
330 message_send.send(CheckEvent::End).unwrap(); 342
343 // We can ignore any error here, as we are already in the progress
344 // of shutting down.
345 let _ = message_send.send(CheckEvent::End);
346
347 // It is okay to ignore the result, as it only errors if the process is already dead
348 let _ = command.kill();
349
350 // Again, we don't care about the exit status so just ignore the result
351 let _ = command.wait();
331 }); 352 });
332 WatchThread { message_recv, cancel_send } 353 WatchThread { handle: Some(handle), message_recv }
333 } 354 }
355}
334 356
335 fn cancel(&self) { 357impl std::ops::Drop for WatchThread {
336 let _ = self.cancel_send.send(()); 358 fn drop(&mut self) {
359 if let Some(handle) = self.handle.take() {
360 // Replace our reciever with dummy one, so we can drop and close the
361 // one actually communicating with the thread
362 let recv = std::mem::replace(&mut self.message_recv, crossbeam_channel::never());
363
364 // Dropping the original reciever initiates thread sub-process shutdown
365 drop(recv);
366
367 // Join the thread, it should finish shortly. We don't really care
368 // whether it panicked, so it is safe to ignore the result
369 let _ = handle.join();
370 }
337 } 371 }
338} 372}
339 373