aboutsummaryrefslogtreecommitdiff
path: root/crates/ra_vfs/src/io/watcher.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/ra_vfs/src/io/watcher.rs')
-rw-r--r--crates/ra_vfs/src/io/watcher.rs128
1 files changed, 128 insertions, 0 deletions
diff --git a/crates/ra_vfs/src/io/watcher.rs b/crates/ra_vfs/src/io/watcher.rs
new file mode 100644
index 000000000..e33298477
--- /dev/null
+++ b/crates/ra_vfs/src/io/watcher.rs
@@ -0,0 +1,128 @@
1use crate::io;
2use crossbeam_channel::Sender;
3use drop_bomb::DropBomb;
4use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher};
5use std::{
6 path::{Path, PathBuf},
7 sync::mpsc,
8 thread,
9 time::Duration,
10};
11use walkdir::{DirEntry, WalkDir};
12
13#[derive(Debug)]
14pub enum WatcherChange {
15 Create(PathBuf),
16 Write(PathBuf),
17 Remove(PathBuf),
18 Rescan,
19}
20
21fn handle_change_event(
22 ev: DebouncedEvent,
23 sender: &Sender<io::Task>,
24) -> Result<(), Box<std::error::Error>> {
25 match ev {
26 DebouncedEvent::NoticeWrite(_)
27 | DebouncedEvent::NoticeRemove(_)
28 | DebouncedEvent::Chmod(_) => {
29 // ignore
30 }
31 DebouncedEvent::Rescan => {
32 sender.send(io::Task::HandleChange(WatcherChange::Rescan))?;
33 }
34 DebouncedEvent::Create(path) => {
35 sender.send(io::Task::HandleChange(WatcherChange::Create(path)))?;
36 }
37 DebouncedEvent::Write(path) => {
38 sender.send(io::Task::HandleChange(WatcherChange::Write(path)))?;
39 }
40 DebouncedEvent::Remove(path) => {
41 sender.send(io::Task::HandleChange(WatcherChange::Remove(path)))?;
42 }
43 DebouncedEvent::Rename(src, dst) => {
44 sender.send(io::Task::HandleChange(WatcherChange::Remove(src)))?;
45 sender.send(io::Task::HandleChange(WatcherChange::Create(dst)))?;
46 }
47 DebouncedEvent::Error(err, path) => {
48 // TODO should we reload the file contents?
49 log::warn!("watcher error \"{}\", {:?}", err, path);
50 }
51 }
52 Ok(())
53}
54
55const WATCHER_DELAY: Duration = Duration::from_millis(250);
56
57pub(crate) struct Watcher {
58 watcher: RecommendedWatcher,
59 thread: thread::JoinHandle<()>,
60 bomb: DropBomb,
61 sender: Sender<io::Task>,
62}
63
64impl Watcher {
65 pub(crate) fn start(
66 output_sender: Sender<io::Task>,
67 ) -> Result<Watcher, Box<std::error::Error>> {
68 let (input_sender, input_receiver) = mpsc::channel();
69 let watcher = notify::watcher(input_sender, WATCHER_DELAY)?;
70 let sender = output_sender.clone();
71 let thread = thread::spawn(move || {
72 input_receiver
73 .into_iter()
74 // forward relevant events only
75 .try_for_each(|change| handle_change_event(change, &output_sender))
76 .unwrap()
77 });
78 Ok(Watcher {
79 watcher,
80 thread,
81 sender,
82 bomb: DropBomb::new(format!("Watcher was not shutdown")),
83 })
84 }
85
86 pub fn watch_recursive(
87 &mut self,
88 dir: &Path,
89 filter_entry: impl Fn(&DirEntry) -> bool,
90 emit_for_contents: bool,
91 ) {
92 for res in WalkDir::new(dir).into_iter().filter_entry(filter_entry) {
93 match res {
94 Ok(entry) => {
95 if entry.path().is_dir() {
96 match self.watcher.watch(dir, RecursiveMode::NonRecursive) {
97 Ok(()) => log::debug!("watching \"{}\"", dir.display()),
98 Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e),
99 }
100 }
101 if emit_for_contents && entry.depth() > 0 {
102 // emit as create because we haven't seen it yet
103 if let Err(e) =
104 self.sender
105 .send(io::Task::HandleChange(WatcherChange::Create(
106 entry.path().to_path_buf(),
107 )))
108 {
109 log::warn!("watcher error: {}", e)
110 }
111 }
112 }
113 Err(e) => log::warn!("watcher error: {}", e),
114 }
115 }
116 }
117
118 pub fn shutdown(mut self) -> thread::Result<()> {
119 self.bomb.defuse();
120 drop(self.watcher);
121 let res = self.thread.join();
122 match &res {
123 Ok(()) => log::info!("... Watcher terminated with ok"),
124 Err(_) => log::error!("... Watcher terminated with err"),
125 }
126 res
127 }
128}