aboutsummaryrefslogtreecommitdiff
path: root/crates/ra_vfs/src/io.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/ra_vfs/src/io.rs')
-rw-r--r--crates/ra_vfs/src/io.rs271
1 files changed, 215 insertions, 56 deletions
diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs
index 7ca1e9835..d764c534a 100644
--- a/crates/ra_vfs/src/io.rs
+++ b/crates/ra_vfs/src/io.rs
@@ -1,19 +1,22 @@
1use std::{fs, sync::Arc, thread}; 1use std::{
2 2 fs,
3use crossbeam_channel::{Receiver, Sender}; 3 thread,
4 path::{Path, PathBuf},
5 sync::{mpsc, Arc},
6 time::Duration,
7};
8use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select};
4use relative_path::RelativePathBuf; 9use relative_path::RelativePathBuf;
5use thread_worker::WorkerHandle; 10use thread_worker::WorkerHandle;
6use walkdir::WalkDir; 11use walkdir::WalkDir;
12use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher};
7 13
8mod watcher; 14use crate::{RootConfig, Roots, VfsRoot};
9use watcher::Watcher;
10
11use crate::{RootFilter, Roots, VfsRoot};
12 15
13pub(crate) enum Task { 16pub(crate) enum Task {
14 AddRoot { 17 AddRoot {
15 root: VfsRoot, 18 root: VfsRoot,
16 filter: Arc<RootFilter>, 19 config: Arc<RootConfig>,
17 }, 20 },
18} 21}
19 22
@@ -39,6 +42,15 @@ pub enum TaskResult {
39 }, 42 },
40} 43}
41 44
45#[derive(Debug)]
46enum ChangeKind {
47 Create,
48 Write,
49 Remove,
50}
51
52const WATCHER_DELAY: Duration = Duration::from_millis(250);
53
42pub(crate) struct Worker { 54pub(crate) struct Worker {
43 worker: thread_worker::Worker<Task, TaskResult>, 55 worker: thread_worker::Worker<Task, TaskResult>,
44 worker_handle: WorkerHandle, 56 worker_handle: WorkerHandle,
@@ -46,24 +58,75 @@ pub(crate) struct Worker {
46 58
47impl Worker { 59impl Worker {
48 pub(crate) fn start(roots: Arc<Roots>) -> Worker { 60 pub(crate) fn start(roots: Arc<Roots>) -> Worker {
49 let (worker, worker_handle) = 61 // This is a pretty elaborate setup of threads & channels! It is
50 thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| { 62 // explained by the following concerns:
51 let mut watcher = match Watcher::start(roots, output_sender.clone()) { 63 // * we need to burn a thread translating from notify's mpsc to
52 Ok(w) => Some(w), 64 // crossbeam_channel.
53 Err(e) => { 65 // * we want to read all files from a single thread, to gurantee that
54 log::error!("could not start watcher: {}", e); 66 // we always get fresher versions and never go back in time.
55 None 67 // * we want to tear down everything neatly during shutdown.
68 let (worker, worker_handle) = thread_worker::spawn(
69 "vfs",
70 128,
71 // This are the channels we use to communicate with outside world.
72 // If `input_receiver` is closed we need to tear ourselves down.
73 // `output_sender` should not be closed unless the parent died.
74 move |input_receiver, output_sender| {
75 // These are `std` channels notify will send events to
76 let (notify_sender, notify_receiver) = mpsc::channel();
77 // These are the corresponding crossbeam channels
78 let (watcher_sender, watcher_receiver) = unbounded();
79
80 let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY)
81 .map_err(|e| log::error!("failed to spawn notify {}", e))
82 .ok();
83 // Start a silly thread to tranform between two channels
84 let thread = thread::spawn(move || {
85 notify_receiver
86 .into_iter()
87 .for_each(|event| convert_notify_event(event, &watcher_sender))
88 });
89
90 // Process requests from the called or notifications from
91 // watcher until the caller says stop.
92 loop {
93 select! {
94 // Received request from the caller. If this channel is
95 // closed, we should shutdown everything.
96 recv(input_receiver) -> t => match t {
97 Err(RecvError) => {
98 drop(input_receiver);
99 break
100 },
101 Ok(Task::AddRoot { root, config }) => {
102 watch_root(watcher.as_mut(), &output_sender, root, Arc::clone(&config));
103 }
104 },
105 // Watcher send us changes. If **this** channel is
106 // closed, the watcher has died, which indicates a bug
107 // -- escalate!
108 recv(watcher_receiver) -> event => match event {
109 Err(RecvError) => panic!("watcher is dead"),
110 Ok((path, change)) => {
111 handle_change(watcher.as_mut(), &output_sender, &*roots, path, change);
112 }
113 },
56 } 114 }
57 };
58 let res = input_receiver
59 .into_iter()
60 .filter_map(|t| handle_task(t, &mut watcher))
61 .try_for_each(|it| output_sender.send(it));
62 if let Some(watcher) = watcher {
63 let _ = watcher.shutdown();
64 } 115 }
65 res.unwrap() 116 // Stopped the watcher
66 }); 117 drop(watcher.take());
118 // Drain pending events: we are not inrerested in them anyways!
119 watcher_receiver.into_iter().for_each(|_| ());
120
121 let res = thread.join();
122 match &res {
123 Ok(()) => log::info!("... Watcher terminated with ok"),
124 Err(_) => log::error!("... Watcher terminated with err"),
125 }
126 res.unwrap();
127 },
128 );
129
67 Worker { 130 Worker {
68 worker, 131 worker,
69 worker_handle, 132 worker_handle,
@@ -84,46 +147,142 @@ impl Worker {
84 } 147 }
85} 148}
86 149
87fn handle_task(task: Task, watcher: &mut Option<Watcher>) -> Option<TaskResult> { 150fn watch_root(
88 match task { 151 watcher: Option<&mut RecommendedWatcher>,
89 Task::AddRoot { root, filter } => { 152 sender: &Sender<TaskResult>,
90 if let Some(watcher) = watcher { 153 root: VfsRoot,
91 watcher.watch_root(&filter) 154 config: Arc<RootConfig>,
155) {
156 log::debug!("loading {} ...", config.root.as_path().display());
157 let files = watch_recursive(watcher, config.root.as_path(), &*config)
158 .into_iter()
159 .filter_map(|path| {
160 let abs_path = path.to_path(&config.root);
161 let text = read_to_string(&abs_path)?;
162 Some((path, text))
163 })
164 .collect();
165 sender
166 .send(TaskResult::BulkLoadRoot { root, files })
167 .unwrap();
168 log::debug!("... loaded {}", config.root.as_path().display());
169}
170
171fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeKind)>) {
172 // forward relevant events only
173 match event {
174 DebouncedEvent::NoticeWrite(_)
175 | DebouncedEvent::NoticeRemove(_)
176 | DebouncedEvent::Chmod(_) => {
177 // ignore
178 }
179 DebouncedEvent::Rescan => {
180 // TODO rescan all roots
181 }
182 DebouncedEvent::Create(path) => {
183 sender.send((path, ChangeKind::Create)).unwrap();
184 }
185 DebouncedEvent::Write(path) => {
186 sender.send((path, ChangeKind::Write)).unwrap();
187 }
188 DebouncedEvent::Remove(path) => {
189 sender.send((path, ChangeKind::Remove)).unwrap();
190 }
191 DebouncedEvent::Rename(src, dst) => {
192 sender.send((src, ChangeKind::Remove)).unwrap();
193 sender.send((dst, ChangeKind::Create)).unwrap();
194 }
195 DebouncedEvent::Error(err, path) => {
196 // TODO should we reload the file contents?
197 log::warn!("watcher error \"{}\", {:?}", err, path);
198 }
199 }
200}
201
202fn handle_change(
203 watcher: Option<&mut RecommendedWatcher>,
204 sender: &Sender<TaskResult>,
205 roots: &Roots,
206 path: PathBuf,
207 kind: ChangeKind,
208) {
209 let (root, rel_path) = match roots.find(&path) {
210 None => return,
211 Some(it) => it,
212 };
213 let config = &roots[root];
214 match kind {
215 ChangeKind::Create => {
216 let mut paths = Vec::new();
217 if path.is_dir() {
218 paths.extend(watch_recursive(watcher, &path, &config));
219 } else {
220 paths.push(rel_path);
92 } 221 }
93 log::debug!("loading {} ...", filter.root.as_path().display()); 222 paths
94 let files = load_root(filter.as_ref()); 223 .into_iter()
95 log::debug!("... loaded {}", filter.root.as_path().display()); 224 .filter_map(|rel_path| {
96 Some(TaskResult::BulkLoadRoot { root, files }) 225 let abs_path = rel_path.to_path(&config.root);
226 let text = read_to_string(&abs_path)?;
227 Some((rel_path, text))
228 })
229 .try_for_each(|(path, text)| {
230 sender.send(TaskResult::AddSingleFile { root, path, text })
231 })
232 .unwrap()
97 } 233 }
234 ChangeKind::Write => {
235 if let Some(text) = read_to_string(&path) {
236 sender
237 .send(TaskResult::ChangeSingleFile {
238 root,
239 path: rel_path,
240 text,
241 })
242 .unwrap();
243 }
244 }
245 ChangeKind::Remove => sender
246 .send(TaskResult::RemoveSingleFile {
247 root,
248 path: rel_path,
249 })
250 .unwrap(),
98 } 251 }
99} 252}
100 253
101fn load_root(filter: &RootFilter) -> Vec<(RelativePathBuf, String)> { 254fn watch_recursive(
102 let mut res = Vec::new(); 255 mut watcher: Option<&mut RecommendedWatcher>,
103 for entry in WalkDir::new(&filter.root) 256 dir: &Path,
257 config: &RootConfig,
258) -> Vec<RelativePathBuf> {
259 let mut files = Vec::new();
260 for entry in WalkDir::new(dir)
104 .into_iter() 261 .into_iter()
105 .filter_entry(filter.entry_filter()) 262 .filter_entry(|it| config.contains(it.path()).is_some())
263 .filter_map(|it| it.map_err(|e| log::warn!("watcher error: {}", e)).ok())
106 { 264 {
107 let entry = match entry { 265 if entry.file_type().is_dir() {
108 Ok(entry) => entry, 266 if let Some(watcher) = &mut watcher {
109 Err(e) => { 267 watch_one(watcher, entry.path());
110 log::warn!("watcher error: {}", e);
111 continue;
112 } 268 }
113 }; 269 } else {
114 if !entry.file_type().is_file() { 270 let path = config.contains(entry.path()).unwrap();
115 continue; 271 files.push(path.to_owned());
116 } 272 }
117 let path = entry.path();
118 let text = match fs::read_to_string(path) {
119 Ok(text) => text,
120 Err(e) => {
121 log::warn!("watcher error: {}", e);
122 continue;
123 }
124 };
125 let path = RelativePathBuf::from_path(path.strip_prefix(&filter.root).unwrap()).unwrap();
126 res.push((path.to_owned(), text))
127 } 273 }
128 res 274 files
275}
276
277fn watch_one(watcher: &mut RecommendedWatcher, dir: &Path) {
278 match watcher.watch(dir, RecursiveMode::NonRecursive) {
279 Ok(()) => log::debug!("watching \"{}\"", dir.display()),
280 Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e),
281 }
282}
283
284fn read_to_string(path: &Path) -> Option<String> {
285 fs::read_to_string(&path)
286 .map_err(|e| log::warn!("failed to read file {}", e))
287 .ok()
129} 288}