From d63e1cebff771621b90bdce25ac013eecb415e1e Mon Sep 17 00:00:00 2001 From: Bernardo Date: Fri, 25 Jan 2019 18:39:35 +0100 Subject: use `Roots` in watcher --- crates/ra_vfs/src/io/watcher.rs | 215 ++++++++++++++++++++++++++-------------- 1 file changed, 141 insertions(+), 74 deletions(-) (limited to 'crates/ra_vfs/src/io/watcher.rs') diff --git a/crates/ra_vfs/src/io/watcher.rs b/crates/ra_vfs/src/io/watcher.rs index 68bb6b692..1d7ce2136 100644 --- a/crates/ra_vfs/src/io/watcher.rs +++ b/crates/ra_vfs/src/io/watcher.rs @@ -1,118 +1,72 @@ -use crate::{io, RootFilter}; +use crate::{io, RootFilter, Roots, VfsRoot}; use crossbeam_channel::Sender; use drop_bomb::DropBomb; use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher}; +use parking_lot::Mutex; use std::{ + fs, path::{Path, PathBuf}, - sync::mpsc, + sync::{mpsc, Arc}, thread, time::Duration, }; use walkdir::WalkDir; #[derive(Debug)] -pub enum WatcherChange { - Create(PathBuf), - Write(PathBuf), - Remove(PathBuf), - Rescan, -} - -fn handle_change_event( - ev: DebouncedEvent, - sender: &Sender, -) -> Result<(), Box> { - match ev { - DebouncedEvent::NoticeWrite(_) - | DebouncedEvent::NoticeRemove(_) - | DebouncedEvent::Chmod(_) => { - // ignore - } - DebouncedEvent::Rescan => { - sender.send(io::Task::HandleChange(WatcherChange::Rescan))?; - } - DebouncedEvent::Create(path) => { - sender.send(io::Task::HandleChange(WatcherChange::Create(path)))?; - } - DebouncedEvent::Write(path) => { - sender.send(io::Task::HandleChange(WatcherChange::Write(path)))?; - } - DebouncedEvent::Remove(path) => { - sender.send(io::Task::HandleChange(WatcherChange::Remove(path)))?; - } - DebouncedEvent::Rename(src, dst) => { - sender.send(io::Task::HandleChange(WatcherChange::Remove(src)))?; - sender.send(io::Task::HandleChange(WatcherChange::Create(dst)))?; - } - DebouncedEvent::Error(err, path) => { - // TODO should we reload the file contents? - log::warn!("watcher error \"{}\", {:?}", err, path); - } - } - Ok(()) +enum ChangeKind { + Create, + Write, + Remove, } const WATCHER_DELAY: Duration = Duration::from_millis(250); pub(crate) struct Watcher { - watcher: RecommendedWatcher, thread: thread::JoinHandle<()>, bomb: DropBomb, - sender: Sender, + watcher: Arc>>, } impl Watcher { pub(crate) fn start( - output_sender: Sender, + roots: Arc, + output_sender: Sender, ) -> Result> { let (input_sender, input_receiver) = mpsc::channel(); - let watcher = notify::watcher(input_sender, WATCHER_DELAY)?; + let watcher = Arc::new(Mutex::new(Some(notify::watcher( + input_sender, + WATCHER_DELAY, + )?))); let sender = output_sender.clone(); + let watcher_clone = watcher.clone(); let thread = thread::spawn(move || { + let worker = WatcherWorker { + roots, + watcher: watcher_clone, + sender, + }; input_receiver .into_iter() // forward relevant events only - .try_for_each(|change| handle_change_event(change, &output_sender)) + .try_for_each(|change| worker.handle_debounced_event(change)) .unwrap() }); Ok(Watcher { - watcher, thread, - sender, + watcher, bomb: DropBomb::new(format!("Watcher was not shutdown")), }) } - pub fn watch_recursive(&mut self, dir: &Path, filter: &RootFilter, emit_for_contents: bool) { - for res in WalkDir::new(dir) + pub fn watch_root(&mut self, filter: &RootFilter) { + for res in WalkDir::new(&filter.root) .into_iter() - .filter_entry(|entry| filter.can_contain(entry.path()).is_some()) + .filter_entry(filter.entry_filter()) { match res { Ok(entry) => { if entry.path().is_dir() { - match self - .watcher - .watch(entry.path(), RecursiveMode::NonRecursive) - { - Ok(()) => log::debug!("watching \"{}\"", entry.path().display()), - Err(e) => { - log::warn!("could not watch \"{}\": {}", entry.path().display(), e) - } - } - } else { - if emit_for_contents && entry.depth() > 0 { - // emit only for files otherwise we will cause watch_recursive to be called again with a dir that we are already watching - // emit as create because we haven't seen it yet - if let Err(e) = - self.sender - .send(io::Task::HandleChange(WatcherChange::Create( - entry.path().to_path_buf(), - ))) - { - log::warn!("watcher error: {}", e) - } - } + watch_one(self.watcher.as_ref(), entry.path()); } } Err(e) => log::warn!("watcher error: {}", e), @@ -122,7 +76,7 @@ impl Watcher { pub fn shutdown(mut self) -> thread::Result<()> { self.bomb.defuse(); - drop(self.watcher); + drop(self.watcher.lock().take()); let res = self.thread.join(); match &res { Ok(()) => log::info!("... Watcher terminated with ok"), @@ -131,3 +85,116 @@ impl Watcher { res } } + +struct WatcherWorker { + watcher: Arc>>, + roots: Arc, + sender: Sender, +} + +impl WatcherWorker { + fn handle_debounced_event(&self, ev: DebouncedEvent) -> Result<(), Box> { + match ev { + DebouncedEvent::NoticeWrite(_) + | DebouncedEvent::NoticeRemove(_) + | DebouncedEvent::Chmod(_) => { + // ignore + } + DebouncedEvent::Rescan => { + // TODO rescan all roots + } + DebouncedEvent::Create(path) => { + self.handle_change(path, ChangeKind::Create); + } + DebouncedEvent::Write(path) => { + self.handle_change(path, ChangeKind::Write); + } + DebouncedEvent::Remove(path) => { + self.handle_change(path, ChangeKind::Remove); + } + DebouncedEvent::Rename(src, dst) => { + self.handle_change(src, ChangeKind::Remove); + self.handle_change(dst, ChangeKind::Create); + } + DebouncedEvent::Error(err, path) => { + // TODO should we reload the file contents? + log::warn!("watcher error \"{}\", {:?}", err, path); + } + } + Ok(()) + } + + fn handle_change(&self, path: PathBuf, kind: ChangeKind) { + if let Err(e) = self.try_handle_change(path, kind) { + log::warn!("watcher error: {}", e) + } + } + + fn try_handle_change( + &self, + path: PathBuf, + kind: ChangeKind, + ) -> Result<(), Box> { + let (root, rel_path) = match self.roots.find(&path) { + Some(x) => x, + None => return Ok(()), + }; + match kind { + ChangeKind::Create => { + if path.is_dir() { + self.watch_recursive(&path, root); + } else { + let text = fs::read_to_string(&path)?; + self.sender.send(io::TaskResult::AddSingleFile { + root, + path: rel_path, + text, + })? + } + } + ChangeKind::Write => { + let text = fs::read_to_string(&path)?; + self.sender.send(io::TaskResult::ChangeSingleFile { + root, + path: rel_path, + text, + })? + } + ChangeKind::Remove => self.sender.send(io::TaskResult::RemoveSingleFile { + root, + path: rel_path, + })?, + } + Ok(()) + } + + fn watch_recursive(&self, dir: &Path, root: VfsRoot) { + let filter = &self.roots[root]; + for res in WalkDir::new(dir) + .into_iter() + .filter_entry(|entry| filter.can_contain(entry.path()).is_some()) + { + match res { + Ok(entry) => { + if entry.path().is_dir() { + watch_one(self.watcher.as_ref(), entry.path()); + } else { + // emit only for files otherwise we will cause watch_recursive to be called again with a dir that we are already watching + // emit as create because we haven't seen it yet + self.handle_change(entry.path().to_path_buf(), ChangeKind::Create); + } + } + Err(e) => log::warn!("watcher error: {}", e), + } + } + } +} + +fn watch_one(watcher: &Mutex>, dir: &Path) { + if let Some(watcher) = watcher.lock().as_mut() { + match watcher.watch(dir, RecursiveMode::NonRecursive) { + Ok(()) => log::debug!("watching \"{}\"", dir.display()), + Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e), + } + } +} -- cgit v1.2.3