aboutsummaryrefslogtreecommitdiff
path: root/crates/ra_vfs/src/io.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/ra_vfs/src/io.rs')
-rw-r--r--crates/ra_vfs/src/io.rs229
1 files changed, 176 insertions, 53 deletions
diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs
index 7ca1e9835..669240488 100644
--- a/crates/ra_vfs/src/io.rs
+++ b/crates/ra_vfs/src/io.rs
@@ -1,19 +1,23 @@
1use std::{fs, sync::Arc, thread}; 1use std::{
2 2 fs,
3use crossbeam_channel::{Receiver, Sender}; 3 path::{Path, PathBuf},
4 sync::{mpsc, Arc},
5 thread,
6 time::Duration,
7};
8use crossbeam_channel::{Receiver, Sender, SendError};
4use relative_path::RelativePathBuf; 9use relative_path::RelativePathBuf;
5use thread_worker::WorkerHandle; 10use thread_worker::WorkerHandle;
6use walkdir::WalkDir; 11use walkdir::WalkDir;
12use parking_lot::Mutex;
13use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher};
7 14
8mod watcher; 15use crate::{RootConfig, Roots, VfsRoot};
9use watcher::Watcher;
10
11use crate::{RootFilter, Roots, VfsRoot};
12 16
13pub(crate) enum Task { 17pub(crate) enum Task {
14 AddRoot { 18 AddRoot {
15 root: VfsRoot, 19 root: VfsRoot,
16 filter: Arc<RootFilter>, 20 config: Arc<RootConfig>,
17 }, 21 },
18} 22}
19 23
@@ -39,6 +43,15 @@ pub enum TaskResult {
39 }, 43 },
40} 44}
41 45
46#[derive(Debug)]
47enum ChangeKind {
48 Create,
49 Write,
50 Remove,
51}
52
53const WATCHER_DELAY: Duration = Duration::from_millis(250);
54
42pub(crate) struct Worker { 55pub(crate) struct Worker {
43 worker: thread_worker::Worker<Task, TaskResult>, 56 worker: thread_worker::Worker<Task, TaskResult>,
44 worker_handle: WorkerHandle, 57 worker_handle: WorkerHandle,
@@ -48,21 +61,36 @@ impl Worker {
48 pub(crate) fn start(roots: Arc<Roots>) -> Worker { 61 pub(crate) fn start(roots: Arc<Roots>) -> Worker {
49 let (worker, worker_handle) = 62 let (worker, worker_handle) =
50 thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| { 63 thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| {
51 let mut watcher = match Watcher::start(roots, output_sender.clone()) { 64 let (notify_sender, notify_receiver) = mpsc::channel();
52 Ok(w) => Some(w), 65 let watcher = notify::watcher(notify_sender, WATCHER_DELAY)
53 Err(e) => { 66 .map_err(|e| log::error!("failed to spawn notify {}", e))
54 log::error!("could not start watcher: {}", e); 67 .ok();
55 None 68 let ctx = WatcherCtx {
56 } 69 roots,
70 watcher: Arc::new(Mutex::new(watcher)),
71 sender: output_sender,
57 }; 72 };
58 let res = input_receiver 73 let thread = thread::spawn({
59 .into_iter() 74 let ctx = ctx.clone();
60 .filter_map(|t| handle_task(t, &mut watcher)) 75 move || {
61 .try_for_each(|it| output_sender.send(it)); 76 let _ = notify_receiver
62 if let Some(watcher) = watcher { 77 .into_iter()
63 let _ = watcher.shutdown(); 78 // forward relevant events only
79 .try_for_each(|change| ctx.handle_debounced_event(change));
80 }
81 });
82 let res1 = input_receiver.into_iter().try_for_each(|t| match t {
83 Task::AddRoot { root, config } => watch_root(&ctx, root, Arc::clone(&config)),
84 });
85 drop(ctx.watcher.lock().take());
86 drop(ctx);
87 let res2 = thread.join();
88 match &res2 {
89 Ok(()) => log::info!("... Watcher terminated with ok"),
90 Err(_) => log::error!("... Watcher terminated with err"),
64 } 91 }
65 res.unwrap() 92 res1.unwrap();
93 res2.unwrap();
66 }); 94 });
67 Worker { 95 Worker {
68 worker, 96 worker,
@@ -84,46 +112,141 @@ impl Worker {
84 } 112 }
85} 113}
86 114
87fn handle_task(task: Task, watcher: &mut Option<Watcher>) -> Option<TaskResult> { 115fn watch_root(
88 match task { 116 woker: &WatcherCtx,
89 Task::AddRoot { root, filter } => { 117 root: VfsRoot,
90 if let Some(watcher) = watcher { 118 config: Arc<RootConfig>,
91 watcher.watch_root(&filter) 119) -> Result<(), SendError<TaskResult>> {
120 let mut guard = woker.watcher.lock();
121 log::debug!("loading {} ...", config.root.as_path().display());
122 let files = watch_recursive(guard.as_mut(), config.root.as_path(), &*config)
123 .into_iter()
124 .filter_map(|path| {
125 let abs_path = path.to_path(&config.root);
126 let text = fs::read_to_string(abs_path)
127 .map_err(|e| log::warn!("watcher error: {}", e))
128 .ok()?;
129 Some((path, text))
130 })
131 .collect();
132 woker
133 .sender
134 .send(TaskResult::BulkLoadRoot { root, files })?;
135 log::debug!("... loaded {}", config.root.as_path().display());
136 Ok(())
137}
138
139#[derive(Clone)]
140struct WatcherCtx {
141 roots: Arc<Roots>,
142 watcher: Arc<Mutex<Option<RecommendedWatcher>>>,
143 sender: Sender<TaskResult>,
144}
145
146impl WatcherCtx {
147 fn handle_debounced_event(&self, ev: DebouncedEvent) -> Result<(), SendError<TaskResult>> {
148 match ev {
149 DebouncedEvent::NoticeWrite(_)
150 | DebouncedEvent::NoticeRemove(_)
151 | DebouncedEvent::Chmod(_) => {
152 // ignore
153 }
154 DebouncedEvent::Rescan => {
155 // TODO rescan all roots
156 }
157 DebouncedEvent::Create(path) => {
158 self.handle_change(path, ChangeKind::Create)?;
159 }
160 DebouncedEvent::Write(path) => {
161 self.handle_change(path, ChangeKind::Write)?;
92 } 162 }
93 log::debug!("loading {} ...", filter.root.as_path().display()); 163 DebouncedEvent::Remove(path) => {
94 let files = load_root(filter.as_ref()); 164 self.handle_change(path, ChangeKind::Remove)?;
95 log::debug!("... loaded {}", filter.root.as_path().display()); 165 }
96 Some(TaskResult::BulkLoadRoot { root, files }) 166 DebouncedEvent::Rename(src, dst) => {
167 self.handle_change(src, ChangeKind::Remove)?;
168 self.handle_change(dst, ChangeKind::Create)?;
169 }
170 DebouncedEvent::Error(err, path) => {
171 // TODO should we reload the file contents?
172 log::warn!("watcher error \"{}\", {:?}", err, path);
173 }
174 }
175 Ok(())
176 }
177
178 fn handle_change(&self, path: PathBuf, kind: ChangeKind) -> Result<(), SendError<TaskResult>> {
179 let (root, rel_path) = match self.roots.find(&path) {
180 None => return Ok(()),
181 Some(it) => it,
182 };
183 let config = &self.roots[root];
184 match kind {
185 ChangeKind::Create => {
186 let mut paths = Vec::new();
187 if path.is_dir() {
188 let mut guard = self.watcher.lock();
189 paths.extend(watch_recursive(guard.as_mut(), &path, &config));
190 } else {
191 paths.push(rel_path);
192 }
193 paths
194 .into_iter()
195 .filter_map(|rel_path| {
196 let abs_path = rel_path.to_path(&config.root);
197 let text = fs::read_to_string(&abs_path)
198 .map_err(|e| log::warn!("watcher failed {}", e))
199 .ok()?;
200 Some((rel_path, text))
201 })
202 .try_for_each(|(path, text)| {
203 self.sender
204 .send(TaskResult::AddSingleFile { root, path, text })
205 })?
206 }
207 ChangeKind::Write => match fs::read_to_string(&path) {
208 Err(e) => log::warn!("watcher failed {}", e),
209 Ok(text) => self.sender.send(TaskResult::ChangeSingleFile {
210 root,
211 path: rel_path,
212 text,
213 })?,
214 },
215 ChangeKind::Remove => self.sender.send(TaskResult::RemoveSingleFile {
216 root,
217 path: rel_path,
218 })?,
97 } 219 }
220 Ok(())
98 } 221 }
99} 222}
100 223
101fn load_root(filter: &RootFilter) -> Vec<(RelativePathBuf, String)> { 224fn watch_recursive(
102 let mut res = Vec::new(); 225 mut watcher: Option<&mut RecommendedWatcher>,
103 for entry in WalkDir::new(&filter.root) 226 dir: &Path,
227 config: &RootConfig,
228) -> Vec<RelativePathBuf> {
229 let mut files = Vec::new();
230 for entry in WalkDir::new(dir)
104 .into_iter() 231 .into_iter()
105 .filter_entry(filter.entry_filter()) 232 .filter_entry(|it| config.contains(it.path()).is_some())
233 .filter_map(|it| it.map_err(|e| log::warn!("watcher error: {}", e)).ok())
106 { 234 {
107 let entry = match entry { 235 if entry.file_type().is_dir() {
108 Ok(entry) => entry, 236 if let Some(watcher) = &mut watcher {
109 Err(e) => { 237 watch_one(watcher, entry.path());
110 log::warn!("watcher error: {}", e);
111 continue;
112 } 238 }
113 }; 239 } else {
114 if !entry.file_type().is_file() { 240 let path = config.contains(entry.path()).unwrap();
115 continue; 241 files.push(path.to_owned());
116 } 242 }
117 let path = entry.path();
118 let text = match fs::read_to_string(path) {
119 Ok(text) => text,
120 Err(e) => {
121 log::warn!("watcher error: {}", e);
122 continue;
123 }
124 };
125 let path = RelativePathBuf::from_path(path.strip_prefix(&filter.root).unwrap()).unwrap();
126 res.push((path.to_owned(), text))
127 } 243 }
128 res 244 files
245}
246
247fn watch_one(watcher: &mut RecommendedWatcher, dir: &Path) {
248 match watcher.watch(dir, RecursiveMode::NonRecursive) {
249 Ok(()) => log::debug!("watching \"{}\"", dir.display()),
250 Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e),
251 }
129} 252}