diff options
author | Bernardo <[email protected]> | 2019-01-21 17:59:54 +0000 |
---|---|---|
committer | Aleksey Kladov <[email protected]> | 2019-01-26 08:46:27 +0000 |
commit | 277e0f1baa21b8f3e5b040b78ce2bd6beca6cd7c (patch) | |
tree | 2445703704ed75987a157302401c5d144788c8a2 /crates/ra_vfs/src/io/watcher.rs | |
parent | 7f7c4e7465f58cdbfdaaf232d571960f1b754b7c (diff) |
move watcher to io module
Diffstat (limited to 'crates/ra_vfs/src/io/watcher.rs')
-rw-r--r-- | crates/ra_vfs/src/io/watcher.rs | 128 |
1 files changed, 128 insertions, 0 deletions
diff --git a/crates/ra_vfs/src/io/watcher.rs b/crates/ra_vfs/src/io/watcher.rs new file mode 100644 index 000000000..e33298477 --- /dev/null +++ b/crates/ra_vfs/src/io/watcher.rs | |||
@@ -0,0 +1,128 @@ | |||
1 | use crate::io; | ||
2 | use crossbeam_channel::Sender; | ||
3 | use drop_bomb::DropBomb; | ||
4 | use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher}; | ||
5 | use std::{ | ||
6 | path::{Path, PathBuf}, | ||
7 | sync::mpsc, | ||
8 | thread, | ||
9 | time::Duration, | ||
10 | }; | ||
11 | use walkdir::{DirEntry, WalkDir}; | ||
12 | |||
13 | #[derive(Debug)] | ||
14 | pub enum WatcherChange { | ||
15 | Create(PathBuf), | ||
16 | Write(PathBuf), | ||
17 | Remove(PathBuf), | ||
18 | Rescan, | ||
19 | } | ||
20 | |||
21 | fn handle_change_event( | ||
22 | ev: DebouncedEvent, | ||
23 | sender: &Sender<io::Task>, | ||
24 | ) -> Result<(), Box<std::error::Error>> { | ||
25 | match ev { | ||
26 | DebouncedEvent::NoticeWrite(_) | ||
27 | | DebouncedEvent::NoticeRemove(_) | ||
28 | | DebouncedEvent::Chmod(_) => { | ||
29 | // ignore | ||
30 | } | ||
31 | DebouncedEvent::Rescan => { | ||
32 | sender.send(io::Task::HandleChange(WatcherChange::Rescan))?; | ||
33 | } | ||
34 | DebouncedEvent::Create(path) => { | ||
35 | sender.send(io::Task::HandleChange(WatcherChange::Create(path)))?; | ||
36 | } | ||
37 | DebouncedEvent::Write(path) => { | ||
38 | sender.send(io::Task::HandleChange(WatcherChange::Write(path)))?; | ||
39 | } | ||
40 | DebouncedEvent::Remove(path) => { | ||
41 | sender.send(io::Task::HandleChange(WatcherChange::Remove(path)))?; | ||
42 | } | ||
43 | DebouncedEvent::Rename(src, dst) => { | ||
44 | sender.send(io::Task::HandleChange(WatcherChange::Remove(src)))?; | ||
45 | sender.send(io::Task::HandleChange(WatcherChange::Create(dst)))?; | ||
46 | } | ||
47 | DebouncedEvent::Error(err, path) => { | ||
48 | // TODO should we reload the file contents? | ||
49 | log::warn!("watcher error \"{}\", {:?}", err, path); | ||
50 | } | ||
51 | } | ||
52 | Ok(()) | ||
53 | } | ||
54 | |||
55 | const WATCHER_DELAY: Duration = Duration::from_millis(250); | ||
56 | |||
57 | pub(crate) struct Watcher { | ||
58 | watcher: RecommendedWatcher, | ||
59 | thread: thread::JoinHandle<()>, | ||
60 | bomb: DropBomb, | ||
61 | sender: Sender<io::Task>, | ||
62 | } | ||
63 | |||
64 | impl Watcher { | ||
65 | pub(crate) fn start( | ||
66 | output_sender: Sender<io::Task>, | ||
67 | ) -> Result<Watcher, Box<std::error::Error>> { | ||
68 | let (input_sender, input_receiver) = mpsc::channel(); | ||
69 | let watcher = notify::watcher(input_sender, WATCHER_DELAY)?; | ||
70 | let sender = output_sender.clone(); | ||
71 | let thread = thread::spawn(move || { | ||
72 | input_receiver | ||
73 | .into_iter() | ||
74 | // forward relevant events only | ||
75 | .try_for_each(|change| handle_change_event(change, &output_sender)) | ||
76 | .unwrap() | ||
77 | }); | ||
78 | Ok(Watcher { | ||
79 | watcher, | ||
80 | thread, | ||
81 | sender, | ||
82 | bomb: DropBomb::new(format!("Watcher was not shutdown")), | ||
83 | }) | ||
84 | } | ||
85 | |||
86 | pub fn watch_recursive( | ||
87 | &mut self, | ||
88 | dir: &Path, | ||
89 | filter_entry: impl Fn(&DirEntry) -> bool, | ||
90 | emit_for_contents: bool, | ||
91 | ) { | ||
92 | for res in WalkDir::new(dir).into_iter().filter_entry(filter_entry) { | ||
93 | match res { | ||
94 | Ok(entry) => { | ||
95 | if entry.path().is_dir() { | ||
96 | match self.watcher.watch(dir, RecursiveMode::NonRecursive) { | ||
97 | Ok(()) => log::debug!("watching \"{}\"", dir.display()), | ||
98 | Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e), | ||
99 | } | ||
100 | } | ||
101 | if emit_for_contents && entry.depth() > 0 { | ||
102 | // emit as create because we haven't seen it yet | ||
103 | if let Err(e) = | ||
104 | self.sender | ||
105 | .send(io::Task::HandleChange(WatcherChange::Create( | ||
106 | entry.path().to_path_buf(), | ||
107 | ))) | ||
108 | { | ||
109 | log::warn!("watcher error: {}", e) | ||
110 | } | ||
111 | } | ||
112 | } | ||
113 | Err(e) => log::warn!("watcher error: {}", e), | ||
114 | } | ||
115 | } | ||
116 | } | ||
117 | |||
118 | pub fn shutdown(mut self) -> thread::Result<()> { | ||
119 | self.bomb.defuse(); | ||
120 | drop(self.watcher); | ||
121 | let res = self.thread.join(); | ||
122 | match &res { | ||
123 | Ok(()) => log::info!("... Watcher terminated with ok"), | ||
124 | Err(_) => log::error!("... Watcher terminated with err"), | ||
125 | } | ||
126 | res | ||
127 | } | ||
128 | } | ||