diff options
Diffstat (limited to 'crates/ra_vfs/src/io.rs')
-rw-r--r-- | crates/ra_vfs/src/io.rs | 286 |
1 files changed, 0 insertions, 286 deletions
diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs deleted file mode 100644 index 5969ee0d0..000000000 --- a/crates/ra_vfs/src/io.rs +++ /dev/null | |||
@@ -1,286 +0,0 @@ | |||
1 | use std::{ | ||
2 | fs, | ||
3 | path::{Path, PathBuf}, | ||
4 | sync::{mpsc, Arc}, | ||
5 | time::Duration, | ||
6 | thread, | ||
7 | }; | ||
8 | use crossbeam_channel::{Sender, Receiver, unbounded, RecvError, select}; | ||
9 | use relative_path::RelativePathBuf; | ||
10 | use walkdir::WalkDir; | ||
11 | use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; | ||
12 | |||
13 | use crate::{Roots, VfsRoot, VfsTask}; | ||
14 | |||
15 | pub(crate) enum Task { | ||
16 | AddRoot { root: VfsRoot }, | ||
17 | } | ||
18 | |||
19 | /// `TaskResult` transfers files read on the IO thread to the VFS on the main | ||
20 | /// thread. | ||
21 | #[derive(Debug)] | ||
22 | pub(crate) enum TaskResult { | ||
23 | /// Emitted when we've recursively scanned a source root during the initial | ||
24 | /// load. | ||
25 | BulkLoadRoot { root: VfsRoot, files: Vec<(RelativePathBuf, String)> }, | ||
26 | /// Emitted when we've noticed that a single file has changed. | ||
27 | /// | ||
28 | /// Note that this by design does not distinguish between | ||
29 | /// create/delete/write events, and instead specifies the *current* state of | ||
30 | /// the file. The idea is to guarantee that in the quiescent state the sum | ||
31 | /// of all results equals to the current state of the file system, while | ||
32 | /// allowing to skip intermediate events in non-quiescent states. | ||
33 | SingleFile { root: VfsRoot, path: RelativePathBuf, text: Option<String> }, | ||
34 | } | ||
35 | |||
36 | /// The kind of raw notification we've received from the notify library. | ||
37 | /// | ||
38 | /// Note that these are not necessary 100% precise (for example we might receive | ||
39 | /// `Create` instead of `Write`, see #734), but we try do distinguish `Create`s | ||
40 | /// to implement recursive watching of directories. | ||
41 | #[derive(Debug)] | ||
42 | enum ChangeKind { | ||
43 | Create, | ||
44 | Write, | ||
45 | Remove, | ||
46 | } | ||
47 | |||
48 | const WATCHER_DELAY: Duration = Duration::from_millis(250); | ||
49 | |||
50 | // Like thread::JoinHandle, but joins the thread on drop. | ||
51 | // | ||
52 | // This is useful because it guarantees the absence of run-away threads, even if | ||
53 | // code panics. This is important, because we might see panics in the test and | ||
54 | // we might be used in an IDE context, where a failed component is just | ||
55 | // restarted. | ||
56 | // | ||
57 | // Because all threads are joined, care must be taken to avoid deadlocks. That | ||
58 | // typically means ensuring that channels are dropped before the threads. | ||
59 | struct ScopedThread(Option<thread::JoinHandle<()>>); | ||
60 | |||
61 | impl ScopedThread { | ||
62 | fn spawn(name: String, f: impl FnOnce() + Send + 'static) -> ScopedThread { | ||
63 | let handle = thread::Builder::new().name(name).spawn(f).unwrap(); | ||
64 | ScopedThread(Some(handle)) | ||
65 | } | ||
66 | } | ||
67 | |||
68 | impl Drop for ScopedThread { | ||
69 | fn drop(&mut self) { | ||
70 | let res = self.0.take().unwrap().join(); | ||
71 | if !thread::panicking() { | ||
72 | res.unwrap(); | ||
73 | } | ||
74 | } | ||
75 | } | ||
76 | |||
77 | pub(crate) struct Worker { | ||
78 | // XXX: field order is significant here. | ||
79 | // | ||
80 | // In Rust, fields are dropped in the declaration order, and we rely on this | ||
81 | // here. We must close sender first, so that the `thread` (who holds the | ||
82 | // opposite side of the channel) noticed shutdown. Then, we must join the | ||
83 | // thread, but we must keep receiver alive so that the thread does not | ||
84 | // panic. | ||
85 | pub(crate) sender: Sender<Task>, | ||
86 | _thread: ScopedThread, | ||
87 | pub(crate) receiver: Receiver<VfsTask>, | ||
88 | } | ||
89 | |||
90 | pub(crate) fn start(roots: Arc<Roots>) -> Worker { | ||
91 | // This is a pretty elaborate setup of threads & channels! It is | ||
92 | // explained by the following concerns: | ||
93 | // * we need to burn a thread translating from notify's mpsc to | ||
94 | // crossbeam_channel. | ||
95 | // * we want to read all files from a single thread, to guarantee that | ||
96 | // we always get fresher versions and never go back in time. | ||
97 | // * we want to tear down everything neatly during shutdown. | ||
98 | let _thread; | ||
99 | // This are the channels we use to communicate with outside world. | ||
100 | // If `input_receiver` is closed we need to tear ourselves down. | ||
101 | // `output_sender` should not be closed unless the parent died. | ||
102 | let (input_sender, input_receiver) = unbounded(); | ||
103 | let (output_sender, output_receiver) = unbounded(); | ||
104 | |||
105 | _thread = ScopedThread::spawn("vfs".to_string(), move || { | ||
106 | // Make sure that the destruction order is | ||
107 | // | ||
108 | // * notify_sender | ||
109 | // * _thread | ||
110 | // * watcher_sender | ||
111 | // | ||
112 | // this is required to avoid deadlocks. | ||
113 | |||
114 | // These are the corresponding crossbeam channels | ||
115 | let (watcher_sender, watcher_receiver) = unbounded(); | ||
116 | let _notify_thread; | ||
117 | { | ||
118 | // These are `std` channels notify will send events to | ||
119 | let (notify_sender, notify_receiver) = mpsc::channel(); | ||
120 | |||
121 | let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) | ||
122 | .map_err(|e| log::error!("failed to spawn notify {}", e)) | ||
123 | .ok(); | ||
124 | // Start a silly thread to transform between two channels | ||
125 | _notify_thread = ScopedThread::spawn("notify-convertor".to_string(), move || { | ||
126 | notify_receiver | ||
127 | .into_iter() | ||
128 | .for_each(|event| convert_notify_event(event, &watcher_sender)) | ||
129 | }); | ||
130 | |||
131 | // Process requests from the called or notifications from | ||
132 | // watcher until the caller says stop. | ||
133 | loop { | ||
134 | select! { | ||
135 | // Received request from the caller. If this channel is | ||
136 | // closed, we should shutdown everything. | ||
137 | recv(input_receiver) -> t => match t { | ||
138 | Err(RecvError) => { | ||
139 | drop(input_receiver); | ||
140 | break | ||
141 | }, | ||
142 | Ok(Task::AddRoot { root }) => { | ||
143 | watch_root(watcher.as_mut(), &output_sender, &*roots, root); | ||
144 | } | ||
145 | }, | ||
146 | // Watcher send us changes. If **this** channel is | ||
147 | // closed, the watcher has died, which indicates a bug | ||
148 | // -- escalate! | ||
149 | recv(watcher_receiver) -> event => match event { | ||
150 | Err(RecvError) => panic!("watcher is dead"), | ||
151 | Ok((path, change)) => { | ||
152 | handle_change(watcher.as_mut(), &output_sender, &*roots, path, change); | ||
153 | } | ||
154 | }, | ||
155 | } | ||
156 | } | ||
157 | } | ||
158 | // Drain pending events: we are not interested in them anyways! | ||
159 | watcher_receiver.into_iter().for_each(|_| ()); | ||
160 | }); | ||
161 | Worker { sender: input_sender, _thread, receiver: output_receiver } | ||
162 | } | ||
163 | |||
164 | fn watch_root( | ||
165 | watcher: Option<&mut RecommendedWatcher>, | ||
166 | sender: &Sender<VfsTask>, | ||
167 | roots: &Roots, | ||
168 | root: VfsRoot, | ||
169 | ) { | ||
170 | let root_path = roots.path(root); | ||
171 | log::debug!("loading {} ...", root_path.display()); | ||
172 | let files = watch_recursive(watcher, root_path, roots, root) | ||
173 | .into_iter() | ||
174 | .filter_map(|path| { | ||
175 | let abs_path = path.to_path(&root_path); | ||
176 | let text = read_to_string(&abs_path)?; | ||
177 | Some((path, text)) | ||
178 | }) | ||
179 | .collect(); | ||
180 | let res = TaskResult::BulkLoadRoot { root, files }; | ||
181 | sender.send(VfsTask(res)).unwrap(); | ||
182 | log::debug!("... loaded {}", root_path.display()); | ||
183 | } | ||
184 | |||
185 | fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeKind)>) { | ||
186 | // forward relevant events only | ||
187 | match event { | ||
188 | DebouncedEvent::NoticeWrite(_) | ||
189 | | DebouncedEvent::NoticeRemove(_) | ||
190 | | DebouncedEvent::Chmod(_) => { | ||
191 | // ignore | ||
192 | } | ||
193 | DebouncedEvent::Rescan => { | ||
194 | // TODO: rescan all roots | ||
195 | } | ||
196 | DebouncedEvent::Create(path) => { | ||
197 | sender.send((path, ChangeKind::Create)).unwrap(); | ||
198 | } | ||
199 | DebouncedEvent::Write(path) => { | ||
200 | sender.send((path, ChangeKind::Write)).unwrap(); | ||
201 | } | ||
202 | DebouncedEvent::Remove(path) => { | ||
203 | sender.send((path, ChangeKind::Remove)).unwrap(); | ||
204 | } | ||
205 | DebouncedEvent::Rename(src, dst) => { | ||
206 | sender.send((src, ChangeKind::Remove)).unwrap(); | ||
207 | sender.send((dst, ChangeKind::Create)).unwrap(); | ||
208 | } | ||
209 | DebouncedEvent::Error(err, path) => { | ||
210 | // TODO: should we reload the file contents? | ||
211 | log::warn!("watcher error \"{}\", {:?}", err, path); | ||
212 | } | ||
213 | } | ||
214 | } | ||
215 | |||
216 | fn handle_change( | ||
217 | watcher: Option<&mut RecommendedWatcher>, | ||
218 | sender: &Sender<VfsTask>, | ||
219 | roots: &Roots, | ||
220 | path: PathBuf, | ||
221 | kind: ChangeKind, | ||
222 | ) { | ||
223 | let (root, rel_path) = match roots.find(&path) { | ||
224 | None => return, | ||
225 | Some(it) => it, | ||
226 | }; | ||
227 | match kind { | ||
228 | ChangeKind::Create => { | ||
229 | let mut paths = Vec::new(); | ||
230 | if path.is_dir() { | ||
231 | paths.extend(watch_recursive(watcher, &path, roots, root)); | ||
232 | } else { | ||
233 | paths.push(rel_path); | ||
234 | } | ||
235 | paths | ||
236 | .into_iter() | ||
237 | .try_for_each(|rel_path| { | ||
238 | let abs_path = rel_path.to_path(&roots.path(root)); | ||
239 | let text = read_to_string(&abs_path); | ||
240 | let res = TaskResult::SingleFile { root, path: rel_path, text }; | ||
241 | sender.send(VfsTask(res)) | ||
242 | }) | ||
243 | .unwrap() | ||
244 | } | ||
245 | ChangeKind::Write | ChangeKind::Remove => { | ||
246 | let text = read_to_string(&path); | ||
247 | let res = TaskResult::SingleFile { root, path: rel_path, text }; | ||
248 | sender.send(VfsTask(res)).unwrap(); | ||
249 | } | ||
250 | } | ||
251 | } | ||
252 | |||
253 | fn watch_recursive( | ||
254 | mut watcher: Option<&mut RecommendedWatcher>, | ||
255 | dir: &Path, | ||
256 | roots: &Roots, | ||
257 | root: VfsRoot, | ||
258 | ) -> Vec<RelativePathBuf> { | ||
259 | let mut files = Vec::new(); | ||
260 | for entry in WalkDir::new(dir) | ||
261 | .into_iter() | ||
262 | .filter_entry(|it| roots.contains(root, it.path()).is_some()) | ||
263 | .filter_map(|it| it.map_err(|e| log::warn!("watcher error: {}", e)).ok()) | ||
264 | { | ||
265 | if entry.file_type().is_dir() { | ||
266 | if let Some(watcher) = &mut watcher { | ||
267 | watch_one(watcher, entry.path()); | ||
268 | } | ||
269 | } else { | ||
270 | let path = roots.contains(root, entry.path()).unwrap(); | ||
271 | files.push(path.to_owned()); | ||
272 | } | ||
273 | } | ||
274 | files | ||
275 | } | ||
276 | |||
277 | fn watch_one(watcher: &mut RecommendedWatcher, dir: &Path) { | ||
278 | match watcher.watch(dir, RecursiveMode::NonRecursive) { | ||
279 | Ok(()) => log::debug!("watching \"{}\"", dir.display()), | ||
280 | Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e), | ||
281 | } | ||
282 | } | ||
283 | |||
284 | fn read_to_string(path: &Path) -> Option<String> { | ||
285 | fs::read_to_string(&path).map_err(|e| log::warn!("failed to read file {}", e)).ok() | ||
286 | } | ||