From 012ea3fac62df26abefa6d64b81570ed58118dea Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Sat, 26 Jan 2019 17:01:58 +0300 Subject: handle all the reads on the "main" watcher thread --- crates/ra_vfs/src/io.rs | 123 +++++++++++++++++++++++++++++------------------- 1 file changed, 75 insertions(+), 48 deletions(-) (limited to 'crates') diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs index 279fa5da8..98b107b35 100644 --- a/crates/ra_vfs/src/io.rs +++ b/crates/ra_vfs/src/io.rs @@ -5,7 +5,7 @@ use std::{ sync::{mpsc, Arc}, time::Duration, }; -use crossbeam_channel::{Receiver, Sender}; +use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select}; use relative_path::RelativePathBuf; use thread_worker::WorkerHandle; use walkdir::WalkDir; @@ -61,9 +61,25 @@ pub(crate) struct Worker { impl Worker { pub(crate) fn start(roots: Arc) -> Worker { - let (worker, worker_handle) = - thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| { + // This is a pretty elaborate setup of threads & channels! It is + // explained by the following concerns: + + // * we need to burn a thread translating from notify's mpsc to + // crossbeam_channel. + // * we want to read all files from a single thread, to gurantee that + // we always get fresher versions and never go back in time. + // * we want to tear down everything neatly during shutdown. + let (worker, worker_handle) = thread_worker::spawn( + "vfs", + 128, + // This are the channels we use to communicate with outside world. + // If `input_receiver` is closed we need to tear ourselves down. + // `output_sender` should not be closed unless the parent died. + move |input_receiver, output_sender| { + // These are `std` channels notify will send events to let (notify_sender, notify_receiver) = mpsc::channel(); + // These are the corresponding crossbeam channels + let (watcher_sender, watcher_receiver) = unbounded(); let watcher = notify::watcher(notify_sender, WATCHER_DELAY) .map_err(|e| log::error!("failed to spawn notify {}", e)) .ok(); @@ -72,18 +88,30 @@ impl Worker { watcher: Arc::new(Mutex::new(watcher)), sender: output_sender, }; - let thread = thread::spawn({ - let ctx = ctx.clone(); - move || { - let _ = notify_receiver - .into_iter() - // forward relevant events only - .try_for_each(|change| ctx.handle_debounced_event(change)); - } - }); - let res1 = input_receiver.into_iter().try_for_each(|t| match t { - Task::AddRoot { root, config } => watch_root(&ctx, root, Arc::clone(&config)), + let thread = thread::spawn(move || { + let _ = notify_receiver + .into_iter() + // forward relevant events only + .for_each(|event| convert_notify_event(event, &watcher_sender)); }); + + loop { + select! { + // Received request from the caller. If this channel is + // closed, we should shutdown everything. + recv(input_receiver) -> t => match t { + Err(RecvError) => break, + Ok(Task::AddRoot { root, config }) => watch_root(&ctx, root, Arc::clone(&config)), + }, + // Watcher send us changes. If **this** channel is + // closed, the watcher has died, which indicates a bug + // -- escalate! + recv(watcher_receiver) -> event => match event { + Err(RecvError) => panic!("watcher is dead"), + Ok((path, change)) => WatcherCtx::handle_change(&ctx, path, change).unwrap(), + }, + } + } drop(ctx.watcher.lock().take()); drop(ctx); let res2 = thread.join(); @@ -91,9 +119,9 @@ impl Worker { Ok(()) => log::info!("... Watcher terminated with ok"), Err(_) => log::error!("... Watcher terminated with err"), } - res1.unwrap(); res2.unwrap(); - }); + }, + ); Worker { worker, worker_handle, @@ -114,7 +142,7 @@ impl Worker { } } -fn watch_root(woker: &WatcherCtx, root: VfsRoot, config: Arc) -> Result<()> { +fn watch_root(woker: &WatcherCtx, root: VfsRoot, config: Arc) { let mut guard = woker.watcher.lock(); log::debug!("loading {} ...", config.root.as_path().display()); let files = watch_recursive(guard.as_mut(), config.root.as_path(), &*config) @@ -127,9 +155,9 @@ fn watch_root(woker: &WatcherCtx, root: VfsRoot, config: Arc) -> Res .collect(); woker .sender - .send(TaskResult::BulkLoadRoot { root, files })?; + .send(TaskResult::BulkLoadRoot { root, files }) + .unwrap(); log::debug!("... loaded {}", config.root.as_path().display()); - Ok(()) } #[derive(Clone)] @@ -139,38 +167,37 @@ struct WatcherCtx { sender: Sender, } -impl WatcherCtx { - fn handle_debounced_event(&self, ev: DebouncedEvent) -> Result<()> { - match ev { - DebouncedEvent::NoticeWrite(_) - | DebouncedEvent::NoticeRemove(_) - | DebouncedEvent::Chmod(_) => { - // ignore - } - DebouncedEvent::Rescan => { - // TODO rescan all roots - } - DebouncedEvent::Create(path) => { - self.handle_change(path, ChangeKind::Create)?; - } - DebouncedEvent::Write(path) => { - self.handle_change(path, ChangeKind::Write)?; - } - DebouncedEvent::Remove(path) => { - self.handle_change(path, ChangeKind::Remove)?; - } - DebouncedEvent::Rename(src, dst) => { - self.handle_change(src, ChangeKind::Remove)?; - self.handle_change(dst, ChangeKind::Create)?; - } - DebouncedEvent::Error(err, path) => { - // TODO should we reload the file contents? - log::warn!("watcher error \"{}\", {:?}", err, path); - } +fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeKind)>) { + match event { + DebouncedEvent::NoticeWrite(_) + | DebouncedEvent::NoticeRemove(_) + | DebouncedEvent::Chmod(_) => { + // ignore + } + DebouncedEvent::Rescan => { + // TODO rescan all roots + } + DebouncedEvent::Create(path) => { + sender.send((path, ChangeKind::Create)).unwrap(); + } + DebouncedEvent::Write(path) => { + sender.send((path, ChangeKind::Write)).unwrap(); + } + DebouncedEvent::Remove(path) => { + sender.send((path, ChangeKind::Remove)).unwrap(); + } + DebouncedEvent::Rename(src, dst) => { + sender.send((src, ChangeKind::Remove)).unwrap(); + sender.send((dst, ChangeKind::Create)).unwrap(); + } + DebouncedEvent::Error(err, path) => { + // TODO should we reload the file contents? + log::warn!("watcher error \"{}\", {:?}", err, path); } - Ok(()) } +} +impl WatcherCtx { fn handle_change(&self, path: PathBuf, kind: ChangeKind) -> Result<()> { let (root, rel_path) = match self.roots.find(&path) { None => return Ok(()), -- cgit v1.2.3