From 1d5eaefe8a8e4f8b267d51ee8ece866741586ada Mon Sep 17 00:00:00 2001 From: Bernardo Date: Sun, 6 Jan 2019 15:05:12 +0100 Subject: initial Watcher impl --- crates/ra_vfs/Cargo.toml | 2 + crates/ra_vfs/src/lib.rs | 59 ++++++++++++++++++++------- crates/ra_vfs/src/watcher.rs | 96 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 143 insertions(+), 14 deletions(-) create mode 100644 crates/ra_vfs/src/watcher.rs (limited to 'crates/ra_vfs') diff --git a/crates/ra_vfs/Cargo.toml b/crates/ra_vfs/Cargo.toml index e637063c9..f7a972e91 100644 --- a/crates/ra_vfs/Cargo.toml +++ b/crates/ra_vfs/Cargo.toml @@ -10,6 +10,8 @@ relative-path = "0.4.0" rustc-hash = "1.0" crossbeam-channel = "0.3.5" log = "0.4.6" +notify = "4" +drop_bomb = "0.1.0" thread_worker = { path = "../thread_worker" } ra_arena = { path = "../ra_arena" } diff --git a/crates/ra_vfs/src/lib.rs b/crates/ra_vfs/src/lib.rs index cdea18d73..5336822b3 100644 --- a/crates/ra_vfs/src/lib.rs +++ b/crates/ra_vfs/src/lib.rs @@ -14,26 +14,26 @@ //! which are watched for changes. Typically, there will be a root for each //! Cargo package. mod io; +mod watcher; use std::{ - fmt, - mem, - thread, cmp::Reverse, - path::{Path, PathBuf}, ffi::OsStr, + fmt, fs, mem, + path::{Path, PathBuf}, sync::Arc, - fs, + thread, }; -use rustc_hash::{FxHashMap, FxHashSet}; -use relative_path::RelativePathBuf; use crossbeam_channel::Receiver; -use walkdir::DirEntry; +use ra_arena::{impl_arena_id, Arena, RawId}; +use relative_path::RelativePathBuf; +use rustc_hash::{FxHashMap, FxHashSet}; use thread_worker::WorkerHandle; -use ra_arena::{Arena, RawId, impl_arena_id}; +use walkdir::DirEntry; pub use crate::io::TaskResult as VfsTask; +pub use crate::watcher::{Watcher, WatcherChange}; /// `RootFilter` is a predicate that checks if a file can belong to a root. If /// several filters match a file (nested dirs), the most nested one wins. @@ -85,6 +85,7 @@ pub struct Vfs { pending_changes: Vec, worker: io::Worker, worker_handle: WorkerHandle, + watcher: Watcher, } impl fmt::Debug for Vfs { @@ -97,12 +98,15 @@ impl Vfs { pub fn new(mut roots: Vec) -> (Vfs, Vec) { let (worker, worker_handle) = io::start(); + let watcher = Watcher::new().unwrap(); // TODO return Result? + let mut res = Vfs { roots: Arena::default(), files: Arena::default(), root2files: FxHashMap::default(), worker, worker_handle, + watcher, pending_changes: Vec::new(), }; @@ -129,6 +133,7 @@ impl Vfs { filter: Box::new(filter), }; res.worker.inp.send(task).unwrap(); + res.watcher.watch(path).unwrap(); } let roots = res.roots.iter().map(|(id, _)| id).collect(); (res, roots) @@ -183,6 +188,10 @@ impl Vfs { &self.worker.out } + pub fn change_receiver(&self) -> &Receiver { + &self.watcher.change_receiver() + } + pub fn handle_task(&mut self, task: io::TaskResult) { let mut files = Vec::new(); // While we were scanning the root in the backgound, a file might have @@ -209,22 +218,41 @@ impl Vfs { self.pending_changes.push(change); } - pub fn add_file_overlay(&mut self, path: &Path, text: String) -> Option { + pub fn handle_change(&mut self, change: WatcherChange) { + match change { + WatcherChange::Create(path) => { + self.add_file_overlay(&path, None); + } + WatcherChange::Remove(path) => { + self.remove_file_overlay(&path); + } + WatcherChange::Rename(src, dst) => { + self.remove_file_overlay(&src); + self.add_file_overlay(&dst, None); + } + WatcherChange::Write(path) => { + self.change_file_overlay(&path, None); + } + } + } + + pub fn add_file_overlay(&mut self, path: &Path, text: Option) -> Option { let mut res = None; - if let Some((root, path, file)) = self.find_root(path) { + if let Some((root, rel_path, file)) = self.find_root(path) { + let text = text.unwrap_or_else(|| fs::read_to_string(&path).unwrap_or_default()); let text = Arc::new(text); let change = if let Some(file) = file { res = Some(file); self.change_file(file, Arc::clone(&text)); VfsChange::ChangeFile { file, text } } else { - let file = self.add_file(root, path.clone(), Arc::clone(&text)); + let file = self.add_file(root, rel_path.clone(), Arc::clone(&text)); res = Some(file); VfsChange::AddFile { file, text, root, - path, + path: rel_path, } }; self.pending_changes.push(change); @@ -232,8 +260,10 @@ impl Vfs { res } - pub fn change_file_overlay(&mut self, path: &Path, new_text: String) { + pub fn change_file_overlay(&mut self, path: &Path, new_text: Option) { if let Some((_root, _path, file)) = self.find_root(path) { + let new_text = + new_text.unwrap_or_else(|| fs::read_to_string(&path).unwrap_or_default()); let file = file.expect("can't change a file which wasn't added"); let text = Arc::new(new_text); self.change_file(file, Arc::clone(&text)); @@ -267,6 +297,7 @@ impl Vfs { /// Sutdown the VFS and terminate the background watching thread. pub fn shutdown(self) -> thread::Result<()> { + let _ = self.watcher.shutdown(); let _ = self.worker.shutdown(); self.worker_handle.shutdown() } diff --git a/crates/ra_vfs/src/watcher.rs b/crates/ra_vfs/src/watcher.rs new file mode 100644 index 000000000..cc05f949e --- /dev/null +++ b/crates/ra_vfs/src/watcher.rs @@ -0,0 +1,96 @@ +use std::{ + path::{Path, PathBuf}, + sync::mpsc, + thread, + time::Duration, +}; + +use crossbeam_channel::Receiver; +use drop_bomb::DropBomb; +use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher}; + +pub struct Watcher { + receiver: Receiver, + watcher: RecommendedWatcher, + thread: thread::JoinHandle<()>, + bomb: DropBomb, +} + +#[derive(Debug)] +pub enum WatcherChange { + Create(PathBuf), + Write(PathBuf), + Remove(PathBuf), + Rename(PathBuf, PathBuf), +} + +impl WatcherChange { + fn from_debounced_event(ev: DebouncedEvent) -> Option { + match ev { + DebouncedEvent::NoticeWrite(_) + | DebouncedEvent::NoticeRemove(_) + | DebouncedEvent::Chmod(_) + | DebouncedEvent::Rescan => { + // ignore + None + } + DebouncedEvent::Create(path) => Some(WatcherChange::Create(path)), + DebouncedEvent::Write(path) => Some(WatcherChange::Write(path)), + DebouncedEvent::Remove(path) => Some(WatcherChange::Remove(path)), + DebouncedEvent::Rename(src, dst) => Some(WatcherChange::Rename(src, dst)), + DebouncedEvent::Error(err, path) => { + // TODO + log::warn!("watch error {}, {:?}", err, path); + None + } + } + } +} + +impl Watcher { + pub fn new() -> Result> { + let (input_sender, input_receiver) = mpsc::channel(); + let watcher = notify::watcher(input_sender, Duration::from_millis(250))?; + let (output_sender, output_receiver) = crossbeam_channel::unbounded(); + let thread = thread::spawn(move || loop { + match input_receiver.recv() { + Ok(ev) => { + // forward relevant events only + if let Some(change) = WatcherChange::from_debounced_event(ev) { + output_sender.send(change).unwrap(); + } + } + Err(err) => { + log::debug!("Watcher stopped ({})", err); + break; + } + } + }); + Ok(Watcher { + receiver: output_receiver, + watcher, + thread, + bomb: DropBomb::new(format!("Watcher was not shutdown")), + }) + } + + pub fn watch(&mut self, root: impl AsRef) -> Result<(), Box> { + self.watcher.watch(root, RecursiveMode::Recursive)?; + Ok(()) + } + + pub fn change_receiver(&self) -> &Receiver { + &self.receiver + } + + pub fn shutdown(mut self) -> thread::Result<()> { + self.bomb.defuse(); + drop(self.watcher); + let res = self.thread.join(); + match &res { + Ok(()) => log::info!("... Watcher terminated with ok"), + Err(_) => log::error!("... Watcher terminated with err"), + } + res + } +} -- cgit v1.2.3