diff options
Diffstat (limited to 'crates/ra_vfs/src/io.rs')
-rw-r--r-- | crates/ra_vfs/src/io.rs | 229 |
1 files changed, 176 insertions, 53 deletions
diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs index 7ca1e9835..669240488 100644 --- a/crates/ra_vfs/src/io.rs +++ b/crates/ra_vfs/src/io.rs | |||
@@ -1,19 +1,23 @@ | |||
1 | use std::{fs, sync::Arc, thread}; | 1 | use std::{ |
2 | 2 | fs, | |
3 | use crossbeam_channel::{Receiver, Sender}; | 3 | path::{Path, PathBuf}, |
4 | sync::{mpsc, Arc}, | ||
5 | thread, | ||
6 | time::Duration, | ||
7 | }; | ||
8 | use crossbeam_channel::{Receiver, Sender, SendError}; | ||
4 | use relative_path::RelativePathBuf; | 9 | use relative_path::RelativePathBuf; |
5 | use thread_worker::WorkerHandle; | 10 | use thread_worker::WorkerHandle; |
6 | use walkdir::WalkDir; | 11 | use walkdir::WalkDir; |
12 | use parking_lot::Mutex; | ||
13 | use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; | ||
7 | 14 | ||
8 | mod watcher; | 15 | use crate::{RootConfig, Roots, VfsRoot}; |
9 | use watcher::Watcher; | ||
10 | |||
11 | use crate::{RootFilter, Roots, VfsRoot}; | ||
12 | 16 | ||
13 | pub(crate) enum Task { | 17 | pub(crate) enum Task { |
14 | AddRoot { | 18 | AddRoot { |
15 | root: VfsRoot, | 19 | root: VfsRoot, |
16 | filter: Arc<RootFilter>, | 20 | config: Arc<RootConfig>, |
17 | }, | 21 | }, |
18 | } | 22 | } |
19 | 23 | ||
@@ -39,6 +43,15 @@ pub enum TaskResult { | |||
39 | }, | 43 | }, |
40 | } | 44 | } |
41 | 45 | ||
46 | #[derive(Debug)] | ||
47 | enum ChangeKind { | ||
48 | Create, | ||
49 | Write, | ||
50 | Remove, | ||
51 | } | ||
52 | |||
53 | const WATCHER_DELAY: Duration = Duration::from_millis(250); | ||
54 | |||
42 | pub(crate) struct Worker { | 55 | pub(crate) struct Worker { |
43 | worker: thread_worker::Worker<Task, TaskResult>, | 56 | worker: thread_worker::Worker<Task, TaskResult>, |
44 | worker_handle: WorkerHandle, | 57 | worker_handle: WorkerHandle, |
@@ -48,21 +61,36 @@ impl Worker { | |||
48 | pub(crate) fn start(roots: Arc<Roots>) -> Worker { | 61 | pub(crate) fn start(roots: Arc<Roots>) -> Worker { |
49 | let (worker, worker_handle) = | 62 | let (worker, worker_handle) = |
50 | thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| { | 63 | thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| { |
51 | let mut watcher = match Watcher::start(roots, output_sender.clone()) { | 64 | let (notify_sender, notify_receiver) = mpsc::channel(); |
52 | Ok(w) => Some(w), | 65 | let watcher = notify::watcher(notify_sender, WATCHER_DELAY) |
53 | Err(e) => { | 66 | .map_err(|e| log::error!("failed to spawn notify {}", e)) |
54 | log::error!("could not start watcher: {}", e); | 67 | .ok(); |
55 | None | 68 | let ctx = WatcherCtx { |
56 | } | 69 | roots, |
70 | watcher: Arc::new(Mutex::new(watcher)), | ||
71 | sender: output_sender, | ||
57 | }; | 72 | }; |
58 | let res = input_receiver | 73 | let thread = thread::spawn({ |
59 | .into_iter() | 74 | let ctx = ctx.clone(); |
60 | .filter_map(|t| handle_task(t, &mut watcher)) | 75 | move || { |
61 | .try_for_each(|it| output_sender.send(it)); | 76 | let _ = notify_receiver |
62 | if let Some(watcher) = watcher { | 77 | .into_iter() |
63 | let _ = watcher.shutdown(); | 78 | // forward relevant events only |
79 | .try_for_each(|change| ctx.handle_debounced_event(change)); | ||
80 | } | ||
81 | }); | ||
82 | let res1 = input_receiver.into_iter().try_for_each(|t| match t { | ||
83 | Task::AddRoot { root, config } => watch_root(&ctx, root, Arc::clone(&config)), | ||
84 | }); | ||
85 | drop(ctx.watcher.lock().take()); | ||
86 | drop(ctx); | ||
87 | let res2 = thread.join(); | ||
88 | match &res2 { | ||
89 | Ok(()) => log::info!("... Watcher terminated with ok"), | ||
90 | Err(_) => log::error!("... Watcher terminated with err"), | ||
64 | } | 91 | } |
65 | res.unwrap() | 92 | res1.unwrap(); |
93 | res2.unwrap(); | ||
66 | }); | 94 | }); |
67 | Worker { | 95 | Worker { |
68 | worker, | 96 | worker, |
@@ -84,46 +112,141 @@ impl Worker { | |||
84 | } | 112 | } |
85 | } | 113 | } |
86 | 114 | ||
87 | fn handle_task(task: Task, watcher: &mut Option<Watcher>) -> Option<TaskResult> { | 115 | fn watch_root( |
88 | match task { | 116 | woker: &WatcherCtx, |
89 | Task::AddRoot { root, filter } => { | 117 | root: VfsRoot, |
90 | if let Some(watcher) = watcher { | 118 | config: Arc<RootConfig>, |
91 | watcher.watch_root(&filter) | 119 | ) -> Result<(), SendError<TaskResult>> { |
120 | let mut guard = woker.watcher.lock(); | ||
121 | log::debug!("loading {} ...", config.root.as_path().display()); | ||
122 | let files = watch_recursive(guard.as_mut(), config.root.as_path(), &*config) | ||
123 | .into_iter() | ||
124 | .filter_map(|path| { | ||
125 | let abs_path = path.to_path(&config.root); | ||
126 | let text = fs::read_to_string(abs_path) | ||
127 | .map_err(|e| log::warn!("watcher error: {}", e)) | ||
128 | .ok()?; | ||
129 | Some((path, text)) | ||
130 | }) | ||
131 | .collect(); | ||
132 | woker | ||
133 | .sender | ||
134 | .send(TaskResult::BulkLoadRoot { root, files })?; | ||
135 | log::debug!("... loaded {}", config.root.as_path().display()); | ||
136 | Ok(()) | ||
137 | } | ||
138 | |||
139 | #[derive(Clone)] | ||
140 | struct WatcherCtx { | ||
141 | roots: Arc<Roots>, | ||
142 | watcher: Arc<Mutex<Option<RecommendedWatcher>>>, | ||
143 | sender: Sender<TaskResult>, | ||
144 | } | ||
145 | |||
146 | impl WatcherCtx { | ||
147 | fn handle_debounced_event(&self, ev: DebouncedEvent) -> Result<(), SendError<TaskResult>> { | ||
148 | match ev { | ||
149 | DebouncedEvent::NoticeWrite(_) | ||
150 | | DebouncedEvent::NoticeRemove(_) | ||
151 | | DebouncedEvent::Chmod(_) => { | ||
152 | // ignore | ||
153 | } | ||
154 | DebouncedEvent::Rescan => { | ||
155 | // TODO rescan all roots | ||
156 | } | ||
157 | DebouncedEvent::Create(path) => { | ||
158 | self.handle_change(path, ChangeKind::Create)?; | ||
159 | } | ||
160 | DebouncedEvent::Write(path) => { | ||
161 | self.handle_change(path, ChangeKind::Write)?; | ||
92 | } | 162 | } |
93 | log::debug!("loading {} ...", filter.root.as_path().display()); | 163 | DebouncedEvent::Remove(path) => { |
94 | let files = load_root(filter.as_ref()); | 164 | self.handle_change(path, ChangeKind::Remove)?; |
95 | log::debug!("... loaded {}", filter.root.as_path().display()); | 165 | } |
96 | Some(TaskResult::BulkLoadRoot { root, files }) | 166 | DebouncedEvent::Rename(src, dst) => { |
167 | self.handle_change(src, ChangeKind::Remove)?; | ||
168 | self.handle_change(dst, ChangeKind::Create)?; | ||
169 | } | ||
170 | DebouncedEvent::Error(err, path) => { | ||
171 | // TODO should we reload the file contents? | ||
172 | log::warn!("watcher error \"{}\", {:?}", err, path); | ||
173 | } | ||
174 | } | ||
175 | Ok(()) | ||
176 | } | ||
177 | |||
178 | fn handle_change(&self, path: PathBuf, kind: ChangeKind) -> Result<(), SendError<TaskResult>> { | ||
179 | let (root, rel_path) = match self.roots.find(&path) { | ||
180 | None => return Ok(()), | ||
181 | Some(it) => it, | ||
182 | }; | ||
183 | let config = &self.roots[root]; | ||
184 | match kind { | ||
185 | ChangeKind::Create => { | ||
186 | let mut paths = Vec::new(); | ||
187 | if path.is_dir() { | ||
188 | let mut guard = self.watcher.lock(); | ||
189 | paths.extend(watch_recursive(guard.as_mut(), &path, &config)); | ||
190 | } else { | ||
191 | paths.push(rel_path); | ||
192 | } | ||
193 | paths | ||
194 | .into_iter() | ||
195 | .filter_map(|rel_path| { | ||
196 | let abs_path = rel_path.to_path(&config.root); | ||
197 | let text = fs::read_to_string(&abs_path) | ||
198 | .map_err(|e| log::warn!("watcher failed {}", e)) | ||
199 | .ok()?; | ||
200 | Some((rel_path, text)) | ||
201 | }) | ||
202 | .try_for_each(|(path, text)| { | ||
203 | self.sender | ||
204 | .send(TaskResult::AddSingleFile { root, path, text }) | ||
205 | })? | ||
206 | } | ||
207 | ChangeKind::Write => match fs::read_to_string(&path) { | ||
208 | Err(e) => log::warn!("watcher failed {}", e), | ||
209 | Ok(text) => self.sender.send(TaskResult::ChangeSingleFile { | ||
210 | root, | ||
211 | path: rel_path, | ||
212 | text, | ||
213 | })?, | ||
214 | }, | ||
215 | ChangeKind::Remove => self.sender.send(TaskResult::RemoveSingleFile { | ||
216 | root, | ||
217 | path: rel_path, | ||
218 | })?, | ||
97 | } | 219 | } |
220 | Ok(()) | ||
98 | } | 221 | } |
99 | } | 222 | } |
100 | 223 | ||
101 | fn load_root(filter: &RootFilter) -> Vec<(RelativePathBuf, String)> { | 224 | fn watch_recursive( |
102 | let mut res = Vec::new(); | 225 | mut watcher: Option<&mut RecommendedWatcher>, |
103 | for entry in WalkDir::new(&filter.root) | 226 | dir: &Path, |
227 | config: &RootConfig, | ||
228 | ) -> Vec<RelativePathBuf> { | ||
229 | let mut files = Vec::new(); | ||
230 | for entry in WalkDir::new(dir) | ||
104 | .into_iter() | 231 | .into_iter() |
105 | .filter_entry(filter.entry_filter()) | 232 | .filter_entry(|it| config.contains(it.path()).is_some()) |
233 | .filter_map(|it| it.map_err(|e| log::warn!("watcher error: {}", e)).ok()) | ||
106 | { | 234 | { |
107 | let entry = match entry { | 235 | if entry.file_type().is_dir() { |
108 | Ok(entry) => entry, | 236 | if let Some(watcher) = &mut watcher { |
109 | Err(e) => { | 237 | watch_one(watcher, entry.path()); |
110 | log::warn!("watcher error: {}", e); | ||
111 | continue; | ||
112 | } | 238 | } |
113 | }; | 239 | } else { |
114 | if !entry.file_type().is_file() { | 240 | let path = config.contains(entry.path()).unwrap(); |
115 | continue; | 241 | files.push(path.to_owned()); |
116 | } | 242 | } |
117 | let path = entry.path(); | ||
118 | let text = match fs::read_to_string(path) { | ||
119 | Ok(text) => text, | ||
120 | Err(e) => { | ||
121 | log::warn!("watcher error: {}", e); | ||
122 | continue; | ||
123 | } | ||
124 | }; | ||
125 | let path = RelativePathBuf::from_path(path.strip_prefix(&filter.root).unwrap()).unwrap(); | ||
126 | res.push((path.to_owned(), text)) | ||
127 | } | 243 | } |
128 | res | 244 | files |
245 | } | ||
246 | |||
247 | fn watch_one(watcher: &mut RecommendedWatcher, dir: &Path) { | ||
248 | match watcher.watch(dir, RecursiveMode::NonRecursive) { | ||
249 | Ok(()) => log::debug!("watching \"{}\"", dir.display()), | ||
250 | Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e), | ||
251 | } | ||
129 | } | 252 | } |