From 277e0f1baa21b8f3e5b040b78ce2bd6beca6cd7c Mon Sep 17 00:00:00 2001 From: Bernardo Date: Mon, 21 Jan 2019 18:59:54 +0100 Subject: move watcher to io module --- crates/ra_vfs/src/io/mod.rs | 212 ++++++++++++++++++++++++++++++++++++++++ crates/ra_vfs/src/io/watcher.rs | 128 ++++++++++++++++++++++++ 2 files changed, 340 insertions(+) create mode 100644 crates/ra_vfs/src/io/mod.rs create mode 100644 crates/ra_vfs/src/io/watcher.rs (limited to 'crates/ra_vfs/src/io') diff --git a/crates/ra_vfs/src/io/mod.rs b/crates/ra_vfs/src/io/mod.rs new file mode 100644 index 000000000..6d5af7690 --- /dev/null +++ b/crates/ra_vfs/src/io/mod.rs @@ -0,0 +1,212 @@ +use std::{ + fmt, fs, + path::{Path, PathBuf}, + sync::Arc, + thread, +}; + +use crossbeam_channel::{Receiver, Sender}; +use parking_lot::Mutex; +use relative_path::RelativePathBuf; +use thread_worker::WorkerHandle; +use walkdir::{DirEntry, WalkDir}; + +mod watcher; +use watcher::Watcher; +pub use watcher::WatcherChange; + +use crate::VfsRoot; + +pub(crate) enum Task { + AddRoot { + root: VfsRoot, + path: PathBuf, + filter: Box bool + Send>, + }, + /// this variant should only be created by the watcher + HandleChange(WatcherChange), + LoadChange(WatcherChange), + Watch { + dir: PathBuf, + filter: Box bool + Send>, + }, +} + +#[derive(Debug)] +pub struct AddRootResult { + pub(crate) root: VfsRoot, + pub(crate) files: Vec<(RelativePathBuf, String)>, +} + +#[derive(Debug)] +pub enum WatcherChangeData { + Create { path: PathBuf, text: String }, + Write { path: PathBuf, text: String }, + Remove { path: PathBuf }, +} + +pub enum TaskResult { + AddRoot(AddRootResult), + HandleChange(WatcherChange), + LoadChange(WatcherChangeData), + NoOp, +} + +impl fmt::Debug for TaskResult { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str("TaskResult { ... }") + } +} + +pub(crate) struct Worker { + worker: thread_worker::Worker, + worker_handle: WorkerHandle, + watcher: Arc>>, +} + +impl Worker { + pub(crate) fn start() -> Worker { + let watcher = Arc::new(Mutex::new(None)); + let watcher_clone = watcher.clone(); + let (worker, worker_handle) = + thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| { + input_receiver + .into_iter() + .map(|t| handle_task(t, &watcher_clone)) + .try_for_each(|it| output_sender.send(it)) + .unwrap() + }); + match Watcher::start(worker.inp.clone()) { + Ok(w) => { + watcher.lock().replace(w); + } + Err(e) => log::error!("could not start watcher: {}", e), + }; + Worker { + worker, + worker_handle, + watcher, + } + } + + pub(crate) fn sender(&self) -> &Sender { + &self.worker.inp + } + + pub(crate) fn receiver(&self) -> &Receiver { + &self.worker.out + } + + pub(crate) fn shutdown(self) -> thread::Result<()> { + if let Some(watcher) = self.watcher.lock().take() { + let _ = watcher.shutdown(); + } + let _ = self.worker.shutdown(); + self.worker_handle.shutdown() + } +} + +fn watch( + watcher: &Arc>>, + dir: &Path, + filter_entry: impl Fn(&DirEntry) -> bool, + emit_for_existing: bool, +) { + let mut watcher = watcher.lock(); + let watcher = match *watcher { + Some(ref mut w) => w, + None => { + // watcher dropped or couldn't start + return; + } + }; + watcher.watch_recursive(dir, filter_entry, emit_for_existing) +} + +fn handle_task(task: Task, watcher: &Arc>>) -> TaskResult { + match task { + Task::AddRoot { root, path, filter } => { + watch(watcher, &path, &*filter, false); + log::debug!("loading {} ...", path.as_path().display()); + let files = load_root(path.as_path(), &*filter); + log::debug!("... loaded {}", path.as_path().display()); + TaskResult::AddRoot(AddRootResult { root, files }) + } + Task::HandleChange(change) => { + // forward as is because Vfs has to decide if we should load it + TaskResult::HandleChange(change) + } + Task::LoadChange(change) => { + log::debug!("loading {:?} ...", change); + match load_change(change) { + Some(data) => TaskResult::LoadChange(data), + None => TaskResult::NoOp, + } + } + Task::Watch { dir, filter } => { + watch(watcher, &dir, &*filter, true); + TaskResult::NoOp + } + } +} + +fn load_root(root: &Path, filter: &dyn Fn(&DirEntry) -> bool) -> Vec<(RelativePathBuf, String)> { + let mut res = Vec::new(); + for entry in WalkDir::new(root).into_iter().filter_entry(filter) { + let entry = match entry { + Ok(entry) => entry, + Err(e) => { + log::warn!("watcher error: {}", e); + continue; + } + }; + if !entry.file_type().is_file() { + continue; + } + let path = entry.path(); + let text = match fs::read_to_string(path) { + Ok(text) => text, + Err(e) => { + log::warn!("watcher error: {}", e); + continue; + } + }; + let path = RelativePathBuf::from_path(path.strip_prefix(root).unwrap()).unwrap(); + res.push((path.to_owned(), text)) + } + res +} + +fn load_change(change: WatcherChange) -> Option { + let data = match change { + WatcherChange::Create(path) => { + if path.is_dir() { + return None; + } + let text = match fs::read_to_string(&path) { + Ok(text) => text, + Err(e) => { + log::warn!("watcher error \"{}\": {}", path.display(), e); + return None; + } + }; + WatcherChangeData::Create { path, text } + } + WatcherChange::Write(path) => { + let text = match fs::read_to_string(&path) { + Ok(text) => text, + Err(e) => { + log::warn!("watcher error \"{}\": {}", path.display(), e); + return None; + } + }; + WatcherChangeData::Write { path, text } + } + WatcherChange::Remove(path) => WatcherChangeData::Remove { path }, + WatcherChange::Rescan => { + // this should be handled by Vfs::handle_task + return None; + } + }; + Some(data) +} diff --git a/crates/ra_vfs/src/io/watcher.rs b/crates/ra_vfs/src/io/watcher.rs new file mode 100644 index 000000000..e33298477 --- /dev/null +++ b/crates/ra_vfs/src/io/watcher.rs @@ -0,0 +1,128 @@ +use crate::io; +use crossbeam_channel::Sender; +use drop_bomb::DropBomb; +use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher}; +use std::{ + path::{Path, PathBuf}, + sync::mpsc, + thread, + time::Duration, +}; +use walkdir::{DirEntry, WalkDir}; + +#[derive(Debug)] +pub enum WatcherChange { + Create(PathBuf), + Write(PathBuf), + Remove(PathBuf), + Rescan, +} + +fn handle_change_event( + ev: DebouncedEvent, + sender: &Sender, +) -> Result<(), Box> { + match ev { + DebouncedEvent::NoticeWrite(_) + | DebouncedEvent::NoticeRemove(_) + | DebouncedEvent::Chmod(_) => { + // ignore + } + DebouncedEvent::Rescan => { + sender.send(io::Task::HandleChange(WatcherChange::Rescan))?; + } + DebouncedEvent::Create(path) => { + sender.send(io::Task::HandleChange(WatcherChange::Create(path)))?; + } + DebouncedEvent::Write(path) => { + sender.send(io::Task::HandleChange(WatcherChange::Write(path)))?; + } + DebouncedEvent::Remove(path) => { + sender.send(io::Task::HandleChange(WatcherChange::Remove(path)))?; + } + DebouncedEvent::Rename(src, dst) => { + sender.send(io::Task::HandleChange(WatcherChange::Remove(src)))?; + sender.send(io::Task::HandleChange(WatcherChange::Create(dst)))?; + } + DebouncedEvent::Error(err, path) => { + // TODO should we reload the file contents? + log::warn!("watcher error \"{}\", {:?}", err, path); + } + } + Ok(()) +} + +const WATCHER_DELAY: Duration = Duration::from_millis(250); + +pub(crate) struct Watcher { + watcher: RecommendedWatcher, + thread: thread::JoinHandle<()>, + bomb: DropBomb, + sender: Sender, +} + +impl Watcher { + pub(crate) fn start( + output_sender: Sender, + ) -> Result> { + let (input_sender, input_receiver) = mpsc::channel(); + let watcher = notify::watcher(input_sender, WATCHER_DELAY)?; + let sender = output_sender.clone(); + let thread = thread::spawn(move || { + input_receiver + .into_iter() + // forward relevant events only + .try_for_each(|change| handle_change_event(change, &output_sender)) + .unwrap() + }); + Ok(Watcher { + watcher, + thread, + sender, + bomb: DropBomb::new(format!("Watcher was not shutdown")), + }) + } + + pub fn watch_recursive( + &mut self, + dir: &Path, + filter_entry: impl Fn(&DirEntry) -> bool, + emit_for_contents: bool, + ) { + for res in WalkDir::new(dir).into_iter().filter_entry(filter_entry) { + match res { + Ok(entry) => { + if entry.path().is_dir() { + match self.watcher.watch(dir, RecursiveMode::NonRecursive) { + Ok(()) => log::debug!("watching \"{}\"", dir.display()), + Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e), + } + } + if emit_for_contents && entry.depth() > 0 { + // emit as create because we haven't seen it yet + if let Err(e) = + self.sender + .send(io::Task::HandleChange(WatcherChange::Create( + entry.path().to_path_buf(), + ))) + { + log::warn!("watcher error: {}", e) + } + } + } + Err(e) => log::warn!("watcher error: {}", e), + } + } + } + + pub fn shutdown(mut self) -> thread::Result<()> { + self.bomb.defuse(); + drop(self.watcher); + let res = self.thread.join(); + match &res { + Ok(()) => log::info!("... Watcher terminated with ok"), + Err(_) => log::error!("... Watcher terminated with err"), + } + res + } +} -- cgit v1.2.3 From 2a1afad3eda7d8c5635de6e7f524ed943cecc22b Mon Sep 17 00:00:00 2001 From: Bernardo Date: Mon, 21 Jan 2019 19:11:39 +0100 Subject: avoid boxing --- crates/ra_vfs/src/io/mod.rs | 45 ++++++++++++++++++++++++++++++----------- crates/ra_vfs/src/io/watcher.rs | 16 +++++++-------- 2 files changed, 40 insertions(+), 21 deletions(-) (limited to 'crates/ra_vfs/src/io') diff --git a/crates/ra_vfs/src/io/mod.rs b/crates/ra_vfs/src/io/mod.rs index 6d5af7690..daac6c6f2 100644 --- a/crates/ra_vfs/src/io/mod.rs +++ b/crates/ra_vfs/src/io/mod.rs @@ -9,26 +9,27 @@ use crossbeam_channel::{Receiver, Sender}; use parking_lot::Mutex; use relative_path::RelativePathBuf; use thread_worker::WorkerHandle; -use walkdir::{DirEntry, WalkDir}; +use walkdir::WalkDir; mod watcher; use watcher::Watcher; pub use watcher::WatcherChange; -use crate::VfsRoot; +use crate::{RootFilter, VfsRoot}; pub(crate) enum Task { AddRoot { root: VfsRoot, path: PathBuf, - filter: Box bool + Send>, + root_filter: Arc, + nested_roots: Vec, }, /// this variant should only be created by the watcher HandleChange(WatcherChange), LoadChange(WatcherChange), Watch { dir: PathBuf, - filter: Box bool + Send>, + root_filter: Arc, }, } @@ -109,7 +110,7 @@ impl Worker { fn watch( watcher: &Arc>>, dir: &Path, - filter_entry: impl Fn(&DirEntry) -> bool, + filter_entry: &RootFilter, emit_for_existing: bool, ) { let mut watcher = watcher.lock(); @@ -125,10 +126,19 @@ fn watch( fn handle_task(task: Task, watcher: &Arc>>) -> TaskResult { match task { - Task::AddRoot { root, path, filter } => { - watch(watcher, &path, &*filter, false); + Task::AddRoot { + root, + path, + root_filter, + nested_roots, + } => { + watch(watcher, &path, &*root_filter, false); log::debug!("loading {} ...", path.as_path().display()); - let files = load_root(path.as_path(), &*filter); + let files = load_root( + path.as_path(), + root_filter.as_ref(), + nested_roots.as_slice(), + ); log::debug!("... loaded {}", path.as_path().display()); TaskResult::AddRoot(AddRootResult { root, files }) } @@ -143,16 +153,27 @@ fn handle_task(task: Task, watcher: &Arc>>) -> TaskResult None => TaskResult::NoOp, } } - Task::Watch { dir, filter } => { - watch(watcher, &dir, &*filter, true); + Task::Watch { dir, root_filter } => { + watch(watcher, &dir, root_filter.as_ref(), true); TaskResult::NoOp } } } -fn load_root(root: &Path, filter: &dyn Fn(&DirEntry) -> bool) -> Vec<(RelativePathBuf, String)> { +fn load_root( + root: &Path, + root_filter: &RootFilter, + nested_roots: &[PathBuf], +) -> Vec<(RelativePathBuf, String)> { let mut res = Vec::new(); - for entry in WalkDir::new(root).into_iter().filter_entry(filter) { + for entry in WalkDir::new(root).into_iter().filter_entry(|entry| { + if entry.file_type().is_dir() && nested_roots.iter().any(|it| it == entry.path()) { + // do not load files of a nested root + false + } else { + root_filter.can_contain(entry.path()).is_some() + } + }) { let entry = match entry { Ok(entry) => entry, Err(e) => { diff --git a/crates/ra_vfs/src/io/watcher.rs b/crates/ra_vfs/src/io/watcher.rs index e33298477..5e9bc8ff3 100644 --- a/crates/ra_vfs/src/io/watcher.rs +++ b/crates/ra_vfs/src/io/watcher.rs @@ -1,4 +1,4 @@ -use crate::io; +use crate::{io, RootFilter}; use crossbeam_channel::Sender; use drop_bomb::DropBomb; use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher}; @@ -8,7 +8,7 @@ use std::{ thread, time::Duration, }; -use walkdir::{DirEntry, WalkDir}; +use walkdir::WalkDir; #[derive(Debug)] pub enum WatcherChange { @@ -83,13 +83,11 @@ impl Watcher { }) } - pub fn watch_recursive( - &mut self, - dir: &Path, - filter_entry: impl Fn(&DirEntry) -> bool, - emit_for_contents: bool, - ) { - for res in WalkDir::new(dir).into_iter().filter_entry(filter_entry) { + pub fn watch_recursive(&mut self, dir: &Path, filter: &RootFilter, emit_for_contents: bool) { + for res in WalkDir::new(dir) + .into_iter() + .filter_entry(|entry| filter.can_contain(entry.path()).is_some()) + { match res { Ok(entry) => { if entry.path().is_dir() { -- cgit v1.2.3 From eeed6cf53b9f6112329cc8a274dcf63bce887c50 Mon Sep 17 00:00:00 2001 From: Bernardo Date: Mon, 21 Jan 2019 21:48:07 +0100 Subject: fix recursive watch --- crates/ra_vfs/src/io/watcher.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'crates/ra_vfs/src/io') diff --git a/crates/ra_vfs/src/io/watcher.rs b/crates/ra_vfs/src/io/watcher.rs index 5e9bc8ff3..b370c5fbc 100644 --- a/crates/ra_vfs/src/io/watcher.rs +++ b/crates/ra_vfs/src/io/watcher.rs @@ -91,9 +91,14 @@ impl Watcher { match res { Ok(entry) => { if entry.path().is_dir() { - match self.watcher.watch(dir, RecursiveMode::NonRecursive) { - Ok(()) => log::debug!("watching \"{}\"", dir.display()), - Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e), + match self + .watcher + .watch(entry.path(), RecursiveMode::NonRecursive) + { + Ok(()) => log::debug!("watching \"{}\"", entry.path().display()), + Err(e) => { + log::warn!("could not watch \"{}\": {}", entry.path().display(), e) + } } } if emit_for_contents && entry.depth() > 0 { -- cgit v1.2.3 From 0a086508524bed87bb15113437e9c2b1e1be4c42 Mon Sep 17 00:00:00 2001 From: Bernardo Date: Tue, 22 Jan 2019 18:14:31 +0100 Subject: hardcode ".git" and "node_modules" also --- crates/ra_vfs/src/io/mod.rs | 233 -------------------------------------------- 1 file changed, 233 deletions(-) delete mode 100644 crates/ra_vfs/src/io/mod.rs (limited to 'crates/ra_vfs/src/io') diff --git a/crates/ra_vfs/src/io/mod.rs b/crates/ra_vfs/src/io/mod.rs deleted file mode 100644 index daac6c6f2..000000000 --- a/crates/ra_vfs/src/io/mod.rs +++ /dev/null @@ -1,233 +0,0 @@ -use std::{ - fmt, fs, - path::{Path, PathBuf}, - sync::Arc, - thread, -}; - -use crossbeam_channel::{Receiver, Sender}; -use parking_lot::Mutex; -use relative_path::RelativePathBuf; -use thread_worker::WorkerHandle; -use walkdir::WalkDir; - -mod watcher; -use watcher::Watcher; -pub use watcher::WatcherChange; - -use crate::{RootFilter, VfsRoot}; - -pub(crate) enum Task { - AddRoot { - root: VfsRoot, - path: PathBuf, - root_filter: Arc, - nested_roots: Vec, - }, - /// this variant should only be created by the watcher - HandleChange(WatcherChange), - LoadChange(WatcherChange), - Watch { - dir: PathBuf, - root_filter: Arc, - }, -} - -#[derive(Debug)] -pub struct AddRootResult { - pub(crate) root: VfsRoot, - pub(crate) files: Vec<(RelativePathBuf, String)>, -} - -#[derive(Debug)] -pub enum WatcherChangeData { - Create { path: PathBuf, text: String }, - Write { path: PathBuf, text: String }, - Remove { path: PathBuf }, -} - -pub enum TaskResult { - AddRoot(AddRootResult), - HandleChange(WatcherChange), - LoadChange(WatcherChangeData), - NoOp, -} - -impl fmt::Debug for TaskResult { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.write_str("TaskResult { ... }") - } -} - -pub(crate) struct Worker { - worker: thread_worker::Worker, - worker_handle: WorkerHandle, - watcher: Arc>>, -} - -impl Worker { - pub(crate) fn start() -> Worker { - let watcher = Arc::new(Mutex::new(None)); - let watcher_clone = watcher.clone(); - let (worker, worker_handle) = - thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| { - input_receiver - .into_iter() - .map(|t| handle_task(t, &watcher_clone)) - .try_for_each(|it| output_sender.send(it)) - .unwrap() - }); - match Watcher::start(worker.inp.clone()) { - Ok(w) => { - watcher.lock().replace(w); - } - Err(e) => log::error!("could not start watcher: {}", e), - }; - Worker { - worker, - worker_handle, - watcher, - } - } - - pub(crate) fn sender(&self) -> &Sender { - &self.worker.inp - } - - pub(crate) fn receiver(&self) -> &Receiver { - &self.worker.out - } - - pub(crate) fn shutdown(self) -> thread::Result<()> { - if let Some(watcher) = self.watcher.lock().take() { - let _ = watcher.shutdown(); - } - let _ = self.worker.shutdown(); - self.worker_handle.shutdown() - } -} - -fn watch( - watcher: &Arc>>, - dir: &Path, - filter_entry: &RootFilter, - emit_for_existing: bool, -) { - let mut watcher = watcher.lock(); - let watcher = match *watcher { - Some(ref mut w) => w, - None => { - // watcher dropped or couldn't start - return; - } - }; - watcher.watch_recursive(dir, filter_entry, emit_for_existing) -} - -fn handle_task(task: Task, watcher: &Arc>>) -> TaskResult { - match task { - Task::AddRoot { - root, - path, - root_filter, - nested_roots, - } => { - watch(watcher, &path, &*root_filter, false); - log::debug!("loading {} ...", path.as_path().display()); - let files = load_root( - path.as_path(), - root_filter.as_ref(), - nested_roots.as_slice(), - ); - log::debug!("... loaded {}", path.as_path().display()); - TaskResult::AddRoot(AddRootResult { root, files }) - } - Task::HandleChange(change) => { - // forward as is because Vfs has to decide if we should load it - TaskResult::HandleChange(change) - } - Task::LoadChange(change) => { - log::debug!("loading {:?} ...", change); - match load_change(change) { - Some(data) => TaskResult::LoadChange(data), - None => TaskResult::NoOp, - } - } - Task::Watch { dir, root_filter } => { - watch(watcher, &dir, root_filter.as_ref(), true); - TaskResult::NoOp - } - } -} - -fn load_root( - root: &Path, - root_filter: &RootFilter, - nested_roots: &[PathBuf], -) -> Vec<(RelativePathBuf, String)> { - let mut res = Vec::new(); - for entry in WalkDir::new(root).into_iter().filter_entry(|entry| { - if entry.file_type().is_dir() && nested_roots.iter().any(|it| it == entry.path()) { - // do not load files of a nested root - false - } else { - root_filter.can_contain(entry.path()).is_some() - } - }) { - let entry = match entry { - Ok(entry) => entry, - Err(e) => { - log::warn!("watcher error: {}", e); - continue; - } - }; - if !entry.file_type().is_file() { - continue; - } - let path = entry.path(); - let text = match fs::read_to_string(path) { - Ok(text) => text, - Err(e) => { - log::warn!("watcher error: {}", e); - continue; - } - }; - let path = RelativePathBuf::from_path(path.strip_prefix(root).unwrap()).unwrap(); - res.push((path.to_owned(), text)) - } - res -} - -fn load_change(change: WatcherChange) -> Option { - let data = match change { - WatcherChange::Create(path) => { - if path.is_dir() { - return None; - } - let text = match fs::read_to_string(&path) { - Ok(text) => text, - Err(e) => { - log::warn!("watcher error \"{}\": {}", path.display(), e); - return None; - } - }; - WatcherChangeData::Create { path, text } - } - WatcherChange::Write(path) => { - let text = match fs::read_to_string(&path) { - Ok(text) => text, - Err(e) => { - log::warn!("watcher error \"{}\": {}", path.display(), e); - return None; - } - }; - WatcherChangeData::Write { path, text } - } - WatcherChange::Remove(path) => WatcherChangeData::Remove { path }, - WatcherChange::Rescan => { - // this should be handled by Vfs::handle_task - return None; - } - }; - Some(data) -} -- cgit v1.2.3 From be14ab217ce29542a8b2c84282e822adcc69646c Mon Sep 17 00:00:00 2001 From: Bernardo Date: Tue, 22 Jan 2019 18:38:34 +0100 Subject: better test, avoid duplicated events --- crates/ra_vfs/src/io/watcher.rs | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) (limited to 'crates/ra_vfs/src/io') diff --git a/crates/ra_vfs/src/io/watcher.rs b/crates/ra_vfs/src/io/watcher.rs index b370c5fbc..68bb6b692 100644 --- a/crates/ra_vfs/src/io/watcher.rs +++ b/crates/ra_vfs/src/io/watcher.rs @@ -100,16 +100,18 @@ impl Watcher { log::warn!("could not watch \"{}\": {}", entry.path().display(), e) } } - } - if emit_for_contents && entry.depth() > 0 { - // emit as create because we haven't seen it yet - if let Err(e) = - self.sender - .send(io::Task::HandleChange(WatcherChange::Create( - entry.path().to_path_buf(), - ))) - { - log::warn!("watcher error: {}", e) + } else { + if emit_for_contents && entry.depth() > 0 { + // emit only for files otherwise we will cause watch_recursive to be called again with a dir that we are already watching + // emit as create because we haven't seen it yet + if let Err(e) = + self.sender + .send(io::Task::HandleChange(WatcherChange::Create( + entry.path().to_path_buf(), + ))) + { + log::warn!("watcher error: {}", e) + } } } } -- cgit v1.2.3 From d63e1cebff771621b90bdce25ac013eecb415e1e Mon Sep 17 00:00:00 2001 From: Bernardo Date: Fri, 25 Jan 2019 18:39:35 +0100 Subject: use `Roots` in watcher --- crates/ra_vfs/src/io/watcher.rs | 215 ++++++++++++++++++++++++++-------------- 1 file changed, 141 insertions(+), 74 deletions(-) (limited to 'crates/ra_vfs/src/io') diff --git a/crates/ra_vfs/src/io/watcher.rs b/crates/ra_vfs/src/io/watcher.rs index 68bb6b692..1d7ce2136 100644 --- a/crates/ra_vfs/src/io/watcher.rs +++ b/crates/ra_vfs/src/io/watcher.rs @@ -1,118 +1,72 @@ -use crate::{io, RootFilter}; +use crate::{io, RootFilter, Roots, VfsRoot}; use crossbeam_channel::Sender; use drop_bomb::DropBomb; use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher}; +use parking_lot::Mutex; use std::{ + fs, path::{Path, PathBuf}, - sync::mpsc, + sync::{mpsc, Arc}, thread, time::Duration, }; use walkdir::WalkDir; #[derive(Debug)] -pub enum WatcherChange { - Create(PathBuf), - Write(PathBuf), - Remove(PathBuf), - Rescan, -} - -fn handle_change_event( - ev: DebouncedEvent, - sender: &Sender, -) -> Result<(), Box> { - match ev { - DebouncedEvent::NoticeWrite(_) - | DebouncedEvent::NoticeRemove(_) - | DebouncedEvent::Chmod(_) => { - // ignore - } - DebouncedEvent::Rescan => { - sender.send(io::Task::HandleChange(WatcherChange::Rescan))?; - } - DebouncedEvent::Create(path) => { - sender.send(io::Task::HandleChange(WatcherChange::Create(path)))?; - } - DebouncedEvent::Write(path) => { - sender.send(io::Task::HandleChange(WatcherChange::Write(path)))?; - } - DebouncedEvent::Remove(path) => { - sender.send(io::Task::HandleChange(WatcherChange::Remove(path)))?; - } - DebouncedEvent::Rename(src, dst) => { - sender.send(io::Task::HandleChange(WatcherChange::Remove(src)))?; - sender.send(io::Task::HandleChange(WatcherChange::Create(dst)))?; - } - DebouncedEvent::Error(err, path) => { - // TODO should we reload the file contents? - log::warn!("watcher error \"{}\", {:?}", err, path); - } - } - Ok(()) +enum ChangeKind { + Create, + Write, + Remove, } const WATCHER_DELAY: Duration = Duration::from_millis(250); pub(crate) struct Watcher { - watcher: RecommendedWatcher, thread: thread::JoinHandle<()>, bomb: DropBomb, - sender: Sender, + watcher: Arc>>, } impl Watcher { pub(crate) fn start( - output_sender: Sender, + roots: Arc, + output_sender: Sender, ) -> Result> { let (input_sender, input_receiver) = mpsc::channel(); - let watcher = notify::watcher(input_sender, WATCHER_DELAY)?; + let watcher = Arc::new(Mutex::new(Some(notify::watcher( + input_sender, + WATCHER_DELAY, + )?))); let sender = output_sender.clone(); + let watcher_clone = watcher.clone(); let thread = thread::spawn(move || { + let worker = WatcherWorker { + roots, + watcher: watcher_clone, + sender, + }; input_receiver .into_iter() // forward relevant events only - .try_for_each(|change| handle_change_event(change, &output_sender)) + .try_for_each(|change| worker.handle_debounced_event(change)) .unwrap() }); Ok(Watcher { - watcher, thread, - sender, + watcher, bomb: DropBomb::new(format!("Watcher was not shutdown")), }) } - pub fn watch_recursive(&mut self, dir: &Path, filter: &RootFilter, emit_for_contents: bool) { - for res in WalkDir::new(dir) + pub fn watch_root(&mut self, filter: &RootFilter) { + for res in WalkDir::new(&filter.root) .into_iter() - .filter_entry(|entry| filter.can_contain(entry.path()).is_some()) + .filter_entry(filter.entry_filter()) { match res { Ok(entry) => { if entry.path().is_dir() { - match self - .watcher - .watch(entry.path(), RecursiveMode::NonRecursive) - { - Ok(()) => log::debug!("watching \"{}\"", entry.path().display()), - Err(e) => { - log::warn!("could not watch \"{}\": {}", entry.path().display(), e) - } - } - } else { - if emit_for_contents && entry.depth() > 0 { - // emit only for files otherwise we will cause watch_recursive to be called again with a dir that we are already watching - // emit as create because we haven't seen it yet - if let Err(e) = - self.sender - .send(io::Task::HandleChange(WatcherChange::Create( - entry.path().to_path_buf(), - ))) - { - log::warn!("watcher error: {}", e) - } - } + watch_one(self.watcher.as_ref(), entry.path()); } } Err(e) => log::warn!("watcher error: {}", e), @@ -122,7 +76,7 @@ impl Watcher { pub fn shutdown(mut self) -> thread::Result<()> { self.bomb.defuse(); - drop(self.watcher); + drop(self.watcher.lock().take()); let res = self.thread.join(); match &res { Ok(()) => log::info!("... Watcher terminated with ok"), @@ -131,3 +85,116 @@ impl Watcher { res } } + +struct WatcherWorker { + watcher: Arc>>, + roots: Arc, + sender: Sender, +} + +impl WatcherWorker { + fn handle_debounced_event(&self, ev: DebouncedEvent) -> Result<(), Box> { + match ev { + DebouncedEvent::NoticeWrite(_) + | DebouncedEvent::NoticeRemove(_) + | DebouncedEvent::Chmod(_) => { + // ignore + } + DebouncedEvent::Rescan => { + // TODO rescan all roots + } + DebouncedEvent::Create(path) => { + self.handle_change(path, ChangeKind::Create); + } + DebouncedEvent::Write(path) => { + self.handle_change(path, ChangeKind::Write); + } + DebouncedEvent::Remove(path) => { + self.handle_change(path, ChangeKind::Remove); + } + DebouncedEvent::Rename(src, dst) => { + self.handle_change(src, ChangeKind::Remove); + self.handle_change(dst, ChangeKind::Create); + } + DebouncedEvent::Error(err, path) => { + // TODO should we reload the file contents? + log::warn!("watcher error \"{}\", {:?}", err, path); + } + } + Ok(()) + } + + fn handle_change(&self, path: PathBuf, kind: ChangeKind) { + if let Err(e) = self.try_handle_change(path, kind) { + log::warn!("watcher error: {}", e) + } + } + + fn try_handle_change( + &self, + path: PathBuf, + kind: ChangeKind, + ) -> Result<(), Box> { + let (root, rel_path) = match self.roots.find(&path) { + Some(x) => x, + None => return Ok(()), + }; + match kind { + ChangeKind::Create => { + if path.is_dir() { + self.watch_recursive(&path, root); + } else { + let text = fs::read_to_string(&path)?; + self.sender.send(io::TaskResult::AddSingleFile { + root, + path: rel_path, + text, + })? + } + } + ChangeKind::Write => { + let text = fs::read_to_string(&path)?; + self.sender.send(io::TaskResult::ChangeSingleFile { + root, + path: rel_path, + text, + })? + } + ChangeKind::Remove => self.sender.send(io::TaskResult::RemoveSingleFile { + root, + path: rel_path, + })?, + } + Ok(()) + } + + fn watch_recursive(&self, dir: &Path, root: VfsRoot) { + let filter = &self.roots[root]; + for res in WalkDir::new(dir) + .into_iter() + .filter_entry(|entry| filter.can_contain(entry.path()).is_some()) + { + match res { + Ok(entry) => { + if entry.path().is_dir() { + watch_one(self.watcher.as_ref(), entry.path()); + } else { + // emit only for files otherwise we will cause watch_recursive to be called again with a dir that we are already watching + // emit as create because we haven't seen it yet + self.handle_change(entry.path().to_path_buf(), ChangeKind::Create); + } + } + Err(e) => log::warn!("watcher error: {}", e), + } + } + } +} + +fn watch_one(watcher: &Mutex>, dir: &Path) { + if let Some(watcher) = watcher.lock().as_mut() { + match watcher.watch(dir, RecursiveMode::NonRecursive) { + Ok(()) => log::debug!("watching \"{}\"", dir.display()), + Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e), + } + } +} -- cgit v1.2.3 From 410a3ae6e847b59f9930ce4d6bf9f3c5f1d72167 Mon Sep 17 00:00:00 2001 From: Bernardo Date: Fri, 25 Jan 2019 22:13:55 +0100 Subject: use entry file_type, improve test --- crates/ra_vfs/src/io/watcher.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'crates/ra_vfs/src/io') diff --git a/crates/ra_vfs/src/io/watcher.rs b/crates/ra_vfs/src/io/watcher.rs index 1d7ce2136..ff6775f59 100644 --- a/crates/ra_vfs/src/io/watcher.rs +++ b/crates/ra_vfs/src/io/watcher.rs @@ -65,7 +65,7 @@ impl Watcher { { match res { Ok(entry) => { - if entry.path().is_dir() { + if entry.file_type().is_dir() { watch_one(self.watcher.as_ref(), entry.path()); } } @@ -172,11 +172,11 @@ impl WatcherWorker { let filter = &self.roots[root]; for res in WalkDir::new(dir) .into_iter() - .filter_entry(|entry| filter.can_contain(entry.path()).is_some()) + .filter_entry(filter.entry_filter()) { match res { Ok(entry) => { - if entry.path().is_dir() { + if entry.file_type().is_dir() { watch_one(self.watcher.as_ref(), entry.path()); } else { // emit only for files otherwise we will cause watch_recursive to be called again with a dir that we are already watching -- cgit v1.2.3