From e0b8942c56378b7966af39058f27b11a0d02890f Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Thu, 14 Feb 2019 21:14:47 +0300 Subject: simplify --- crates/ra_vfs/src/io.rs | 155 +++++++++++++++++++++-------------------------- crates/ra_vfs/src/lib.rs | 2 +- 2 files changed, 71 insertions(+), 86 deletions(-) (limited to 'crates') diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs index 1b70cd8df..f64b4c532 100644 --- a/crates/ra_vfs/src/io.rs +++ b/crates/ra_vfs/src/io.rs @@ -4,7 +4,7 @@ use std::{ sync::{mpsc, Arc}, time::Duration, }; -use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select}; +use crossbeam_channel::{Sender, unbounded, RecvError, select}; use relative_path::RelativePathBuf; use walkdir::WalkDir; use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; @@ -46,93 +46,78 @@ enum ChangeKind { const WATCHER_DELAY: Duration = Duration::from_millis(250); -pub(crate) struct Worker { - thread_worker: thread_worker::Worker, -} - -impl Worker { - pub(crate) fn start(roots: Arc) -> Worker { - // This is a pretty elaborate setup of threads & channels! It is - // explained by the following concerns: - // * we need to burn a thread translating from notify's mpsc to - // crossbeam_channel. - // * we want to read all files from a single thread, to guarantee that - // we always get fresher versions and never go back in time. - // * we want to tear down everything neatly during shutdown. - let thread_worker = thread_worker::Worker::spawn( - "vfs", - 128, - // This are the channels we use to communicate with outside world. - // If `input_receiver` is closed we need to tear ourselves down. - // `output_sender` should not be closed unless the parent died. - move |input_receiver, output_sender| { - // Make sure that the destruction order is - // - // * notify_sender - // * _thread - // * watcher_sender - // - // this is required to avoid deadlocks. - - // These are the corresponding crossbeam channels - let (watcher_sender, watcher_receiver) = unbounded(); - let _thread; - { - // These are `std` channels notify will send events to - let (notify_sender, notify_receiver) = mpsc::channel(); - - let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) - .map_err(|e| log::error!("failed to spawn notify {}", e)) - .ok(); - // Start a silly thread to transform between two channels - _thread = thread_worker::ScopedThread::spawn("notify-convertor", move || { - notify_receiver - .into_iter() - .for_each(|event| convert_notify_event(event, &watcher_sender)) - }); - - // Process requests from the called or notifications from - // watcher until the caller says stop. - loop { - select! { - // Received request from the caller. If this channel is - // closed, we should shutdown everything. - recv(input_receiver) -> t => match t { - Err(RecvError) => { - drop(input_receiver); - break - }, - Ok(Task::AddRoot { root, config }) => { - watch_root(watcher.as_mut(), &output_sender, root, Arc::clone(&config)); - } - }, - // Watcher send us changes. If **this** channel is - // closed, the watcher has died, which indicates a bug - // -- escalate! - recv(watcher_receiver) -> event => match event { - Err(RecvError) => panic!("watcher is dead"), - Ok((path, change)) => { - handle_change(watcher.as_mut(), &output_sender, &*roots, path, change); - } +pub(crate) type Worker = thread_worker::Worker; +pub(crate) fn start(roots: Arc) -> Worker { + // This is a pretty elaborate setup of threads & channels! It is + // explained by the following concerns: + // * we need to burn a thread translating from notify's mpsc to + // crossbeam_channel. + // * we want to read all files from a single thread, to guarantee that + // we always get fresher versions and never go back in time. + // * we want to tear down everything neatly during shutdown. + Worker::spawn( + "vfs", + 128, + // This are the channels we use to communicate with outside world. + // If `input_receiver` is closed we need to tear ourselves down. + // `output_sender` should not be closed unless the parent died. + move |input_receiver, output_sender| { + // Make sure that the destruction order is + // + // * notify_sender + // * _thread + // * watcher_sender + // + // this is required to avoid deadlocks. + + // These are the corresponding crossbeam channels + let (watcher_sender, watcher_receiver) = unbounded(); + let _thread; + { + // These are `std` channels notify will send events to + let (notify_sender, notify_receiver) = mpsc::channel(); + + let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) + .map_err(|e| log::error!("failed to spawn notify {}", e)) + .ok(); + // Start a silly thread to transform between two channels + _thread = thread_worker::ScopedThread::spawn("notify-convertor", move || { + notify_receiver + .into_iter() + .for_each(|event| convert_notify_event(event, &watcher_sender)) + }); + + // Process requests from the called or notifications from + // watcher until the caller says stop. + loop { + select! { + // Received request from the caller. If this channel is + // closed, we should shutdown everything. + recv(input_receiver) -> t => match t { + Err(RecvError) => { + drop(input_receiver); + break }, - } + Ok(Task::AddRoot { root, config }) => { + watch_root(watcher.as_mut(), &output_sender, root, Arc::clone(&config)); + } + }, + // Watcher send us changes. If **this** channel is + // closed, the watcher has died, which indicates a bug + // -- escalate! + recv(watcher_receiver) -> event => match event { + Err(RecvError) => panic!("watcher is dead"), + Ok((path, change)) => { + handle_change(watcher.as_mut(), &output_sender, &*roots, path, change); + } + }, } } - // Drain pending events: we are not interested in them anyways! - watcher_receiver.into_iter().for_each(|_| ()); - }, - ); - - Worker { thread_worker } - } - - pub(crate) fn sender(&self) -> &Sender { - &self.thread_worker.sender() - } - - pub(crate) fn receiver(&self) -> &Receiver { - &self.thread_worker.receiver() - } + } + // Drain pending events: we are not interested in them anyways! + watcher_receiver.into_iter().for_each(|_| ()); + }, + ) } fn watch_root( diff --git a/crates/ra_vfs/src/lib.rs b/crates/ra_vfs/src/lib.rs index 1fb255365..cfdc1275f 100644 --- a/crates/ra_vfs/src/lib.rs +++ b/crates/ra_vfs/src/lib.rs @@ -159,7 +159,7 @@ impl fmt::Debug for Vfs { impl Vfs { pub fn new(roots: Vec) -> (Vfs, Vec) { let roots = Arc::new(Roots::new(roots)); - let worker = io::Worker::start(Arc::clone(&roots)); + let worker = io::start(Arc::clone(&roots)); let mut root2files = ArenaMap::default(); for (root, config) in roots.iter() { -- cgit v1.2.3