aboutsummaryrefslogtreecommitdiff
path: root/crates/ra_vfs/src/io.rs
diff options
context:
space:
mode:
authorbors[bot] <bors[bot]@users.noreply.github.com>2019-01-26 15:13:44 +0000
committerbors[bot] <bors[bot]@users.noreply.github.com>2019-01-26 15:13:44 +0000
commita8d32c4d1ae4b3e4276f7a97b6c6e5f95f91e67a (patch)
tree9e56b095a073ff3c82322302356f4d096a70afcd /crates/ra_vfs/src/io.rs
parent5af7b2f4af51291fa4a0549c549796ba0520927b (diff)
parent9f16892b94817d144f37dfe0081b39aacec65635 (diff)
Merge #671
671: Makre VFS slightly less super obscure r=vemoo a=matklad I've decided to better understand what we do in VFS, and this turns out to be really hard. Jugling threads and channels is one of the most unfortunately arcane bits of rust... I had some success though by flattenning the structure so that all channel & thread creation routines are on one screen. r? @vemoo Co-authored-by: Aleksey Kladov <[email protected]>
Diffstat (limited to 'crates/ra_vfs/src/io.rs')
-rw-r--r--crates/ra_vfs/src/io.rs271
1 files changed, 215 insertions, 56 deletions
diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs
index 7ca1e9835..d764c534a 100644
--- a/crates/ra_vfs/src/io.rs
+++ b/crates/ra_vfs/src/io.rs
@@ -1,19 +1,22 @@
1use std::{fs, sync::Arc, thread}; 1use std::{
2 2 fs,
3use crossbeam_channel::{Receiver, Sender}; 3 thread,
4 path::{Path, PathBuf},
5 sync::{mpsc, Arc},
6 time::Duration,
7};
8use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select};
4use relative_path::RelativePathBuf; 9use relative_path::RelativePathBuf;
5use thread_worker::WorkerHandle; 10use thread_worker::WorkerHandle;
6use walkdir::WalkDir; 11use walkdir::WalkDir;
12use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher};
7 13
8mod watcher; 14use crate::{RootConfig, Roots, VfsRoot};
9use watcher::Watcher;
10
11use crate::{RootFilter, Roots, VfsRoot};
12 15
13pub(crate) enum Task { 16pub(crate) enum Task {
14 AddRoot { 17 AddRoot {
15 root: VfsRoot, 18 root: VfsRoot,
16 filter: Arc<RootFilter>, 19 config: Arc<RootConfig>,
17 }, 20 },
18} 21}
19 22
@@ -39,6 +42,15 @@ pub enum TaskResult {
39 }, 42 },
40} 43}
41 44
45#[derive(Debug)]
46enum ChangeKind {
47 Create,
48 Write,
49 Remove,
50}
51
52const WATCHER_DELAY: Duration = Duration::from_millis(250);
53
42pub(crate) struct Worker { 54pub(crate) struct Worker {
43 worker: thread_worker::Worker<Task, TaskResult>, 55 worker: thread_worker::Worker<Task, TaskResult>,
44 worker_handle: WorkerHandle, 56 worker_handle: WorkerHandle,
@@ -46,24 +58,75 @@ pub(crate) struct Worker {
46 58
47impl Worker { 59impl Worker {
48 pub(crate) fn start(roots: Arc<Roots>) -> Worker { 60 pub(crate) fn start(roots: Arc<Roots>) -> Worker {
49 let (worker, worker_handle) = 61 // This is a pretty elaborate setup of threads & channels! It is
50 thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| { 62 // explained by the following concerns:
51 let mut watcher = match Watcher::start(roots, output_sender.clone()) { 63 // * we need to burn a thread translating from notify's mpsc to
52 Ok(w) => Some(w), 64 // crossbeam_channel.
53 Err(e) => { 65 // * we want to read all files from a single thread, to gurantee that
54 log::error!("could not start watcher: {}", e); 66 // we always get fresher versions and never go back in time.
55 None 67 // * we want to tear down everything neatly during shutdown.
68 let (worker, worker_handle) = thread_worker::spawn(
69 "vfs",
70 128,
71 // This are the channels we use to communicate with outside world.
72 // If `input_receiver` is closed we need to tear ourselves down.
73 // `output_sender` should not be closed unless the parent died.
74 move |input_receiver, output_sender| {
75 // These are `std` channels notify will send events to
76 let (notify_sender, notify_receiver) = mpsc::channel();
77 // These are the corresponding crossbeam channels
78 let (watcher_sender, watcher_receiver) = unbounded();
79
80 let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY)
81 .map_err(|e| log::error!("failed to spawn notify {}", e))
82 .ok();
83 // Start a silly thread to tranform between two channels
84 let thread = thread::spawn(move || {
85 notify_receiver
86 .into_iter()
87 .for_each(|event| convert_notify_event(event, &watcher_sender))
88 });
89
90 // Process requests from the called or notifications from
91 // watcher until the caller says stop.
92 loop {
93 select! {
94 // Received request from the caller. If this channel is
95 // closed, we should shutdown everything.
96 recv(input_receiver) -> t => match t {
97 Err(RecvError) => {
98 drop(input_receiver);
99 break
100 },
101 Ok(Task::AddRoot { root, config }) => {
102 watch_root(watcher.as_mut(), &output_sender, root, Arc::clone(&config));
103 }
104 },
105 // Watcher send us changes. If **this** channel is
106 // closed, the watcher has died, which indicates a bug
107 // -- escalate!
108 recv(watcher_receiver) -> event => match event {
109 Err(RecvError) => panic!("watcher is dead"),
110 Ok((path, change)) => {
111 handle_change(watcher.as_mut(), &output_sender, &*roots, path, change);
112 }
113 },
56 } 114 }
57 };
58 let res = input_receiver
59 .into_iter()
60 .filter_map(|t| handle_task(t, &mut watcher))
61 .try_for_each(|it| output_sender.send(it));
62 if let Some(watcher) = watcher {
63 let _ = watcher.shutdown();
64 } 115 }
65 res.unwrap() 116 // Stopped the watcher
66 }); 117 drop(watcher.take());
118 // Drain pending events: we are not inrerested in them anyways!
119 watcher_receiver.into_iter().for_each(|_| ());
120
121 let res = thread.join();
122 match &res {
123 Ok(()) => log::info!("... Watcher terminated with ok"),
124 Err(_) => log::error!("... Watcher terminated with err"),
125 }
126 res.unwrap();
127 },
128 );
129
67 Worker { 130 Worker {
68 worker, 131 worker,
69 worker_handle, 132 worker_handle,
@@ -84,46 +147,142 @@ impl Worker {
84 } 147 }
85} 148}
86 149
87fn handle_task(task: Task, watcher: &mut Option<Watcher>) -> Option<TaskResult> { 150fn watch_root(
88 match task { 151 watcher: Option<&mut RecommendedWatcher>,
89 Task::AddRoot { root, filter } => { 152 sender: &Sender<TaskResult>,
90 if let Some(watcher) = watcher { 153 root: VfsRoot,
91 watcher.watch_root(&filter) 154 config: Arc<RootConfig>,
155) {
156 log::debug!("loading {} ...", config.root.as_path().display());
157 let files = watch_recursive(watcher, config.root.as_path(), &*config)
158 .into_iter()
159 .filter_map(|path| {
160 let abs_path = path.to_path(&config.root);
161 let text = read_to_string(&abs_path)?;
162 Some((path, text))
163 })
164 .collect();
165 sender
166 .send(TaskResult::BulkLoadRoot { root, files })
167 .unwrap();
168 log::debug!("... loaded {}", config.root.as_path().display());
169}
170
171fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeKind)>) {
172 // forward relevant events only
173 match event {
174 DebouncedEvent::NoticeWrite(_)
175 | DebouncedEvent::NoticeRemove(_)
176 | DebouncedEvent::Chmod(_) => {
177 // ignore
178 }
179 DebouncedEvent::Rescan => {
180 // TODO rescan all roots
181 }
182 DebouncedEvent::Create(path) => {
183 sender.send((path, ChangeKind::Create)).unwrap();
184 }
185 DebouncedEvent::Write(path) => {
186 sender.send((path, ChangeKind::Write)).unwrap();
187 }
188 DebouncedEvent::Remove(path) => {
189 sender.send((path, ChangeKind::Remove)).unwrap();
190 }
191 DebouncedEvent::Rename(src, dst) => {
192 sender.send((src, ChangeKind::Remove)).unwrap();
193 sender.send((dst, ChangeKind::Create)).unwrap();
194 }
195 DebouncedEvent::Error(err, path) => {
196 // TODO should we reload the file contents?
197 log::warn!("watcher error \"{}\", {:?}", err, path);
198 }
199 }
200}
201
202fn handle_change(
203 watcher: Option<&mut RecommendedWatcher>,
204 sender: &Sender<TaskResult>,
205 roots: &Roots,
206 path: PathBuf,
207 kind: ChangeKind,
208) {
209 let (root, rel_path) = match roots.find(&path) {
210 None => return,
211 Some(it) => it,
212 };
213 let config = &roots[root];
214 match kind {
215 ChangeKind::Create => {
216 let mut paths = Vec::new();
217 if path.is_dir() {
218 paths.extend(watch_recursive(watcher, &path, &config));
219 } else {
220 paths.push(rel_path);
92 } 221 }
93 log::debug!("loading {} ...", filter.root.as_path().display()); 222 paths
94 let files = load_root(filter.as_ref()); 223 .into_iter()
95 log::debug!("... loaded {}", filter.root.as_path().display()); 224 .filter_map(|rel_path| {
96 Some(TaskResult::BulkLoadRoot { root, files }) 225 let abs_path = rel_path.to_path(&config.root);
226 let text = read_to_string(&abs_path)?;
227 Some((rel_path, text))
228 })
229 .try_for_each(|(path, text)| {
230 sender.send(TaskResult::AddSingleFile { root, path, text })
231 })
232 .unwrap()
97 } 233 }
234 ChangeKind::Write => {
235 if let Some(text) = read_to_string(&path) {
236 sender
237 .send(TaskResult::ChangeSingleFile {
238 root,
239 path: rel_path,
240 text,
241 })
242 .unwrap();
243 }
244 }
245 ChangeKind::Remove => sender
246 .send(TaskResult::RemoveSingleFile {
247 root,
248 path: rel_path,
249 })
250 .unwrap(),
98 } 251 }
99} 252}
100 253
101fn load_root(filter: &RootFilter) -> Vec<(RelativePathBuf, String)> { 254fn watch_recursive(
102 let mut res = Vec::new(); 255 mut watcher: Option<&mut RecommendedWatcher>,
103 for entry in WalkDir::new(&filter.root) 256 dir: &Path,
257 config: &RootConfig,
258) -> Vec<RelativePathBuf> {
259 let mut files = Vec::new();
260 for entry in WalkDir::new(dir)
104 .into_iter() 261 .into_iter()
105 .filter_entry(filter.entry_filter()) 262 .filter_entry(|it| config.contains(it.path()).is_some())
263 .filter_map(|it| it.map_err(|e| log::warn!("watcher error: {}", e)).ok())
106 { 264 {
107 let entry = match entry { 265 if entry.file_type().is_dir() {
108 Ok(entry) => entry, 266 if let Some(watcher) = &mut watcher {
109 Err(e) => { 267 watch_one(watcher, entry.path());
110 log::warn!("watcher error: {}", e);
111 continue;
112 } 268 }
113 }; 269 } else {
114 if !entry.file_type().is_file() { 270 let path = config.contains(entry.path()).unwrap();
115 continue; 271 files.push(path.to_owned());
116 } 272 }
117 let path = entry.path();
118 let text = match fs::read_to_string(path) {
119 Ok(text) => text,
120 Err(e) => {
121 log::warn!("watcher error: {}", e);
122 continue;
123 }
124 };
125 let path = RelativePathBuf::from_path(path.strip_prefix(&filter.root).unwrap()).unwrap();
126 res.push((path.to_owned(), text))
127 } 273 }
128 res 274 files
275}
276
277fn watch_one(watcher: &mut RecommendedWatcher, dir: &Path) {
278 match watcher.watch(dir, RecursiveMode::NonRecursive) {
279 Ok(()) => log::debug!("watching \"{}\"", dir.display()),
280 Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e),
281 }
282}
283
284fn read_to_string(path: &Path) -> Option<String> {
285 fs::read_to_string(&path)
286 .map_err(|e| log::warn!("failed to read file {}", e))
287 .ok()
129} 288}