From 1d5eaefe8a8e4f8b267d51ee8ece866741586ada Mon Sep 17 00:00:00 2001 From: Bernardo Date: Sun, 6 Jan 2019 15:05:12 +0100 Subject: initial Watcher impl --- crates/ra_lsp_server/src/main_loop.rs | 19 +++++-- crates/ra_vfs/Cargo.toml | 2 + crates/ra_vfs/src/lib.rs | 59 ++++++++++++++++----- crates/ra_vfs/src/watcher.rs | 96 +++++++++++++++++++++++++++++++++++ 4 files changed, 159 insertions(+), 17 deletions(-) create mode 100644 crates/ra_vfs/src/watcher.rs (limited to 'crates') diff --git a/crates/ra_lsp_server/src/main_loop.rs b/crates/ra_lsp_server/src/main_loop.rs index ddd20a41f..e5a0603d1 100644 --- a/crates/ra_lsp_server/src/main_loop.rs +++ b/crates/ra_lsp_server/src/main_loop.rs @@ -11,7 +11,7 @@ use gen_lsp_server::{ }; use lsp_types::NumberOrString; use ra_ide_api::{Canceled, FileId, LibraryData}; -use ra_vfs::VfsTask; +use ra_vfs::{VfsTask, WatcherChange}; use rustc_hash::FxHashSet; use serde::{de::DeserializeOwned, Serialize}; use threadpool::ThreadPool; @@ -113,6 +113,7 @@ enum Event { Msg(RawMessage), Task(Task), Vfs(VfsTask), + Watcher(WatcherChange), Lib(LibraryData), } @@ -149,6 +150,7 @@ impl fmt::Debug for Event { Event::Task(it) => fmt::Debug::fmt(it, f), Event::Vfs(it) => fmt::Debug::fmt(it, f), Event::Lib(it) => fmt::Debug::fmt(it, f), + Event::Watcher(it) => fmt::Debug::fmt(it, f), } } } @@ -183,6 +185,10 @@ fn main_loop_inner( Ok(task) => Event::Vfs(task), Err(RecvError) => bail!("vfs died"), }, + recv(state.vfs.read().change_receiver()) -> change => match change { + Ok(change) => Event::Watcher(change), + Err(RecvError) => bail!("vfs watcher died"), + }, recv(libdata_receiver) -> data => Event::Lib(data.unwrap()) }; log::info!("loop_turn = {:?}", event); @@ -194,6 +200,10 @@ fn main_loop_inner( state.vfs.write().handle_task(task); state_changed = true; } + Event::Watcher(change) => { + state.vfs.write().handle_change(change); + state_changed = true; + } Event::Lib(lib) => { feedback(internal_mode, "library loaded", msg_sender); state.add_lib(lib); @@ -365,7 +375,7 @@ fn on_notification( if let Some(file_id) = state .vfs .write() - .add_file_overlay(&path, params.text_document.text) + .add_file_overlay(&path, Some(params.text_document.text)) { subs.add_sub(FileId(file_id.0.into())); } @@ -384,7 +394,10 @@ fn on_notification( .pop() .ok_or_else(|| format_err!("empty changes"))? .text; - state.vfs.write().change_file_overlay(path.as_path(), text); + state + .vfs + .write() + .change_file_overlay(path.as_path(), Some(text)); return Ok(()); } Err(not) => not, diff --git a/crates/ra_vfs/Cargo.toml b/crates/ra_vfs/Cargo.toml index e637063c9..f7a972e91 100644 --- a/crates/ra_vfs/Cargo.toml +++ b/crates/ra_vfs/Cargo.toml @@ -10,6 +10,8 @@ relative-path = "0.4.0" rustc-hash = "1.0" crossbeam-channel = "0.3.5" log = "0.4.6" +notify = "4" +drop_bomb = "0.1.0" thread_worker = { path = "../thread_worker" } ra_arena = { path = "../ra_arena" } diff --git a/crates/ra_vfs/src/lib.rs b/crates/ra_vfs/src/lib.rs index cdea18d73..5336822b3 100644 --- a/crates/ra_vfs/src/lib.rs +++ b/crates/ra_vfs/src/lib.rs @@ -14,26 +14,26 @@ //! which are watched for changes. Typically, there will be a root for each //! Cargo package. mod io; +mod watcher; use std::{ - fmt, - mem, - thread, cmp::Reverse, - path::{Path, PathBuf}, ffi::OsStr, + fmt, fs, mem, + path::{Path, PathBuf}, sync::Arc, - fs, + thread, }; -use rustc_hash::{FxHashMap, FxHashSet}; -use relative_path::RelativePathBuf; use crossbeam_channel::Receiver; -use walkdir::DirEntry; +use ra_arena::{impl_arena_id, Arena, RawId}; +use relative_path::RelativePathBuf; +use rustc_hash::{FxHashMap, FxHashSet}; use thread_worker::WorkerHandle; -use ra_arena::{Arena, RawId, impl_arena_id}; +use walkdir::DirEntry; pub use crate::io::TaskResult as VfsTask; +pub use crate::watcher::{Watcher, WatcherChange}; /// `RootFilter` is a predicate that checks if a file can belong to a root. If /// several filters match a file (nested dirs), the most nested one wins. @@ -85,6 +85,7 @@ pub struct Vfs { pending_changes: Vec, worker: io::Worker, worker_handle: WorkerHandle, + watcher: Watcher, } impl fmt::Debug for Vfs { @@ -97,12 +98,15 @@ impl Vfs { pub fn new(mut roots: Vec) -> (Vfs, Vec) { let (worker, worker_handle) = io::start(); + let watcher = Watcher::new().unwrap(); // TODO return Result? + let mut res = Vfs { roots: Arena::default(), files: Arena::default(), root2files: FxHashMap::default(), worker, worker_handle, + watcher, pending_changes: Vec::new(), }; @@ -129,6 +133,7 @@ impl Vfs { filter: Box::new(filter), }; res.worker.inp.send(task).unwrap(); + res.watcher.watch(path).unwrap(); } let roots = res.roots.iter().map(|(id, _)| id).collect(); (res, roots) @@ -183,6 +188,10 @@ impl Vfs { &self.worker.out } + pub fn change_receiver(&self) -> &Receiver { + &self.watcher.change_receiver() + } + pub fn handle_task(&mut self, task: io::TaskResult) { let mut files = Vec::new(); // While we were scanning the root in the backgound, a file might have @@ -209,22 +218,41 @@ impl Vfs { self.pending_changes.push(change); } - pub fn add_file_overlay(&mut self, path: &Path, text: String) -> Option { + pub fn handle_change(&mut self, change: WatcherChange) { + match change { + WatcherChange::Create(path) => { + self.add_file_overlay(&path, None); + } + WatcherChange::Remove(path) => { + self.remove_file_overlay(&path); + } + WatcherChange::Rename(src, dst) => { + self.remove_file_overlay(&src); + self.add_file_overlay(&dst, None); + } + WatcherChange::Write(path) => { + self.change_file_overlay(&path, None); + } + } + } + + pub fn add_file_overlay(&mut self, path: &Path, text: Option) -> Option { let mut res = None; - if let Some((root, path, file)) = self.find_root(path) { + if let Some((root, rel_path, file)) = self.find_root(path) { + let text = text.unwrap_or_else(|| fs::read_to_string(&path).unwrap_or_default()); let text = Arc::new(text); let change = if let Some(file) = file { res = Some(file); self.change_file(file, Arc::clone(&text)); VfsChange::ChangeFile { file, text } } else { - let file = self.add_file(root, path.clone(), Arc::clone(&text)); + let file = self.add_file(root, rel_path.clone(), Arc::clone(&text)); res = Some(file); VfsChange::AddFile { file, text, root, - path, + path: rel_path, } }; self.pending_changes.push(change); @@ -232,8 +260,10 @@ impl Vfs { res } - pub fn change_file_overlay(&mut self, path: &Path, new_text: String) { + pub fn change_file_overlay(&mut self, path: &Path, new_text: Option) { if let Some((_root, _path, file)) = self.find_root(path) { + let new_text = + new_text.unwrap_or_else(|| fs::read_to_string(&path).unwrap_or_default()); let file = file.expect("can't change a file which wasn't added"); let text = Arc::new(new_text); self.change_file(file, Arc::clone(&text)); @@ -267,6 +297,7 @@ impl Vfs { /// Sutdown the VFS and terminate the background watching thread. pub fn shutdown(self) -> thread::Result<()> { + let _ = self.watcher.shutdown(); let _ = self.worker.shutdown(); self.worker_handle.shutdown() } diff --git a/crates/ra_vfs/src/watcher.rs b/crates/ra_vfs/src/watcher.rs new file mode 100644 index 000000000..cc05f949e --- /dev/null +++ b/crates/ra_vfs/src/watcher.rs @@ -0,0 +1,96 @@ +use std::{ + path::{Path, PathBuf}, + sync::mpsc, + thread, + time::Duration, +}; + +use crossbeam_channel::Receiver; +use drop_bomb::DropBomb; +use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher}; + +pub struct Watcher { + receiver: Receiver, + watcher: RecommendedWatcher, + thread: thread::JoinHandle<()>, + bomb: DropBomb, +} + +#[derive(Debug)] +pub enum WatcherChange { + Create(PathBuf), + Write(PathBuf), + Remove(PathBuf), + Rename(PathBuf, PathBuf), +} + +impl WatcherChange { + fn from_debounced_event(ev: DebouncedEvent) -> Option { + match ev { + DebouncedEvent::NoticeWrite(_) + | DebouncedEvent::NoticeRemove(_) + | DebouncedEvent::Chmod(_) + | DebouncedEvent::Rescan => { + // ignore + None + } + DebouncedEvent::Create(path) => Some(WatcherChange::Create(path)), + DebouncedEvent::Write(path) => Some(WatcherChange::Write(path)), + DebouncedEvent::Remove(path) => Some(WatcherChange::Remove(path)), + DebouncedEvent::Rename(src, dst) => Some(WatcherChange::Rename(src, dst)), + DebouncedEvent::Error(err, path) => { + // TODO + log::warn!("watch error {}, {:?}", err, path); + None + } + } + } +} + +impl Watcher { + pub fn new() -> Result> { + let (input_sender, input_receiver) = mpsc::channel(); + let watcher = notify::watcher(input_sender, Duration::from_millis(250))?; + let (output_sender, output_receiver) = crossbeam_channel::unbounded(); + let thread = thread::spawn(move || loop { + match input_receiver.recv() { + Ok(ev) => { + // forward relevant events only + if let Some(change) = WatcherChange::from_debounced_event(ev) { + output_sender.send(change).unwrap(); + } + } + Err(err) => { + log::debug!("Watcher stopped ({})", err); + break; + } + } + }); + Ok(Watcher { + receiver: output_receiver, + watcher, + thread, + bomb: DropBomb::new(format!("Watcher was not shutdown")), + }) + } + + pub fn watch(&mut self, root: impl AsRef) -> Result<(), Box> { + self.watcher.watch(root, RecursiveMode::Recursive)?; + Ok(()) + } + + pub fn change_receiver(&self) -> &Receiver { + &self.receiver + } + + pub fn shutdown(mut self) -> thread::Result<()> { + self.bomb.defuse(); + drop(self.watcher); + let res = self.thread.join(); + match &res { + Ok(()) => log::info!("... Watcher terminated with ok"), + Err(_) => log::error!("... Watcher terminated with err"), + } + res + } +} -- cgit v1.2.3