From bf352cd2511775a331d77dee261b64bd8359dacb Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Thu, 14 Feb 2019 20:43:45 +0300 Subject: automatically wait for worker threads closes #817 --- crates/ra_vfs/src/io.rs | 106 +++++++++++++++++++++------------------------ crates/ra_vfs/src/lib.rs | 6 --- crates/ra_vfs/tests/vfs.rs | 1 - 3 files changed, 50 insertions(+), 63 deletions(-) (limited to 'crates/ra_vfs') diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs index 3952b200b..1b70cd8df 100644 --- a/crates/ra_vfs/src/io.rs +++ b/crates/ra_vfs/src/io.rs @@ -1,13 +1,11 @@ use std::{ fs, - thread, path::{Path, PathBuf}, sync::{mpsc, Arc}, time::Duration, }; use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select}; use relative_path::RelativePathBuf; -use thread_worker::WorkerHandle; use walkdir::WalkDir; use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; @@ -49,8 +47,7 @@ enum ChangeKind { const WATCHER_DELAY: Duration = Duration::from_millis(250); pub(crate) struct Worker { - worker: thread_worker::Worker, - worker_handle: WorkerHandle, + thread_worker: thread_worker::Worker, } impl Worker { @@ -62,82 +59,79 @@ impl Worker { // * we want to read all files from a single thread, to guarantee that // we always get fresher versions and never go back in time. // * we want to tear down everything neatly during shutdown. - let (worker, worker_handle) = thread_worker::spawn( + let thread_worker = thread_worker::Worker::spawn( "vfs", 128, // This are the channels we use to communicate with outside world. // If `input_receiver` is closed we need to tear ourselves down. // `output_sender` should not be closed unless the parent died. move |input_receiver, output_sender| { - // These are `std` channels notify will send events to - let (notify_sender, notify_receiver) = mpsc::channel(); + // Make sure that the destruction order is + // + // * notify_sender + // * _thread + // * watcher_sender + // + // this is required to avoid deadlocks. + // These are the corresponding crossbeam channels let (watcher_sender, watcher_receiver) = unbounded(); + let _thread; + { + // These are `std` channels notify will send events to + let (notify_sender, notify_receiver) = mpsc::channel(); - let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) - .map_err(|e| log::error!("failed to spawn notify {}", e)) - .ok(); - // Start a silly thread to transform between two channels - let thread = thread::spawn(move || { - notify_receiver - .into_iter() - .for_each(|event| convert_notify_event(event, &watcher_sender)) - }); + let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) + .map_err(|e| log::error!("failed to spawn notify {}", e)) + .ok(); + // Start a silly thread to transform between two channels + _thread = thread_worker::ScopedThread::spawn("notify-convertor", move || { + notify_receiver + .into_iter() + .for_each(|event| convert_notify_event(event, &watcher_sender)) + }); - // Process requests from the called or notifications from - // watcher until the caller says stop. - loop { - select! { - // Received request from the caller. If this channel is - // closed, we should shutdown everything. - recv(input_receiver) -> t => match t { - Err(RecvError) => { - drop(input_receiver); - break + // Process requests from the called or notifications from + // watcher until the caller says stop. + loop { + select! { + // Received request from the caller. If this channel is + // closed, we should shutdown everything. + recv(input_receiver) -> t => match t { + Err(RecvError) => { + drop(input_receiver); + break + }, + Ok(Task::AddRoot { root, config }) => { + watch_root(watcher.as_mut(), &output_sender, root, Arc::clone(&config)); + } + }, + // Watcher send us changes. If **this** channel is + // closed, the watcher has died, which indicates a bug + // -- escalate! + recv(watcher_receiver) -> event => match event { + Err(RecvError) => panic!("watcher is dead"), + Ok((path, change)) => { + handle_change(watcher.as_mut(), &output_sender, &*roots, path, change); + } }, - Ok(Task::AddRoot { root, config }) => { - watch_root(watcher.as_mut(), &output_sender, root, Arc::clone(&config)); - } - }, - // Watcher send us changes. If **this** channel is - // closed, the watcher has died, which indicates a bug - // -- escalate! - recv(watcher_receiver) -> event => match event { - Err(RecvError) => panic!("watcher is dead"), - Ok((path, change)) => { - handle_change(watcher.as_mut(), &output_sender, &*roots, path, change); - } - }, + } } } - // Stopped the watcher - drop(watcher.take()); // Drain pending events: we are not interested in them anyways! watcher_receiver.into_iter().for_each(|_| ()); - - let res = thread.join(); - match &res { - Ok(()) => log::info!("... Watcher terminated with ok"), - Err(_) => log::error!("... Watcher terminated with err"), - } - res.unwrap(); }, ); - Worker { worker, worker_handle } + Worker { thread_worker } } pub(crate) fn sender(&self) -> &Sender { - &self.worker.inp + &self.thread_worker.sender() } pub(crate) fn receiver(&self) -> &Receiver { - &self.worker.out - } - - pub(crate) fn shutdown(self) -> thread::Result<()> { - let _ = self.worker.shutdown(); - self.worker_handle.shutdown() + &self.thread_worker.receiver() } } diff --git a/crates/ra_vfs/src/lib.rs b/crates/ra_vfs/src/lib.rs index f07657db6..1fb255365 100644 --- a/crates/ra_vfs/src/lib.rs +++ b/crates/ra_vfs/src/lib.rs @@ -22,7 +22,6 @@ use std::{ fmt, fs, mem, path::{Path, PathBuf}, sync::Arc, - thread, }; use crossbeam_channel::Receiver; @@ -337,11 +336,6 @@ impl Vfs { mem::replace(&mut self.pending_changes, Vec::new()) } - /// Shutdown the VFS and terminate the background watching thread. - pub fn shutdown(self) -> thread::Result<()> { - self.worker.shutdown() - } - fn add_file( &mut self, root: VfsRoot, diff --git a/crates/ra_vfs/tests/vfs.rs b/crates/ra_vfs/tests/vfs.rs index 649ef96c9..c76e6ea26 100644 --- a/crates/ra_vfs/tests/vfs.rs +++ b/crates/ra_vfs/tests/vfs.rs @@ -158,6 +158,5 @@ fn test_vfs_works() -> std::io::Result<()> { Err(RecvTimeoutError::Timeout) ); - vfs.shutdown().unwrap(); Ok(()) } -- cgit v1.2.3 From e0b8942c56378b7966af39058f27b11a0d02890f Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Thu, 14 Feb 2019 21:14:47 +0300 Subject: simplify --- crates/ra_vfs/src/io.rs | 155 +++++++++++++++++++++-------------------------- crates/ra_vfs/src/lib.rs | 2 +- 2 files changed, 71 insertions(+), 86 deletions(-) (limited to 'crates/ra_vfs') diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs index 1b70cd8df..f64b4c532 100644 --- a/crates/ra_vfs/src/io.rs +++ b/crates/ra_vfs/src/io.rs @@ -4,7 +4,7 @@ use std::{ sync::{mpsc, Arc}, time::Duration, }; -use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select}; +use crossbeam_channel::{Sender, unbounded, RecvError, select}; use relative_path::RelativePathBuf; use walkdir::WalkDir; use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; @@ -46,93 +46,78 @@ enum ChangeKind { const WATCHER_DELAY: Duration = Duration::from_millis(250); -pub(crate) struct Worker { - thread_worker: thread_worker::Worker, -} - -impl Worker { - pub(crate) fn start(roots: Arc) -> Worker { - // This is a pretty elaborate setup of threads & channels! It is - // explained by the following concerns: - // * we need to burn a thread translating from notify's mpsc to - // crossbeam_channel. - // * we want to read all files from a single thread, to guarantee that - // we always get fresher versions and never go back in time. - // * we want to tear down everything neatly during shutdown. - let thread_worker = thread_worker::Worker::spawn( - "vfs", - 128, - // This are the channels we use to communicate with outside world. - // If `input_receiver` is closed we need to tear ourselves down. - // `output_sender` should not be closed unless the parent died. - move |input_receiver, output_sender| { - // Make sure that the destruction order is - // - // * notify_sender - // * _thread - // * watcher_sender - // - // this is required to avoid deadlocks. - - // These are the corresponding crossbeam channels - let (watcher_sender, watcher_receiver) = unbounded(); - let _thread; - { - // These are `std` channels notify will send events to - let (notify_sender, notify_receiver) = mpsc::channel(); - - let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) - .map_err(|e| log::error!("failed to spawn notify {}", e)) - .ok(); - // Start a silly thread to transform between two channels - _thread = thread_worker::ScopedThread::spawn("notify-convertor", move || { - notify_receiver - .into_iter() - .for_each(|event| convert_notify_event(event, &watcher_sender)) - }); - - // Process requests from the called or notifications from - // watcher until the caller says stop. - loop { - select! { - // Received request from the caller. If this channel is - // closed, we should shutdown everything. - recv(input_receiver) -> t => match t { - Err(RecvError) => { - drop(input_receiver); - break - }, - Ok(Task::AddRoot { root, config }) => { - watch_root(watcher.as_mut(), &output_sender, root, Arc::clone(&config)); - } - }, - // Watcher send us changes. If **this** channel is - // closed, the watcher has died, which indicates a bug - // -- escalate! - recv(watcher_receiver) -> event => match event { - Err(RecvError) => panic!("watcher is dead"), - Ok((path, change)) => { - handle_change(watcher.as_mut(), &output_sender, &*roots, path, change); - } +pub(crate) type Worker = thread_worker::Worker; +pub(crate) fn start(roots: Arc) -> Worker { + // This is a pretty elaborate setup of threads & channels! It is + // explained by the following concerns: + // * we need to burn a thread translating from notify's mpsc to + // crossbeam_channel. + // * we want to read all files from a single thread, to guarantee that + // we always get fresher versions and never go back in time. + // * we want to tear down everything neatly during shutdown. + Worker::spawn( + "vfs", + 128, + // This are the channels we use to communicate with outside world. + // If `input_receiver` is closed we need to tear ourselves down. + // `output_sender` should not be closed unless the parent died. + move |input_receiver, output_sender| { + // Make sure that the destruction order is + // + // * notify_sender + // * _thread + // * watcher_sender + // + // this is required to avoid deadlocks. + + // These are the corresponding crossbeam channels + let (watcher_sender, watcher_receiver) = unbounded(); + let _thread; + { + // These are `std` channels notify will send events to + let (notify_sender, notify_receiver) = mpsc::channel(); + + let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) + .map_err(|e| log::error!("failed to spawn notify {}", e)) + .ok(); + // Start a silly thread to transform between two channels + _thread = thread_worker::ScopedThread::spawn("notify-convertor", move || { + notify_receiver + .into_iter() + .for_each(|event| convert_notify_event(event, &watcher_sender)) + }); + + // Process requests from the called or notifications from + // watcher until the caller says stop. + loop { + select! { + // Received request from the caller. If this channel is + // closed, we should shutdown everything. + recv(input_receiver) -> t => match t { + Err(RecvError) => { + drop(input_receiver); + break }, - } + Ok(Task::AddRoot { root, config }) => { + watch_root(watcher.as_mut(), &output_sender, root, Arc::clone(&config)); + } + }, + // Watcher send us changes. If **this** channel is + // closed, the watcher has died, which indicates a bug + // -- escalate! + recv(watcher_receiver) -> event => match event { + Err(RecvError) => panic!("watcher is dead"), + Ok((path, change)) => { + handle_change(watcher.as_mut(), &output_sender, &*roots, path, change); + } + }, } } - // Drain pending events: we are not interested in them anyways! - watcher_receiver.into_iter().for_each(|_| ()); - }, - ); - - Worker { thread_worker } - } - - pub(crate) fn sender(&self) -> &Sender { - &self.thread_worker.sender() - } - - pub(crate) fn receiver(&self) -> &Receiver { - &self.thread_worker.receiver() - } + } + // Drain pending events: we are not interested in them anyways! + watcher_receiver.into_iter().for_each(|_| ()); + }, + ) } fn watch_root( diff --git a/crates/ra_vfs/src/lib.rs b/crates/ra_vfs/src/lib.rs index 1fb255365..cfdc1275f 100644 --- a/crates/ra_vfs/src/lib.rs +++ b/crates/ra_vfs/src/lib.rs @@ -159,7 +159,7 @@ impl fmt::Debug for Vfs { impl Vfs { pub fn new(roots: Vec) -> (Vfs, Vec) { let roots = Arc::new(Roots::new(roots)); - let worker = io::Worker::start(Arc::clone(&roots)); + let worker = io::start(Arc::clone(&roots)); let mut root2files = ArenaMap::default(); for (root, config) in roots.iter() { -- cgit v1.2.3