From 20d7a431fd6e3e363e698a2e464160640868597b Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Sat, 26 Jan 2019 15:11:47 +0300 Subject: refactor-fvs --- crates/ra_vfs/src/io.rs | 229 ++++++++++++++++++++++++++++++---------- crates/ra_vfs/src/io/watcher.rs | 200 ----------------------------------- crates/ra_vfs/src/lib.rs | 146 ++++++++++++------------- 3 files changed, 242 insertions(+), 333 deletions(-) delete mode 100644 crates/ra_vfs/src/io/watcher.rs diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs index 7ca1e9835..669240488 100644 --- a/crates/ra_vfs/src/io.rs +++ b/crates/ra_vfs/src/io.rs @@ -1,19 +1,23 @@ -use std::{fs, sync::Arc, thread}; - -use crossbeam_channel::{Receiver, Sender}; +use std::{ + fs, + path::{Path, PathBuf}, + sync::{mpsc, Arc}, + thread, + time::Duration, +}; +use crossbeam_channel::{Receiver, Sender, SendError}; use relative_path::RelativePathBuf; use thread_worker::WorkerHandle; use walkdir::WalkDir; +use parking_lot::Mutex; +use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; -mod watcher; -use watcher::Watcher; - -use crate::{RootFilter, Roots, VfsRoot}; +use crate::{RootConfig, Roots, VfsRoot}; pub(crate) enum Task { AddRoot { root: VfsRoot, - filter: Arc, + config: Arc, }, } @@ -39,6 +43,15 @@ pub enum TaskResult { }, } +#[derive(Debug)] +enum ChangeKind { + Create, + Write, + Remove, +} + +const WATCHER_DELAY: Duration = Duration::from_millis(250); + pub(crate) struct Worker { worker: thread_worker::Worker, worker_handle: WorkerHandle, @@ -48,21 +61,36 @@ impl Worker { pub(crate) fn start(roots: Arc) -> Worker { let (worker, worker_handle) = thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| { - let mut watcher = match Watcher::start(roots, output_sender.clone()) { - Ok(w) => Some(w), - Err(e) => { - log::error!("could not start watcher: {}", e); - None - } + let (notify_sender, notify_receiver) = mpsc::channel(); + let watcher = notify::watcher(notify_sender, WATCHER_DELAY) + .map_err(|e| log::error!("failed to spawn notify {}", e)) + .ok(); + let ctx = WatcherCtx { + roots, + watcher: Arc::new(Mutex::new(watcher)), + sender: output_sender, }; - let res = input_receiver - .into_iter() - .filter_map(|t| handle_task(t, &mut watcher)) - .try_for_each(|it| output_sender.send(it)); - if let Some(watcher) = watcher { - let _ = watcher.shutdown(); + let thread = thread::spawn({ + let ctx = ctx.clone(); + move || { + let _ = notify_receiver + .into_iter() + // forward relevant events only + .try_for_each(|change| ctx.handle_debounced_event(change)); + } + }); + let res1 = input_receiver.into_iter().try_for_each(|t| match t { + Task::AddRoot { root, config } => watch_root(&ctx, root, Arc::clone(&config)), + }); + drop(ctx.watcher.lock().take()); + drop(ctx); + let res2 = thread.join(); + match &res2 { + Ok(()) => log::info!("... Watcher terminated with ok"), + Err(_) => log::error!("... Watcher terminated with err"), } - res.unwrap() + res1.unwrap(); + res2.unwrap(); }); Worker { worker, @@ -84,46 +112,141 @@ impl Worker { } } -fn handle_task(task: Task, watcher: &mut Option) -> Option { - match task { - Task::AddRoot { root, filter } => { - if let Some(watcher) = watcher { - watcher.watch_root(&filter) +fn watch_root( + woker: &WatcherCtx, + root: VfsRoot, + config: Arc, +) -> Result<(), SendError> { + let mut guard = woker.watcher.lock(); + log::debug!("loading {} ...", config.root.as_path().display()); + let files = watch_recursive(guard.as_mut(), config.root.as_path(), &*config) + .into_iter() + .filter_map(|path| { + let abs_path = path.to_path(&config.root); + let text = fs::read_to_string(abs_path) + .map_err(|e| log::warn!("watcher error: {}", e)) + .ok()?; + Some((path, text)) + }) + .collect(); + woker + .sender + .send(TaskResult::BulkLoadRoot { root, files })?; + log::debug!("... loaded {}", config.root.as_path().display()); + Ok(()) +} + +#[derive(Clone)] +struct WatcherCtx { + roots: Arc, + watcher: Arc>>, + sender: Sender, +} + +impl WatcherCtx { + fn handle_debounced_event(&self, ev: DebouncedEvent) -> Result<(), SendError> { + match ev { + DebouncedEvent::NoticeWrite(_) + | DebouncedEvent::NoticeRemove(_) + | DebouncedEvent::Chmod(_) => { + // ignore + } + DebouncedEvent::Rescan => { + // TODO rescan all roots + } + DebouncedEvent::Create(path) => { + self.handle_change(path, ChangeKind::Create)?; + } + DebouncedEvent::Write(path) => { + self.handle_change(path, ChangeKind::Write)?; } - log::debug!("loading {} ...", filter.root.as_path().display()); - let files = load_root(filter.as_ref()); - log::debug!("... loaded {}", filter.root.as_path().display()); - Some(TaskResult::BulkLoadRoot { root, files }) + DebouncedEvent::Remove(path) => { + self.handle_change(path, ChangeKind::Remove)?; + } + DebouncedEvent::Rename(src, dst) => { + self.handle_change(src, ChangeKind::Remove)?; + self.handle_change(dst, ChangeKind::Create)?; + } + DebouncedEvent::Error(err, path) => { + // TODO should we reload the file contents? + log::warn!("watcher error \"{}\", {:?}", err, path); + } + } + Ok(()) + } + + fn handle_change(&self, path: PathBuf, kind: ChangeKind) -> Result<(), SendError> { + let (root, rel_path) = match self.roots.find(&path) { + None => return Ok(()), + Some(it) => it, + }; + let config = &self.roots[root]; + match kind { + ChangeKind::Create => { + let mut paths = Vec::new(); + if path.is_dir() { + let mut guard = self.watcher.lock(); + paths.extend(watch_recursive(guard.as_mut(), &path, &config)); + } else { + paths.push(rel_path); + } + paths + .into_iter() + .filter_map(|rel_path| { + let abs_path = rel_path.to_path(&config.root); + let text = fs::read_to_string(&abs_path) + .map_err(|e| log::warn!("watcher failed {}", e)) + .ok()?; + Some((rel_path, text)) + }) + .try_for_each(|(path, text)| { + self.sender + .send(TaskResult::AddSingleFile { root, path, text }) + })? + } + ChangeKind::Write => match fs::read_to_string(&path) { + Err(e) => log::warn!("watcher failed {}", e), + Ok(text) => self.sender.send(TaskResult::ChangeSingleFile { + root, + path: rel_path, + text, + })?, + }, + ChangeKind::Remove => self.sender.send(TaskResult::RemoveSingleFile { + root, + path: rel_path, + })?, } + Ok(()) } } -fn load_root(filter: &RootFilter) -> Vec<(RelativePathBuf, String)> { - let mut res = Vec::new(); - for entry in WalkDir::new(&filter.root) +fn watch_recursive( + mut watcher: Option<&mut RecommendedWatcher>, + dir: &Path, + config: &RootConfig, +) -> Vec { + let mut files = Vec::new(); + for entry in WalkDir::new(dir) .into_iter() - .filter_entry(filter.entry_filter()) + .filter_entry(|it| config.contains(it.path()).is_some()) + .filter_map(|it| it.map_err(|e| log::warn!("watcher error: {}", e)).ok()) { - let entry = match entry { - Ok(entry) => entry, - Err(e) => { - log::warn!("watcher error: {}", e); - continue; + if entry.file_type().is_dir() { + if let Some(watcher) = &mut watcher { + watch_one(watcher, entry.path()); } - }; - if !entry.file_type().is_file() { - continue; + } else { + let path = config.contains(entry.path()).unwrap(); + files.push(path.to_owned()); } - let path = entry.path(); - let text = match fs::read_to_string(path) { - Ok(text) => text, - Err(e) => { - log::warn!("watcher error: {}", e); - continue; - } - }; - let path = RelativePathBuf::from_path(path.strip_prefix(&filter.root).unwrap()).unwrap(); - res.push((path.to_owned(), text)) } - res + files +} + +fn watch_one(watcher: &mut RecommendedWatcher, dir: &Path) { + match watcher.watch(dir, RecursiveMode::NonRecursive) { + Ok(()) => log::debug!("watching \"{}\"", dir.display()), + Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e), + } } diff --git a/crates/ra_vfs/src/io/watcher.rs b/crates/ra_vfs/src/io/watcher.rs deleted file mode 100644 index ff6775f59..000000000 --- a/crates/ra_vfs/src/io/watcher.rs +++ /dev/null @@ -1,200 +0,0 @@ -use crate::{io, RootFilter, Roots, VfsRoot}; -use crossbeam_channel::Sender; -use drop_bomb::DropBomb; -use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher}; -use parking_lot::Mutex; -use std::{ - fs, - path::{Path, PathBuf}, - sync::{mpsc, Arc}, - thread, - time::Duration, -}; -use walkdir::WalkDir; - -#[derive(Debug)] -enum ChangeKind { - Create, - Write, - Remove, -} - -const WATCHER_DELAY: Duration = Duration::from_millis(250); - -pub(crate) struct Watcher { - thread: thread::JoinHandle<()>, - bomb: DropBomb, - watcher: Arc>>, -} - -impl Watcher { - pub(crate) fn start( - roots: Arc, - output_sender: Sender, - ) -> Result> { - let (input_sender, input_receiver) = mpsc::channel(); - let watcher = Arc::new(Mutex::new(Some(notify::watcher( - input_sender, - WATCHER_DELAY, - )?))); - let sender = output_sender.clone(); - let watcher_clone = watcher.clone(); - let thread = thread::spawn(move || { - let worker = WatcherWorker { - roots, - watcher: watcher_clone, - sender, - }; - input_receiver - .into_iter() - // forward relevant events only - .try_for_each(|change| worker.handle_debounced_event(change)) - .unwrap() - }); - Ok(Watcher { - thread, - watcher, - bomb: DropBomb::new(format!("Watcher was not shutdown")), - }) - } - - pub fn watch_root(&mut self, filter: &RootFilter) { - for res in WalkDir::new(&filter.root) - .into_iter() - .filter_entry(filter.entry_filter()) - { - match res { - Ok(entry) => { - if entry.file_type().is_dir() { - watch_one(self.watcher.as_ref(), entry.path()); - } - } - Err(e) => log::warn!("watcher error: {}", e), - } - } - } - - pub fn shutdown(mut self) -> thread::Result<()> { - self.bomb.defuse(); - drop(self.watcher.lock().take()); - let res = self.thread.join(); - match &res { - Ok(()) => log::info!("... Watcher terminated with ok"), - Err(_) => log::error!("... Watcher terminated with err"), - } - res - } -} - -struct WatcherWorker { - watcher: Arc>>, - roots: Arc, - sender: Sender, -} - -impl WatcherWorker { - fn handle_debounced_event(&self, ev: DebouncedEvent) -> Result<(), Box> { - match ev { - DebouncedEvent::NoticeWrite(_) - | DebouncedEvent::NoticeRemove(_) - | DebouncedEvent::Chmod(_) => { - // ignore - } - DebouncedEvent::Rescan => { - // TODO rescan all roots - } - DebouncedEvent::Create(path) => { - self.handle_change(path, ChangeKind::Create); - } - DebouncedEvent::Write(path) => { - self.handle_change(path, ChangeKind::Write); - } - DebouncedEvent::Remove(path) => { - self.handle_change(path, ChangeKind::Remove); - } - DebouncedEvent::Rename(src, dst) => { - self.handle_change(src, ChangeKind::Remove); - self.handle_change(dst, ChangeKind::Create); - } - DebouncedEvent::Error(err, path) => { - // TODO should we reload the file contents? - log::warn!("watcher error \"{}\", {:?}", err, path); - } - } - Ok(()) - } - - fn handle_change(&self, path: PathBuf, kind: ChangeKind) { - if let Err(e) = self.try_handle_change(path, kind) { - log::warn!("watcher error: {}", e) - } - } - - fn try_handle_change( - &self, - path: PathBuf, - kind: ChangeKind, - ) -> Result<(), Box> { - let (root, rel_path) = match self.roots.find(&path) { - Some(x) => x, - None => return Ok(()), - }; - match kind { - ChangeKind::Create => { - if path.is_dir() { - self.watch_recursive(&path, root); - } else { - let text = fs::read_to_string(&path)?; - self.sender.send(io::TaskResult::AddSingleFile { - root, - path: rel_path, - text, - })? - } - } - ChangeKind::Write => { - let text = fs::read_to_string(&path)?; - self.sender.send(io::TaskResult::ChangeSingleFile { - root, - path: rel_path, - text, - })? - } - ChangeKind::Remove => self.sender.send(io::TaskResult::RemoveSingleFile { - root, - path: rel_path, - })?, - } - Ok(()) - } - - fn watch_recursive(&self, dir: &Path, root: VfsRoot) { - let filter = &self.roots[root]; - for res in WalkDir::new(dir) - .into_iter() - .filter_entry(filter.entry_filter()) - { - match res { - Ok(entry) => { - if entry.file_type().is_dir() { - watch_one(self.watcher.as_ref(), entry.path()); - } else { - // emit only for files otherwise we will cause watch_recursive to be called again with a dir that we are already watching - // emit as create because we haven't seen it yet - self.handle_change(entry.path().to_path_buf(), ChangeKind::Create); - } - } - Err(e) => log::warn!("watcher error: {}", e), - } - } - } -} - -fn watch_one(watcher: &Mutex>, dir: &Path) { - if let Some(watcher) = watcher.lock().as_mut() { - match watcher.watch(dir, RecursiveMode::NonRecursive) { - Ok(()) => log::debug!("watching \"{}\"", dir.display()), - Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e), - } - } -} diff --git a/crates/ra_vfs/src/lib.rs b/crates/ra_vfs/src/lib.rs index 70a13f765..71a3f807d 100644 --- a/crates/ra_vfs/src/lib.rs +++ b/crates/ra_vfs/src/lib.rs @@ -18,94 +18,78 @@ mod io; use std::{ cmp::Reverse, fmt, fs, mem, - ops::{Deref, DerefMut}, path::{Path, PathBuf}, sync::Arc, thread, }; use crossbeam_channel::Receiver; -use ra_arena::{impl_arena_id, Arena, RawId}; +use ra_arena::{impl_arena_id, Arena, RawId, map::ArenaMap}; use relative_path::{Component, RelativePath, RelativePathBuf}; use rustc_hash::{FxHashMap, FxHashSet}; -use walkdir::DirEntry; pub use crate::io::TaskResult as VfsTask; use io::{TaskResult, Worker}; -/// `RootFilter` is a predicate that checks if a file can belong to a root. If -/// several filters match a file (nested dirs), the most nested one wins. -pub(crate) struct RootFilter { +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct VfsRoot(pub RawId); +impl_arena_id!(VfsRoot); + +/// Describes the contents of a single source root. +/// +/// `RootConfig` can be thought of as a glob pattern like `src/**.rs` whihc +/// specifes the source root or as a function whihc takes a `PathBuf` and +/// returns `true` iff path belongs to the source root +pub(crate) struct RootConfig { root: PathBuf, - filter: fn(&Path, &RelativePath) -> bool, excluded_dirs: Vec, } -impl RootFilter { - fn new(root: PathBuf, excluded_dirs: Vec) -> RootFilter { - RootFilter { +pub(crate) struct Roots { + roots: Arena>, +} + +impl std::ops::Deref for Roots { + type Target = Arena>; + fn deref(&self) -> &Self::Target { + &self.roots + } +} + +impl RootConfig { + fn new(root: PathBuf, excluded_dirs: Vec) -> RootConfig { + RootConfig { root, - filter: default_filter, excluded_dirs, } } - /// Check if this root can contain `path`. NB: even if this returns - /// true, the `path` might actually be conained in some nested root. - pub(crate) fn can_contain(&self, path: &Path) -> Option { - let rel_path = path.strip_prefix(&self.root).ok()?; - let rel_path = RelativePathBuf::from_path(rel_path).ok()?; - if !(self.filter)(path, rel_path.as_relative_path()) { + /// Cheks if root contains a path and returns a root-relative path. + pub(crate) fn contains(&self, path: &Path) -> Option { + // First, check excluded dirs + if self.excluded_dirs.iter().any(|it| path.starts_with(it)) { return None; } - Some(rel_path) - } - - pub(crate) fn entry_filter<'a>(&'a self) -> impl FnMut(&DirEntry) -> bool + 'a { - move |entry: &DirEntry| { - if entry.file_type().is_dir() && self.excluded_dirs.iter().any(|it| it == entry.path()) - { - // do not walk nested roots - false - } else { - self.can_contain(entry.path()).is_some() - } - } - } -} + let rel_path = path.strip_prefix(&self.root).ok()?; + let rel_path = RelativePathBuf::from_path(rel_path).ok()?; -pub(crate) fn default_filter(path: &Path, rel_path: &RelativePath) -> bool { - if path.is_dir() { + // Ignore some common directories. + // + // FIXME: don't hard-code, specify at source-root creation time using + // gitignore for (i, c) in rel_path.components().enumerate() { if let Component::Normal(c) = c { - // TODO hardcoded for now if (i == 0 && c == "target") || c == ".git" || c == "node_modules" { - return false; + return None; } } } - true - } else { - rel_path.extension() == Some("rs") - } -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct VfsRoot(pub RawId); -impl_arena_id!(VfsRoot); - -#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct VfsFile(pub RawId); -impl_arena_id!(VfsFile); -struct VfsFileData { - root: VfsRoot, - path: RelativePathBuf, - is_overlayed: bool, - text: Arc, -} + if path.is_file() && rel_path.extension() != Some("rs") { + return None; + } -pub(crate) struct Roots { - roots: Arena>, + Some(rel_path) + } } impl Roots { @@ -120,59 +104,61 @@ impl Roots { .map(|it| it.clone()) .collect::>(); - let root_filter = Arc::new(RootFilter::new(path.clone(), nested_roots)); + let config = Arc::new(RootConfig::new(path.clone(), nested_roots)); - roots.alloc(root_filter.clone()); + roots.alloc(config.clone()); } Roots { roots } } pub(crate) fn find(&self, path: &Path) -> Option<(VfsRoot, RelativePathBuf)> { self.roots .iter() - .find_map(|(root, data)| data.can_contain(path).map(|it| (root, it))) + .find_map(|(root, data)| data.contains(path).map(|it| (root, it))) } } -impl Deref for Roots { - type Target = Arena>; - fn deref(&self) -> &Self::Target { - &self.roots - } -} +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct VfsFile(pub RawId); +impl_arena_id!(VfsFile); -impl DerefMut for Roots { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.roots - } +struct VfsFileData { + root: VfsRoot, + path: RelativePathBuf, + is_overlayed: bool, + text: Arc, } pub struct Vfs { roots: Arc, files: Arena, - root2files: FxHashMap>, + root2files: ArenaMap>, pending_changes: Vec, worker: Worker, } impl fmt::Debug for Vfs { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - f.write_str("Vfs { ... }") + f.debug_struct("Vfs") + .field("n_roots", &self.roots.len()) + .field("n_files", &self.files.len()) + .field("n_pending_changes", &self.pending_changes.len()) + .finish() } } impl Vfs { pub fn new(roots: Vec) -> (Vfs, Vec) { let roots = Arc::new(Roots::new(roots)); - let worker = io::Worker::start(roots.clone()); - let mut root2files = FxHashMap::default(); + let worker = io::Worker::start(Arc::clone(&roots)); + let mut root2files = ArenaMap::default(); - for (root, filter) in roots.iter() { + for (root, config) in roots.iter() { root2files.insert(root, Default::default()); worker .sender() .send(io::Task::AddRoot { root, - filter: filter.clone(), + config: Arc::clone(config), }) .unwrap(); } @@ -242,7 +228,7 @@ impl Vfs { let mut cur_files = Vec::new(); // While we were scanning the root in the backgound, a file might have // been open in the editor, so we need to account for that. - let exising = self.root2files[&root] + let exising = self.root2files[root] .iter() .map(|&file| (self.files[file].path.clone(), file)) .collect::>(); @@ -384,7 +370,7 @@ impl Vfs { is_overlayed, }; let file = self.files.alloc(data); - self.root2files.get_mut(&root).unwrap().insert(file); + self.root2files.get_mut(root).unwrap().insert(file); file } @@ -399,7 +385,7 @@ impl Vfs { self.files[file].text = Default::default(); self.files[file].path = Default::default(); let root = self.files[file].root; - let removed = self.root2files.get_mut(&root).unwrap().remove(&file); + let removed = self.root2files.get_mut(root).unwrap().remove(&file); assert!(removed); } @@ -410,7 +396,7 @@ impl Vfs { } fn find_file(&self, root: VfsRoot, path: &RelativePath) -> Option { - self.root2files[&root] + self.root2files[root] .iter() .map(|&it| it) .find(|&file| self.files[file].path == path) -- cgit v1.2.3 From 390a20787e0605e979b5eb8829e2ffddc3e6b1f9 Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Sat, 26 Jan 2019 15:19:24 +0300 Subject: consolidate error handling --- crates/ra_vfs/src/io.rs | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs index 669240488..84ccdb394 100644 --- a/crates/ra_vfs/src/io.rs +++ b/crates/ra_vfs/src/io.rs @@ -123,9 +123,7 @@ fn watch_root( .into_iter() .filter_map(|path| { let abs_path = path.to_path(&config.root); - let text = fs::read_to_string(abs_path) - .map_err(|e| log::warn!("watcher error: {}", e)) - .ok()?; + let text = read_to_string(&abs_path)?; Some((path, text)) }) .collect(); @@ -194,9 +192,7 @@ impl WatcherCtx { .into_iter() .filter_map(|rel_path| { let abs_path = rel_path.to_path(&config.root); - let text = fs::read_to_string(&abs_path) - .map_err(|e| log::warn!("watcher failed {}", e)) - .ok()?; + let text = read_to_string(&abs_path)?; Some((rel_path, text)) }) .try_for_each(|(path, text)| { @@ -204,14 +200,15 @@ impl WatcherCtx { .send(TaskResult::AddSingleFile { root, path, text }) })? } - ChangeKind::Write => match fs::read_to_string(&path) { - Err(e) => log::warn!("watcher failed {}", e), - Ok(text) => self.sender.send(TaskResult::ChangeSingleFile { - root, - path: rel_path, - text, - })?, - }, + ChangeKind::Write => { + if let Some(text) = read_to_string(&path) { + self.sender.send(TaskResult::ChangeSingleFile { + root, + path: rel_path, + text, + })?; + } + } ChangeKind::Remove => self.sender.send(TaskResult::RemoveSingleFile { root, path: rel_path, @@ -250,3 +247,9 @@ fn watch_one(watcher: &mut RecommendedWatcher, dir: &Path) { Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e), } } + +fn read_to_string(path: &Path) -> Option { + fs::read_to_string(&path) + .map_err(|e| log::warn!("failed to read file {}", e)) + .ok() +} -- cgit v1.2.3 From 3ce531f95dec87a1f59e9347fdd6c250e36b489d Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Sat, 26 Jan 2019 16:40:24 +0300 Subject: cleanup: add result alias --- crates/ra_vfs/src/io.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs index 84ccdb394..279fa5da8 100644 --- a/crates/ra_vfs/src/io.rs +++ b/crates/ra_vfs/src/io.rs @@ -1,11 +1,11 @@ use std::{ fs, + thread, path::{Path, PathBuf}, sync::{mpsc, Arc}, - thread, time::Duration, }; -use crossbeam_channel::{Receiver, Sender, SendError}; +use crossbeam_channel::{Receiver, Sender}; use relative_path::RelativePathBuf; use thread_worker::WorkerHandle; use walkdir::WalkDir; @@ -14,6 +14,8 @@ use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watc use crate::{RootConfig, Roots, VfsRoot}; +type Result = std::result::Result>; + pub(crate) enum Task { AddRoot { root: VfsRoot, @@ -112,11 +114,7 @@ impl Worker { } } -fn watch_root( - woker: &WatcherCtx, - root: VfsRoot, - config: Arc, -) -> Result<(), SendError> { +fn watch_root(woker: &WatcherCtx, root: VfsRoot, config: Arc) -> Result<()> { let mut guard = woker.watcher.lock(); log::debug!("loading {} ...", config.root.as_path().display()); let files = watch_recursive(guard.as_mut(), config.root.as_path(), &*config) @@ -142,7 +140,7 @@ struct WatcherCtx { } impl WatcherCtx { - fn handle_debounced_event(&self, ev: DebouncedEvent) -> Result<(), SendError> { + fn handle_debounced_event(&self, ev: DebouncedEvent) -> Result<()> { match ev { DebouncedEvent::NoticeWrite(_) | DebouncedEvent::NoticeRemove(_) @@ -173,7 +171,7 @@ impl WatcherCtx { Ok(()) } - fn handle_change(&self, path: PathBuf, kind: ChangeKind) -> Result<(), SendError> { + fn handle_change(&self, path: PathBuf, kind: ChangeKind) -> Result<()> { let (root, rel_path) = match self.roots.find(&path) { None => return Ok(()), Some(it) => it, -- cgit v1.2.3 From 012ea3fac62df26abefa6d64b81570ed58118dea Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Sat, 26 Jan 2019 17:01:58 +0300 Subject: handle all the reads on the "main" watcher thread --- crates/ra_vfs/src/io.rs | 123 +++++++++++++++++++++++++++++------------------- 1 file changed, 75 insertions(+), 48 deletions(-) diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs index 279fa5da8..98b107b35 100644 --- a/crates/ra_vfs/src/io.rs +++ b/crates/ra_vfs/src/io.rs @@ -5,7 +5,7 @@ use std::{ sync::{mpsc, Arc}, time::Duration, }; -use crossbeam_channel::{Receiver, Sender}; +use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select}; use relative_path::RelativePathBuf; use thread_worker::WorkerHandle; use walkdir::WalkDir; @@ -61,9 +61,25 @@ pub(crate) struct Worker { impl Worker { pub(crate) fn start(roots: Arc) -> Worker { - let (worker, worker_handle) = - thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| { + // This is a pretty elaborate setup of threads & channels! It is + // explained by the following concerns: + + // * we need to burn a thread translating from notify's mpsc to + // crossbeam_channel. + // * we want to read all files from a single thread, to gurantee that + // we always get fresher versions and never go back in time. + // * we want to tear down everything neatly during shutdown. + let (worker, worker_handle) = thread_worker::spawn( + "vfs", + 128, + // This are the channels we use to communicate with outside world. + // If `input_receiver` is closed we need to tear ourselves down. + // `output_sender` should not be closed unless the parent died. + move |input_receiver, output_sender| { + // These are `std` channels notify will send events to let (notify_sender, notify_receiver) = mpsc::channel(); + // These are the corresponding crossbeam channels + let (watcher_sender, watcher_receiver) = unbounded(); let watcher = notify::watcher(notify_sender, WATCHER_DELAY) .map_err(|e| log::error!("failed to spawn notify {}", e)) .ok(); @@ -72,18 +88,30 @@ impl Worker { watcher: Arc::new(Mutex::new(watcher)), sender: output_sender, }; - let thread = thread::spawn({ - let ctx = ctx.clone(); - move || { - let _ = notify_receiver - .into_iter() - // forward relevant events only - .try_for_each(|change| ctx.handle_debounced_event(change)); - } - }); - let res1 = input_receiver.into_iter().try_for_each(|t| match t { - Task::AddRoot { root, config } => watch_root(&ctx, root, Arc::clone(&config)), + let thread = thread::spawn(move || { + let _ = notify_receiver + .into_iter() + // forward relevant events only + .for_each(|event| convert_notify_event(event, &watcher_sender)); }); + + loop { + select! { + // Received request from the caller. If this channel is + // closed, we should shutdown everything. + recv(input_receiver) -> t => match t { + Err(RecvError) => break, + Ok(Task::AddRoot { root, config }) => watch_root(&ctx, root, Arc::clone(&config)), + }, + // Watcher send us changes. If **this** channel is + // closed, the watcher has died, which indicates a bug + // -- escalate! + recv(watcher_receiver) -> event => match event { + Err(RecvError) => panic!("watcher is dead"), + Ok((path, change)) => WatcherCtx::handle_change(&ctx, path, change).unwrap(), + }, + } + } drop(ctx.watcher.lock().take()); drop(ctx); let res2 = thread.join(); @@ -91,9 +119,9 @@ impl Worker { Ok(()) => log::info!("... Watcher terminated with ok"), Err(_) => log::error!("... Watcher terminated with err"), } - res1.unwrap(); res2.unwrap(); - }); + }, + ); Worker { worker, worker_handle, @@ -114,7 +142,7 @@ impl Worker { } } -fn watch_root(woker: &WatcherCtx, root: VfsRoot, config: Arc) -> Result<()> { +fn watch_root(woker: &WatcherCtx, root: VfsRoot, config: Arc) { let mut guard = woker.watcher.lock(); log::debug!("loading {} ...", config.root.as_path().display()); let files = watch_recursive(guard.as_mut(), config.root.as_path(), &*config) @@ -127,9 +155,9 @@ fn watch_root(woker: &WatcherCtx, root: VfsRoot, config: Arc) -> Res .collect(); woker .sender - .send(TaskResult::BulkLoadRoot { root, files })?; + .send(TaskResult::BulkLoadRoot { root, files }) + .unwrap(); log::debug!("... loaded {}", config.root.as_path().display()); - Ok(()) } #[derive(Clone)] @@ -139,38 +167,37 @@ struct WatcherCtx { sender: Sender, } -impl WatcherCtx { - fn handle_debounced_event(&self, ev: DebouncedEvent) -> Result<()> { - match ev { - DebouncedEvent::NoticeWrite(_) - | DebouncedEvent::NoticeRemove(_) - | DebouncedEvent::Chmod(_) => { - // ignore - } - DebouncedEvent::Rescan => { - // TODO rescan all roots - } - DebouncedEvent::Create(path) => { - self.handle_change(path, ChangeKind::Create)?; - } - DebouncedEvent::Write(path) => { - self.handle_change(path, ChangeKind::Write)?; - } - DebouncedEvent::Remove(path) => { - self.handle_change(path, ChangeKind::Remove)?; - } - DebouncedEvent::Rename(src, dst) => { - self.handle_change(src, ChangeKind::Remove)?; - self.handle_change(dst, ChangeKind::Create)?; - } - DebouncedEvent::Error(err, path) => { - // TODO should we reload the file contents? - log::warn!("watcher error \"{}\", {:?}", err, path); - } +fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeKind)>) { + match event { + DebouncedEvent::NoticeWrite(_) + | DebouncedEvent::NoticeRemove(_) + | DebouncedEvent::Chmod(_) => { + // ignore + } + DebouncedEvent::Rescan => { + // TODO rescan all roots + } + DebouncedEvent::Create(path) => { + sender.send((path, ChangeKind::Create)).unwrap(); + } + DebouncedEvent::Write(path) => { + sender.send((path, ChangeKind::Write)).unwrap(); + } + DebouncedEvent::Remove(path) => { + sender.send((path, ChangeKind::Remove)).unwrap(); + } + DebouncedEvent::Rename(src, dst) => { + sender.send((src, ChangeKind::Remove)).unwrap(); + sender.send((dst, ChangeKind::Create)).unwrap(); + } + DebouncedEvent::Error(err, path) => { + // TODO should we reload the file contents? + log::warn!("watcher error \"{}\", {:?}", err, path); } - Ok(()) } +} +impl WatcherCtx { fn handle_change(&self, path: PathBuf, kind: ChangeKind) -> Result<()> { let (root, rel_path) = match self.roots.find(&path) { None => return Ok(()), -- cgit v1.2.3 From bf98fc609e2b587d8455bf2bec3ca35f85cf0700 Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Sat, 26 Jan 2019 17:04:00 +0300 Subject: remove mutexes --- crates/ra_vfs/src/io.rs | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs index 98b107b35..8c719dc5d 100644 --- a/crates/ra_vfs/src/io.rs +++ b/crates/ra_vfs/src/io.rs @@ -9,7 +9,6 @@ use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select}; use relative_path::RelativePathBuf; use thread_worker::WorkerHandle; use walkdir::WalkDir; -use parking_lot::Mutex; use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; use crate::{RootConfig, Roots, VfsRoot}; @@ -83,9 +82,9 @@ impl Worker { let watcher = notify::watcher(notify_sender, WATCHER_DELAY) .map_err(|e| log::error!("failed to spawn notify {}", e)) .ok(); - let ctx = WatcherCtx { + let mut ctx = WatcherCtx { roots, - watcher: Arc::new(Mutex::new(watcher)), + watcher, sender: output_sender, }; let thread = thread::spawn(move || { @@ -101,18 +100,18 @@ impl Worker { // closed, we should shutdown everything. recv(input_receiver) -> t => match t { Err(RecvError) => break, - Ok(Task::AddRoot { root, config }) => watch_root(&ctx, root, Arc::clone(&config)), + Ok(Task::AddRoot { root, config }) => watch_root(&mut ctx, root, Arc::clone(&config)), }, // Watcher send us changes. If **this** channel is // closed, the watcher has died, which indicates a bug // -- escalate! recv(watcher_receiver) -> event => match event { Err(RecvError) => panic!("watcher is dead"), - Ok((path, change)) => WatcherCtx::handle_change(&ctx, path, change).unwrap(), + Ok((path, change)) => WatcherCtx::handle_change(&mut ctx, path, change).unwrap(), }, } } - drop(ctx.watcher.lock().take()); + drop(ctx.watcher.take()); drop(ctx); let res2 = thread.join(); match &res2 { @@ -142,10 +141,9 @@ impl Worker { } } -fn watch_root(woker: &WatcherCtx, root: VfsRoot, config: Arc) { - let mut guard = woker.watcher.lock(); +fn watch_root(woker: &mut WatcherCtx, root: VfsRoot, config: Arc) { log::debug!("loading {} ...", config.root.as_path().display()); - let files = watch_recursive(guard.as_mut(), config.root.as_path(), &*config) + let files = watch_recursive(woker.watcher.as_mut(), config.root.as_path(), &*config) .into_iter() .filter_map(|path| { let abs_path = path.to_path(&config.root); @@ -160,10 +158,9 @@ fn watch_root(woker: &WatcherCtx, root: VfsRoot, config: Arc) { log::debug!("... loaded {}", config.root.as_path().display()); } -#[derive(Clone)] struct WatcherCtx { roots: Arc, - watcher: Arc>>, + watcher: Option, sender: Sender, } @@ -198,7 +195,7 @@ fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeK } impl WatcherCtx { - fn handle_change(&self, path: PathBuf, kind: ChangeKind) -> Result<()> { + fn handle_change(&mut self, path: PathBuf, kind: ChangeKind) -> Result<()> { let (root, rel_path) = match self.roots.find(&path) { None => return Ok(()), Some(it) => it, @@ -208,8 +205,7 @@ impl WatcherCtx { ChangeKind::Create => { let mut paths = Vec::new(); if path.is_dir() { - let mut guard = self.watcher.lock(); - paths.extend(watch_recursive(guard.as_mut(), &path, &config)); + paths.extend(watch_recursive(self.watcher.as_mut(), &path, &config)); } else { paths.push(rel_path); } -- cgit v1.2.3 From 9f16892b94817d144f37dfe0081b39aacec65635 Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Sat, 26 Jan 2019 17:17:28 +0300 Subject: remove watcher ctx --- crates/ra_vfs/src/io.rs | 142 ++++++++++++++++++++++++++---------------------- 1 file changed, 77 insertions(+), 65 deletions(-) diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs index 8c719dc5d..d764c534a 100644 --- a/crates/ra_vfs/src/io.rs +++ b/crates/ra_vfs/src/io.rs @@ -13,8 +13,6 @@ use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watc use crate::{RootConfig, Roots, VfsRoot}; -type Result = std::result::Result>; - pub(crate) enum Task { AddRoot { root: VfsRoot, @@ -62,7 +60,6 @@ impl Worker { pub(crate) fn start(roots: Arc) -> Worker { // This is a pretty elaborate setup of threads & channels! It is // explained by the following concerns: - // * we need to burn a thread translating from notify's mpsc to // crossbeam_channel. // * we want to read all files from a single thread, to gurantee that @@ -79,48 +76,57 @@ impl Worker { let (notify_sender, notify_receiver) = mpsc::channel(); // These are the corresponding crossbeam channels let (watcher_sender, watcher_receiver) = unbounded(); - let watcher = notify::watcher(notify_sender, WATCHER_DELAY) + + let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) .map_err(|e| log::error!("failed to spawn notify {}", e)) .ok(); - let mut ctx = WatcherCtx { - roots, - watcher, - sender: output_sender, - }; + // Start a silly thread to tranform between two channels let thread = thread::spawn(move || { - let _ = notify_receiver + notify_receiver .into_iter() - // forward relevant events only - .for_each(|event| convert_notify_event(event, &watcher_sender)); + .for_each(|event| convert_notify_event(event, &watcher_sender)) }); + // Process requests from the called or notifications from + // watcher until the caller says stop. loop { select! { // Received request from the caller. If this channel is // closed, we should shutdown everything. recv(input_receiver) -> t => match t { - Err(RecvError) => break, - Ok(Task::AddRoot { root, config }) => watch_root(&mut ctx, root, Arc::clone(&config)), + Err(RecvError) => { + drop(input_receiver); + break + }, + Ok(Task::AddRoot { root, config }) => { + watch_root(watcher.as_mut(), &output_sender, root, Arc::clone(&config)); + } }, // Watcher send us changes. If **this** channel is // closed, the watcher has died, which indicates a bug // -- escalate! recv(watcher_receiver) -> event => match event { Err(RecvError) => panic!("watcher is dead"), - Ok((path, change)) => WatcherCtx::handle_change(&mut ctx, path, change).unwrap(), + Ok((path, change)) => { + handle_change(watcher.as_mut(), &output_sender, &*roots, path, change); + } }, } } - drop(ctx.watcher.take()); - drop(ctx); - let res2 = thread.join(); - match &res2 { + // Stopped the watcher + drop(watcher.take()); + // Drain pending events: we are not inrerested in them anyways! + watcher_receiver.into_iter().for_each(|_| ()); + + let res = thread.join(); + match &res { Ok(()) => log::info!("... Watcher terminated with ok"), Err(_) => log::error!("... Watcher terminated with err"), } - res2.unwrap(); + res.unwrap(); }, ); + Worker { worker, worker_handle, @@ -141,9 +147,14 @@ impl Worker { } } -fn watch_root(woker: &mut WatcherCtx, root: VfsRoot, config: Arc) { +fn watch_root( + watcher: Option<&mut RecommendedWatcher>, + sender: &Sender, + root: VfsRoot, + config: Arc, +) { log::debug!("loading {} ...", config.root.as_path().display()); - let files = watch_recursive(woker.watcher.as_mut(), config.root.as_path(), &*config) + let files = watch_recursive(watcher, config.root.as_path(), &*config) .into_iter() .filter_map(|path| { let abs_path = path.to_path(&config.root); @@ -151,20 +162,14 @@ fn watch_root(woker: &mut WatcherCtx, root: VfsRoot, config: Arc) { Some((path, text)) }) .collect(); - woker - .sender + sender .send(TaskResult::BulkLoadRoot { root, files }) .unwrap(); log::debug!("... loaded {}", config.root.as_path().display()); } -struct WatcherCtx { - roots: Arc, - watcher: Option, - sender: Sender, -} - fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeKind)>) { + // forward relevant events only match event { DebouncedEvent::NoticeWrite(_) | DebouncedEvent::NoticeRemove(_) @@ -194,48 +199,55 @@ fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeK } } -impl WatcherCtx { - fn handle_change(&mut self, path: PathBuf, kind: ChangeKind) -> Result<()> { - let (root, rel_path) = match self.roots.find(&path) { - None => return Ok(()), - Some(it) => it, - }; - let config = &self.roots[root]; - match kind { - ChangeKind::Create => { - let mut paths = Vec::new(); - if path.is_dir() { - paths.extend(watch_recursive(self.watcher.as_mut(), &path, &config)); - } else { - paths.push(rel_path); - } - paths - .into_iter() - .filter_map(|rel_path| { - let abs_path = rel_path.to_path(&config.root); - let text = read_to_string(&abs_path)?; - Some((rel_path, text)) - }) - .try_for_each(|(path, text)| { - self.sender - .send(TaskResult::AddSingleFile { root, path, text }) - })? +fn handle_change( + watcher: Option<&mut RecommendedWatcher>, + sender: &Sender, + roots: &Roots, + path: PathBuf, + kind: ChangeKind, +) { + let (root, rel_path) = match roots.find(&path) { + None => return, + Some(it) => it, + }; + let config = &roots[root]; + match kind { + ChangeKind::Create => { + let mut paths = Vec::new(); + if path.is_dir() { + paths.extend(watch_recursive(watcher, &path, &config)); + } else { + paths.push(rel_path); } - ChangeKind::Write => { - if let Some(text) = read_to_string(&path) { - self.sender.send(TaskResult::ChangeSingleFile { + paths + .into_iter() + .filter_map(|rel_path| { + let abs_path = rel_path.to_path(&config.root); + let text = read_to_string(&abs_path)?; + Some((rel_path, text)) + }) + .try_for_each(|(path, text)| { + sender.send(TaskResult::AddSingleFile { root, path, text }) + }) + .unwrap() + } + ChangeKind::Write => { + if let Some(text) = read_to_string(&path) { + sender + .send(TaskResult::ChangeSingleFile { root, path: rel_path, text, - })?; - } + }) + .unwrap(); } - ChangeKind::Remove => self.sender.send(TaskResult::RemoveSingleFile { + } + ChangeKind::Remove => sender + .send(TaskResult::RemoveSingleFile { root, path: rel_path, - })?, - } - Ok(()) + }) + .unwrap(), } } -- cgit v1.2.3