From 9f16892b94817d144f37dfe0081b39aacec65635 Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Sat, 26 Jan 2019 17:17:28 +0300 Subject: remove watcher ctx --- crates/ra_vfs/src/io.rs | 142 ++++++++++++++++++++++++++---------------------- 1 file changed, 77 insertions(+), 65 deletions(-) (limited to 'crates') diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs index 8c719dc5d..d764c534a 100644 --- a/crates/ra_vfs/src/io.rs +++ b/crates/ra_vfs/src/io.rs @@ -13,8 +13,6 @@ use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watc use crate::{RootConfig, Roots, VfsRoot}; -type Result = std::result::Result>; - pub(crate) enum Task { AddRoot { root: VfsRoot, @@ -62,7 +60,6 @@ impl Worker { pub(crate) fn start(roots: Arc) -> Worker { // This is a pretty elaborate setup of threads & channels! It is // explained by the following concerns: - // * we need to burn a thread translating from notify's mpsc to // crossbeam_channel. // * we want to read all files from a single thread, to gurantee that @@ -79,48 +76,57 @@ impl Worker { let (notify_sender, notify_receiver) = mpsc::channel(); // These are the corresponding crossbeam channels let (watcher_sender, watcher_receiver) = unbounded(); - let watcher = notify::watcher(notify_sender, WATCHER_DELAY) + + let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) .map_err(|e| log::error!("failed to spawn notify {}", e)) .ok(); - let mut ctx = WatcherCtx { - roots, - watcher, - sender: output_sender, - }; + // Start a silly thread to tranform between two channels let thread = thread::spawn(move || { - let _ = notify_receiver + notify_receiver .into_iter() - // forward relevant events only - .for_each(|event| convert_notify_event(event, &watcher_sender)); + .for_each(|event| convert_notify_event(event, &watcher_sender)) }); + // Process requests from the called or notifications from + // watcher until the caller says stop. loop { select! { // Received request from the caller. If this channel is // closed, we should shutdown everything. recv(input_receiver) -> t => match t { - Err(RecvError) => break, - Ok(Task::AddRoot { root, config }) => watch_root(&mut ctx, root, Arc::clone(&config)), + Err(RecvError) => { + drop(input_receiver); + break + }, + Ok(Task::AddRoot { root, config }) => { + watch_root(watcher.as_mut(), &output_sender, root, Arc::clone(&config)); + } }, // Watcher send us changes. If **this** channel is // closed, the watcher has died, which indicates a bug // -- escalate! recv(watcher_receiver) -> event => match event { Err(RecvError) => panic!("watcher is dead"), - Ok((path, change)) => WatcherCtx::handle_change(&mut ctx, path, change).unwrap(), + Ok((path, change)) => { + handle_change(watcher.as_mut(), &output_sender, &*roots, path, change); + } }, } } - drop(ctx.watcher.take()); - drop(ctx); - let res2 = thread.join(); - match &res2 { + // Stopped the watcher + drop(watcher.take()); + // Drain pending events: we are not inrerested in them anyways! + watcher_receiver.into_iter().for_each(|_| ()); + + let res = thread.join(); + match &res { Ok(()) => log::info!("... Watcher terminated with ok"), Err(_) => log::error!("... Watcher terminated with err"), } - res2.unwrap(); + res.unwrap(); }, ); + Worker { worker, worker_handle, @@ -141,9 +147,14 @@ impl Worker { } } -fn watch_root(woker: &mut WatcherCtx, root: VfsRoot, config: Arc) { +fn watch_root( + watcher: Option<&mut RecommendedWatcher>, + sender: &Sender, + root: VfsRoot, + config: Arc, +) { log::debug!("loading {} ...", config.root.as_path().display()); - let files = watch_recursive(woker.watcher.as_mut(), config.root.as_path(), &*config) + let files = watch_recursive(watcher, config.root.as_path(), &*config) .into_iter() .filter_map(|path| { let abs_path = path.to_path(&config.root); @@ -151,20 +162,14 @@ fn watch_root(woker: &mut WatcherCtx, root: VfsRoot, config: Arc) { Some((path, text)) }) .collect(); - woker - .sender + sender .send(TaskResult::BulkLoadRoot { root, files }) .unwrap(); log::debug!("... loaded {}", config.root.as_path().display()); } -struct WatcherCtx { - roots: Arc, - watcher: Option, - sender: Sender, -} - fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeKind)>) { + // forward relevant events only match event { DebouncedEvent::NoticeWrite(_) | DebouncedEvent::NoticeRemove(_) @@ -194,48 +199,55 @@ fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeK } } -impl WatcherCtx { - fn handle_change(&mut self, path: PathBuf, kind: ChangeKind) -> Result<()> { - let (root, rel_path) = match self.roots.find(&path) { - None => return Ok(()), - Some(it) => it, - }; - let config = &self.roots[root]; - match kind { - ChangeKind::Create => { - let mut paths = Vec::new(); - if path.is_dir() { - paths.extend(watch_recursive(self.watcher.as_mut(), &path, &config)); - } else { - paths.push(rel_path); - } - paths - .into_iter() - .filter_map(|rel_path| { - let abs_path = rel_path.to_path(&config.root); - let text = read_to_string(&abs_path)?; - Some((rel_path, text)) - }) - .try_for_each(|(path, text)| { - self.sender - .send(TaskResult::AddSingleFile { root, path, text }) - })? +fn handle_change( + watcher: Option<&mut RecommendedWatcher>, + sender: &Sender, + roots: &Roots, + path: PathBuf, + kind: ChangeKind, +) { + let (root, rel_path) = match roots.find(&path) { + None => return, + Some(it) => it, + }; + let config = &roots[root]; + match kind { + ChangeKind::Create => { + let mut paths = Vec::new(); + if path.is_dir() { + paths.extend(watch_recursive(watcher, &path, &config)); + } else { + paths.push(rel_path); } - ChangeKind::Write => { - if let Some(text) = read_to_string(&path) { - self.sender.send(TaskResult::ChangeSingleFile { + paths + .into_iter() + .filter_map(|rel_path| { + let abs_path = rel_path.to_path(&config.root); + let text = read_to_string(&abs_path)?; + Some((rel_path, text)) + }) + .try_for_each(|(path, text)| { + sender.send(TaskResult::AddSingleFile { root, path, text }) + }) + .unwrap() + } + ChangeKind::Write => { + if let Some(text) = read_to_string(&path) { + sender + .send(TaskResult::ChangeSingleFile { root, path: rel_path, text, - })?; - } + }) + .unwrap(); } - ChangeKind::Remove => self.sender.send(TaskResult::RemoveSingleFile { + } + ChangeKind::Remove => sender + .send(TaskResult::RemoveSingleFile { root, path: rel_path, - })?, - } - Ok(()) + }) + .unwrap(), } } -- cgit v1.2.3