aboutsummaryrefslogtreecommitdiff
path: root/crates
diff options
context:
space:
mode:
authorAleksey Kladov <[email protected]>2019-01-26 14:17:28 +0000
committerAleksey Kladov <[email protected]>2019-01-26 14:17:28 +0000
commit9f16892b94817d144f37dfe0081b39aacec65635 (patch)
tree0aeca750f2aefb68ee1e062b41d852d8d0ca9dd4 /crates
parentbf98fc609e2b587d8455bf2bec3ca35f85cf0700 (diff)
remove watcher ctx
Diffstat (limited to 'crates')
-rw-r--r--crates/ra_vfs/src/io.rs142
1 files changed, 77 insertions, 65 deletions
diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs
index 8c719dc5d..d764c534a 100644
--- a/crates/ra_vfs/src/io.rs
+++ b/crates/ra_vfs/src/io.rs
@@ -13,8 +13,6 @@ use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watc
13 13
14use crate::{RootConfig, Roots, VfsRoot}; 14use crate::{RootConfig, Roots, VfsRoot};
15 15
16type Result<T> = std::result::Result<T, crossbeam_channel::SendError<TaskResult>>;
17
18pub(crate) enum Task { 16pub(crate) enum Task {
19 AddRoot { 17 AddRoot {
20 root: VfsRoot, 18 root: VfsRoot,
@@ -62,7 +60,6 @@ impl Worker {
62 pub(crate) fn start(roots: Arc<Roots>) -> Worker { 60 pub(crate) fn start(roots: Arc<Roots>) -> Worker {
63 // This is a pretty elaborate setup of threads & channels! It is 61 // This is a pretty elaborate setup of threads & channels! It is
64 // explained by the following concerns: 62 // explained by the following concerns:
65
66 // * we need to burn a thread translating from notify's mpsc to 63 // * we need to burn a thread translating from notify's mpsc to
67 // crossbeam_channel. 64 // crossbeam_channel.
68 // * we want to read all files from a single thread, to gurantee that 65 // * we want to read all files from a single thread, to gurantee that
@@ -79,48 +76,57 @@ impl Worker {
79 let (notify_sender, notify_receiver) = mpsc::channel(); 76 let (notify_sender, notify_receiver) = mpsc::channel();
80 // These are the corresponding crossbeam channels 77 // These are the corresponding crossbeam channels
81 let (watcher_sender, watcher_receiver) = unbounded(); 78 let (watcher_sender, watcher_receiver) = unbounded();
82 let watcher = notify::watcher(notify_sender, WATCHER_DELAY) 79
80 let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY)
83 .map_err(|e| log::error!("failed to spawn notify {}", e)) 81 .map_err(|e| log::error!("failed to spawn notify {}", e))
84 .ok(); 82 .ok();
85 let mut ctx = WatcherCtx { 83 // Start a silly thread to tranform between two channels
86 roots,
87 watcher,
88 sender: output_sender,
89 };
90 let thread = thread::spawn(move || { 84 let thread = thread::spawn(move || {
91 let _ = notify_receiver 85 notify_receiver
92 .into_iter() 86 .into_iter()
93 // forward relevant events only 87 .for_each(|event| convert_notify_event(event, &watcher_sender))
94 .for_each(|event| convert_notify_event(event, &watcher_sender));
95 }); 88 });
96 89
90 // Process requests from the called or notifications from
91 // watcher until the caller says stop.
97 loop { 92 loop {
98 select! { 93 select! {
99 // Received request from the caller. If this channel is 94 // Received request from the caller. If this channel is
100 // closed, we should shutdown everything. 95 // closed, we should shutdown everything.
101 recv(input_receiver) -> t => match t { 96 recv(input_receiver) -> t => match t {
102 Err(RecvError) => break, 97 Err(RecvError) => {
103 Ok(Task::AddRoot { root, config }) => watch_root(&mut ctx, root, Arc::clone(&config)), 98 drop(input_receiver);
99 break
100 },
101 Ok(Task::AddRoot { root, config }) => {
102 watch_root(watcher.as_mut(), &output_sender, root, Arc::clone(&config));
103 }
104 }, 104 },
105 // Watcher send us changes. If **this** channel is 105 // Watcher send us changes. If **this** channel is
106 // closed, the watcher has died, which indicates a bug 106 // closed, the watcher has died, which indicates a bug
107 // -- escalate! 107 // -- escalate!
108 recv(watcher_receiver) -> event => match event { 108 recv(watcher_receiver) -> event => match event {
109 Err(RecvError) => panic!("watcher is dead"), 109 Err(RecvError) => panic!("watcher is dead"),
110 Ok((path, change)) => WatcherCtx::handle_change(&mut ctx, path, change).unwrap(), 110 Ok((path, change)) => {
111 handle_change(watcher.as_mut(), &output_sender, &*roots, path, change);
112 }
111 }, 113 },
112 } 114 }
113 } 115 }
114 drop(ctx.watcher.take()); 116 // Stopped the watcher
115 drop(ctx); 117 drop(watcher.take());
116 let res2 = thread.join(); 118 // Drain pending events: we are not inrerested in them anyways!
117 match &res2 { 119 watcher_receiver.into_iter().for_each(|_| ());
120
121 let res = thread.join();
122 match &res {
118 Ok(()) => log::info!("... Watcher terminated with ok"), 123 Ok(()) => log::info!("... Watcher terminated with ok"),
119 Err(_) => log::error!("... Watcher terminated with err"), 124 Err(_) => log::error!("... Watcher terminated with err"),
120 } 125 }
121 res2.unwrap(); 126 res.unwrap();
122 }, 127 },
123 ); 128 );
129
124 Worker { 130 Worker {
125 worker, 131 worker,
126 worker_handle, 132 worker_handle,
@@ -141,9 +147,14 @@ impl Worker {
141 } 147 }
142} 148}
143 149
144fn watch_root(woker: &mut WatcherCtx, root: VfsRoot, config: Arc<RootConfig>) { 150fn watch_root(
151 watcher: Option<&mut RecommendedWatcher>,
152 sender: &Sender<TaskResult>,
153 root: VfsRoot,
154 config: Arc<RootConfig>,
155) {
145 log::debug!("loading {} ...", config.root.as_path().display()); 156 log::debug!("loading {} ...", config.root.as_path().display());
146 let files = watch_recursive(woker.watcher.as_mut(), config.root.as_path(), &*config) 157 let files = watch_recursive(watcher, config.root.as_path(), &*config)
147 .into_iter() 158 .into_iter()
148 .filter_map(|path| { 159 .filter_map(|path| {
149 let abs_path = path.to_path(&config.root); 160 let abs_path = path.to_path(&config.root);
@@ -151,20 +162,14 @@ fn watch_root(woker: &mut WatcherCtx, root: VfsRoot, config: Arc<RootConfig>) {
151 Some((path, text)) 162 Some((path, text))
152 }) 163 })
153 .collect(); 164 .collect();
154 woker 165 sender
155 .sender
156 .send(TaskResult::BulkLoadRoot { root, files }) 166 .send(TaskResult::BulkLoadRoot { root, files })
157 .unwrap(); 167 .unwrap();
158 log::debug!("... loaded {}", config.root.as_path().display()); 168 log::debug!("... loaded {}", config.root.as_path().display());
159} 169}
160 170
161struct WatcherCtx {
162 roots: Arc<Roots>,
163 watcher: Option<RecommendedWatcher>,
164 sender: Sender<TaskResult>,
165}
166
167fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeKind)>) { 171fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeKind)>) {
172 // forward relevant events only
168 match event { 173 match event {
169 DebouncedEvent::NoticeWrite(_) 174 DebouncedEvent::NoticeWrite(_)
170 | DebouncedEvent::NoticeRemove(_) 175 | DebouncedEvent::NoticeRemove(_)
@@ -194,48 +199,55 @@ fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeK
194 } 199 }
195} 200}
196 201
197impl WatcherCtx { 202fn handle_change(
198 fn handle_change(&mut self, path: PathBuf, kind: ChangeKind) -> Result<()> { 203 watcher: Option<&mut RecommendedWatcher>,
199 let (root, rel_path) = match self.roots.find(&path) { 204 sender: &Sender<TaskResult>,
200 None => return Ok(()), 205 roots: &Roots,
201 Some(it) => it, 206 path: PathBuf,
202 }; 207 kind: ChangeKind,
203 let config = &self.roots[root]; 208) {
204 match kind { 209 let (root, rel_path) = match roots.find(&path) {
205 ChangeKind::Create => { 210 None => return,
206 let mut paths = Vec::new(); 211 Some(it) => it,
207 if path.is_dir() { 212 };
208 paths.extend(watch_recursive(self.watcher.as_mut(), &path, &config)); 213 let config = &roots[root];
209 } else { 214 match kind {
210 paths.push(rel_path); 215 ChangeKind::Create => {
211 } 216 let mut paths = Vec::new();
212 paths 217 if path.is_dir() {
213 .into_iter() 218 paths.extend(watch_recursive(watcher, &path, &config));
214 .filter_map(|rel_path| { 219 } else {
215 let abs_path = rel_path.to_path(&config.root); 220 paths.push(rel_path);
216 let text = read_to_string(&abs_path)?;
217 Some((rel_path, text))
218 })
219 .try_for_each(|(path, text)| {
220 self.sender
221 .send(TaskResult::AddSingleFile { root, path, text })
222 })?
223 } 221 }
224 ChangeKind::Write => { 222 paths
225 if let Some(text) = read_to_string(&path) { 223 .into_iter()
226 self.sender.send(TaskResult::ChangeSingleFile { 224 .filter_map(|rel_path| {
225 let abs_path = rel_path.to_path(&config.root);
226 let text = read_to_string(&abs_path)?;
227 Some((rel_path, text))
228 })
229 .try_for_each(|(path, text)| {
230 sender.send(TaskResult::AddSingleFile { root, path, text })
231 })
232 .unwrap()
233 }
234 ChangeKind::Write => {
235 if let Some(text) = read_to_string(&path) {
236 sender
237 .send(TaskResult::ChangeSingleFile {
227 root, 238 root,
228 path: rel_path, 239 path: rel_path,
229 text, 240 text,
230 })?; 241 })
231 } 242 .unwrap();
232 } 243 }
233 ChangeKind::Remove => self.sender.send(TaskResult::RemoveSingleFile { 244 }
245 ChangeKind::Remove => sender
246 .send(TaskResult::RemoveSingleFile {
234 root, 247 root,
235 path: rel_path, 248 path: rel_path,
236 })?, 249 })
237 } 250 .unwrap(),
238 Ok(())
239 } 251 }
240} 252}
241 253