aboutsummaryrefslogtreecommitdiff
path: root/crates/ra_vfs/src/io
diff options
context:
space:
mode:
authorBernardo <[email protected]>2019-01-21 17:59:54 +0000
committerAleksey Kladov <[email protected]>2019-01-26 08:46:27 +0000
commit277e0f1baa21b8f3e5b040b78ce2bd6beca6cd7c (patch)
tree2445703704ed75987a157302401c5d144788c8a2 /crates/ra_vfs/src/io
parent7f7c4e7465f58cdbfdaaf232d571960f1b754b7c (diff)
move watcher to io module
Diffstat (limited to 'crates/ra_vfs/src/io')
-rw-r--r--crates/ra_vfs/src/io/mod.rs212
-rw-r--r--crates/ra_vfs/src/io/watcher.rs128
2 files changed, 340 insertions, 0 deletions
diff --git a/crates/ra_vfs/src/io/mod.rs b/crates/ra_vfs/src/io/mod.rs
new file mode 100644
index 000000000..6d5af7690
--- /dev/null
+++ b/crates/ra_vfs/src/io/mod.rs
@@ -0,0 +1,212 @@
1use std::{
2 fmt, fs,
3 path::{Path, PathBuf},
4 sync::Arc,
5 thread,
6};
7
8use crossbeam_channel::{Receiver, Sender};
9use parking_lot::Mutex;
10use relative_path::RelativePathBuf;
11use thread_worker::WorkerHandle;
12use walkdir::{DirEntry, WalkDir};
13
14mod watcher;
15use watcher::Watcher;
16pub use watcher::WatcherChange;
17
18use crate::VfsRoot;
19
20pub(crate) enum Task {
21 AddRoot {
22 root: VfsRoot,
23 path: PathBuf,
24 filter: Box<Fn(&DirEntry) -> bool + Send>,
25 },
26 /// this variant should only be created by the watcher
27 HandleChange(WatcherChange),
28 LoadChange(WatcherChange),
29 Watch {
30 dir: PathBuf,
31 filter: Box<Fn(&DirEntry) -> bool + Send>,
32 },
33}
34
35#[derive(Debug)]
36pub struct AddRootResult {
37 pub(crate) root: VfsRoot,
38 pub(crate) files: Vec<(RelativePathBuf, String)>,
39}
40
41#[derive(Debug)]
42pub enum WatcherChangeData {
43 Create { path: PathBuf, text: String },
44 Write { path: PathBuf, text: String },
45 Remove { path: PathBuf },
46}
47
48pub enum TaskResult {
49 AddRoot(AddRootResult),
50 HandleChange(WatcherChange),
51 LoadChange(WatcherChangeData),
52 NoOp,
53}
54
55impl fmt::Debug for TaskResult {
56 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
57 f.write_str("TaskResult { ... }")
58 }
59}
60
61pub(crate) struct Worker {
62 worker: thread_worker::Worker<Task, TaskResult>,
63 worker_handle: WorkerHandle,
64 watcher: Arc<Mutex<Option<Watcher>>>,
65}
66
67impl Worker {
68 pub(crate) fn start() -> Worker {
69 let watcher = Arc::new(Mutex::new(None));
70 let watcher_clone = watcher.clone();
71 let (worker, worker_handle) =
72 thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| {
73 input_receiver
74 .into_iter()
75 .map(|t| handle_task(t, &watcher_clone))
76 .try_for_each(|it| output_sender.send(it))
77 .unwrap()
78 });
79 match Watcher::start(worker.inp.clone()) {
80 Ok(w) => {
81 watcher.lock().replace(w);
82 }
83 Err(e) => log::error!("could not start watcher: {}", e),
84 };
85 Worker {
86 worker,
87 worker_handle,
88 watcher,
89 }
90 }
91
92 pub(crate) fn sender(&self) -> &Sender<Task> {
93 &self.worker.inp
94 }
95
96 pub(crate) fn receiver(&self) -> &Receiver<TaskResult> {
97 &self.worker.out
98 }
99
100 pub(crate) fn shutdown(self) -> thread::Result<()> {
101 if let Some(watcher) = self.watcher.lock().take() {
102 let _ = watcher.shutdown();
103 }
104 let _ = self.worker.shutdown();
105 self.worker_handle.shutdown()
106 }
107}
108
109fn watch(
110 watcher: &Arc<Mutex<Option<Watcher>>>,
111 dir: &Path,
112 filter_entry: impl Fn(&DirEntry) -> bool,
113 emit_for_existing: bool,
114) {
115 let mut watcher = watcher.lock();
116 let watcher = match *watcher {
117 Some(ref mut w) => w,
118 None => {
119 // watcher dropped or couldn't start
120 return;
121 }
122 };
123 watcher.watch_recursive(dir, filter_entry, emit_for_existing)
124}
125
126fn handle_task(task: Task, watcher: &Arc<Mutex<Option<Watcher>>>) -> TaskResult {
127 match task {
128 Task::AddRoot { root, path, filter } => {
129 watch(watcher, &path, &*filter, false);
130 log::debug!("loading {} ...", path.as_path().display());
131 let files = load_root(path.as_path(), &*filter);
132 log::debug!("... loaded {}", path.as_path().display());
133 TaskResult::AddRoot(AddRootResult { root, files })
134 }
135 Task::HandleChange(change) => {
136 // forward as is because Vfs has to decide if we should load it
137 TaskResult::HandleChange(change)
138 }
139 Task::LoadChange(change) => {
140 log::debug!("loading {:?} ...", change);
141 match load_change(change) {
142 Some(data) => TaskResult::LoadChange(data),
143 None => TaskResult::NoOp,
144 }
145 }
146 Task::Watch { dir, filter } => {
147 watch(watcher, &dir, &*filter, true);
148 TaskResult::NoOp
149 }
150 }
151}
152
153fn load_root(root: &Path, filter: &dyn Fn(&DirEntry) -> bool) -> Vec<(RelativePathBuf, String)> {
154 let mut res = Vec::new();
155 for entry in WalkDir::new(root).into_iter().filter_entry(filter) {
156 let entry = match entry {
157 Ok(entry) => entry,
158 Err(e) => {
159 log::warn!("watcher error: {}", e);
160 continue;
161 }
162 };
163 if !entry.file_type().is_file() {
164 continue;
165 }
166 let path = entry.path();
167 let text = match fs::read_to_string(path) {
168 Ok(text) => text,
169 Err(e) => {
170 log::warn!("watcher error: {}", e);
171 continue;
172 }
173 };
174 let path = RelativePathBuf::from_path(path.strip_prefix(root).unwrap()).unwrap();
175 res.push((path.to_owned(), text))
176 }
177 res
178}
179
180fn load_change(change: WatcherChange) -> Option<WatcherChangeData> {
181 let data = match change {
182 WatcherChange::Create(path) => {
183 if path.is_dir() {
184 return None;
185 }
186 let text = match fs::read_to_string(&path) {
187 Ok(text) => text,
188 Err(e) => {
189 log::warn!("watcher error \"{}\": {}", path.display(), e);
190 return None;
191 }
192 };
193 WatcherChangeData::Create { path, text }
194 }
195 WatcherChange::Write(path) => {
196 let text = match fs::read_to_string(&path) {
197 Ok(text) => text,
198 Err(e) => {
199 log::warn!("watcher error \"{}\": {}", path.display(), e);
200 return None;
201 }
202 };
203 WatcherChangeData::Write { path, text }
204 }
205 WatcherChange::Remove(path) => WatcherChangeData::Remove { path },
206 WatcherChange::Rescan => {
207 // this should be handled by Vfs::handle_task
208 return None;
209 }
210 };
211 Some(data)
212}
diff --git a/crates/ra_vfs/src/io/watcher.rs b/crates/ra_vfs/src/io/watcher.rs
new file mode 100644
index 000000000..e33298477
--- /dev/null
+++ b/crates/ra_vfs/src/io/watcher.rs
@@ -0,0 +1,128 @@
1use crate::io;
2use crossbeam_channel::Sender;
3use drop_bomb::DropBomb;
4use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher};
5use std::{
6 path::{Path, PathBuf},
7 sync::mpsc,
8 thread,
9 time::Duration,
10};
11use walkdir::{DirEntry, WalkDir};
12
13#[derive(Debug)]
14pub enum WatcherChange {
15 Create(PathBuf),
16 Write(PathBuf),
17 Remove(PathBuf),
18 Rescan,
19}
20
21fn handle_change_event(
22 ev: DebouncedEvent,
23 sender: &Sender<io::Task>,
24) -> Result<(), Box<std::error::Error>> {
25 match ev {
26 DebouncedEvent::NoticeWrite(_)
27 | DebouncedEvent::NoticeRemove(_)
28 | DebouncedEvent::Chmod(_) => {
29 // ignore
30 }
31 DebouncedEvent::Rescan => {
32 sender.send(io::Task::HandleChange(WatcherChange::Rescan))?;
33 }
34 DebouncedEvent::Create(path) => {
35 sender.send(io::Task::HandleChange(WatcherChange::Create(path)))?;
36 }
37 DebouncedEvent::Write(path) => {
38 sender.send(io::Task::HandleChange(WatcherChange::Write(path)))?;
39 }
40 DebouncedEvent::Remove(path) => {
41 sender.send(io::Task::HandleChange(WatcherChange::Remove(path)))?;
42 }
43 DebouncedEvent::Rename(src, dst) => {
44 sender.send(io::Task::HandleChange(WatcherChange::Remove(src)))?;
45 sender.send(io::Task::HandleChange(WatcherChange::Create(dst)))?;
46 }
47 DebouncedEvent::Error(err, path) => {
48 // TODO should we reload the file contents?
49 log::warn!("watcher error \"{}\", {:?}", err, path);
50 }
51 }
52 Ok(())
53}
54
55const WATCHER_DELAY: Duration = Duration::from_millis(250);
56
57pub(crate) struct Watcher {
58 watcher: RecommendedWatcher,
59 thread: thread::JoinHandle<()>,
60 bomb: DropBomb,
61 sender: Sender<io::Task>,
62}
63
64impl Watcher {
65 pub(crate) fn start(
66 output_sender: Sender<io::Task>,
67 ) -> Result<Watcher, Box<std::error::Error>> {
68 let (input_sender, input_receiver) = mpsc::channel();
69 let watcher = notify::watcher(input_sender, WATCHER_DELAY)?;
70 let sender = output_sender.clone();
71 let thread = thread::spawn(move || {
72 input_receiver
73 .into_iter()
74 // forward relevant events only
75 .try_for_each(|change| handle_change_event(change, &output_sender))
76 .unwrap()
77 });
78 Ok(Watcher {
79 watcher,
80 thread,
81 sender,
82 bomb: DropBomb::new(format!("Watcher was not shutdown")),
83 })
84 }
85
86 pub fn watch_recursive(
87 &mut self,
88 dir: &Path,
89 filter_entry: impl Fn(&DirEntry) -> bool,
90 emit_for_contents: bool,
91 ) {
92 for res in WalkDir::new(dir).into_iter().filter_entry(filter_entry) {
93 match res {
94 Ok(entry) => {
95 if entry.path().is_dir() {
96 match self.watcher.watch(dir, RecursiveMode::NonRecursive) {
97 Ok(()) => log::debug!("watching \"{}\"", dir.display()),
98 Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e),
99 }
100 }
101 if emit_for_contents && entry.depth() > 0 {
102 // emit as create because we haven't seen it yet
103 if let Err(e) =
104 self.sender
105 .send(io::Task::HandleChange(WatcherChange::Create(
106 entry.path().to_path_buf(),
107 )))
108 {
109 log::warn!("watcher error: {}", e)
110 }
111 }
112 }
113 Err(e) => log::warn!("watcher error: {}", e),
114 }
115 }
116 }
117
118 pub fn shutdown(mut self) -> thread::Result<()> {
119 self.bomb.defuse();
120 drop(self.watcher);
121 let res = self.thread.join();
122 match &res {
123 Ok(()) => log::info!("... Watcher terminated with ok"),
124 Err(_) => log::error!("... Watcher terminated with err"),
125 }
126 res
127 }
128}