aboutsummaryrefslogtreecommitdiff
path: root/crates/ra_vfs/src/io/watcher.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/ra_vfs/src/io/watcher.rs')
-rw-r--r--crates/ra_vfs/src/io/watcher.rs200
1 files changed, 200 insertions, 0 deletions
diff --git a/crates/ra_vfs/src/io/watcher.rs b/crates/ra_vfs/src/io/watcher.rs
new file mode 100644
index 000000000..ff6775f59
--- /dev/null
+++ b/crates/ra_vfs/src/io/watcher.rs
@@ -0,0 +1,200 @@
1use crate::{io, RootFilter, Roots, VfsRoot};
2use crossbeam_channel::Sender;
3use drop_bomb::DropBomb;
4use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher};
5use parking_lot::Mutex;
6use std::{
7 fs,
8 path::{Path, PathBuf},
9 sync::{mpsc, Arc},
10 thread,
11 time::Duration,
12};
13use walkdir::WalkDir;
14
15#[derive(Debug)]
16enum ChangeKind {
17 Create,
18 Write,
19 Remove,
20}
21
22const WATCHER_DELAY: Duration = Duration::from_millis(250);
23
24pub(crate) struct Watcher {
25 thread: thread::JoinHandle<()>,
26 bomb: DropBomb,
27 watcher: Arc<Mutex<Option<RecommendedWatcher>>>,
28}
29
30impl Watcher {
31 pub(crate) fn start(
32 roots: Arc<Roots>,
33 output_sender: Sender<io::TaskResult>,
34 ) -> Result<Watcher, Box<std::error::Error>> {
35 let (input_sender, input_receiver) = mpsc::channel();
36 let watcher = Arc::new(Mutex::new(Some(notify::watcher(
37 input_sender,
38 WATCHER_DELAY,
39 )?)));
40 let sender = output_sender.clone();
41 let watcher_clone = watcher.clone();
42 let thread = thread::spawn(move || {
43 let worker = WatcherWorker {
44 roots,
45 watcher: watcher_clone,
46 sender,
47 };
48 input_receiver
49 .into_iter()
50 // forward relevant events only
51 .try_for_each(|change| worker.handle_debounced_event(change))
52 .unwrap()
53 });
54 Ok(Watcher {
55 thread,
56 watcher,
57 bomb: DropBomb::new(format!("Watcher was not shutdown")),
58 })
59 }
60
61 pub fn watch_root(&mut self, filter: &RootFilter) {
62 for res in WalkDir::new(&filter.root)
63 .into_iter()
64 .filter_entry(filter.entry_filter())
65 {
66 match res {
67 Ok(entry) => {
68 if entry.file_type().is_dir() {
69 watch_one(self.watcher.as_ref(), entry.path());
70 }
71 }
72 Err(e) => log::warn!("watcher error: {}", e),
73 }
74 }
75 }
76
77 pub fn shutdown(mut self) -> thread::Result<()> {
78 self.bomb.defuse();
79 drop(self.watcher.lock().take());
80 let res = self.thread.join();
81 match &res {
82 Ok(()) => log::info!("... Watcher terminated with ok"),
83 Err(_) => log::error!("... Watcher terminated with err"),
84 }
85 res
86 }
87}
88
89struct WatcherWorker {
90 watcher: Arc<Mutex<Option<RecommendedWatcher>>>,
91 roots: Arc<Roots>,
92 sender: Sender<io::TaskResult>,
93}
94
95impl WatcherWorker {
96 fn handle_debounced_event(&self, ev: DebouncedEvent) -> Result<(), Box<std::error::Error>> {
97 match ev {
98 DebouncedEvent::NoticeWrite(_)
99 | DebouncedEvent::NoticeRemove(_)
100 | DebouncedEvent::Chmod(_) => {
101 // ignore
102 }
103 DebouncedEvent::Rescan => {
104 // TODO rescan all roots
105 }
106 DebouncedEvent::Create(path) => {
107 self.handle_change(path, ChangeKind::Create);
108 }
109 DebouncedEvent::Write(path) => {
110 self.handle_change(path, ChangeKind::Write);
111 }
112 DebouncedEvent::Remove(path) => {
113 self.handle_change(path, ChangeKind::Remove);
114 }
115 DebouncedEvent::Rename(src, dst) => {
116 self.handle_change(src, ChangeKind::Remove);
117 self.handle_change(dst, ChangeKind::Create);
118 }
119 DebouncedEvent::Error(err, path) => {
120 // TODO should we reload the file contents?
121 log::warn!("watcher error \"{}\", {:?}", err, path);
122 }
123 }
124 Ok(())
125 }
126
127 fn handle_change(&self, path: PathBuf, kind: ChangeKind) {
128 if let Err(e) = self.try_handle_change(path, kind) {
129 log::warn!("watcher error: {}", e)
130 }
131 }
132
133 fn try_handle_change(
134 &self,
135 path: PathBuf,
136 kind: ChangeKind,
137 ) -> Result<(), Box<std::error::Error>> {
138 let (root, rel_path) = match self.roots.find(&path) {
139 Some(x) => x,
140 None => return Ok(()),
141 };
142 match kind {
143 ChangeKind::Create => {
144 if path.is_dir() {
145 self.watch_recursive(&path, root);
146 } else {
147 let text = fs::read_to_string(&path)?;
148 self.sender.send(io::TaskResult::AddSingleFile {
149 root,
150 path: rel_path,
151 text,
152 })?
153 }
154 }
155 ChangeKind::Write => {
156 let text = fs::read_to_string(&path)?;
157 self.sender.send(io::TaskResult::ChangeSingleFile {
158 root,
159 path: rel_path,
160 text,
161 })?
162 }
163 ChangeKind::Remove => self.sender.send(io::TaskResult::RemoveSingleFile {
164 root,
165 path: rel_path,
166 })?,
167 }
168 Ok(())
169 }
170
171 fn watch_recursive(&self, dir: &Path, root: VfsRoot) {
172 let filter = &self.roots[root];
173 for res in WalkDir::new(dir)
174 .into_iter()
175 .filter_entry(filter.entry_filter())
176 {
177 match res {
178 Ok(entry) => {
179 if entry.file_type().is_dir() {
180 watch_one(self.watcher.as_ref(), entry.path());
181 } else {
182 // emit only for files otherwise we will cause watch_recursive to be called again with a dir that we are already watching
183 // emit as create because we haven't seen it yet
184 self.handle_change(entry.path().to_path_buf(), ChangeKind::Create);
185 }
186 }
187 Err(e) => log::warn!("watcher error: {}", e),
188 }
189 }
190 }
191}
192
193fn watch_one(watcher: &Mutex<Option<RecommendedWatcher>>, dir: &Path) {
194 if let Some(watcher) = watcher.lock().as_mut() {
195 match watcher.watch(dir, RecursiveMode::NonRecursive) {
196 Ok(()) => log::debug!("watching \"{}\"", dir.display()),
197 Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e),
198 }
199 }
200}