diff options
author | bors[bot] <bors[bot]@users.noreply.github.com> | 2019-01-26 15:13:44 +0000 |
---|---|---|
committer | bors[bot] <bors[bot]@users.noreply.github.com> | 2019-01-26 15:13:44 +0000 |
commit | a8d32c4d1ae4b3e4276f7a97b6c6e5f95f91e67a (patch) | |
tree | 9e56b095a073ff3c82322302356f4d096a70afcd /crates/ra_vfs/src/io.rs | |
parent | 5af7b2f4af51291fa4a0549c549796ba0520927b (diff) | |
parent | 9f16892b94817d144f37dfe0081b39aacec65635 (diff) |
Merge #671
671: Makre VFS slightly less super obscure r=vemoo a=matklad
I've decided to better understand what we do in VFS, and this turns out to be really hard. Jugling threads and channels is one of the most unfortunately arcane bits of rust...
I had some success though by flattenning the structure so that all channel & thread creation routines are on one screen.
r? @vemoo
Co-authored-by: Aleksey Kladov <[email protected]>
Diffstat (limited to 'crates/ra_vfs/src/io.rs')
-rw-r--r-- | crates/ra_vfs/src/io.rs | 271 |
1 files changed, 215 insertions, 56 deletions
diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs index 7ca1e9835..d764c534a 100644 --- a/crates/ra_vfs/src/io.rs +++ b/crates/ra_vfs/src/io.rs | |||
@@ -1,19 +1,22 @@ | |||
1 | use std::{fs, sync::Arc, thread}; | 1 | use std::{ |
2 | 2 | fs, | |
3 | use crossbeam_channel::{Receiver, Sender}; | 3 | thread, |
4 | path::{Path, PathBuf}, | ||
5 | sync::{mpsc, Arc}, | ||
6 | time::Duration, | ||
7 | }; | ||
8 | use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select}; | ||
4 | use relative_path::RelativePathBuf; | 9 | use relative_path::RelativePathBuf; |
5 | use thread_worker::WorkerHandle; | 10 | use thread_worker::WorkerHandle; |
6 | use walkdir::WalkDir; | 11 | use walkdir::WalkDir; |
12 | use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; | ||
7 | 13 | ||
8 | mod watcher; | 14 | use crate::{RootConfig, Roots, VfsRoot}; |
9 | use watcher::Watcher; | ||
10 | |||
11 | use crate::{RootFilter, Roots, VfsRoot}; | ||
12 | 15 | ||
13 | pub(crate) enum Task { | 16 | pub(crate) enum Task { |
14 | AddRoot { | 17 | AddRoot { |
15 | root: VfsRoot, | 18 | root: VfsRoot, |
16 | filter: Arc<RootFilter>, | 19 | config: Arc<RootConfig>, |
17 | }, | 20 | }, |
18 | } | 21 | } |
19 | 22 | ||
@@ -39,6 +42,15 @@ pub enum TaskResult { | |||
39 | }, | 42 | }, |
40 | } | 43 | } |
41 | 44 | ||
45 | #[derive(Debug)] | ||
46 | enum ChangeKind { | ||
47 | Create, | ||
48 | Write, | ||
49 | Remove, | ||
50 | } | ||
51 | |||
52 | const WATCHER_DELAY: Duration = Duration::from_millis(250); | ||
53 | |||
42 | pub(crate) struct Worker { | 54 | pub(crate) struct Worker { |
43 | worker: thread_worker::Worker<Task, TaskResult>, | 55 | worker: thread_worker::Worker<Task, TaskResult>, |
44 | worker_handle: WorkerHandle, | 56 | worker_handle: WorkerHandle, |
@@ -46,24 +58,75 @@ pub(crate) struct Worker { | |||
46 | 58 | ||
47 | impl Worker { | 59 | impl Worker { |
48 | pub(crate) fn start(roots: Arc<Roots>) -> Worker { | 60 | pub(crate) fn start(roots: Arc<Roots>) -> Worker { |
49 | let (worker, worker_handle) = | 61 | // This is a pretty elaborate setup of threads & channels! It is |
50 | thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| { | 62 | // explained by the following concerns: |
51 | let mut watcher = match Watcher::start(roots, output_sender.clone()) { | 63 | // * we need to burn a thread translating from notify's mpsc to |
52 | Ok(w) => Some(w), | 64 | // crossbeam_channel. |
53 | Err(e) => { | 65 | // * we want to read all files from a single thread, to gurantee that |
54 | log::error!("could not start watcher: {}", e); | 66 | // we always get fresher versions and never go back in time. |
55 | None | 67 | // * we want to tear down everything neatly during shutdown. |
68 | let (worker, worker_handle) = thread_worker::spawn( | ||
69 | "vfs", | ||
70 | 128, | ||
71 | // This are the channels we use to communicate with outside world. | ||
72 | // If `input_receiver` is closed we need to tear ourselves down. | ||
73 | // `output_sender` should not be closed unless the parent died. | ||
74 | move |input_receiver, output_sender| { | ||
75 | // These are `std` channels notify will send events to | ||
76 | let (notify_sender, notify_receiver) = mpsc::channel(); | ||
77 | // These are the corresponding crossbeam channels | ||
78 | let (watcher_sender, watcher_receiver) = unbounded(); | ||
79 | |||
80 | let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) | ||
81 | .map_err(|e| log::error!("failed to spawn notify {}", e)) | ||
82 | .ok(); | ||
83 | // Start a silly thread to tranform between two channels | ||
84 | let thread = thread::spawn(move || { | ||
85 | notify_receiver | ||
86 | .into_iter() | ||
87 | .for_each(|event| convert_notify_event(event, &watcher_sender)) | ||
88 | }); | ||
89 | |||
90 | // Process requests from the called or notifications from | ||
91 | // watcher until the caller says stop. | ||
92 | loop { | ||
93 | select! { | ||
94 | // Received request from the caller. If this channel is | ||
95 | // closed, we should shutdown everything. | ||
96 | recv(input_receiver) -> t => match t { | ||
97 | Err(RecvError) => { | ||
98 | drop(input_receiver); | ||
99 | break | ||
100 | }, | ||
101 | Ok(Task::AddRoot { root, config }) => { | ||
102 | watch_root(watcher.as_mut(), &output_sender, root, Arc::clone(&config)); | ||
103 | } | ||
104 | }, | ||
105 | // Watcher send us changes. If **this** channel is | ||
106 | // closed, the watcher has died, which indicates a bug | ||
107 | // -- escalate! | ||
108 | recv(watcher_receiver) -> event => match event { | ||
109 | Err(RecvError) => panic!("watcher is dead"), | ||
110 | Ok((path, change)) => { | ||
111 | handle_change(watcher.as_mut(), &output_sender, &*roots, path, change); | ||
112 | } | ||
113 | }, | ||
56 | } | 114 | } |
57 | }; | ||
58 | let res = input_receiver | ||
59 | .into_iter() | ||
60 | .filter_map(|t| handle_task(t, &mut watcher)) | ||
61 | .try_for_each(|it| output_sender.send(it)); | ||
62 | if let Some(watcher) = watcher { | ||
63 | let _ = watcher.shutdown(); | ||
64 | } | 115 | } |
65 | res.unwrap() | 116 | // Stopped the watcher |
66 | }); | 117 | drop(watcher.take()); |
118 | // Drain pending events: we are not inrerested in them anyways! | ||
119 | watcher_receiver.into_iter().for_each(|_| ()); | ||
120 | |||
121 | let res = thread.join(); | ||
122 | match &res { | ||
123 | Ok(()) => log::info!("... Watcher terminated with ok"), | ||
124 | Err(_) => log::error!("... Watcher terminated with err"), | ||
125 | } | ||
126 | res.unwrap(); | ||
127 | }, | ||
128 | ); | ||
129 | |||
67 | Worker { | 130 | Worker { |
68 | worker, | 131 | worker, |
69 | worker_handle, | 132 | worker_handle, |
@@ -84,46 +147,142 @@ impl Worker { | |||
84 | } | 147 | } |
85 | } | 148 | } |
86 | 149 | ||
87 | fn handle_task(task: Task, watcher: &mut Option<Watcher>) -> Option<TaskResult> { | 150 | fn watch_root( |
88 | match task { | 151 | watcher: Option<&mut RecommendedWatcher>, |
89 | Task::AddRoot { root, filter } => { | 152 | sender: &Sender<TaskResult>, |
90 | if let Some(watcher) = watcher { | 153 | root: VfsRoot, |
91 | watcher.watch_root(&filter) | 154 | config: Arc<RootConfig>, |
155 | ) { | ||
156 | log::debug!("loading {} ...", config.root.as_path().display()); | ||
157 | let files = watch_recursive(watcher, config.root.as_path(), &*config) | ||
158 | .into_iter() | ||
159 | .filter_map(|path| { | ||
160 | let abs_path = path.to_path(&config.root); | ||
161 | let text = read_to_string(&abs_path)?; | ||
162 | Some((path, text)) | ||
163 | }) | ||
164 | .collect(); | ||
165 | sender | ||
166 | .send(TaskResult::BulkLoadRoot { root, files }) | ||
167 | .unwrap(); | ||
168 | log::debug!("... loaded {}", config.root.as_path().display()); | ||
169 | } | ||
170 | |||
171 | fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeKind)>) { | ||
172 | // forward relevant events only | ||
173 | match event { | ||
174 | DebouncedEvent::NoticeWrite(_) | ||
175 | | DebouncedEvent::NoticeRemove(_) | ||
176 | | DebouncedEvent::Chmod(_) => { | ||
177 | // ignore | ||
178 | } | ||
179 | DebouncedEvent::Rescan => { | ||
180 | // TODO rescan all roots | ||
181 | } | ||
182 | DebouncedEvent::Create(path) => { | ||
183 | sender.send((path, ChangeKind::Create)).unwrap(); | ||
184 | } | ||
185 | DebouncedEvent::Write(path) => { | ||
186 | sender.send((path, ChangeKind::Write)).unwrap(); | ||
187 | } | ||
188 | DebouncedEvent::Remove(path) => { | ||
189 | sender.send((path, ChangeKind::Remove)).unwrap(); | ||
190 | } | ||
191 | DebouncedEvent::Rename(src, dst) => { | ||
192 | sender.send((src, ChangeKind::Remove)).unwrap(); | ||
193 | sender.send((dst, ChangeKind::Create)).unwrap(); | ||
194 | } | ||
195 | DebouncedEvent::Error(err, path) => { | ||
196 | // TODO should we reload the file contents? | ||
197 | log::warn!("watcher error \"{}\", {:?}", err, path); | ||
198 | } | ||
199 | } | ||
200 | } | ||
201 | |||
202 | fn handle_change( | ||
203 | watcher: Option<&mut RecommendedWatcher>, | ||
204 | sender: &Sender<TaskResult>, | ||
205 | roots: &Roots, | ||
206 | path: PathBuf, | ||
207 | kind: ChangeKind, | ||
208 | ) { | ||
209 | let (root, rel_path) = match roots.find(&path) { | ||
210 | None => return, | ||
211 | Some(it) => it, | ||
212 | }; | ||
213 | let config = &roots[root]; | ||
214 | match kind { | ||
215 | ChangeKind::Create => { | ||
216 | let mut paths = Vec::new(); | ||
217 | if path.is_dir() { | ||
218 | paths.extend(watch_recursive(watcher, &path, &config)); | ||
219 | } else { | ||
220 | paths.push(rel_path); | ||
92 | } | 221 | } |
93 | log::debug!("loading {} ...", filter.root.as_path().display()); | 222 | paths |
94 | let files = load_root(filter.as_ref()); | 223 | .into_iter() |
95 | log::debug!("... loaded {}", filter.root.as_path().display()); | 224 | .filter_map(|rel_path| { |
96 | Some(TaskResult::BulkLoadRoot { root, files }) | 225 | let abs_path = rel_path.to_path(&config.root); |
226 | let text = read_to_string(&abs_path)?; | ||
227 | Some((rel_path, text)) | ||
228 | }) | ||
229 | .try_for_each(|(path, text)| { | ||
230 | sender.send(TaskResult::AddSingleFile { root, path, text }) | ||
231 | }) | ||
232 | .unwrap() | ||
97 | } | 233 | } |
234 | ChangeKind::Write => { | ||
235 | if let Some(text) = read_to_string(&path) { | ||
236 | sender | ||
237 | .send(TaskResult::ChangeSingleFile { | ||
238 | root, | ||
239 | path: rel_path, | ||
240 | text, | ||
241 | }) | ||
242 | .unwrap(); | ||
243 | } | ||
244 | } | ||
245 | ChangeKind::Remove => sender | ||
246 | .send(TaskResult::RemoveSingleFile { | ||
247 | root, | ||
248 | path: rel_path, | ||
249 | }) | ||
250 | .unwrap(), | ||
98 | } | 251 | } |
99 | } | 252 | } |
100 | 253 | ||
101 | fn load_root(filter: &RootFilter) -> Vec<(RelativePathBuf, String)> { | 254 | fn watch_recursive( |
102 | let mut res = Vec::new(); | 255 | mut watcher: Option<&mut RecommendedWatcher>, |
103 | for entry in WalkDir::new(&filter.root) | 256 | dir: &Path, |
257 | config: &RootConfig, | ||
258 | ) -> Vec<RelativePathBuf> { | ||
259 | let mut files = Vec::new(); | ||
260 | for entry in WalkDir::new(dir) | ||
104 | .into_iter() | 261 | .into_iter() |
105 | .filter_entry(filter.entry_filter()) | 262 | .filter_entry(|it| config.contains(it.path()).is_some()) |
263 | .filter_map(|it| it.map_err(|e| log::warn!("watcher error: {}", e)).ok()) | ||
106 | { | 264 | { |
107 | let entry = match entry { | 265 | if entry.file_type().is_dir() { |
108 | Ok(entry) => entry, | 266 | if let Some(watcher) = &mut watcher { |
109 | Err(e) => { | 267 | watch_one(watcher, entry.path()); |
110 | log::warn!("watcher error: {}", e); | ||
111 | continue; | ||
112 | } | 268 | } |
113 | }; | 269 | } else { |
114 | if !entry.file_type().is_file() { | 270 | let path = config.contains(entry.path()).unwrap(); |
115 | continue; | 271 | files.push(path.to_owned()); |
116 | } | 272 | } |
117 | let path = entry.path(); | ||
118 | let text = match fs::read_to_string(path) { | ||
119 | Ok(text) => text, | ||
120 | Err(e) => { | ||
121 | log::warn!("watcher error: {}", e); | ||
122 | continue; | ||
123 | } | ||
124 | }; | ||
125 | let path = RelativePathBuf::from_path(path.strip_prefix(&filter.root).unwrap()).unwrap(); | ||
126 | res.push((path.to_owned(), text)) | ||
127 | } | 273 | } |
128 | res | 274 | files |
275 | } | ||
276 | |||
277 | fn watch_one(watcher: &mut RecommendedWatcher, dir: &Path) { | ||
278 | match watcher.watch(dir, RecursiveMode::NonRecursive) { | ||
279 | Ok(()) => log::debug!("watching \"{}\"", dir.display()), | ||
280 | Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e), | ||
281 | } | ||
282 | } | ||
283 | |||
284 | fn read_to_string(path: &Path) -> Option<String> { | ||
285 | fs::read_to_string(&path) | ||
286 | .map_err(|e| log::warn!("failed to read file {}", e)) | ||
287 | .ok() | ||
129 | } | 288 | } |