From 277e0f1baa21b8f3e5b040b78ce2bd6beca6cd7c Mon Sep 17 00:00:00 2001 From: Bernardo Date: Mon, 21 Jan 2019 18:59:54 +0100 Subject: move watcher to io module --- crates/ra_vfs/src/io/mod.rs | 212 ++++++++++++++++++++++++++++++++++++++++ crates/ra_vfs/src/io/watcher.rs | 128 ++++++++++++++++++++++++ 2 files changed, 340 insertions(+) create mode 100644 crates/ra_vfs/src/io/mod.rs create mode 100644 crates/ra_vfs/src/io/watcher.rs (limited to 'crates/ra_vfs/src/io') diff --git a/crates/ra_vfs/src/io/mod.rs b/crates/ra_vfs/src/io/mod.rs new file mode 100644 index 000000000..6d5af7690 --- /dev/null +++ b/crates/ra_vfs/src/io/mod.rs @@ -0,0 +1,212 @@ +use std::{ + fmt, fs, + path::{Path, PathBuf}, + sync::Arc, + thread, +}; + +use crossbeam_channel::{Receiver, Sender}; +use parking_lot::Mutex; +use relative_path::RelativePathBuf; +use thread_worker::WorkerHandle; +use walkdir::{DirEntry, WalkDir}; + +mod watcher; +use watcher::Watcher; +pub use watcher::WatcherChange; + +use crate::VfsRoot; + +pub(crate) enum Task { + AddRoot { + root: VfsRoot, + path: PathBuf, + filter: Box bool + Send>, + }, + /// this variant should only be created by the watcher + HandleChange(WatcherChange), + LoadChange(WatcherChange), + Watch { + dir: PathBuf, + filter: Box bool + Send>, + }, +} + +#[derive(Debug)] +pub struct AddRootResult { + pub(crate) root: VfsRoot, + pub(crate) files: Vec<(RelativePathBuf, String)>, +} + +#[derive(Debug)] +pub enum WatcherChangeData { + Create { path: PathBuf, text: String }, + Write { path: PathBuf, text: String }, + Remove { path: PathBuf }, +} + +pub enum TaskResult { + AddRoot(AddRootResult), + HandleChange(WatcherChange), + LoadChange(WatcherChangeData), + NoOp, +} + +impl fmt::Debug for TaskResult { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str("TaskResult { ... }") + } +} + +pub(crate) struct Worker { + worker: thread_worker::Worker, + worker_handle: WorkerHandle, + watcher: Arc>>, +} + +impl Worker { + pub(crate) fn start() -> Worker { + let watcher = Arc::new(Mutex::new(None)); + let watcher_clone = watcher.clone(); + let (worker, worker_handle) = + thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| { + input_receiver + .into_iter() + .map(|t| handle_task(t, &watcher_clone)) + .try_for_each(|it| output_sender.send(it)) + .unwrap() + }); + match Watcher::start(worker.inp.clone()) { + Ok(w) => { + watcher.lock().replace(w); + } + Err(e) => log::error!("could not start watcher: {}", e), + }; + Worker { + worker, + worker_handle, + watcher, + } + } + + pub(crate) fn sender(&self) -> &Sender { + &self.worker.inp + } + + pub(crate) fn receiver(&self) -> &Receiver { + &self.worker.out + } + + pub(crate) fn shutdown(self) -> thread::Result<()> { + if let Some(watcher) = self.watcher.lock().take() { + let _ = watcher.shutdown(); + } + let _ = self.worker.shutdown(); + self.worker_handle.shutdown() + } +} + +fn watch( + watcher: &Arc>>, + dir: &Path, + filter_entry: impl Fn(&DirEntry) -> bool, + emit_for_existing: bool, +) { + let mut watcher = watcher.lock(); + let watcher = match *watcher { + Some(ref mut w) => w, + None => { + // watcher dropped or couldn't start + return; + } + }; + watcher.watch_recursive(dir, filter_entry, emit_for_existing) +} + +fn handle_task(task: Task, watcher: &Arc>>) -> TaskResult { + match task { + Task::AddRoot { root, path, filter } => { + watch(watcher, &path, &*filter, false); + log::debug!("loading {} ...", path.as_path().display()); + let files = load_root(path.as_path(), &*filter); + log::debug!("... loaded {}", path.as_path().display()); + TaskResult::AddRoot(AddRootResult { root, files }) + } + Task::HandleChange(change) => { + // forward as is because Vfs has to decide if we should load it + TaskResult::HandleChange(change) + } + Task::LoadChange(change) => { + log::debug!("loading {:?} ...", change); + match load_change(change) { + Some(data) => TaskResult::LoadChange(data), + None => TaskResult::NoOp, + } + } + Task::Watch { dir, filter } => { + watch(watcher, &dir, &*filter, true); + TaskResult::NoOp + } + } +} + +fn load_root(root: &Path, filter: &dyn Fn(&DirEntry) -> bool) -> Vec<(RelativePathBuf, String)> { + let mut res = Vec::new(); + for entry in WalkDir::new(root).into_iter().filter_entry(filter) { + let entry = match entry { + Ok(entry) => entry, + Err(e) => { + log::warn!("watcher error: {}", e); + continue; + } + }; + if !entry.file_type().is_file() { + continue; + } + let path = entry.path(); + let text = match fs::read_to_string(path) { + Ok(text) => text, + Err(e) => { + log::warn!("watcher error: {}", e); + continue; + } + }; + let path = RelativePathBuf::from_path(path.strip_prefix(root).unwrap()).unwrap(); + res.push((path.to_owned(), text)) + } + res +} + +fn load_change(change: WatcherChange) -> Option { + let data = match change { + WatcherChange::Create(path) => { + if path.is_dir() { + return None; + } + let text = match fs::read_to_string(&path) { + Ok(text) => text, + Err(e) => { + log::warn!("watcher error \"{}\": {}", path.display(), e); + return None; + } + }; + WatcherChangeData::Create { path, text } + } + WatcherChange::Write(path) => { + let text = match fs::read_to_string(&path) { + Ok(text) => text, + Err(e) => { + log::warn!("watcher error \"{}\": {}", path.display(), e); + return None; + } + }; + WatcherChangeData::Write { path, text } + } + WatcherChange::Remove(path) => WatcherChangeData::Remove { path }, + WatcherChange::Rescan => { + // this should be handled by Vfs::handle_task + return None; + } + }; + Some(data) +} diff --git a/crates/ra_vfs/src/io/watcher.rs b/crates/ra_vfs/src/io/watcher.rs new file mode 100644 index 000000000..e33298477 --- /dev/null +++ b/crates/ra_vfs/src/io/watcher.rs @@ -0,0 +1,128 @@ +use crate::io; +use crossbeam_channel::Sender; +use drop_bomb::DropBomb; +use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher}; +use std::{ + path::{Path, PathBuf}, + sync::mpsc, + thread, + time::Duration, +}; +use walkdir::{DirEntry, WalkDir}; + +#[derive(Debug)] +pub enum WatcherChange { + Create(PathBuf), + Write(PathBuf), + Remove(PathBuf), + Rescan, +} + +fn handle_change_event( + ev: DebouncedEvent, + sender: &Sender, +) -> Result<(), Box> { + match ev { + DebouncedEvent::NoticeWrite(_) + | DebouncedEvent::NoticeRemove(_) + | DebouncedEvent::Chmod(_) => { + // ignore + } + DebouncedEvent::Rescan => { + sender.send(io::Task::HandleChange(WatcherChange::Rescan))?; + } + DebouncedEvent::Create(path) => { + sender.send(io::Task::HandleChange(WatcherChange::Create(path)))?; + } + DebouncedEvent::Write(path) => { + sender.send(io::Task::HandleChange(WatcherChange::Write(path)))?; + } + DebouncedEvent::Remove(path) => { + sender.send(io::Task::HandleChange(WatcherChange::Remove(path)))?; + } + DebouncedEvent::Rename(src, dst) => { + sender.send(io::Task::HandleChange(WatcherChange::Remove(src)))?; + sender.send(io::Task::HandleChange(WatcherChange::Create(dst)))?; + } + DebouncedEvent::Error(err, path) => { + // TODO should we reload the file contents? + log::warn!("watcher error \"{}\", {:?}", err, path); + } + } + Ok(()) +} + +const WATCHER_DELAY: Duration = Duration::from_millis(250); + +pub(crate) struct Watcher { + watcher: RecommendedWatcher, + thread: thread::JoinHandle<()>, + bomb: DropBomb, + sender: Sender, +} + +impl Watcher { + pub(crate) fn start( + output_sender: Sender, + ) -> Result> { + let (input_sender, input_receiver) = mpsc::channel(); + let watcher = notify::watcher(input_sender, WATCHER_DELAY)?; + let sender = output_sender.clone(); + let thread = thread::spawn(move || { + input_receiver + .into_iter() + // forward relevant events only + .try_for_each(|change| handle_change_event(change, &output_sender)) + .unwrap() + }); + Ok(Watcher { + watcher, + thread, + sender, + bomb: DropBomb::new(format!("Watcher was not shutdown")), + }) + } + + pub fn watch_recursive( + &mut self, + dir: &Path, + filter_entry: impl Fn(&DirEntry) -> bool, + emit_for_contents: bool, + ) { + for res in WalkDir::new(dir).into_iter().filter_entry(filter_entry) { + match res { + Ok(entry) => { + if entry.path().is_dir() { + match self.watcher.watch(dir, RecursiveMode::NonRecursive) { + Ok(()) => log::debug!("watching \"{}\"", dir.display()), + Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e), + } + } + if emit_for_contents && entry.depth() > 0 { + // emit as create because we haven't seen it yet + if let Err(e) = + self.sender + .send(io::Task::HandleChange(WatcherChange::Create( + entry.path().to_path_buf(), + ))) + { + log::warn!("watcher error: {}", e) + } + } + } + Err(e) => log::warn!("watcher error: {}", e), + } + } + } + + pub fn shutdown(mut self) -> thread::Result<()> { + self.bomb.defuse(); + drop(self.watcher); + let res = self.thread.join(); + match &res { + Ok(()) => log::info!("... Watcher terminated with ok"), + Err(_) => log::error!("... Watcher terminated with err"), + } + res + } +} -- cgit v1.2.3