diff options
Diffstat (limited to 'crates/ra_vfs/src/io')
-rw-r--r-- | crates/ra_vfs/src/io/mod.rs | 212 | ||||
-rw-r--r-- | crates/ra_vfs/src/io/watcher.rs | 128 |
2 files changed, 340 insertions, 0 deletions
diff --git a/crates/ra_vfs/src/io/mod.rs b/crates/ra_vfs/src/io/mod.rs new file mode 100644 index 000000000..6d5af7690 --- /dev/null +++ b/crates/ra_vfs/src/io/mod.rs | |||
@@ -0,0 +1,212 @@ | |||
1 | use std::{ | ||
2 | fmt, fs, | ||
3 | path::{Path, PathBuf}, | ||
4 | sync::Arc, | ||
5 | thread, | ||
6 | }; | ||
7 | |||
8 | use crossbeam_channel::{Receiver, Sender}; | ||
9 | use parking_lot::Mutex; | ||
10 | use relative_path::RelativePathBuf; | ||
11 | use thread_worker::WorkerHandle; | ||
12 | use walkdir::{DirEntry, WalkDir}; | ||
13 | |||
14 | mod watcher; | ||
15 | use watcher::Watcher; | ||
16 | pub use watcher::WatcherChange; | ||
17 | |||
18 | use crate::VfsRoot; | ||
19 | |||
20 | pub(crate) enum Task { | ||
21 | AddRoot { | ||
22 | root: VfsRoot, | ||
23 | path: PathBuf, | ||
24 | filter: Box<Fn(&DirEntry) -> bool + Send>, | ||
25 | }, | ||
26 | /// this variant should only be created by the watcher | ||
27 | HandleChange(WatcherChange), | ||
28 | LoadChange(WatcherChange), | ||
29 | Watch { | ||
30 | dir: PathBuf, | ||
31 | filter: Box<Fn(&DirEntry) -> bool + Send>, | ||
32 | }, | ||
33 | } | ||
34 | |||
35 | #[derive(Debug)] | ||
36 | pub struct AddRootResult { | ||
37 | pub(crate) root: VfsRoot, | ||
38 | pub(crate) files: Vec<(RelativePathBuf, String)>, | ||
39 | } | ||
40 | |||
41 | #[derive(Debug)] | ||
42 | pub enum WatcherChangeData { | ||
43 | Create { path: PathBuf, text: String }, | ||
44 | Write { path: PathBuf, text: String }, | ||
45 | Remove { path: PathBuf }, | ||
46 | } | ||
47 | |||
48 | pub enum TaskResult { | ||
49 | AddRoot(AddRootResult), | ||
50 | HandleChange(WatcherChange), | ||
51 | LoadChange(WatcherChangeData), | ||
52 | NoOp, | ||
53 | } | ||
54 | |||
55 | impl fmt::Debug for TaskResult { | ||
56 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { | ||
57 | f.write_str("TaskResult { ... }") | ||
58 | } | ||
59 | } | ||
60 | |||
61 | pub(crate) struct Worker { | ||
62 | worker: thread_worker::Worker<Task, TaskResult>, | ||
63 | worker_handle: WorkerHandle, | ||
64 | watcher: Arc<Mutex<Option<Watcher>>>, | ||
65 | } | ||
66 | |||
67 | impl Worker { | ||
68 | pub(crate) fn start() -> Worker { | ||
69 | let watcher = Arc::new(Mutex::new(None)); | ||
70 | let watcher_clone = watcher.clone(); | ||
71 | let (worker, worker_handle) = | ||
72 | thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| { | ||
73 | input_receiver | ||
74 | .into_iter() | ||
75 | .map(|t| handle_task(t, &watcher_clone)) | ||
76 | .try_for_each(|it| output_sender.send(it)) | ||
77 | .unwrap() | ||
78 | }); | ||
79 | match Watcher::start(worker.inp.clone()) { | ||
80 | Ok(w) => { | ||
81 | watcher.lock().replace(w); | ||
82 | } | ||
83 | Err(e) => log::error!("could not start watcher: {}", e), | ||
84 | }; | ||
85 | Worker { | ||
86 | worker, | ||
87 | worker_handle, | ||
88 | watcher, | ||
89 | } | ||
90 | } | ||
91 | |||
92 | pub(crate) fn sender(&self) -> &Sender<Task> { | ||
93 | &self.worker.inp | ||
94 | } | ||
95 | |||
96 | pub(crate) fn receiver(&self) -> &Receiver<TaskResult> { | ||
97 | &self.worker.out | ||
98 | } | ||
99 | |||
100 | pub(crate) fn shutdown(self) -> thread::Result<()> { | ||
101 | if let Some(watcher) = self.watcher.lock().take() { | ||
102 | let _ = watcher.shutdown(); | ||
103 | } | ||
104 | let _ = self.worker.shutdown(); | ||
105 | self.worker_handle.shutdown() | ||
106 | } | ||
107 | } | ||
108 | |||
109 | fn watch( | ||
110 | watcher: &Arc<Mutex<Option<Watcher>>>, | ||
111 | dir: &Path, | ||
112 | filter_entry: impl Fn(&DirEntry) -> bool, | ||
113 | emit_for_existing: bool, | ||
114 | ) { | ||
115 | let mut watcher = watcher.lock(); | ||
116 | let watcher = match *watcher { | ||
117 | Some(ref mut w) => w, | ||
118 | None => { | ||
119 | // watcher dropped or couldn't start | ||
120 | return; | ||
121 | } | ||
122 | }; | ||
123 | watcher.watch_recursive(dir, filter_entry, emit_for_existing) | ||
124 | } | ||
125 | |||
126 | fn handle_task(task: Task, watcher: &Arc<Mutex<Option<Watcher>>>) -> TaskResult { | ||
127 | match task { | ||
128 | Task::AddRoot { root, path, filter } => { | ||
129 | watch(watcher, &path, &*filter, false); | ||
130 | log::debug!("loading {} ...", path.as_path().display()); | ||
131 | let files = load_root(path.as_path(), &*filter); | ||
132 | log::debug!("... loaded {}", path.as_path().display()); | ||
133 | TaskResult::AddRoot(AddRootResult { root, files }) | ||
134 | } | ||
135 | Task::HandleChange(change) => { | ||
136 | // forward as is because Vfs has to decide if we should load it | ||
137 | TaskResult::HandleChange(change) | ||
138 | } | ||
139 | Task::LoadChange(change) => { | ||
140 | log::debug!("loading {:?} ...", change); | ||
141 | match load_change(change) { | ||
142 | Some(data) => TaskResult::LoadChange(data), | ||
143 | None => TaskResult::NoOp, | ||
144 | } | ||
145 | } | ||
146 | Task::Watch { dir, filter } => { | ||
147 | watch(watcher, &dir, &*filter, true); | ||
148 | TaskResult::NoOp | ||
149 | } | ||
150 | } | ||
151 | } | ||
152 | |||
153 | fn load_root(root: &Path, filter: &dyn Fn(&DirEntry) -> bool) -> Vec<(RelativePathBuf, String)> { | ||
154 | let mut res = Vec::new(); | ||
155 | for entry in WalkDir::new(root).into_iter().filter_entry(filter) { | ||
156 | let entry = match entry { | ||
157 | Ok(entry) => entry, | ||
158 | Err(e) => { | ||
159 | log::warn!("watcher error: {}", e); | ||
160 | continue; | ||
161 | } | ||
162 | }; | ||
163 | if !entry.file_type().is_file() { | ||
164 | continue; | ||
165 | } | ||
166 | let path = entry.path(); | ||
167 | let text = match fs::read_to_string(path) { | ||
168 | Ok(text) => text, | ||
169 | Err(e) => { | ||
170 | log::warn!("watcher error: {}", e); | ||
171 | continue; | ||
172 | } | ||
173 | }; | ||
174 | let path = RelativePathBuf::from_path(path.strip_prefix(root).unwrap()).unwrap(); | ||
175 | res.push((path.to_owned(), text)) | ||
176 | } | ||
177 | res | ||
178 | } | ||
179 | |||
180 | fn load_change(change: WatcherChange) -> Option<WatcherChangeData> { | ||
181 | let data = match change { | ||
182 | WatcherChange::Create(path) => { | ||
183 | if path.is_dir() { | ||
184 | return None; | ||
185 | } | ||
186 | let text = match fs::read_to_string(&path) { | ||
187 | Ok(text) => text, | ||
188 | Err(e) => { | ||
189 | log::warn!("watcher error \"{}\": {}", path.display(), e); | ||
190 | return None; | ||
191 | } | ||
192 | }; | ||
193 | WatcherChangeData::Create { path, text } | ||
194 | } | ||
195 | WatcherChange::Write(path) => { | ||
196 | let text = match fs::read_to_string(&path) { | ||
197 | Ok(text) => text, | ||
198 | Err(e) => { | ||
199 | log::warn!("watcher error \"{}\": {}", path.display(), e); | ||
200 | return None; | ||
201 | } | ||
202 | }; | ||
203 | WatcherChangeData::Write { path, text } | ||
204 | } | ||
205 | WatcherChange::Remove(path) => WatcherChangeData::Remove { path }, | ||
206 | WatcherChange::Rescan => { | ||
207 | // this should be handled by Vfs::handle_task | ||
208 | return None; | ||
209 | } | ||
210 | }; | ||
211 | Some(data) | ||
212 | } | ||
diff --git a/crates/ra_vfs/src/io/watcher.rs b/crates/ra_vfs/src/io/watcher.rs new file mode 100644 index 000000000..e33298477 --- /dev/null +++ b/crates/ra_vfs/src/io/watcher.rs | |||
@@ -0,0 +1,128 @@ | |||
1 | use crate::io; | ||
2 | use crossbeam_channel::Sender; | ||
3 | use drop_bomb::DropBomb; | ||
4 | use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher}; | ||
5 | use std::{ | ||
6 | path::{Path, PathBuf}, | ||
7 | sync::mpsc, | ||
8 | thread, | ||
9 | time::Duration, | ||
10 | }; | ||
11 | use walkdir::{DirEntry, WalkDir}; | ||
12 | |||
13 | #[derive(Debug)] | ||
14 | pub enum WatcherChange { | ||
15 | Create(PathBuf), | ||
16 | Write(PathBuf), | ||
17 | Remove(PathBuf), | ||
18 | Rescan, | ||
19 | } | ||
20 | |||
21 | fn handle_change_event( | ||
22 | ev: DebouncedEvent, | ||
23 | sender: &Sender<io::Task>, | ||
24 | ) -> Result<(), Box<std::error::Error>> { | ||
25 | match ev { | ||
26 | DebouncedEvent::NoticeWrite(_) | ||
27 | | DebouncedEvent::NoticeRemove(_) | ||
28 | | DebouncedEvent::Chmod(_) => { | ||
29 | // ignore | ||
30 | } | ||
31 | DebouncedEvent::Rescan => { | ||
32 | sender.send(io::Task::HandleChange(WatcherChange::Rescan))?; | ||
33 | } | ||
34 | DebouncedEvent::Create(path) => { | ||
35 | sender.send(io::Task::HandleChange(WatcherChange::Create(path)))?; | ||
36 | } | ||
37 | DebouncedEvent::Write(path) => { | ||
38 | sender.send(io::Task::HandleChange(WatcherChange::Write(path)))?; | ||
39 | } | ||
40 | DebouncedEvent::Remove(path) => { | ||
41 | sender.send(io::Task::HandleChange(WatcherChange::Remove(path)))?; | ||
42 | } | ||
43 | DebouncedEvent::Rename(src, dst) => { | ||
44 | sender.send(io::Task::HandleChange(WatcherChange::Remove(src)))?; | ||
45 | sender.send(io::Task::HandleChange(WatcherChange::Create(dst)))?; | ||
46 | } | ||
47 | DebouncedEvent::Error(err, path) => { | ||
48 | // TODO should we reload the file contents? | ||
49 | log::warn!("watcher error \"{}\", {:?}", err, path); | ||
50 | } | ||
51 | } | ||
52 | Ok(()) | ||
53 | } | ||
54 | |||
55 | const WATCHER_DELAY: Duration = Duration::from_millis(250); | ||
56 | |||
57 | pub(crate) struct Watcher { | ||
58 | watcher: RecommendedWatcher, | ||
59 | thread: thread::JoinHandle<()>, | ||
60 | bomb: DropBomb, | ||
61 | sender: Sender<io::Task>, | ||
62 | } | ||
63 | |||
64 | impl Watcher { | ||
65 | pub(crate) fn start( | ||
66 | output_sender: Sender<io::Task>, | ||
67 | ) -> Result<Watcher, Box<std::error::Error>> { | ||
68 | let (input_sender, input_receiver) = mpsc::channel(); | ||
69 | let watcher = notify::watcher(input_sender, WATCHER_DELAY)?; | ||
70 | let sender = output_sender.clone(); | ||
71 | let thread = thread::spawn(move || { | ||
72 | input_receiver | ||
73 | .into_iter() | ||
74 | // forward relevant events only | ||
75 | .try_for_each(|change| handle_change_event(change, &output_sender)) | ||
76 | .unwrap() | ||
77 | }); | ||
78 | Ok(Watcher { | ||
79 | watcher, | ||
80 | thread, | ||
81 | sender, | ||
82 | bomb: DropBomb::new(format!("Watcher was not shutdown")), | ||
83 | }) | ||
84 | } | ||
85 | |||
86 | pub fn watch_recursive( | ||
87 | &mut self, | ||
88 | dir: &Path, | ||
89 | filter_entry: impl Fn(&DirEntry) -> bool, | ||
90 | emit_for_contents: bool, | ||
91 | ) { | ||
92 | for res in WalkDir::new(dir).into_iter().filter_entry(filter_entry) { | ||
93 | match res { | ||
94 | Ok(entry) => { | ||
95 | if entry.path().is_dir() { | ||
96 | match self.watcher.watch(dir, RecursiveMode::NonRecursive) { | ||
97 | Ok(()) => log::debug!("watching \"{}\"", dir.display()), | ||
98 | Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e), | ||
99 | } | ||
100 | } | ||
101 | if emit_for_contents && entry.depth() > 0 { | ||
102 | // emit as create because we haven't seen it yet | ||
103 | if let Err(e) = | ||
104 | self.sender | ||
105 | .send(io::Task::HandleChange(WatcherChange::Create( | ||
106 | entry.path().to_path_buf(), | ||
107 | ))) | ||
108 | { | ||
109 | log::warn!("watcher error: {}", e) | ||
110 | } | ||
111 | } | ||
112 | } | ||
113 | Err(e) => log::warn!("watcher error: {}", e), | ||
114 | } | ||
115 | } | ||
116 | } | ||
117 | |||
118 | pub fn shutdown(mut self) -> thread::Result<()> { | ||
119 | self.bomb.defuse(); | ||
120 | drop(self.watcher); | ||
121 | let res = self.thread.join(); | ||
122 | match &res { | ||
123 | Ok(()) => log::info!("... Watcher terminated with ok"), | ||
124 | Err(_) => log::error!("... Watcher terminated with err"), | ||
125 | } | ||
126 | res | ||
127 | } | ||
128 | } | ||