aboutsummaryrefslogtreecommitdiff
path: root/crates/ra_vfs/src/io/mod.rs
diff options
context:
space:
mode:
authorBernardo <[email protected]>2019-01-21 17:59:54 +0000
committerAleksey Kladov <[email protected]>2019-01-26 08:46:27 +0000
commit277e0f1baa21b8f3e5b040b78ce2bd6beca6cd7c (patch)
tree2445703704ed75987a157302401c5d144788c8a2 /crates/ra_vfs/src/io/mod.rs
parent7f7c4e7465f58cdbfdaaf232d571960f1b754b7c (diff)
move watcher to io module
Diffstat (limited to 'crates/ra_vfs/src/io/mod.rs')
-rw-r--r--crates/ra_vfs/src/io/mod.rs212
1 files changed, 212 insertions, 0 deletions
diff --git a/crates/ra_vfs/src/io/mod.rs b/crates/ra_vfs/src/io/mod.rs
new file mode 100644
index 000000000..6d5af7690
--- /dev/null
+++ b/crates/ra_vfs/src/io/mod.rs
@@ -0,0 +1,212 @@
1use std::{
2 fmt, fs,
3 path::{Path, PathBuf},
4 sync::Arc,
5 thread,
6};
7
8use crossbeam_channel::{Receiver, Sender};
9use parking_lot::Mutex;
10use relative_path::RelativePathBuf;
11use thread_worker::WorkerHandle;
12use walkdir::{DirEntry, WalkDir};
13
14mod watcher;
15use watcher::Watcher;
16pub use watcher::WatcherChange;
17
18use crate::VfsRoot;
19
20pub(crate) enum Task {
21 AddRoot {
22 root: VfsRoot,
23 path: PathBuf,
24 filter: Box<Fn(&DirEntry) -> bool + Send>,
25 },
26 /// this variant should only be created by the watcher
27 HandleChange(WatcherChange),
28 LoadChange(WatcherChange),
29 Watch {
30 dir: PathBuf,
31 filter: Box<Fn(&DirEntry) -> bool + Send>,
32 },
33}
34
35#[derive(Debug)]
36pub struct AddRootResult {
37 pub(crate) root: VfsRoot,
38 pub(crate) files: Vec<(RelativePathBuf, String)>,
39}
40
41#[derive(Debug)]
42pub enum WatcherChangeData {
43 Create { path: PathBuf, text: String },
44 Write { path: PathBuf, text: String },
45 Remove { path: PathBuf },
46}
47
48pub enum TaskResult {
49 AddRoot(AddRootResult),
50 HandleChange(WatcherChange),
51 LoadChange(WatcherChangeData),
52 NoOp,
53}
54
55impl fmt::Debug for TaskResult {
56 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
57 f.write_str("TaskResult { ... }")
58 }
59}
60
61pub(crate) struct Worker {
62 worker: thread_worker::Worker<Task, TaskResult>,
63 worker_handle: WorkerHandle,
64 watcher: Arc<Mutex<Option<Watcher>>>,
65}
66
67impl Worker {
68 pub(crate) fn start() -> Worker {
69 let watcher = Arc::new(Mutex::new(None));
70 let watcher_clone = watcher.clone();
71 let (worker, worker_handle) =
72 thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| {
73 input_receiver
74 .into_iter()
75 .map(|t| handle_task(t, &watcher_clone))
76 .try_for_each(|it| output_sender.send(it))
77 .unwrap()
78 });
79 match Watcher::start(worker.inp.clone()) {
80 Ok(w) => {
81 watcher.lock().replace(w);
82 }
83 Err(e) => log::error!("could not start watcher: {}", e),
84 };
85 Worker {
86 worker,
87 worker_handle,
88 watcher,
89 }
90 }
91
92 pub(crate) fn sender(&self) -> &Sender<Task> {
93 &self.worker.inp
94 }
95
96 pub(crate) fn receiver(&self) -> &Receiver<TaskResult> {
97 &self.worker.out
98 }
99
100 pub(crate) fn shutdown(self) -> thread::Result<()> {
101 if let Some(watcher) = self.watcher.lock().take() {
102 let _ = watcher.shutdown();
103 }
104 let _ = self.worker.shutdown();
105 self.worker_handle.shutdown()
106 }
107}
108
109fn watch(
110 watcher: &Arc<Mutex<Option<Watcher>>>,
111 dir: &Path,
112 filter_entry: impl Fn(&DirEntry) -> bool,
113 emit_for_existing: bool,
114) {
115 let mut watcher = watcher.lock();
116 let watcher = match *watcher {
117 Some(ref mut w) => w,
118 None => {
119 // watcher dropped or couldn't start
120 return;
121 }
122 };
123 watcher.watch_recursive(dir, filter_entry, emit_for_existing)
124}
125
126fn handle_task(task: Task, watcher: &Arc<Mutex<Option<Watcher>>>) -> TaskResult {
127 match task {
128 Task::AddRoot { root, path, filter } => {
129 watch(watcher, &path, &*filter, false);
130 log::debug!("loading {} ...", path.as_path().display());
131 let files = load_root(path.as_path(), &*filter);
132 log::debug!("... loaded {}", path.as_path().display());
133 TaskResult::AddRoot(AddRootResult { root, files })
134 }
135 Task::HandleChange(change) => {
136 // forward as is because Vfs has to decide if we should load it
137 TaskResult::HandleChange(change)
138 }
139 Task::LoadChange(change) => {
140 log::debug!("loading {:?} ...", change);
141 match load_change(change) {
142 Some(data) => TaskResult::LoadChange(data),
143 None => TaskResult::NoOp,
144 }
145 }
146 Task::Watch { dir, filter } => {
147 watch(watcher, &dir, &*filter, true);
148 TaskResult::NoOp
149 }
150 }
151}
152
153fn load_root(root: &Path, filter: &dyn Fn(&DirEntry) -> bool) -> Vec<(RelativePathBuf, String)> {
154 let mut res = Vec::new();
155 for entry in WalkDir::new(root).into_iter().filter_entry(filter) {
156 let entry = match entry {
157 Ok(entry) => entry,
158 Err(e) => {
159 log::warn!("watcher error: {}", e);
160 continue;
161 }
162 };
163 if !entry.file_type().is_file() {
164 continue;
165 }
166 let path = entry.path();
167 let text = match fs::read_to_string(path) {
168 Ok(text) => text,
169 Err(e) => {
170 log::warn!("watcher error: {}", e);
171 continue;
172 }
173 };
174 let path = RelativePathBuf::from_path(path.strip_prefix(root).unwrap()).unwrap();
175 res.push((path.to_owned(), text))
176 }
177 res
178}
179
180fn load_change(change: WatcherChange) -> Option<WatcherChangeData> {
181 let data = match change {
182 WatcherChange::Create(path) => {
183 if path.is_dir() {
184 return None;
185 }
186 let text = match fs::read_to_string(&path) {
187 Ok(text) => text,
188 Err(e) => {
189 log::warn!("watcher error \"{}\": {}", path.display(), e);
190 return None;
191 }
192 };
193 WatcherChangeData::Create { path, text }
194 }
195 WatcherChange::Write(path) => {
196 let text = match fs::read_to_string(&path) {
197 Ok(text) => text,
198 Err(e) => {
199 log::warn!("watcher error \"{}\": {}", path.display(), e);
200 return None;
201 }
202 };
203 WatcherChangeData::Write { path, text }
204 }
205 WatcherChange::Remove(path) => WatcherChangeData::Remove { path },
206 WatcherChange::Rescan => {
207 // this should be handled by Vfs::handle_task
208 return None;
209 }
210 };
211 Some(data)
212}