aboutsummaryrefslogtreecommitdiff
path: root/crates/ra_vfs/src/io.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/ra_vfs/src/io.rs')
-rw-r--r--crates/ra_vfs/src/io.rs286
1 files changed, 0 insertions, 286 deletions
diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs
deleted file mode 100644
index 5969ee0d0..000000000
--- a/crates/ra_vfs/src/io.rs
+++ /dev/null
@@ -1,286 +0,0 @@
1use std::{
2 fs,
3 path::{Path, PathBuf},
4 sync::{mpsc, Arc},
5 time::Duration,
6 thread,
7};
8use crossbeam_channel::{Sender, Receiver, unbounded, RecvError, select};
9use relative_path::RelativePathBuf;
10use walkdir::WalkDir;
11use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher};
12
13use crate::{Roots, VfsRoot, VfsTask};
14
15pub(crate) enum Task {
16 AddRoot { root: VfsRoot },
17}
18
19/// `TaskResult` transfers files read on the IO thread to the VFS on the main
20/// thread.
21#[derive(Debug)]
22pub(crate) enum TaskResult {
23 /// Emitted when we've recursively scanned a source root during the initial
24 /// load.
25 BulkLoadRoot { root: VfsRoot, files: Vec<(RelativePathBuf, String)> },
26 /// Emitted when we've noticed that a single file has changed.
27 ///
28 /// Note that this by design does not distinguish between
29 /// create/delete/write events, and instead specifies the *current* state of
30 /// the file. The idea is to guarantee that in the quiescent state the sum
31 /// of all results equals to the current state of the file system, while
32 /// allowing to skip intermediate events in non-quiescent states.
33 SingleFile { root: VfsRoot, path: RelativePathBuf, text: Option<String> },
34}
35
36/// The kind of raw notification we've received from the notify library.
37///
38/// Note that these are not necessary 100% precise (for example we might receive
39/// `Create` instead of `Write`, see #734), but we try do distinguish `Create`s
40/// to implement recursive watching of directories.
41#[derive(Debug)]
42enum ChangeKind {
43 Create,
44 Write,
45 Remove,
46}
47
48const WATCHER_DELAY: Duration = Duration::from_millis(250);
49
50// Like thread::JoinHandle, but joins the thread on drop.
51//
52// This is useful because it guarantees the absence of run-away threads, even if
53// code panics. This is important, because we might see panics in the test and
54// we might be used in an IDE context, where a failed component is just
55// restarted.
56//
57// Because all threads are joined, care must be taken to avoid deadlocks. That
58// typically means ensuring that channels are dropped before the threads.
59struct ScopedThread(Option<thread::JoinHandle<()>>);
60
61impl ScopedThread {
62 fn spawn(name: String, f: impl FnOnce() + Send + 'static) -> ScopedThread {
63 let handle = thread::Builder::new().name(name).spawn(f).unwrap();
64 ScopedThread(Some(handle))
65 }
66}
67
68impl Drop for ScopedThread {
69 fn drop(&mut self) {
70 let res = self.0.take().unwrap().join();
71 if !thread::panicking() {
72 res.unwrap();
73 }
74 }
75}
76
77pub(crate) struct Worker {
78 // XXX: field order is significant here.
79 //
80 // In Rust, fields are dropped in the declaration order, and we rely on this
81 // here. We must close sender first, so that the `thread` (who holds the
82 // opposite side of the channel) noticed shutdown. Then, we must join the
83 // thread, but we must keep receiver alive so that the thread does not
84 // panic.
85 pub(crate) sender: Sender<Task>,
86 _thread: ScopedThread,
87 pub(crate) receiver: Receiver<VfsTask>,
88}
89
90pub(crate) fn start(roots: Arc<Roots>) -> Worker {
91 // This is a pretty elaborate setup of threads & channels! It is
92 // explained by the following concerns:
93 // * we need to burn a thread translating from notify's mpsc to
94 // crossbeam_channel.
95 // * we want to read all files from a single thread, to guarantee that
96 // we always get fresher versions and never go back in time.
97 // * we want to tear down everything neatly during shutdown.
98 let _thread;
99 // This are the channels we use to communicate with outside world.
100 // If `input_receiver` is closed we need to tear ourselves down.
101 // `output_sender` should not be closed unless the parent died.
102 let (input_sender, input_receiver) = unbounded();
103 let (output_sender, output_receiver) = unbounded();
104
105 _thread = ScopedThread::spawn("vfs".to_string(), move || {
106 // Make sure that the destruction order is
107 //
108 // * notify_sender
109 // * _thread
110 // * watcher_sender
111 //
112 // this is required to avoid deadlocks.
113
114 // These are the corresponding crossbeam channels
115 let (watcher_sender, watcher_receiver) = unbounded();
116 let _notify_thread;
117 {
118 // These are `std` channels notify will send events to
119 let (notify_sender, notify_receiver) = mpsc::channel();
120
121 let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY)
122 .map_err(|e| log::error!("failed to spawn notify {}", e))
123 .ok();
124 // Start a silly thread to transform between two channels
125 _notify_thread = ScopedThread::spawn("notify-convertor".to_string(), move || {
126 notify_receiver
127 .into_iter()
128 .for_each(|event| convert_notify_event(event, &watcher_sender))
129 });
130
131 // Process requests from the called or notifications from
132 // watcher until the caller says stop.
133 loop {
134 select! {
135 // Received request from the caller. If this channel is
136 // closed, we should shutdown everything.
137 recv(input_receiver) -> t => match t {
138 Err(RecvError) => {
139 drop(input_receiver);
140 break
141 },
142 Ok(Task::AddRoot { root }) => {
143 watch_root(watcher.as_mut(), &output_sender, &*roots, root);
144 }
145 },
146 // Watcher send us changes. If **this** channel is
147 // closed, the watcher has died, which indicates a bug
148 // -- escalate!
149 recv(watcher_receiver) -> event => match event {
150 Err(RecvError) => panic!("watcher is dead"),
151 Ok((path, change)) => {
152 handle_change(watcher.as_mut(), &output_sender, &*roots, path, change);
153 }
154 },
155 }
156 }
157 }
158 // Drain pending events: we are not interested in them anyways!
159 watcher_receiver.into_iter().for_each(|_| ());
160 });
161 Worker { sender: input_sender, _thread, receiver: output_receiver }
162}
163
164fn watch_root(
165 watcher: Option<&mut RecommendedWatcher>,
166 sender: &Sender<VfsTask>,
167 roots: &Roots,
168 root: VfsRoot,
169) {
170 let root_path = roots.path(root);
171 log::debug!("loading {} ...", root_path.display());
172 let files = watch_recursive(watcher, root_path, roots, root)
173 .into_iter()
174 .filter_map(|path| {
175 let abs_path = path.to_path(&root_path);
176 let text = read_to_string(&abs_path)?;
177 Some((path, text))
178 })
179 .collect();
180 let res = TaskResult::BulkLoadRoot { root, files };
181 sender.send(VfsTask(res)).unwrap();
182 log::debug!("... loaded {}", root_path.display());
183}
184
185fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeKind)>) {
186 // forward relevant events only
187 match event {
188 DebouncedEvent::NoticeWrite(_)
189 | DebouncedEvent::NoticeRemove(_)
190 | DebouncedEvent::Chmod(_) => {
191 // ignore
192 }
193 DebouncedEvent::Rescan => {
194 // TODO: rescan all roots
195 }
196 DebouncedEvent::Create(path) => {
197 sender.send((path, ChangeKind::Create)).unwrap();
198 }
199 DebouncedEvent::Write(path) => {
200 sender.send((path, ChangeKind::Write)).unwrap();
201 }
202 DebouncedEvent::Remove(path) => {
203 sender.send((path, ChangeKind::Remove)).unwrap();
204 }
205 DebouncedEvent::Rename(src, dst) => {
206 sender.send((src, ChangeKind::Remove)).unwrap();
207 sender.send((dst, ChangeKind::Create)).unwrap();
208 }
209 DebouncedEvent::Error(err, path) => {
210 // TODO: should we reload the file contents?
211 log::warn!("watcher error \"{}\", {:?}", err, path);
212 }
213 }
214}
215
216fn handle_change(
217 watcher: Option<&mut RecommendedWatcher>,
218 sender: &Sender<VfsTask>,
219 roots: &Roots,
220 path: PathBuf,
221 kind: ChangeKind,
222) {
223 let (root, rel_path) = match roots.find(&path) {
224 None => return,
225 Some(it) => it,
226 };
227 match kind {
228 ChangeKind::Create => {
229 let mut paths = Vec::new();
230 if path.is_dir() {
231 paths.extend(watch_recursive(watcher, &path, roots, root));
232 } else {
233 paths.push(rel_path);
234 }
235 paths
236 .into_iter()
237 .try_for_each(|rel_path| {
238 let abs_path = rel_path.to_path(&roots.path(root));
239 let text = read_to_string(&abs_path);
240 let res = TaskResult::SingleFile { root, path: rel_path, text };
241 sender.send(VfsTask(res))
242 })
243 .unwrap()
244 }
245 ChangeKind::Write | ChangeKind::Remove => {
246 let text = read_to_string(&path);
247 let res = TaskResult::SingleFile { root, path: rel_path, text };
248 sender.send(VfsTask(res)).unwrap();
249 }
250 }
251}
252
253fn watch_recursive(
254 mut watcher: Option<&mut RecommendedWatcher>,
255 dir: &Path,
256 roots: &Roots,
257 root: VfsRoot,
258) -> Vec<RelativePathBuf> {
259 let mut files = Vec::new();
260 for entry in WalkDir::new(dir)
261 .into_iter()
262 .filter_entry(|it| roots.contains(root, it.path()).is_some())
263 .filter_map(|it| it.map_err(|e| log::warn!("watcher error: {}", e)).ok())
264 {
265 if entry.file_type().is_dir() {
266 if let Some(watcher) = &mut watcher {
267 watch_one(watcher, entry.path());
268 }
269 } else {
270 let path = roots.contains(root, entry.path()).unwrap();
271 files.push(path.to_owned());
272 }
273 }
274 files
275}
276
277fn watch_one(watcher: &mut RecommendedWatcher, dir: &Path) {
278 match watcher.watch(dir, RecursiveMode::NonRecursive) {
279 Ok(()) => log::debug!("watching \"{}\"", dir.display()),
280 Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e),
281 }
282}
283
284fn read_to_string(path: &Path) -> Option<String> {
285 fs::read_to_string(&path).map_err(|e| log::warn!("failed to read file {}", e)).ok()
286}