From 20d7a431fd6e3e363e698a2e464160640868597b Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Sat, 26 Jan 2019 15:11:47 +0300 Subject: refactor-fvs --- crates/ra_vfs/src/io.rs | 229 ++++++++++++++++++++++++++++++---------- crates/ra_vfs/src/io/watcher.rs | 200 ----------------------------------- crates/ra_vfs/src/lib.rs | 146 ++++++++++++------------- 3 files changed, 242 insertions(+), 333 deletions(-) delete mode 100644 crates/ra_vfs/src/io/watcher.rs diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs index 7ca1e9835..669240488 100644 --- a/crates/ra_vfs/src/io.rs +++ b/crates/ra_vfs/src/io.rs @@ -1,19 +1,23 @@ -use std::{fs, sync::Arc, thread}; - -use crossbeam_channel::{Receiver, Sender}; +use std::{ + fs, + path::{Path, PathBuf}, + sync::{mpsc, Arc}, + thread, + time::Duration, +}; +use crossbeam_channel::{Receiver, Sender, SendError}; use relative_path::RelativePathBuf; use thread_worker::WorkerHandle; use walkdir::WalkDir; +use parking_lot::Mutex; +use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; -mod watcher; -use watcher::Watcher; - -use crate::{RootFilter, Roots, VfsRoot}; +use crate::{RootConfig, Roots, VfsRoot}; pub(crate) enum Task { AddRoot { root: VfsRoot, - filter: Arc, + config: Arc, }, } @@ -39,6 +43,15 @@ pub enum TaskResult { }, } +#[derive(Debug)] +enum ChangeKind { + Create, + Write, + Remove, +} + +const WATCHER_DELAY: Duration = Duration::from_millis(250); + pub(crate) struct Worker { worker: thread_worker::Worker, worker_handle: WorkerHandle, @@ -48,21 +61,36 @@ impl Worker { pub(crate) fn start(roots: Arc) -> Worker { let (worker, worker_handle) = thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| { - let mut watcher = match Watcher::start(roots, output_sender.clone()) { - Ok(w) => Some(w), - Err(e) => { - log::error!("could not start watcher: {}", e); - None - } + let (notify_sender, notify_receiver) = mpsc::channel(); + let watcher = notify::watcher(notify_sender, WATCHER_DELAY) + .map_err(|e| log::error!("failed to spawn notify {}", e)) + .ok(); + let ctx = WatcherCtx { + roots, + watcher: Arc::new(Mutex::new(watcher)), + sender: output_sender, }; - let res = input_receiver - .into_iter() - .filter_map(|t| handle_task(t, &mut watcher)) - .try_for_each(|it| output_sender.send(it)); - if let Some(watcher) = watcher { - let _ = watcher.shutdown(); + let thread = thread::spawn({ + let ctx = ctx.clone(); + move || { + let _ = notify_receiver + .into_iter() + // forward relevant events only + .try_for_each(|change| ctx.handle_debounced_event(change)); + } + }); + let res1 = input_receiver.into_iter().try_for_each(|t| match t { + Task::AddRoot { root, config } => watch_root(&ctx, root, Arc::clone(&config)), + }); + drop(ctx.watcher.lock().take()); + drop(ctx); + let res2 = thread.join(); + match &res2 { + Ok(()) => log::info!("... Watcher terminated with ok"), + Err(_) => log::error!("... Watcher terminated with err"), } - res.unwrap() + res1.unwrap(); + res2.unwrap(); }); Worker { worker, @@ -84,46 +112,141 @@ impl Worker { } } -fn handle_task(task: Task, watcher: &mut Option) -> Option { - match task { - Task::AddRoot { root, filter } => { - if let Some(watcher) = watcher { - watcher.watch_root(&filter) +fn watch_root( + woker: &WatcherCtx, + root: VfsRoot, + config: Arc, +) -> Result<(), SendError> { + let mut guard = woker.watcher.lock(); + log::debug!("loading {} ...", config.root.as_path().display()); + let files = watch_recursive(guard.as_mut(), config.root.as_path(), &*config) + .into_iter() + .filter_map(|path| { + let abs_path = path.to_path(&config.root); + let text = fs::read_to_string(abs_path) + .map_err(|e| log::warn!("watcher error: {}", e)) + .ok()?; + Some((path, text)) + }) + .collect(); + woker + .sender + .send(TaskResult::BulkLoadRoot { root, files })?; + log::debug!("... loaded {}", config.root.as_path().display()); + Ok(()) +} + +#[derive(Clone)] +struct WatcherCtx { + roots: Arc, + watcher: Arc>>, + sender: Sender, +} + +impl WatcherCtx { + fn handle_debounced_event(&self, ev: DebouncedEvent) -> Result<(), SendError> { + match ev { + DebouncedEvent::NoticeWrite(_) + | DebouncedEvent::NoticeRemove(_) + | DebouncedEvent::Chmod(_) => { + // ignore + } + DebouncedEvent::Rescan => { + // TODO rescan all roots + } + DebouncedEvent::Create(path) => { + self.handle_change(path, ChangeKind::Create)?; + } + DebouncedEvent::Write(path) => { + self.handle_change(path, ChangeKind::Write)?; } - log::debug!("loading {} ...", filter.root.as_path().display()); - let files = load_root(filter.as_ref()); - log::debug!("... loaded {}", filter.root.as_path().display()); - Some(TaskResult::BulkLoadRoot { root, files }) + DebouncedEvent::Remove(path) => { + self.handle_change(path, ChangeKind::Remove)?; + } + DebouncedEvent::Rename(src, dst) => { + self.handle_change(src, ChangeKind::Remove)?; + self.handle_change(dst, ChangeKind::Create)?; + } + DebouncedEvent::Error(err, path) => { + // TODO should we reload the file contents? + log::warn!("watcher error \"{}\", {:?}", err, path); + } + } + Ok(()) + } + + fn handle_change(&self, path: PathBuf, kind: ChangeKind) -> Result<(), SendError> { + let (root, rel_path) = match self.roots.find(&path) { + None => return Ok(()), + Some(it) => it, + }; + let config = &self.roots[root]; + match kind { + ChangeKind::Create => { + let mut paths = Vec::new(); + if path.is_dir() { + let mut guard = self.watcher.lock(); + paths.extend(watch_recursive(guard.as_mut(), &path, &config)); + } else { + paths.push(rel_path); + } + paths + .into_iter() + .filter_map(|rel_path| { + let abs_path = rel_path.to_path(&config.root); + let text = fs::read_to_string(&abs_path) + .map_err(|e| log::warn!("watcher failed {}", e)) + .ok()?; + Some((rel_path, text)) + }) + .try_for_each(|(path, text)| { + self.sender + .send(TaskResult::AddSingleFile { root, path, text }) + })? + } + ChangeKind::Write => match fs::read_to_string(&path) { + Err(e) => log::warn!("watcher failed {}", e), + Ok(text) => self.sender.send(TaskResult::ChangeSingleFile { + root, + path: rel_path, + text, + })?, + }, + ChangeKind::Remove => self.sender.send(TaskResult::RemoveSingleFile { + root, + path: rel_path, + })?, } + Ok(()) } } -fn load_root(filter: &RootFilter) -> Vec<(RelativePathBuf, String)> { - let mut res = Vec::new(); - for entry in WalkDir::new(&filter.root) +fn watch_recursive( + mut watcher: Option<&mut RecommendedWatcher>, + dir: &Path, + config: &RootConfig, +) -> Vec { + let mut files = Vec::new(); + for entry in WalkDir::new(dir) .into_iter() - .filter_entry(filter.entry_filter()) + .filter_entry(|it| config.contains(it.path()).is_some()) + .filter_map(|it| it.map_err(|e| log::warn!("watcher error: {}", e)).ok()) { - let entry = match entry { - Ok(entry) => entry, - Err(e) => { - log::warn!("watcher error: {}", e); - continue; + if entry.file_type().is_dir() { + if let Some(watcher) = &mut watcher { + watch_one(watcher, entry.path()); } - }; - if !entry.file_type().is_file() { - continue; + } else { + let path = config.contains(entry.path()).unwrap(); + files.push(path.to_owned()); } - let path = entry.path(); - let text = match fs::read_to_string(path) { - Ok(text) => text, - Err(e) => { - log::warn!("watcher error: {}", e); - continue; - } - }; - let path = RelativePathBuf::from_path(path.strip_prefix(&filter.root).unwrap()).unwrap(); - res.push((path.to_owned(), text)) } - res + files +} + +fn watch_one(watcher: &mut RecommendedWatcher, dir: &Path) { + match watcher.watch(dir, RecursiveMode::NonRecursive) { + Ok(()) => log::debug!("watching \"{}\"", dir.display()), + Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e), + } } diff --git a/crates/ra_vfs/src/io/watcher.rs b/crates/ra_vfs/src/io/watcher.rs deleted file mode 100644 index ff6775f59..000000000 --- a/crates/ra_vfs/src/io/watcher.rs +++ /dev/null @@ -1,200 +0,0 @@ -use crate::{io, RootFilter, Roots, VfsRoot}; -use crossbeam_channel::Sender; -use drop_bomb::DropBomb; -use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher}; -use parking_lot::Mutex; -use std::{ - fs, - path::{Path, PathBuf}, - sync::{mpsc, Arc}, - thread, - time::Duration, -}; -use walkdir::WalkDir; - -#[derive(Debug)] -enum ChangeKind { - Create, - Write, - Remove, -} - -const WATCHER_DELAY: Duration = Duration::from_millis(250); - -pub(crate) struct Watcher { - thread: thread::JoinHandle<()>, - bomb: DropBomb, - watcher: Arc>>, -} - -impl Watcher { - pub(crate) fn start( - roots: Arc, - output_sender: Sender, - ) -> Result> { - let (input_sender, input_receiver) = mpsc::channel(); - let watcher = Arc::new(Mutex::new(Some(notify::watcher( - input_sender, - WATCHER_DELAY, - )?))); - let sender = output_sender.clone(); - let watcher_clone = watcher.clone(); - let thread = thread::spawn(move || { - let worker = WatcherWorker { - roots, - watcher: watcher_clone, - sender, - }; - input_receiver - .into_iter() - // forward relevant events only - .try_for_each(|change| worker.handle_debounced_event(change)) - .unwrap() - }); - Ok(Watcher { - thread, - watcher, - bomb: DropBomb::new(format!("Watcher was not shutdown")), - }) - } - - pub fn watch_root(&mut self, filter: &RootFilter) { - for res in WalkDir::new(&filter.root) - .into_iter() - .filter_entry(filter.entry_filter()) - { - match res { - Ok(entry) => { - if entry.file_type().is_dir() { - watch_one(self.watcher.as_ref(), entry.path()); - } - } - Err(e) => log::warn!("watcher error: {}", e), - } - } - } - - pub fn shutdown(mut self) -> thread::Result<()> { - self.bomb.defuse(); - drop(self.watcher.lock().take()); - let res = self.thread.join(); - match &res { - Ok(()) => log::info!("... Watcher terminated with ok"), - Err(_) => log::error!("... Watcher terminated with err"), - } - res - } -} - -struct WatcherWorker { - watcher: Arc>>, - roots: Arc, - sender: Sender, -} - -impl WatcherWorker { - fn handle_debounced_event(&self, ev: DebouncedEvent) -> Result<(), Box> { - match ev { - DebouncedEvent::NoticeWrite(_) - | DebouncedEvent::NoticeRemove(_) - | DebouncedEvent::Chmod(_) => { - // ignore - } - DebouncedEvent::Rescan => { - // TODO rescan all roots - } - DebouncedEvent::Create(path) => { - self.handle_change(path, ChangeKind::Create); - } - DebouncedEvent::Write(path) => { - self.handle_change(path, ChangeKind::Write); - } - DebouncedEvent::Remove(path) => { - self.handle_change(path, ChangeKind::Remove); - } - DebouncedEvent::Rename(src, dst) => { - self.handle_change(src, ChangeKind::Remove); - self.handle_change(dst, ChangeKind::Create); - } - DebouncedEvent::Error(err, path) => { - // TODO should we reload the file contents? - log::warn!("watcher error \"{}\", {:?}", err, path); - } - } - Ok(()) - } - - fn handle_change(&self, path: PathBuf, kind: ChangeKind) { - if let Err(e) = self.try_handle_change(path, kind) { - log::warn!("watcher error: {}", e) - } - } - - fn try_handle_change( - &self, - path: PathBuf, - kind: ChangeKind, - ) -> Result<(), Box> { - let (root, rel_path) = match self.roots.find(&path) { - Some(x) => x, - None => return Ok(()), - }; - match kind { - ChangeKind::Create => { - if path.is_dir() { - self.watch_recursive(&path, root); - } else { - let text = fs::read_to_string(&path)?; - self.sender.send(io::TaskResult::AddSingleFile { - root, - path: rel_path, - text, - })? - } - } - ChangeKind::Write => { - let text = fs::read_to_string(&path)?; - self.sender.send(io::TaskResult::ChangeSingleFile { - root, - path: rel_path, - text, - })? - } - ChangeKind::Remove => self.sender.send(io::TaskResult::RemoveSingleFile { - root, - path: rel_path, - })?, - } - Ok(()) - } - - fn watch_recursive(&self, dir: &Path, root: VfsRoot) { - let filter = &self.roots[root]; - for res in WalkDir::new(dir) - .into_iter() - .filter_entry(filter.entry_filter()) - { - match res { - Ok(entry) => { - if entry.file_type().is_dir() { - watch_one(self.watcher.as_ref(), entry.path()); - } else { - // emit only for files otherwise we will cause watch_recursive to be called again with a dir that we are already watching - // emit as create because we haven't seen it yet - self.handle_change(entry.path().to_path_buf(), ChangeKind::Create); - } - } - Err(e) => log::warn!("watcher error: {}", e), - } - } - } -} - -fn watch_one(watcher: &Mutex>, dir: &Path) { - if let Some(watcher) = watcher.lock().as_mut() { - match watcher.watch(dir, RecursiveMode::NonRecursive) { - Ok(()) => log::debug!("watching \"{}\"", dir.display()), - Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e), - } - } -} diff --git a/crates/ra_vfs/src/lib.rs b/crates/ra_vfs/src/lib.rs index 70a13f765..71a3f807d 100644 --- a/crates/ra_vfs/src/lib.rs +++ b/crates/ra_vfs/src/lib.rs @@ -18,94 +18,78 @@ mod io; use std::{ cmp::Reverse, fmt, fs, mem, - ops::{Deref, DerefMut}, path::{Path, PathBuf}, sync::Arc, thread, }; use crossbeam_channel::Receiver; -use ra_arena::{impl_arena_id, Arena, RawId}; +use ra_arena::{impl_arena_id, Arena, RawId, map::ArenaMap}; use relative_path::{Component, RelativePath, RelativePathBuf}; use rustc_hash::{FxHashMap, FxHashSet}; -use walkdir::DirEntry; pub use crate::io::TaskResult as VfsTask; use io::{TaskResult, Worker}; -/// `RootFilter` is a predicate that checks if a file can belong to a root. If -/// several filters match a file (nested dirs), the most nested one wins. -pub(crate) struct RootFilter { +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct VfsRoot(pub RawId); +impl_arena_id!(VfsRoot); + +/// Describes the contents of a single source root. +/// +/// `RootConfig` can be thought of as a glob pattern like `src/**.rs` whihc +/// specifes the source root or as a function whihc takes a `PathBuf` and +/// returns `true` iff path belongs to the source root +pub(crate) struct RootConfig { root: PathBuf, - filter: fn(&Path, &RelativePath) -> bool, excluded_dirs: Vec, } -impl RootFilter { - fn new(root: PathBuf, excluded_dirs: Vec) -> RootFilter { - RootFilter { +pub(crate) struct Roots { + roots: Arena>, +} + +impl std::ops::Deref for Roots { + type Target = Arena>; + fn deref(&self) -> &Self::Target { + &self.roots + } +} + +impl RootConfig { + fn new(root: PathBuf, excluded_dirs: Vec) -> RootConfig { + RootConfig { root, - filter: default_filter, excluded_dirs, } } - /// Check if this root can contain `path`. NB: even if this returns - /// true, the `path` might actually be conained in some nested root. - pub(crate) fn can_contain(&self, path: &Path) -> Option { - let rel_path = path.strip_prefix(&self.root).ok()?; - let rel_path = RelativePathBuf::from_path(rel_path).ok()?; - if !(self.filter)(path, rel_path.as_relative_path()) { + /// Cheks if root contains a path and returns a root-relative path. + pub(crate) fn contains(&self, path: &Path) -> Option { + // First, check excluded dirs + if self.excluded_dirs.iter().any(|it| path.starts_with(it)) { return None; } - Some(rel_path) - } - - pub(crate) fn entry_filter<'a>(&'a self) -> impl FnMut(&DirEntry) -> bool + 'a { - move |entry: &DirEntry| { - if entry.file_type().is_dir() && self.excluded_dirs.iter().any(|it| it == entry.path()) - { - // do not walk nested roots - false - } else { - self.can_contain(entry.path()).is_some() - } - } - } -} + let rel_path = path.strip_prefix(&self.root).ok()?; + let rel_path = RelativePathBuf::from_path(rel_path).ok()?; -pub(crate) fn default_filter(path: &Path, rel_path: &RelativePath) -> bool { - if path.is_dir() { + // Ignore some common directories. + // + // FIXME: don't hard-code, specify at source-root creation time using + // gitignore for (i, c) in rel_path.components().enumerate() { if let Component::Normal(c) = c { - // TODO hardcoded for now if (i == 0 && c == "target") || c == ".git" || c == "node_modules" { - return false; + return None; } } } - true - } else { - rel_path.extension() == Some("rs") - } -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct VfsRoot(pub RawId); -impl_arena_id!(VfsRoot); - -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct VfsFile(pub RawId); -impl_arena_id!(VfsFile); -struct VfsFileData { - root: VfsRoot, - path: RelativePathBuf, - is_overlayed: bool, - text: Arc, -} + if path.is_file() && rel_path.extension() != Some("rs") { + return None; + } -pub(crate) struct Roots { - roots: Arena>, + Some(rel_path) + } } impl Roots { @@ -120,59 +104,61 @@ impl Roots { .map(|it| it.clone()) .collect::>(); - let root_filter = Arc::new(RootFilter::new(path.clone(), nested_roots)); + let config = Arc::new(RootConfig::new(path.clone(), nested_roots)); - roots.alloc(root_filter.clone()); + roots.alloc(config.clone()); } Roots { roots } } pub(crate) fn find(&self, path: &Path) -> Option<(VfsRoot, RelativePathBuf)> { self.roots .iter() - .find_map(|(root, data)| data.can_contain(path).map(|it| (root, it))) + .find_map(|(root, data)| data.contains(path).map(|it| (root, it))) } } -impl Deref for Roots { - type Target = Arena>; - fn deref(&self) -> &Self::Target { - &self.roots - } -} +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct VfsFile(pub RawId); +impl_arena_id!(VfsFile); -impl DerefMut for Roots { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.roots - } +struct VfsFileData { + root: VfsRoot, + path: RelativePathBuf, + is_overlayed: bool, + text: Arc, } pub struct Vfs { roots: Arc, files: Arena, - root2files: FxHashMap>, + root2files: ArenaMap>, pending_changes: Vec, worker: Worker, } impl fmt::Debug for Vfs { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.write_str("Vfs { ... }") + f.debug_struct("Vfs") + .field("n_roots", &self.roots.len()) + .field("n_files", &self.files.len()) + .field("n_pending_changes", &self.pending_changes.len()) + .finish() } } impl Vfs { pub fn new(roots: Vec) -> (Vfs, Vec) { let roots = Arc::new(Roots::new(roots)); - let worker = io::Worker::start(roots.clone()); - let mut root2files = FxHashMap::default(); + let worker = io::Worker::start(Arc::clone(&roots)); + let mut root2files = ArenaMap::default(); - for (root, filter) in roots.iter() { + for (root, config) in roots.iter() { root2files.insert(root, Default::default()); worker .sender() .send(io::Task::AddRoot { root, - filter: filter.clone(), + config: Arc::clone(config), }) .unwrap(); } @@ -242,7 +228,7 @@ impl Vfs { let mut cur_files = Vec::new(); // While we were scanning the root in the backgound, a file might have // been open in the editor, so we need to account for that. - let exising = self.root2files[&root] + let exising = self.root2files[root] .iter() .map(|&file| (self.files[file].path.clone(), file)) .collect::>(); @@ -384,7 +370,7 @@ impl Vfs { is_overlayed, }; let file = self.files.alloc(data); - self.root2files.get_mut(&root).unwrap().insert(file); + self.root2files.get_mut(root).unwrap().insert(file); file } @@ -399,7 +385,7 @@ impl Vfs { self.files[file].text = Default::default(); self.files[file].path = Default::default(); let root = self.files[file].root; - let removed = self.root2files.get_mut(&root).unwrap().remove(&file); + let removed = self.root2files.get_mut(root).unwrap().remove(&file); assert!(removed); } @@ -410,7 +396,7 @@ impl Vfs { } fn find_file(&self, root: VfsRoot, path: &RelativePath) -> Option { - self.root2files[&root] + self.root2files[root] .iter() .map(|&it| it) .find(|&file| self.files[file].path == path) -- cgit v1.2.3