From 277e0f1baa21b8f3e5b040b78ce2bd6beca6cd7c Mon Sep 17 00:00:00 2001 From: Bernardo Date: Mon, 21 Jan 2019 18:59:54 +0100 Subject: move watcher to io module --- crates/ra_vfs/src/io.rs | 210 --------------------------------------- crates/ra_vfs/src/io/mod.rs | 212 ++++++++++++++++++++++++++++++++++++++++ crates/ra_vfs/src/io/watcher.rs | 128 ++++++++++++++++++++++++ crates/ra_vfs/src/lib.rs | 36 +++---- crates/ra_vfs/src/watcher.rs | 128 ------------------------ 5 files changed, 356 insertions(+), 358 deletions(-) delete mode 100644 crates/ra_vfs/src/io.rs create mode 100644 crates/ra_vfs/src/io/mod.rs create mode 100644 crates/ra_vfs/src/io/watcher.rs delete mode 100644 crates/ra_vfs/src/watcher.rs diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs deleted file mode 100644 index a74222c02..000000000 --- a/crates/ra_vfs/src/io.rs +++ /dev/null @@ -1,210 +0,0 @@ -use std::{ - fmt, fs, - path::{Path, PathBuf}, - sync::Arc, - thread, -}; - -use crossbeam_channel::{Receiver, Sender}; -use parking_lot::Mutex; -use relative_path::RelativePathBuf; -use thread_worker::WorkerHandle; -use walkdir::{DirEntry, WalkDir}; - -use crate::{ - watcher::{Watcher, WatcherChange}, - VfsRoot, -}; - -pub(crate) enum Task { - AddRoot { - root: VfsRoot, - path: PathBuf, - filter: Box bool + Send>, - }, - HandleChange(WatcherChange), - LoadChange(WatcherChange), - Watch { - dir: PathBuf, - filter: Box bool + Send>, - }, -} - -#[derive(Debug)] -pub struct AddRootResult { - pub(crate) root: VfsRoot, - pub(crate) files: Vec<(RelativePathBuf, String)>, -} - -#[derive(Debug)] -pub enum WatcherChangeData { - Create { path: PathBuf, text: String }, - Write { path: PathBuf, text: String }, - Remove { path: PathBuf }, -} - -pub enum TaskResult { - AddRoot(AddRootResult), - HandleChange(WatcherChange), - LoadChange(WatcherChangeData), - NoOp, -} - -impl fmt::Debug for TaskResult { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.write_str("TaskResult { ... }") - } -} - -pub(crate) struct Worker { - worker: thread_worker::Worker, - worker_handle: WorkerHandle, - watcher: Arc>>, -} - -impl Worker { - pub(crate) fn start() -> Worker { - let watcher = Arc::new(Mutex::new(None)); - let watcher_clone = watcher.clone(); - let (worker, worker_handle) = - thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| { - input_receiver - .into_iter() - .map(|t| handle_task(t, &watcher_clone)) - .try_for_each(|it| output_sender.send(it)) - .unwrap() - }); - match Watcher::start(worker.inp.clone()) { - Ok(w) => { - watcher.lock().replace(w); - } - Err(e) => log::error!("could not start watcher: {}", e), - }; - Worker { - worker, - worker_handle, - watcher, - } - } - - pub(crate) fn sender(&self) -> &Sender { - &self.worker.inp - } - - pub(crate) fn receiver(&self) -> &Receiver { - &self.worker.out - } - - pub(crate) fn shutdown(self) -> thread::Result<()> { - if let Some(watcher) = self.watcher.lock().take() { - let _ = watcher.shutdown(); - } - let _ = self.worker.shutdown(); - self.worker_handle.shutdown() - } -} - -fn watch( - watcher: &Arc>>, - dir: &Path, - filter_entry: impl Fn(&DirEntry) -> bool, - emit_for_existing: bool, -) { - let mut watcher = watcher.lock(); - let watcher = match *watcher { - Some(ref mut w) => w, - None => { - // watcher dropped or couldn't start - return; - } - }; - watcher.watch_recursive(dir, filter_entry, emit_for_existing) -} - -fn handle_task(task: Task, watcher: &Arc>>) -> TaskResult { - match task { - Task::AddRoot { root, path, filter } => { - watch(watcher, &path, &*filter, false); - log::debug!("loading {} ...", path.as_path().display()); - let files = load_root(path.as_path(), &*filter); - log::debug!("... loaded {}", path.as_path().display()); - TaskResult::AddRoot(AddRootResult { root, files }) - } - Task::HandleChange(change) => { - // forward as is because Vfs has to decide if we should load it - TaskResult::HandleChange(change) - } - Task::LoadChange(change) => { - log::debug!("loading {:?} ...", change); - match load_change(change) { - Some(data) => TaskResult::LoadChange(data), - None => TaskResult::NoOp, - } - } - Task::Watch { dir, filter } => { - watch(watcher, &dir, &*filter, true); - TaskResult::NoOp - } - } -} - -fn load_root(root: &Path, filter: &dyn Fn(&DirEntry) -> bool) -> Vec<(RelativePathBuf, String)> { - let mut res = Vec::new(); - for entry in WalkDir::new(root).into_iter().filter_entry(filter) { - let entry = match entry { - Ok(entry) => entry, - Err(e) => { - log::warn!("watcher error: {}", e); - continue; - } - }; - if !entry.file_type().is_file() { - continue; - } - let path = entry.path(); - let text = match fs::read_to_string(path) { - Ok(text) => text, - Err(e) => { - log::warn!("watcher error: {}", e); - continue; - } - }; - let path = RelativePathBuf::from_path(path.strip_prefix(root).unwrap()).unwrap(); - res.push((path.to_owned(), text)) - } - res -} - -fn load_change(change: WatcherChange) -> Option { - let data = match change { - WatcherChange::Create(path) => { - if path.is_dir() { - return None; - } - let text = match fs::read_to_string(&path) { - Ok(text) => text, - Err(e) => { - log::warn!("watcher error \"{}\": {}", path.display(), e); - return None; - } - }; - WatcherChangeData::Create { path, text } - } - WatcherChange::Write(path) => { - let text = match fs::read_to_string(&path) { - Ok(text) => text, - Err(e) => { - log::warn!("watcher error \"{}\": {}", path.display(), e); - return None; - } - }; - WatcherChangeData::Write { path, text } - } - WatcherChange::Remove(path) => WatcherChangeData::Remove { path }, - WatcherChange::Rescan => { - // this should be handled by Vfs::handle_task - return None; - } - }; - Some(data) -} diff --git a/crates/ra_vfs/src/io/mod.rs b/crates/ra_vfs/src/io/mod.rs new file mode 100644 index 000000000..6d5af7690 --- /dev/null +++ b/crates/ra_vfs/src/io/mod.rs @@ -0,0 +1,212 @@ +use std::{ + fmt, fs, + path::{Path, PathBuf}, + sync::Arc, + thread, +}; + +use crossbeam_channel::{Receiver, Sender}; +use parking_lot::Mutex; +use relative_path::RelativePathBuf; +use thread_worker::WorkerHandle; +use walkdir::{DirEntry, WalkDir}; + +mod watcher; +use watcher::Watcher; +pub use watcher::WatcherChange; + +use crate::VfsRoot; + +pub(crate) enum Task { + AddRoot { + root: VfsRoot, + path: PathBuf, + filter: Box bool + Send>, + }, + /// this variant should only be created by the watcher + HandleChange(WatcherChange), + LoadChange(WatcherChange), + Watch { + dir: PathBuf, + filter: Box bool + Send>, + }, +} + +#[derive(Debug)] +pub struct AddRootResult { + pub(crate) root: VfsRoot, + pub(crate) files: Vec<(RelativePathBuf, String)>, +} + +#[derive(Debug)] +pub enum WatcherChangeData { + Create { path: PathBuf, text: String }, + Write { path: PathBuf, text: String }, + Remove { path: PathBuf }, +} + +pub enum TaskResult { + AddRoot(AddRootResult), + HandleChange(WatcherChange), + LoadChange(WatcherChangeData), + NoOp, +} + +impl fmt::Debug for TaskResult { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str("TaskResult { ... }") + } +} + +pub(crate) struct Worker { + worker: thread_worker::Worker, + worker_handle: WorkerHandle, + watcher: Arc>>, +} + +impl Worker { + pub(crate) fn start() -> Worker { + let watcher = Arc::new(Mutex::new(None)); + let watcher_clone = watcher.clone(); + let (worker, worker_handle) = + thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| { + input_receiver + .into_iter() + .map(|t| handle_task(t, &watcher_clone)) + .try_for_each(|it| output_sender.send(it)) + .unwrap() + }); + match Watcher::start(worker.inp.clone()) { + Ok(w) => { + watcher.lock().replace(w); + } + Err(e) => log::error!("could not start watcher: {}", e), + }; + Worker { + worker, + worker_handle, + watcher, + } + } + + pub(crate) fn sender(&self) -> &Sender { + &self.worker.inp + } + + pub(crate) fn receiver(&self) -> &Receiver { + &self.worker.out + } + + pub(crate) fn shutdown(self) -> thread::Result<()> { + if let Some(watcher) = self.watcher.lock().take() { + let _ = watcher.shutdown(); + } + let _ = self.worker.shutdown(); + self.worker_handle.shutdown() + } +} + +fn watch( + watcher: &Arc>>, + dir: &Path, + filter_entry: impl Fn(&DirEntry) -> bool, + emit_for_existing: bool, +) { + let mut watcher = watcher.lock(); + let watcher = match *watcher { + Some(ref mut w) => w, + None => { + // watcher dropped or couldn't start + return; + } + }; + watcher.watch_recursive(dir, filter_entry, emit_for_existing) +} + +fn handle_task(task: Task, watcher: &Arc>>) -> TaskResult { + match task { + Task::AddRoot { root, path, filter } => { + watch(watcher, &path, &*filter, false); + log::debug!("loading {} ...", path.as_path().display()); + let files = load_root(path.as_path(), &*filter); + log::debug!("... loaded {}", path.as_path().display()); + TaskResult::AddRoot(AddRootResult { root, files }) + } + Task::HandleChange(change) => { + // forward as is because Vfs has to decide if we should load it + TaskResult::HandleChange(change) + } + Task::LoadChange(change) => { + log::debug!("loading {:?} ...", change); + match load_change(change) { + Some(data) => TaskResult::LoadChange(data), + None => TaskResult::NoOp, + } + } + Task::Watch { dir, filter } => { + watch(watcher, &dir, &*filter, true); + TaskResult::NoOp + } + } +} + +fn load_root(root: &Path, filter: &dyn Fn(&DirEntry) -> bool) -> Vec<(RelativePathBuf, String)> { + let mut res = Vec::new(); + for entry in WalkDir::new(root).into_iter().filter_entry(filter) { + let entry = match entry { + Ok(entry) => entry, + Err(e) => { + log::warn!("watcher error: {}", e); + continue; + } + }; + if !entry.file_type().is_file() { + continue; + } + let path = entry.path(); + let text = match fs::read_to_string(path) { + Ok(text) => text, + Err(e) => { + log::warn!("watcher error: {}", e); + continue; + } + }; + let path = RelativePathBuf::from_path(path.strip_prefix(root).unwrap()).unwrap(); + res.push((path.to_owned(), text)) + } + res +} + +fn load_change(change: WatcherChange) -> Option { + let data = match change { + WatcherChange::Create(path) => { + if path.is_dir() { + return None; + } + let text = match fs::read_to_string(&path) { + Ok(text) => text, + Err(e) => { + log::warn!("watcher error \"{}\": {}", path.display(), e); + return None; + } + }; + WatcherChangeData::Create { path, text } + } + WatcherChange::Write(path) => { + let text = match fs::read_to_string(&path) { + Ok(text) => text, + Err(e) => { + log::warn!("watcher error \"{}\": {}", path.display(), e); + return None; + } + }; + WatcherChangeData::Write { path, text } + } + WatcherChange::Remove(path) => WatcherChangeData::Remove { path }, + WatcherChange::Rescan => { + // this should be handled by Vfs::handle_task + return None; + } + }; + Some(data) +} diff --git a/crates/ra_vfs/src/io/watcher.rs b/crates/ra_vfs/src/io/watcher.rs new file mode 100644 index 000000000..e33298477 --- /dev/null +++ b/crates/ra_vfs/src/io/watcher.rs @@ -0,0 +1,128 @@ +use crate::io; +use crossbeam_channel::Sender; +use drop_bomb::DropBomb; +use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher}; +use std::{ + path::{Path, PathBuf}, + sync::mpsc, + thread, + time::Duration, +}; +use walkdir::{DirEntry, WalkDir}; + +#[derive(Debug)] +pub enum WatcherChange { + Create(PathBuf), + Write(PathBuf), + Remove(PathBuf), + Rescan, +} + +fn handle_change_event( + ev: DebouncedEvent, + sender: &Sender, +) -> Result<(), Box> { + match ev { + DebouncedEvent::NoticeWrite(_) + | DebouncedEvent::NoticeRemove(_) + | DebouncedEvent::Chmod(_) => { + // ignore + } + DebouncedEvent::Rescan => { + sender.send(io::Task::HandleChange(WatcherChange::Rescan))?; + } + DebouncedEvent::Create(path) => { + sender.send(io::Task::HandleChange(WatcherChange::Create(path)))?; + } + DebouncedEvent::Write(path) => { + sender.send(io::Task::HandleChange(WatcherChange::Write(path)))?; + } + DebouncedEvent::Remove(path) => { + sender.send(io::Task::HandleChange(WatcherChange::Remove(path)))?; + } + DebouncedEvent::Rename(src, dst) => { + sender.send(io::Task::HandleChange(WatcherChange::Remove(src)))?; + sender.send(io::Task::HandleChange(WatcherChange::Create(dst)))?; + } + DebouncedEvent::Error(err, path) => { + // TODO should we reload the file contents? + log::warn!("watcher error \"{}\", {:?}", err, path); + } + } + Ok(()) +} + +const WATCHER_DELAY: Duration = Duration::from_millis(250); + +pub(crate) struct Watcher { + watcher: RecommendedWatcher, + thread: thread::JoinHandle<()>, + bomb: DropBomb, + sender: Sender, +} + +impl Watcher { + pub(crate) fn start( + output_sender: Sender, + ) -> Result> { + let (input_sender, input_receiver) = mpsc::channel(); + let watcher = notify::watcher(input_sender, WATCHER_DELAY)?; + let sender = output_sender.clone(); + let thread = thread::spawn(move || { + input_receiver + .into_iter() + // forward relevant events only + .try_for_each(|change| handle_change_event(change, &output_sender)) + .unwrap() + }); + Ok(Watcher { + watcher, + thread, + sender, + bomb: DropBomb::new(format!("Watcher was not shutdown")), + }) + } + + pub fn watch_recursive( + &mut self, + dir: &Path, + filter_entry: impl Fn(&DirEntry) -> bool, + emit_for_contents: bool, + ) { + for res in WalkDir::new(dir).into_iter().filter_entry(filter_entry) { + match res { + Ok(entry) => { + if entry.path().is_dir() { + match self.watcher.watch(dir, RecursiveMode::NonRecursive) { + Ok(()) => log::debug!("watching \"{}\"", dir.display()), + Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e), + } + } + if emit_for_contents && entry.depth() > 0 { + // emit as create because we haven't seen it yet + if let Err(e) = + self.sender + .send(io::Task::HandleChange(WatcherChange::Create( + entry.path().to_path_buf(), + ))) + { + log::warn!("watcher error: {}", e) + } + } + } + Err(e) => log::warn!("watcher error: {}", e), + } + } + } + + pub fn shutdown(mut self) -> thread::Result<()> { + self.bomb.defuse(); + drop(self.watcher); + let res = self.thread.join(); + match &res { + Ok(()) => log::info!("... Watcher terminated with ok"), + Err(_) => log::error!("... Watcher terminated with err"), + } + res + } +} diff --git a/crates/ra_vfs/src/lib.rs b/crates/ra_vfs/src/lib.rs index 196180890..5db0d8646 100644 --- a/crates/ra_vfs/src/lib.rs +++ b/crates/ra_vfs/src/lib.rs @@ -14,7 +14,6 @@ //! which are watched for changes. Typically, there will be a root for each //! Cargo package. mod io; -mod watcher; use std::{ cmp::Reverse, @@ -32,7 +31,7 @@ use rustc_hash::{FxHashMap, FxHashSet}; use walkdir::DirEntry; pub use crate::io::TaskResult as VfsTask; -pub use crate::watcher::WatcherChange; +use io::{Task, TaskResult, WatcherChange, WatcherChangeData, Worker}; /// `RootFilter` is a predicate that checks if a file can belong to a root. If /// several filters match a file (nested dirs), the most nested one wins. @@ -100,7 +99,7 @@ pub struct Vfs { files: Arena, root2files: FxHashMap>, pending_changes: Vec, - worker: io::Worker, + worker: Worker, } impl fmt::Debug for Vfs { @@ -204,7 +203,7 @@ impl Vfs { pub fn handle_task(&mut self, task: io::TaskResult) { match task { - io::TaskResult::AddRoot(task) => { + TaskResult::AddRoot(task) => { let mut files = Vec::new(); // While we were scanning the root in the backgound, a file might have // been open in the editor, so we need to account for that. @@ -229,38 +228,35 @@ impl Vfs { }; self.pending_changes.push(change); } - io::TaskResult::HandleChange(change) => match &change { - watcher::WatcherChange::Create(path) if path.is_dir() => { + TaskResult::HandleChange(change) => match &change { + WatcherChange::Create(path) if path.is_dir() => { if let Some((root, _path, _file)) = self.find_root(&path) { let root_filter = self.roots[root].clone(); let filter = move |entry: &DirEntry| root_filter.can_contain(entry.path()).is_some(); self.worker .sender() - .send(io::Task::Watch { + .send(Task::Watch { dir: path.to_path_buf(), filter: Box::new(filter), }) .unwrap() } } - watcher::WatcherChange::Create(path) - | watcher::WatcherChange::Remove(path) - | watcher::WatcherChange::Write(path) => { + WatcherChange::Create(path) + | WatcherChange::Remove(path) + | WatcherChange::Write(path) => { if self.should_handle_change(&path) { - self.worker - .sender() - .send(io::Task::LoadChange(change)) - .unwrap() + self.worker.sender().send(Task::LoadChange(change)).unwrap() } } - watcher::WatcherChange::Rescan => { + WatcherChange::Rescan => { // TODO we should reload all files } }, - io::TaskResult::LoadChange(change) => match change { - io::WatcherChangeData::Create { path, text } - | io::WatcherChangeData::Write { path, text } => { + TaskResult::LoadChange(change) => match change { + WatcherChangeData::Create { path, text } + | WatcherChangeData::Write { path, text } => { if let Some((root, path, file)) = self.find_root(&path) { if let Some(file) = file { self.do_change_file(file, text, false); @@ -269,7 +265,7 @@ impl Vfs { } } } - io::WatcherChangeData::Remove { path } => { + WatcherChangeData::Remove { path } => { if let Some((root, path, file)) = self.find_root(&path) { if let Some(file) = file { self.do_remove_file(root, path, file, false); @@ -277,7 +273,7 @@ impl Vfs { } } }, - io::TaskResult::NoOp => {} + TaskResult::NoOp => {} } } diff --git a/crates/ra_vfs/src/watcher.rs b/crates/ra_vfs/src/watcher.rs deleted file mode 100644 index 606935891..000000000 --- a/crates/ra_vfs/src/watcher.rs +++ /dev/null @@ -1,128 +0,0 @@ -use crate::io; -use crossbeam_channel::Sender; -use drop_bomb::DropBomb; -use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher}; -use std::{ - path::{Path, PathBuf}, - sync::mpsc, - thread, - time::Duration, -}; -use walkdir::{DirEntry, WalkDir}; - -pub(crate) struct Watcher { - watcher: RecommendedWatcher, - thread: thread::JoinHandle<()>, - bomb: DropBomb, - sender: Sender, -} - -#[derive(Debug)] -pub enum WatcherChange { - Create(PathBuf), - Write(PathBuf), - Remove(PathBuf), - Rescan, -} - -fn handle_change_event( - ev: DebouncedEvent, - sender: &Sender, -) -> Result<(), Box> { - match ev { - DebouncedEvent::NoticeWrite(_) - | DebouncedEvent::NoticeRemove(_) - | DebouncedEvent::Chmod(_) => { - // ignore - } - DebouncedEvent::Rescan => { - sender.send(io::Task::HandleChange(WatcherChange::Rescan))?; - } - DebouncedEvent::Create(path) => { - sender.send(io::Task::HandleChange(WatcherChange::Create(path)))?; - } - DebouncedEvent::Write(path) => { - sender.send(io::Task::HandleChange(WatcherChange::Write(path)))?; - } - DebouncedEvent::Remove(path) => { - sender.send(io::Task::HandleChange(WatcherChange::Remove(path)))?; - } - DebouncedEvent::Rename(src, dst) => { - sender.send(io::Task::HandleChange(WatcherChange::Remove(src)))?; - sender.send(io::Task::HandleChange(WatcherChange::Create(dst)))?; - } - DebouncedEvent::Error(err, path) => { - // TODO should we reload the file contents? - log::warn!("watcher error \"{}\", {:?}", err, path); - } - } - Ok(()) -} - -const WATCHER_DELAY: Duration = Duration::from_millis(250); - -impl Watcher { - pub(crate) fn start( - output_sender: Sender, - ) -> Result> { - let (input_sender, input_receiver) = mpsc::channel(); - let watcher = notify::watcher(input_sender, WATCHER_DELAY)?; - let sender = output_sender.clone(); - let thread = thread::spawn(move || { - input_receiver - .into_iter() - // forward relevant events only - .try_for_each(|change| handle_change_event(change, &output_sender)) - .unwrap() - }); - Ok(Watcher { - watcher, - thread, - sender, - bomb: DropBomb::new(format!("Watcher was not shutdown")), - }) - } - - pub fn watch_recursive( - &mut self, - dir: &Path, - filter_entry: impl Fn(&DirEntry) -> bool, - emit_for_contents: bool, - ) { - for res in WalkDir::new(dir).into_iter().filter_entry(filter_entry) { - match res { - Ok(entry) => { - if entry.path().is_dir() { - match self.watcher.watch(dir, RecursiveMode::NonRecursive) { - Ok(()) => log::debug!("watching \"{}\"", dir.display()), - Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e), - } - } - if emit_for_contents && entry.depth() > 0 { - // emit as create because we haven't seen it yet - if let Err(e) = - self.sender - .send(io::Task::HandleChange(WatcherChange::Create( - entry.path().to_path_buf(), - ))) - { - log::warn!("watcher error: {}", e) - } - } - } - Err(e) => log::warn!("watcher error: {}", e), - } - } - } - - pub fn shutdown(mut self) -> thread::Result<()> { - self.bomb.defuse(); - drop(self.watcher); - let res = self.thread.join(); - match &res { - Ok(()) => log::info!("... Watcher terminated with ok"), - Err(_) => log::error!("... Watcher terminated with err"), - } - res - } -} -- cgit v1.2.3