diff options
Diffstat (limited to 'crates/ra_vfs/src/io.rs')
-rw-r--r-- | crates/ra_vfs/src/io.rs | 271 |
1 files changed, 215 insertions, 56 deletions
diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs index 7ca1e9835..d764c534a 100644 --- a/crates/ra_vfs/src/io.rs +++ b/crates/ra_vfs/src/io.rs | |||
@@ -1,19 +1,22 @@ | |||
1 | use std::{fs, sync::Arc, thread}; | 1 | use std::{ |
2 | 2 | fs, | |
3 | use crossbeam_channel::{Receiver, Sender}; | 3 | thread, |
4 | path::{Path, PathBuf}, | ||
5 | sync::{mpsc, Arc}, | ||
6 | time::Duration, | ||
7 | }; | ||
8 | use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select}; | ||
4 | use relative_path::RelativePathBuf; | 9 | use relative_path::RelativePathBuf; |
5 | use thread_worker::WorkerHandle; | 10 | use thread_worker::WorkerHandle; |
6 | use walkdir::WalkDir; | 11 | use walkdir::WalkDir; |
12 | use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; | ||
7 | 13 | ||
8 | mod watcher; | 14 | use crate::{RootConfig, Roots, VfsRoot}; |
9 | use watcher::Watcher; | ||
10 | |||
11 | use crate::{RootFilter, Roots, VfsRoot}; | ||
12 | 15 | ||
13 | pub(crate) enum Task { | 16 | pub(crate) enum Task { |
14 | AddRoot { | 17 | AddRoot { |
15 | root: VfsRoot, | 18 | root: VfsRoot, |
16 | filter: Arc<RootFilter>, | 19 | config: Arc<RootConfig>, |
17 | }, | 20 | }, |
18 | } | 21 | } |
19 | 22 | ||
@@ -39,6 +42,15 @@ pub enum TaskResult { | |||
39 | }, | 42 | }, |
40 | } | 43 | } |
41 | 44 | ||
45 | #[derive(Debug)] | ||
46 | enum ChangeKind { | ||
47 | Create, | ||
48 | Write, | ||
49 | Remove, | ||
50 | } | ||
51 | |||
52 | const WATCHER_DELAY: Duration = Duration::from_millis(250); | ||
53 | |||
42 | pub(crate) struct Worker { | 54 | pub(crate) struct Worker { |
43 | worker: thread_worker::Worker<Task, TaskResult>, | 55 | worker: thread_worker::Worker<Task, TaskResult>, |
44 | worker_handle: WorkerHandle, | 56 | worker_handle: WorkerHandle, |
@@ -46,24 +58,75 @@ pub(crate) struct Worker { | |||
46 | 58 | ||
47 | impl Worker { | 59 | impl Worker { |
48 | pub(crate) fn start(roots: Arc<Roots>) -> Worker { | 60 | pub(crate) fn start(roots: Arc<Roots>) -> Worker { |
49 | let (worker, worker_handle) = | 61 | // This is a pretty elaborate setup of threads & channels! It is |
50 | thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| { | 62 | // explained by the following concerns: |
51 | let mut watcher = match Watcher::start(roots, output_sender.clone()) { | 63 | // * we need to burn a thread translating from notify's mpsc to |
52 | Ok(w) => Some(w), | 64 | // crossbeam_channel. |
53 | Err(e) => { | 65 | // * we want to read all files from a single thread, to gurantee that |
54 | log::error!("could not start watcher: {}", e); | 66 | // we always get fresher versions and never go back in time. |
55 | None | 67 | // * we want to tear down everything neatly during shutdown. |
68 | let (worker, worker_handle) = thread_worker::spawn( | ||
69 | "vfs", | ||
70 | 128, | ||
71 | // This are the channels we use to communicate with outside world. | ||
72 | // If `input_receiver` is closed we need to tear ourselves down. | ||
73 | // `output_sender` should not be closed unless the parent died. | ||
74 | move |input_receiver, output_sender| { | ||
75 | // These are `std` channels notify will send events to | ||
76 | let (notify_sender, notify_receiver) = mpsc::channel(); | ||
77 | // These are the corresponding crossbeam channels | ||
78 | let (watcher_sender, watcher_receiver) = unbounded(); | ||
79 | |||
80 | let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) | ||
81 | .map_err(|e| log::error!("failed to spawn notify {}", e)) | ||
82 | .ok(); | ||
83 | // Start a silly thread to tranform between two channels | ||
84 | let thread = thread::spawn(move || { | ||
85 | notify_receiver | ||
86 | .into_iter() | ||
87 | .for_each(|event| convert_notify_event(event, &watcher_sender)) | ||
88 | }); | ||
89 | |||
90 | // Process requests from the called or notifications from | ||
91 | // watcher until the caller says stop. | ||
92 | loop { | ||
93 | select! { | ||
94 | // Received request from the caller. If this channel is | ||
95 | // closed, we should shutdown everything. | ||
96 | recv(input_receiver) -> t => match t { | ||
97 | Err(RecvError) => { | ||
98 | drop(input_receiver); | ||
99 | break | ||
100 | }, | ||
101 | Ok(Task::AddRoot { root, config }) => { | ||
102 | watch_root(watcher.as_mut(), &output_sender, root, Arc::clone(&config)); | ||
103 | } | ||
104 | }, | ||
105 | // Watcher send us changes. If **this** channel is | ||
106 | // closed, the watcher has died, which indicates a bug | ||
107 | // -- escalate! | ||
108 | recv(watcher_receiver) -> event => match event { | ||
109 | Err(RecvError) => panic!("watcher is dead"), | ||
110 | Ok((path, change)) => { | ||
111 | handle_change(watcher.as_mut(), &output_sender, &*roots, path, change); | ||
112 | } | ||
113 | }, | ||
56 | } | 114 | } |
57 | }; | ||
58 | let res = input_receiver | ||
59 | .into_iter() | ||
60 | .filter_map(|t| handle_task(t, &mut watcher)) | ||
61 | .try_for_each(|it| output_sender.send(it)); | ||
62 | if let Some(watcher) = watcher { | ||
63 | let _ = watcher.shutdown(); | ||
64 | } | 115 | } |
65 | res.unwrap() | 116 | // Stopped the watcher |
66 | }); | 117 | drop(watcher.take()); |
118 | // Drain pending events: we are not inrerested in them anyways! | ||
119 | watcher_receiver.into_iter().for_each(|_| ()); | ||
120 | |||
121 | let res = thread.join(); | ||
122 | match &res { | ||
123 | Ok(()) => log::info!("... Watcher terminated with ok"), | ||
124 | Err(_) => log::error!("... Watcher terminated with err"), | ||
125 | } | ||
126 | res.unwrap(); | ||
127 | }, | ||
128 | ); | ||
129 | |||
67 | Worker { | 130 | Worker { |
68 | worker, | 131 | worker, |
69 | worker_handle, | 132 | worker_handle, |
@@ -84,46 +147,142 @@ impl Worker { | |||
84 | } | 147 | } |
85 | } | 148 | } |
86 | 149 | ||
87 | fn handle_task(task: Task, watcher: &mut Option<Watcher>) -> Option<TaskResult> { | 150 | fn watch_root( |
88 | match task { | 151 | watcher: Option<&mut RecommendedWatcher>, |
89 | Task::AddRoot { root, filter } => { | 152 | sender: &Sender<TaskResult>, |
90 | if let Some(watcher) = watcher { | 153 | root: VfsRoot, |
91 | watcher.watch_root(&filter) | 154 | config: Arc<RootConfig>, |
155 | ) { | ||
156 | log::debug!("loading {} ...", config.root.as_path().display()); | ||
157 | let files = watch_recursive(watcher, config.root.as_path(), &*config) | ||
158 | .into_iter() | ||
159 | .filter_map(|path| { | ||
160 | let abs_path = path.to_path(&config.root); | ||
161 | let text = read_to_string(&abs_path)?; | ||
162 | Some((path, text)) | ||
163 | }) | ||
164 | .collect(); | ||
165 | sender | ||
166 | .send(TaskResult::BulkLoadRoot { root, files }) | ||
167 | .unwrap(); | ||
168 | log::debug!("... loaded {}", config.root.as_path().display()); | ||
169 | } | ||
170 | |||
171 | fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeKind)>) { | ||
172 | // forward relevant events only | ||
173 | match event { | ||
174 | DebouncedEvent::NoticeWrite(_) | ||
175 | | DebouncedEvent::NoticeRemove(_) | ||
176 | | DebouncedEvent::Chmod(_) => { | ||
177 | // ignore | ||
178 | } | ||
179 | DebouncedEvent::Rescan => { | ||
180 | // TODO rescan all roots | ||
181 | } | ||
182 | DebouncedEvent::Create(path) => { | ||
183 | sender.send((path, ChangeKind::Create)).unwrap(); | ||
184 | } | ||
185 | DebouncedEvent::Write(path) => { | ||
186 | sender.send((path, ChangeKind::Write)).unwrap(); | ||
187 | } | ||
188 | DebouncedEvent::Remove(path) => { | ||
189 | sender.send((path, ChangeKind::Remove)).unwrap(); | ||
190 | } | ||
191 | DebouncedEvent::Rename(src, dst) => { | ||
192 | sender.send((src, ChangeKind::Remove)).unwrap(); | ||
193 | sender.send((dst, ChangeKind::Create)).unwrap(); | ||
194 | } | ||
195 | DebouncedEvent::Error(err, path) => { | ||
196 | // TODO should we reload the file contents? | ||
197 | log::warn!("watcher error \"{}\", {:?}", err, path); | ||
198 | } | ||
199 | } | ||
200 | } | ||
201 | |||
202 | fn handle_change( | ||
203 | watcher: Option<&mut RecommendedWatcher>, | ||
204 | sender: &Sender<TaskResult>, | ||
205 | roots: &Roots, | ||
206 | path: PathBuf, | ||
207 | kind: ChangeKind, | ||
208 | ) { | ||
209 | let (root, rel_path) = match roots.find(&path) { | ||
210 | None => return, | ||
211 | Some(it) => it, | ||
212 | }; | ||
213 | let config = &roots[root]; | ||
214 | match kind { | ||
215 | ChangeKind::Create => { | ||
216 | let mut paths = Vec::new(); | ||
217 | if path.is_dir() { | ||
218 | paths.extend(watch_recursive(watcher, &path, &config)); | ||
219 | } else { | ||
220 | paths.push(rel_path); | ||
92 | } | 221 | } |
93 | log::debug!("loading {} ...", filter.root.as_path().display()); | 222 | paths |
94 | let files = load_root(filter.as_ref()); | 223 | .into_iter() |
95 | log::debug!("... loaded {}", filter.root.as_path().display()); | 224 | .filter_map(|rel_path| { |
96 | Some(TaskResult::BulkLoadRoot { root, files }) | 225 | let abs_path = rel_path.to_path(&config.root); |
226 | let text = read_to_string(&abs_path)?; | ||
227 | Some((rel_path, text)) | ||
228 | }) | ||
229 | .try_for_each(|(path, text)| { | ||
230 | sender.send(TaskResult::AddSingleFile { root, path, text }) | ||
231 | }) | ||
232 | .unwrap() | ||
97 | } | 233 | } |
234 | ChangeKind::Write => { | ||
235 | if let Some(text) = read_to_string(&path) { | ||
236 | sender | ||
237 | .send(TaskResult::ChangeSingleFile { | ||
238 | root, | ||
239 | path: rel_path, | ||
240 | text, | ||
241 | }) | ||
242 | .unwrap(); | ||
243 | } | ||
244 | } | ||
245 | ChangeKind::Remove => sender | ||
246 | .send(TaskResult::RemoveSingleFile { | ||
247 | root, | ||
248 | path: rel_path, | ||
249 | }) | ||
250 | .unwrap(), | ||
98 | } | 251 | } |
99 | } | 252 | } |
100 | 253 | ||
101 | fn load_root(filter: &RootFilter) -> Vec<(RelativePathBuf, String)> { | 254 | fn watch_recursive( |
102 | let mut res = Vec::new(); | 255 | mut watcher: Option<&mut RecommendedWatcher>, |
103 | for entry in WalkDir::new(&filter.root) | 256 | dir: &Path, |
257 | config: &RootConfig, | ||
258 | ) -> Vec<RelativePathBuf> { | ||
259 | let mut files = Vec::new(); | ||
260 | for entry in WalkDir::new(dir) | ||
104 | .into_iter() | 261 | .into_iter() |
105 | .filter_entry(filter.entry_filter()) | 262 | .filter_entry(|it| config.contains(it.path()).is_some()) |
263 | .filter_map(|it| it.map_err(|e| log::warn!("watcher error: {}", e)).ok()) | ||
106 | { | 264 | { |
107 | let entry = match entry { | 265 | if entry.file_type().is_dir() { |
108 | Ok(entry) => entry, | 266 | if let Some(watcher) = &mut watcher { |
109 | Err(e) => { | 267 | watch_one(watcher, entry.path()); |
110 | log::warn!("watcher error: {}", e); | ||
111 | continue; | ||
112 | } | 268 | } |
113 | }; | 269 | } else { |
114 | if !entry.file_type().is_file() { | 270 | let path = config.contains(entry.path()).unwrap(); |
115 | continue; | 271 | files.push(path.to_owned()); |
116 | } | 272 | } |
117 | let path = entry.path(); | ||
118 | let text = match fs::read_to_string(path) { | ||
119 | Ok(text) => text, | ||
120 | Err(e) => { | ||
121 | log::warn!("watcher error: {}", e); | ||
122 | continue; | ||
123 | } | ||
124 | }; | ||
125 | let path = RelativePathBuf::from_path(path.strip_prefix(&filter.root).unwrap()).unwrap(); | ||
126 | res.push((path.to_owned(), text)) | ||
127 | } | 273 | } |
128 | res | 274 | files |
275 | } | ||
276 | |||
277 | fn watch_one(watcher: &mut RecommendedWatcher, dir: &Path) { | ||
278 | match watcher.watch(dir, RecursiveMode::NonRecursive) { | ||
279 | Ok(()) => log::debug!("watching \"{}\"", dir.display()), | ||
280 | Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e), | ||
281 | } | ||
282 | } | ||
283 | |||
284 | fn read_to_string(path: &Path) -> Option<String> { | ||
285 | fs::read_to_string(&path) | ||
286 | .map_err(|e| log::warn!("failed to read file {}", e)) | ||
287 | .ok() | ||
129 | } | 288 | } |