diff options
author | Aleksey Kladov <[email protected]> | 2019-01-26 14:17:28 +0000 |
---|---|---|
committer | Aleksey Kladov <[email protected]> | 2019-01-26 14:17:28 +0000 |
commit | 9f16892b94817d144f37dfe0081b39aacec65635 (patch) | |
tree | 0aeca750f2aefb68ee1e062b41d852d8d0ca9dd4 /crates/ra_vfs/src | |
parent | bf98fc609e2b587d8455bf2bec3ca35f85cf0700 (diff) |
remove watcher ctx
Diffstat (limited to 'crates/ra_vfs/src')
-rw-r--r-- | crates/ra_vfs/src/io.rs | 142 |
1 files changed, 77 insertions, 65 deletions
diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs index 8c719dc5d..d764c534a 100644 --- a/crates/ra_vfs/src/io.rs +++ b/crates/ra_vfs/src/io.rs | |||
@@ -13,8 +13,6 @@ use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watc | |||
13 | 13 | ||
14 | use crate::{RootConfig, Roots, VfsRoot}; | 14 | use crate::{RootConfig, Roots, VfsRoot}; |
15 | 15 | ||
16 | type Result<T> = std::result::Result<T, crossbeam_channel::SendError<TaskResult>>; | ||
17 | |||
18 | pub(crate) enum Task { | 16 | pub(crate) enum Task { |
19 | AddRoot { | 17 | AddRoot { |
20 | root: VfsRoot, | 18 | root: VfsRoot, |
@@ -62,7 +60,6 @@ impl Worker { | |||
62 | pub(crate) fn start(roots: Arc<Roots>) -> Worker { | 60 | pub(crate) fn start(roots: Arc<Roots>) -> Worker { |
63 | // This is a pretty elaborate setup of threads & channels! It is | 61 | // This is a pretty elaborate setup of threads & channels! It is |
64 | // explained by the following concerns: | 62 | // explained by the following concerns: |
65 | |||
66 | // * we need to burn a thread translating from notify's mpsc to | 63 | // * we need to burn a thread translating from notify's mpsc to |
67 | // crossbeam_channel. | 64 | // crossbeam_channel. |
68 | // * we want to read all files from a single thread, to gurantee that | 65 | // * we want to read all files from a single thread, to gurantee that |
@@ -79,48 +76,57 @@ impl Worker { | |||
79 | let (notify_sender, notify_receiver) = mpsc::channel(); | 76 | let (notify_sender, notify_receiver) = mpsc::channel(); |
80 | // These are the corresponding crossbeam channels | 77 | // These are the corresponding crossbeam channels |
81 | let (watcher_sender, watcher_receiver) = unbounded(); | 78 | let (watcher_sender, watcher_receiver) = unbounded(); |
82 | let watcher = notify::watcher(notify_sender, WATCHER_DELAY) | 79 | |
80 | let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) | ||
83 | .map_err(|e| log::error!("failed to spawn notify {}", e)) | 81 | .map_err(|e| log::error!("failed to spawn notify {}", e)) |
84 | .ok(); | 82 | .ok(); |
85 | let mut ctx = WatcherCtx { | 83 | // Start a silly thread to tranform between two channels |
86 | roots, | ||
87 | watcher, | ||
88 | sender: output_sender, | ||
89 | }; | ||
90 | let thread = thread::spawn(move || { | 84 | let thread = thread::spawn(move || { |
91 | let _ = notify_receiver | 85 | notify_receiver |
92 | .into_iter() | 86 | .into_iter() |
93 | // forward relevant events only | 87 | .for_each(|event| convert_notify_event(event, &watcher_sender)) |
94 | .for_each(|event| convert_notify_event(event, &watcher_sender)); | ||
95 | }); | 88 | }); |
96 | 89 | ||
90 | // Process requests from the called or notifications from | ||
91 | // watcher until the caller says stop. | ||
97 | loop { | 92 | loop { |
98 | select! { | 93 | select! { |
99 | // Received request from the caller. If this channel is | 94 | // Received request from the caller. If this channel is |
100 | // closed, we should shutdown everything. | 95 | // closed, we should shutdown everything. |
101 | recv(input_receiver) -> t => match t { | 96 | recv(input_receiver) -> t => match t { |
102 | Err(RecvError) => break, | 97 | Err(RecvError) => { |
103 | Ok(Task::AddRoot { root, config }) => watch_root(&mut ctx, root, Arc::clone(&config)), | 98 | drop(input_receiver); |
99 | break | ||
100 | }, | ||
101 | Ok(Task::AddRoot { root, config }) => { | ||
102 | watch_root(watcher.as_mut(), &output_sender, root, Arc::clone(&config)); | ||
103 | } | ||
104 | }, | 104 | }, |
105 | // Watcher send us changes. If **this** channel is | 105 | // Watcher send us changes. If **this** channel is |
106 | // closed, the watcher has died, which indicates a bug | 106 | // closed, the watcher has died, which indicates a bug |
107 | // -- escalate! | 107 | // -- escalate! |
108 | recv(watcher_receiver) -> event => match event { | 108 | recv(watcher_receiver) -> event => match event { |
109 | Err(RecvError) => panic!("watcher is dead"), | 109 | Err(RecvError) => panic!("watcher is dead"), |
110 | Ok((path, change)) => WatcherCtx::handle_change(&mut ctx, path, change).unwrap(), | 110 | Ok((path, change)) => { |
111 | handle_change(watcher.as_mut(), &output_sender, &*roots, path, change); | ||
112 | } | ||
111 | }, | 113 | }, |
112 | } | 114 | } |
113 | } | 115 | } |
114 | drop(ctx.watcher.take()); | 116 | // Stopped the watcher |
115 | drop(ctx); | 117 | drop(watcher.take()); |
116 | let res2 = thread.join(); | 118 | // Drain pending events: we are not inrerested in them anyways! |
117 | match &res2 { | 119 | watcher_receiver.into_iter().for_each(|_| ()); |
120 | |||
121 | let res = thread.join(); | ||
122 | match &res { | ||
118 | Ok(()) => log::info!("... Watcher terminated with ok"), | 123 | Ok(()) => log::info!("... Watcher terminated with ok"), |
119 | Err(_) => log::error!("... Watcher terminated with err"), | 124 | Err(_) => log::error!("... Watcher terminated with err"), |
120 | } | 125 | } |
121 | res2.unwrap(); | 126 | res.unwrap(); |
122 | }, | 127 | }, |
123 | ); | 128 | ); |
129 | |||
124 | Worker { | 130 | Worker { |
125 | worker, | 131 | worker, |
126 | worker_handle, | 132 | worker_handle, |
@@ -141,9 +147,14 @@ impl Worker { | |||
141 | } | 147 | } |
142 | } | 148 | } |
143 | 149 | ||
144 | fn watch_root(woker: &mut WatcherCtx, root: VfsRoot, config: Arc<RootConfig>) { | 150 | fn watch_root( |
151 | watcher: Option<&mut RecommendedWatcher>, | ||
152 | sender: &Sender<TaskResult>, | ||
153 | root: VfsRoot, | ||
154 | config: Arc<RootConfig>, | ||
155 | ) { | ||
145 | log::debug!("loading {} ...", config.root.as_path().display()); | 156 | log::debug!("loading {} ...", config.root.as_path().display()); |
146 | let files = watch_recursive(woker.watcher.as_mut(), config.root.as_path(), &*config) | 157 | let files = watch_recursive(watcher, config.root.as_path(), &*config) |
147 | .into_iter() | 158 | .into_iter() |
148 | .filter_map(|path| { | 159 | .filter_map(|path| { |
149 | let abs_path = path.to_path(&config.root); | 160 | let abs_path = path.to_path(&config.root); |
@@ -151,20 +162,14 @@ fn watch_root(woker: &mut WatcherCtx, root: VfsRoot, config: Arc<RootConfig>) { | |||
151 | Some((path, text)) | 162 | Some((path, text)) |
152 | }) | 163 | }) |
153 | .collect(); | 164 | .collect(); |
154 | woker | 165 | sender |
155 | .sender | ||
156 | .send(TaskResult::BulkLoadRoot { root, files }) | 166 | .send(TaskResult::BulkLoadRoot { root, files }) |
157 | .unwrap(); | 167 | .unwrap(); |
158 | log::debug!("... loaded {}", config.root.as_path().display()); | 168 | log::debug!("... loaded {}", config.root.as_path().display()); |
159 | } | 169 | } |
160 | 170 | ||
161 | struct WatcherCtx { | ||
162 | roots: Arc<Roots>, | ||
163 | watcher: Option<RecommendedWatcher>, | ||
164 | sender: Sender<TaskResult>, | ||
165 | } | ||
166 | |||
167 | fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeKind)>) { | 171 | fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeKind)>) { |
172 | // forward relevant events only | ||
168 | match event { | 173 | match event { |
169 | DebouncedEvent::NoticeWrite(_) | 174 | DebouncedEvent::NoticeWrite(_) |
170 | | DebouncedEvent::NoticeRemove(_) | 175 | | DebouncedEvent::NoticeRemove(_) |
@@ -194,48 +199,55 @@ fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeK | |||
194 | } | 199 | } |
195 | } | 200 | } |
196 | 201 | ||
197 | impl WatcherCtx { | 202 | fn handle_change( |
198 | fn handle_change(&mut self, path: PathBuf, kind: ChangeKind) -> Result<()> { | 203 | watcher: Option<&mut RecommendedWatcher>, |
199 | let (root, rel_path) = match self.roots.find(&path) { | 204 | sender: &Sender<TaskResult>, |
200 | None => return Ok(()), | 205 | roots: &Roots, |
201 | Some(it) => it, | 206 | path: PathBuf, |
202 | }; | 207 | kind: ChangeKind, |
203 | let config = &self.roots[root]; | 208 | ) { |
204 | match kind { | 209 | let (root, rel_path) = match roots.find(&path) { |
205 | ChangeKind::Create => { | 210 | None => return, |
206 | let mut paths = Vec::new(); | 211 | Some(it) => it, |
207 | if path.is_dir() { | 212 | }; |
208 | paths.extend(watch_recursive(self.watcher.as_mut(), &path, &config)); | 213 | let config = &roots[root]; |
209 | } else { | 214 | match kind { |
210 | paths.push(rel_path); | 215 | ChangeKind::Create => { |
211 | } | 216 | let mut paths = Vec::new(); |
212 | paths | 217 | if path.is_dir() { |
213 | .into_iter() | 218 | paths.extend(watch_recursive(watcher, &path, &config)); |
214 | .filter_map(|rel_path| { | 219 | } else { |
215 | let abs_path = rel_path.to_path(&config.root); | 220 | paths.push(rel_path); |
216 | let text = read_to_string(&abs_path)?; | ||
217 | Some((rel_path, text)) | ||
218 | }) | ||
219 | .try_for_each(|(path, text)| { | ||
220 | self.sender | ||
221 | .send(TaskResult::AddSingleFile { root, path, text }) | ||
222 | })? | ||
223 | } | 221 | } |
224 | ChangeKind::Write => { | 222 | paths |
225 | if let Some(text) = read_to_string(&path) { | 223 | .into_iter() |
226 | self.sender.send(TaskResult::ChangeSingleFile { | 224 | .filter_map(|rel_path| { |
225 | let abs_path = rel_path.to_path(&config.root); | ||
226 | let text = read_to_string(&abs_path)?; | ||
227 | Some((rel_path, text)) | ||
228 | }) | ||
229 | .try_for_each(|(path, text)| { | ||
230 | sender.send(TaskResult::AddSingleFile { root, path, text }) | ||
231 | }) | ||
232 | .unwrap() | ||
233 | } | ||
234 | ChangeKind::Write => { | ||
235 | if let Some(text) = read_to_string(&path) { | ||
236 | sender | ||
237 | .send(TaskResult::ChangeSingleFile { | ||
227 | root, | 238 | root, |
228 | path: rel_path, | 239 | path: rel_path, |
229 | text, | 240 | text, |
230 | })?; | 241 | }) |
231 | } | 242 | .unwrap(); |
232 | } | 243 | } |
233 | ChangeKind::Remove => self.sender.send(TaskResult::RemoveSingleFile { | 244 | } |
245 | ChangeKind::Remove => sender | ||
246 | .send(TaskResult::RemoveSingleFile { | ||
234 | root, | 247 | root, |
235 | path: rel_path, | 248 | path: rel_path, |
236 | })?, | 249 | }) |
237 | } | 250 | .unwrap(), |
238 | Ok(()) | ||
239 | } | 251 | } |
240 | } | 252 | } |
241 | 253 | ||