aboutsummaryrefslogtreecommitdiff
path: root/crates/ra_vfs/src/io/watcher.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/ra_vfs/src/io/watcher.rs')
-rw-r--r--crates/ra_vfs/src/io/watcher.rs215
1 files changed, 141 insertions, 74 deletions
diff --git a/crates/ra_vfs/src/io/watcher.rs b/crates/ra_vfs/src/io/watcher.rs
index 68bb6b692..1d7ce2136 100644
--- a/crates/ra_vfs/src/io/watcher.rs
+++ b/crates/ra_vfs/src/io/watcher.rs
@@ -1,118 +1,72 @@
1use crate::{io, RootFilter}; 1use crate::{io, RootFilter, Roots, VfsRoot};
2use crossbeam_channel::Sender; 2use crossbeam_channel::Sender;
3use drop_bomb::DropBomb; 3use drop_bomb::DropBomb;
4use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher}; 4use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher};
5use parking_lot::Mutex;
5use std::{ 6use std::{
7 fs,
6 path::{Path, PathBuf}, 8 path::{Path, PathBuf},
7 sync::mpsc, 9 sync::{mpsc, Arc},
8 thread, 10 thread,
9 time::Duration, 11 time::Duration,
10}; 12};
11use walkdir::WalkDir; 13use walkdir::WalkDir;
12 14
13#[derive(Debug)] 15#[derive(Debug)]
14pub enum WatcherChange { 16enum ChangeKind {
15 Create(PathBuf), 17 Create,
16 Write(PathBuf), 18 Write,
17 Remove(PathBuf), 19 Remove,
18 Rescan,
19}
20
21fn handle_change_event(
22 ev: DebouncedEvent,
23 sender: &Sender<io::Task>,
24) -> Result<(), Box<std::error::Error>> {
25 match ev {
26 DebouncedEvent::NoticeWrite(_)
27 | DebouncedEvent::NoticeRemove(_)
28 | DebouncedEvent::Chmod(_) => {
29 // ignore
30 }
31 DebouncedEvent::Rescan => {
32 sender.send(io::Task::HandleChange(WatcherChange::Rescan))?;
33 }
34 DebouncedEvent::Create(path) => {
35 sender.send(io::Task::HandleChange(WatcherChange::Create(path)))?;
36 }
37 DebouncedEvent::Write(path) => {
38 sender.send(io::Task::HandleChange(WatcherChange::Write(path)))?;
39 }
40 DebouncedEvent::Remove(path) => {
41 sender.send(io::Task::HandleChange(WatcherChange::Remove(path)))?;
42 }
43 DebouncedEvent::Rename(src, dst) => {
44 sender.send(io::Task::HandleChange(WatcherChange::Remove(src)))?;
45 sender.send(io::Task::HandleChange(WatcherChange::Create(dst)))?;
46 }
47 DebouncedEvent::Error(err, path) => {
48 // TODO should we reload the file contents?
49 log::warn!("watcher error \"{}\", {:?}", err, path);
50 }
51 }
52 Ok(())
53} 20}
54 21
55const WATCHER_DELAY: Duration = Duration::from_millis(250); 22const WATCHER_DELAY: Duration = Duration::from_millis(250);
56 23
57pub(crate) struct Watcher { 24pub(crate) struct Watcher {
58 watcher: RecommendedWatcher,
59 thread: thread::JoinHandle<()>, 25 thread: thread::JoinHandle<()>,
60 bomb: DropBomb, 26 bomb: DropBomb,
61 sender: Sender<io::Task>, 27 watcher: Arc<Mutex<Option<RecommendedWatcher>>>,
62} 28}
63 29
64impl Watcher { 30impl Watcher {
65 pub(crate) fn start( 31 pub(crate) fn start(
66 output_sender: Sender<io::Task>, 32 roots: Arc<Roots>,
33 output_sender: Sender<io::TaskResult>,
67 ) -> Result<Watcher, Box<std::error::Error>> { 34 ) -> Result<Watcher, Box<std::error::Error>> {
68 let (input_sender, input_receiver) = mpsc::channel(); 35 let (input_sender, input_receiver) = mpsc::channel();
69 let watcher = notify::watcher(input_sender, WATCHER_DELAY)?; 36 let watcher = Arc::new(Mutex::new(Some(notify::watcher(
37 input_sender,
38 WATCHER_DELAY,
39 )?)));
70 let sender = output_sender.clone(); 40 let sender = output_sender.clone();
41 let watcher_clone = watcher.clone();
71 let thread = thread::spawn(move || { 42 let thread = thread::spawn(move || {
43 let worker = WatcherWorker {
44 roots,
45 watcher: watcher_clone,
46 sender,
47 };
72 input_receiver 48 input_receiver
73 .into_iter() 49 .into_iter()
74 // forward relevant events only 50 // forward relevant events only
75 .try_for_each(|change| handle_change_event(change, &output_sender)) 51 .try_for_each(|change| worker.handle_debounced_event(change))
76 .unwrap() 52 .unwrap()
77 }); 53 });
78 Ok(Watcher { 54 Ok(Watcher {
79 watcher,
80 thread, 55 thread,
81 sender, 56 watcher,
82 bomb: DropBomb::new(format!("Watcher was not shutdown")), 57 bomb: DropBomb::new(format!("Watcher was not shutdown")),
83 }) 58 })
84 } 59 }
85 60
86 pub fn watch_recursive(&mut self, dir: &Path, filter: &RootFilter, emit_for_contents: bool) { 61 pub fn watch_root(&mut self, filter: &RootFilter) {
87 for res in WalkDir::new(dir) 62 for res in WalkDir::new(&filter.root)
88 .into_iter() 63 .into_iter()
89 .filter_entry(|entry| filter.can_contain(entry.path()).is_some()) 64 .filter_entry(filter.entry_filter())
90 { 65 {
91 match res { 66 match res {
92 Ok(entry) => { 67 Ok(entry) => {
93 if entry.path().is_dir() { 68 if entry.path().is_dir() {
94 match self 69 watch_one(self.watcher.as_ref(), entry.path());
95 .watcher
96 .watch(entry.path(), RecursiveMode::NonRecursive)
97 {
98 Ok(()) => log::debug!("watching \"{}\"", entry.path().display()),
99 Err(e) => {
100 log::warn!("could not watch \"{}\": {}", entry.path().display(), e)
101 }
102 }
103 } else {
104 if emit_for_contents && entry.depth() > 0 {
105 // emit only for files otherwise we will cause watch_recursive to be called again with a dir that we are already watching
106 // emit as create because we haven't seen it yet
107 if let Err(e) =
108 self.sender
109 .send(io::Task::HandleChange(WatcherChange::Create(
110 entry.path().to_path_buf(),
111 )))
112 {
113 log::warn!("watcher error: {}", e)
114 }
115 }
116 } 70 }
117 } 71 }
118 Err(e) => log::warn!("watcher error: {}", e), 72 Err(e) => log::warn!("watcher error: {}", e),
@@ -122,7 +76,7 @@ impl Watcher {
122 76
123 pub fn shutdown(mut self) -> thread::Result<()> { 77 pub fn shutdown(mut self) -> thread::Result<()> {
124 self.bomb.defuse(); 78 self.bomb.defuse();
125 drop(self.watcher); 79 drop(self.watcher.lock().take());
126 let res = self.thread.join(); 80 let res = self.thread.join();
127 match &res { 81 match &res {
128 Ok(()) => log::info!("... Watcher terminated with ok"), 82 Ok(()) => log::info!("... Watcher terminated with ok"),
@@ -131,3 +85,116 @@ impl Watcher {
131 res 85 res
132 } 86 }
133} 87}
88
89struct WatcherWorker {
90 watcher: Arc<Mutex<Option<RecommendedWatcher>>>,
91 roots: Arc<Roots>,
92 sender: Sender<io::TaskResult>,
93}
94
95impl WatcherWorker {
96 fn handle_debounced_event(&self, ev: DebouncedEvent) -> Result<(), Box<std::error::Error>> {
97 match ev {
98 DebouncedEvent::NoticeWrite(_)
99 | DebouncedEvent::NoticeRemove(_)
100 | DebouncedEvent::Chmod(_) => {
101 // ignore
102 }
103 DebouncedEvent::Rescan => {
104 // TODO rescan all roots
105 }
106 DebouncedEvent::Create(path) => {
107 self.handle_change(path, ChangeKind::Create);
108 }
109 DebouncedEvent::Write(path) => {
110 self.handle_change(path, ChangeKind::Write);
111 }
112 DebouncedEvent::Remove(path) => {
113 self.handle_change(path, ChangeKind::Remove);
114 }
115 DebouncedEvent::Rename(src, dst) => {
116 self.handle_change(src, ChangeKind::Remove);
117 self.handle_change(dst, ChangeKind::Create);
118 }
119 DebouncedEvent::Error(err, path) => {
120 // TODO should we reload the file contents?
121 log::warn!("watcher error \"{}\", {:?}", err, path);
122 }
123 }
124 Ok(())
125 }
126
127 fn handle_change(&self, path: PathBuf, kind: ChangeKind) {
128 if let Err(e) = self.try_handle_change(path, kind) {
129 log::warn!("watcher error: {}", e)
130 }
131 }
132
133 fn try_handle_change(
134 &self,
135 path: PathBuf,
136 kind: ChangeKind,
137 ) -> Result<(), Box<std::error::Error>> {
138 let (root, rel_path) = match self.roots.find(&path) {
139 Some(x) => x,
140 None => return Ok(()),
141 };
142 match kind {
143 ChangeKind::Create => {
144 if path.is_dir() {
145 self.watch_recursive(&path, root);
146 } else {
147 let text = fs::read_to_string(&path)?;
148 self.sender.send(io::TaskResult::AddSingleFile {
149 root,
150 path: rel_path,
151 text,
152 })?
153 }
154 }
155 ChangeKind::Write => {
156 let text = fs::read_to_string(&path)?;
157 self.sender.send(io::TaskResult::ChangeSingleFile {
158 root,
159 path: rel_path,
160 text,
161 })?
162 }
163 ChangeKind::Remove => self.sender.send(io::TaskResult::RemoveSingleFile {
164 root,
165 path: rel_path,
166 })?,
167 }
168 Ok(())
169 }
170
171 fn watch_recursive(&self, dir: &Path, root: VfsRoot) {
172 let filter = &self.roots[root];
173 for res in WalkDir::new(dir)
174 .into_iter()
175 .filter_entry(|entry| filter.can_contain(entry.path()).is_some())
176 {
177 match res {
178 Ok(entry) => {
179 if entry.path().is_dir() {
180 watch_one(self.watcher.as_ref(), entry.path());
181 } else {
182 // emit only for files otherwise we will cause watch_recursive to be called again with a dir that we are already watching
183 // emit as create because we haven't seen it yet
184 self.handle_change(entry.path().to_path_buf(), ChangeKind::Create);
185 }
186 }
187 Err(e) => log::warn!("watcher error: {}", e),
188 }
189 }
190 }
191}
192
193fn watch_one(watcher: &Mutex<Option<RecommendedWatcher>>, dir: &Path) {
194 if let Some(watcher) = watcher.lock().as_mut() {
195 match watcher.watch(dir, RecursiveMode::NonRecursive) {
196 Ok(()) => log::debug!("watching \"{}\"", dir.display()),
197 Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e),
198 }
199 }
200}