From a6897a837c2f633bdc88b87c614db227aab3b1ad Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Mon, 18 Feb 2019 16:43:30 +0300 Subject: remove local-vfs --- crates/ra_vfs/src/io.rs | 286 ------------------------------------------------ 1 file changed, 286 deletions(-) delete mode 100644 crates/ra_vfs/src/io.rs (limited to 'crates/ra_vfs/src/io.rs') diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs deleted file mode 100644 index 5969ee0d0..000000000 --- a/crates/ra_vfs/src/io.rs +++ /dev/null @@ -1,286 +0,0 @@ -use std::{ - fs, - path::{Path, PathBuf}, - sync::{mpsc, Arc}, - time::Duration, - thread, -}; -use crossbeam_channel::{Sender, Receiver, unbounded, RecvError, select}; -use relative_path::RelativePathBuf; -use walkdir::WalkDir; -use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; - -use crate::{Roots, VfsRoot, VfsTask}; - -pub(crate) enum Task { - AddRoot { root: VfsRoot }, -} - -/// `TaskResult` transfers files read on the IO thread to the VFS on the main -/// thread. -#[derive(Debug)] -pub(crate) enum TaskResult { - /// Emitted when we've recursively scanned a source root during the initial - /// load. - BulkLoadRoot { root: VfsRoot, files: Vec<(RelativePathBuf, String)> }, - /// Emitted when we've noticed that a single file has changed. - /// - /// Note that this by design does not distinguish between - /// create/delete/write events, and instead specifies the *current* state of - /// the file. The idea is to guarantee that in the quiescent state the sum - /// of all results equals to the current state of the file system, while - /// allowing to skip intermediate events in non-quiescent states. - SingleFile { root: VfsRoot, path: RelativePathBuf, text: Option }, -} - -/// The kind of raw notification we've received from the notify library. -/// -/// Note that these are not necessary 100% precise (for example we might receive -/// `Create` instead of `Write`, see #734), but we try do distinguish `Create`s -/// to implement recursive watching of directories. -#[derive(Debug)] -enum ChangeKind { - Create, - Write, - Remove, -} - -const WATCHER_DELAY: Duration = Duration::from_millis(250); - -// Like thread::JoinHandle, but joins the thread on drop. -// -// This is useful because it guarantees the absence of run-away threads, even if -// code panics. This is important, because we might see panics in the test and -// we might be used in an IDE context, where a failed component is just -// restarted. -// -// Because all threads are joined, care must be taken to avoid deadlocks. That -// typically means ensuring that channels are dropped before the threads. -struct ScopedThread(Option>); - -impl ScopedThread { - fn spawn(name: String, f: impl FnOnce() + Send + 'static) -> ScopedThread { - let handle = thread::Builder::new().name(name).spawn(f).unwrap(); - ScopedThread(Some(handle)) - } -} - -impl Drop for ScopedThread { - fn drop(&mut self) { - let res = self.0.take().unwrap().join(); - if !thread::panicking() { - res.unwrap(); - } - } -} - -pub(crate) struct Worker { - // XXX: field order is significant here. - // - // In Rust, fields are dropped in the declaration order, and we rely on this - // here. We must close sender first, so that the `thread` (who holds the - // opposite side of the channel) noticed shutdown. Then, we must join the - // thread, but we must keep receiver alive so that the thread does not - // panic. - pub(crate) sender: Sender, - _thread: ScopedThread, - pub(crate) receiver: Receiver, -} - -pub(crate) fn start(roots: Arc) -> Worker { - // This is a pretty elaborate setup of threads & channels! It is - // explained by the following concerns: - // * we need to burn a thread translating from notify's mpsc to - // crossbeam_channel. - // * we want to read all files from a single thread, to guarantee that - // we always get fresher versions and never go back in time. - // * we want to tear down everything neatly during shutdown. - let _thread; - // This are the channels we use to communicate with outside world. - // If `input_receiver` is closed we need to tear ourselves down. - // `output_sender` should not be closed unless the parent died. - let (input_sender, input_receiver) = unbounded(); - let (output_sender, output_receiver) = unbounded(); - - _thread = ScopedThread::spawn("vfs".to_string(), move || { - // Make sure that the destruction order is - // - // * notify_sender - // * _thread - // * watcher_sender - // - // this is required to avoid deadlocks. - - // These are the corresponding crossbeam channels - let (watcher_sender, watcher_receiver) = unbounded(); - let _notify_thread; - { - // These are `std` channels notify will send events to - let (notify_sender, notify_receiver) = mpsc::channel(); - - let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) - .map_err(|e| log::error!("failed to spawn notify {}", e)) - .ok(); - // Start a silly thread to transform between two channels - _notify_thread = ScopedThread::spawn("notify-convertor".to_string(), move || { - notify_receiver - .into_iter() - .for_each(|event| convert_notify_event(event, &watcher_sender)) - }); - - // Process requests from the called or notifications from - // watcher until the caller says stop. - loop { - select! { - // Received request from the caller. If this channel is - // closed, we should shutdown everything. - recv(input_receiver) -> t => match t { - Err(RecvError) => { - drop(input_receiver); - break - }, - Ok(Task::AddRoot { root }) => { - watch_root(watcher.as_mut(), &output_sender, &*roots, root); - } - }, - // Watcher send us changes. If **this** channel is - // closed, the watcher has died, which indicates a bug - // -- escalate! - recv(watcher_receiver) -> event => match event { - Err(RecvError) => panic!("watcher is dead"), - Ok((path, change)) => { - handle_change(watcher.as_mut(), &output_sender, &*roots, path, change); - } - }, - } - } - } - // Drain pending events: we are not interested in them anyways! - watcher_receiver.into_iter().for_each(|_| ()); - }); - Worker { sender: input_sender, _thread, receiver: output_receiver } -} - -fn watch_root( - watcher: Option<&mut RecommendedWatcher>, - sender: &Sender, - roots: &Roots, - root: VfsRoot, -) { - let root_path = roots.path(root); - log::debug!("loading {} ...", root_path.display()); - let files = watch_recursive(watcher, root_path, roots, root) - .into_iter() - .filter_map(|path| { - let abs_path = path.to_path(&root_path); - let text = read_to_string(&abs_path)?; - Some((path, text)) - }) - .collect(); - let res = TaskResult::BulkLoadRoot { root, files }; - sender.send(VfsTask(res)).unwrap(); - log::debug!("... loaded {}", root_path.display()); -} - -fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeKind)>) { - // forward relevant events only - match event { - DebouncedEvent::NoticeWrite(_) - | DebouncedEvent::NoticeRemove(_) - | DebouncedEvent::Chmod(_) => { - // ignore - } - DebouncedEvent::Rescan => { - // TODO: rescan all roots - } - DebouncedEvent::Create(path) => { - sender.send((path, ChangeKind::Create)).unwrap(); - } - DebouncedEvent::Write(path) => { - sender.send((path, ChangeKind::Write)).unwrap(); - } - DebouncedEvent::Remove(path) => { - sender.send((path, ChangeKind::Remove)).unwrap(); - } - DebouncedEvent::Rename(src, dst) => { - sender.send((src, ChangeKind::Remove)).unwrap(); - sender.send((dst, ChangeKind::Create)).unwrap(); - } - DebouncedEvent::Error(err, path) => { - // TODO: should we reload the file contents? - log::warn!("watcher error \"{}\", {:?}", err, path); - } - } -} - -fn handle_change( - watcher: Option<&mut RecommendedWatcher>, - sender: &Sender, - roots: &Roots, - path: PathBuf, - kind: ChangeKind, -) { - let (root, rel_path) = match roots.find(&path) { - None => return, - Some(it) => it, - }; - match kind { - ChangeKind::Create => { - let mut paths = Vec::new(); - if path.is_dir() { - paths.extend(watch_recursive(watcher, &path, roots, root)); - } else { - paths.push(rel_path); - } - paths - .into_iter() - .try_for_each(|rel_path| { - let abs_path = rel_path.to_path(&roots.path(root)); - let text = read_to_string(&abs_path); - let res = TaskResult::SingleFile { root, path: rel_path, text }; - sender.send(VfsTask(res)) - }) - .unwrap() - } - ChangeKind::Write | ChangeKind::Remove => { - let text = read_to_string(&path); - let res = TaskResult::SingleFile { root, path: rel_path, text }; - sender.send(VfsTask(res)).unwrap(); - } - } -} - -fn watch_recursive( - mut watcher: Option<&mut RecommendedWatcher>, - dir: &Path, - roots: &Roots, - root: VfsRoot, -) -> Vec { - let mut files = Vec::new(); - for entry in WalkDir::new(dir) - .into_iter() - .filter_entry(|it| roots.contains(root, it.path()).is_some()) - .filter_map(|it| it.map_err(|e| log::warn!("watcher error: {}", e)).ok()) - { - if entry.file_type().is_dir() { - if let Some(watcher) = &mut watcher { - watch_one(watcher, entry.path()); - } - } else { - let path = roots.contains(root, entry.path()).unwrap(); - files.push(path.to_owned()); - } - } - files -} - -fn watch_one(watcher: &mut RecommendedWatcher, dir: &Path) { - match watcher.watch(dir, RecursiveMode::NonRecursive) { - Ok(()) => log::debug!("watching \"{}\"", dir.display()), - Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e), - } -} - -fn read_to_string(path: &Path) -> Option { - fs::read_to_string(&path).map_err(|e| log::warn!("failed to read file {}", e)).ok() -} -- cgit v1.2.3