diff options
author | Aleksey Kladov <[email protected]> | 2019-01-26 14:01:58 +0000 |
---|---|---|
committer | Aleksey Kladov <[email protected]> | 2019-01-26 14:01:58 +0000 |
commit | 012ea3fac62df26abefa6d64b81570ed58118dea (patch) | |
tree | 11052942db7df5d596ba290a546add213de2ac83 /crates/ra_vfs | |
parent | 3ce531f95dec87a1f59e9347fdd6c250e36b489d (diff) |
handle all the reads on the "main" watcher thread
Diffstat (limited to 'crates/ra_vfs')
-rw-r--r-- | crates/ra_vfs/src/io.rs | 123 |
1 files changed, 75 insertions, 48 deletions
diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs index 279fa5da8..98b107b35 100644 --- a/crates/ra_vfs/src/io.rs +++ b/crates/ra_vfs/src/io.rs | |||
@@ -5,7 +5,7 @@ use std::{ | |||
5 | sync::{mpsc, Arc}, | 5 | sync::{mpsc, Arc}, |
6 | time::Duration, | 6 | time::Duration, |
7 | }; | 7 | }; |
8 | use crossbeam_channel::{Receiver, Sender}; | 8 | use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select}; |
9 | use relative_path::RelativePathBuf; | 9 | use relative_path::RelativePathBuf; |
10 | use thread_worker::WorkerHandle; | 10 | use thread_worker::WorkerHandle; |
11 | use walkdir::WalkDir; | 11 | use walkdir::WalkDir; |
@@ -61,9 +61,25 @@ pub(crate) struct Worker { | |||
61 | 61 | ||
62 | impl Worker { | 62 | impl Worker { |
63 | pub(crate) fn start(roots: Arc<Roots>) -> Worker { | 63 | pub(crate) fn start(roots: Arc<Roots>) -> Worker { |
64 | let (worker, worker_handle) = | 64 | // This is a pretty elaborate setup of threads & channels! It is |
65 | thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| { | 65 | // explained by the following concerns: |
66 | |||
67 | // * we need to burn a thread translating from notify's mpsc to | ||
68 | // crossbeam_channel. | ||
69 | // * we want to read all files from a single thread, to gurantee that | ||
70 | // we always get fresher versions and never go back in time. | ||
71 | // * we want to tear down everything neatly during shutdown. | ||
72 | let (worker, worker_handle) = thread_worker::spawn( | ||
73 | "vfs", | ||
74 | 128, | ||
75 | // This are the channels we use to communicate with outside world. | ||
76 | // If `input_receiver` is closed we need to tear ourselves down. | ||
77 | // `output_sender` should not be closed unless the parent died. | ||
78 | move |input_receiver, output_sender| { | ||
79 | // These are `std` channels notify will send events to | ||
66 | let (notify_sender, notify_receiver) = mpsc::channel(); | 80 | let (notify_sender, notify_receiver) = mpsc::channel(); |
81 | // These are the corresponding crossbeam channels | ||
82 | let (watcher_sender, watcher_receiver) = unbounded(); | ||
67 | let watcher = notify::watcher(notify_sender, WATCHER_DELAY) | 83 | let watcher = notify::watcher(notify_sender, WATCHER_DELAY) |
68 | .map_err(|e| log::error!("failed to spawn notify {}", e)) | 84 | .map_err(|e| log::error!("failed to spawn notify {}", e)) |
69 | .ok(); | 85 | .ok(); |
@@ -72,18 +88,30 @@ impl Worker { | |||
72 | watcher: Arc::new(Mutex::new(watcher)), | 88 | watcher: Arc::new(Mutex::new(watcher)), |
73 | sender: output_sender, | 89 | sender: output_sender, |
74 | }; | 90 | }; |
75 | let thread = thread::spawn({ | 91 | let thread = thread::spawn(move || { |
76 | let ctx = ctx.clone(); | 92 | let _ = notify_receiver |
77 | move || { | 93 | .into_iter() |
78 | let _ = notify_receiver | 94 | // forward relevant events only |
79 | .into_iter() | 95 | .for_each(|event| convert_notify_event(event, &watcher_sender)); |
80 | // forward relevant events only | ||
81 | .try_for_each(|change| ctx.handle_debounced_event(change)); | ||
82 | } | ||
83 | }); | ||
84 | let res1 = input_receiver.into_iter().try_for_each(|t| match t { | ||
85 | Task::AddRoot { root, config } => watch_root(&ctx, root, Arc::clone(&config)), | ||
86 | }); | 96 | }); |
97 | |||
98 | loop { | ||
99 | select! { | ||
100 | // Received request from the caller. If this channel is | ||
101 | // closed, we should shutdown everything. | ||
102 | recv(input_receiver) -> t => match t { | ||
103 | Err(RecvError) => break, | ||
104 | Ok(Task::AddRoot { root, config }) => watch_root(&ctx, root, Arc::clone(&config)), | ||
105 | }, | ||
106 | // Watcher send us changes. If **this** channel is | ||
107 | // closed, the watcher has died, which indicates a bug | ||
108 | // -- escalate! | ||
109 | recv(watcher_receiver) -> event => match event { | ||
110 | Err(RecvError) => panic!("watcher is dead"), | ||
111 | Ok((path, change)) => WatcherCtx::handle_change(&ctx, path, change).unwrap(), | ||
112 | }, | ||
113 | } | ||
114 | } | ||
87 | drop(ctx.watcher.lock().take()); | 115 | drop(ctx.watcher.lock().take()); |
88 | drop(ctx); | 116 | drop(ctx); |
89 | let res2 = thread.join(); | 117 | let res2 = thread.join(); |
@@ -91,9 +119,9 @@ impl Worker { | |||
91 | Ok(()) => log::info!("... Watcher terminated with ok"), | 119 | Ok(()) => log::info!("... Watcher terminated with ok"), |
92 | Err(_) => log::error!("... Watcher terminated with err"), | 120 | Err(_) => log::error!("... Watcher terminated with err"), |
93 | } | 121 | } |
94 | res1.unwrap(); | ||
95 | res2.unwrap(); | 122 | res2.unwrap(); |
96 | }); | 123 | }, |
124 | ); | ||
97 | Worker { | 125 | Worker { |
98 | worker, | 126 | worker, |
99 | worker_handle, | 127 | worker_handle, |
@@ -114,7 +142,7 @@ impl Worker { | |||
114 | } | 142 | } |
115 | } | 143 | } |
116 | 144 | ||
117 | fn watch_root(woker: &WatcherCtx, root: VfsRoot, config: Arc<RootConfig>) -> Result<()> { | 145 | fn watch_root(woker: &WatcherCtx, root: VfsRoot, config: Arc<RootConfig>) { |
118 | let mut guard = woker.watcher.lock(); | 146 | let mut guard = woker.watcher.lock(); |
119 | log::debug!("loading {} ...", config.root.as_path().display()); | 147 | log::debug!("loading {} ...", config.root.as_path().display()); |
120 | let files = watch_recursive(guard.as_mut(), config.root.as_path(), &*config) | 148 | let files = watch_recursive(guard.as_mut(), config.root.as_path(), &*config) |
@@ -127,9 +155,9 @@ fn watch_root(woker: &WatcherCtx, root: VfsRoot, config: Arc<RootConfig>) -> Res | |||
127 | .collect(); | 155 | .collect(); |
128 | woker | 156 | woker |
129 | .sender | 157 | .sender |
130 | .send(TaskResult::BulkLoadRoot { root, files })?; | 158 | .send(TaskResult::BulkLoadRoot { root, files }) |
159 | .unwrap(); | ||
131 | log::debug!("... loaded {}", config.root.as_path().display()); | 160 | log::debug!("... loaded {}", config.root.as_path().display()); |
132 | Ok(()) | ||
133 | } | 161 | } |
134 | 162 | ||
135 | #[derive(Clone)] | 163 | #[derive(Clone)] |
@@ -139,38 +167,37 @@ struct WatcherCtx { | |||
139 | sender: Sender<TaskResult>, | 167 | sender: Sender<TaskResult>, |
140 | } | 168 | } |
141 | 169 | ||
142 | impl WatcherCtx { | 170 | fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeKind)>) { |
143 | fn handle_debounced_event(&self, ev: DebouncedEvent) -> Result<()> { | 171 | match event { |
144 | match ev { | 172 | DebouncedEvent::NoticeWrite(_) |
145 | DebouncedEvent::NoticeWrite(_) | 173 | | DebouncedEvent::NoticeRemove(_) |
146 | | DebouncedEvent::NoticeRemove(_) | 174 | | DebouncedEvent::Chmod(_) => { |
147 | | DebouncedEvent::Chmod(_) => { | 175 | // ignore |
148 | // ignore | 176 | } |
149 | } | 177 | DebouncedEvent::Rescan => { |
150 | DebouncedEvent::Rescan => { | 178 | // TODO rescan all roots |
151 | // TODO rescan all roots | 179 | } |
152 | } | 180 | DebouncedEvent::Create(path) => { |
153 | DebouncedEvent::Create(path) => { | 181 | sender.send((path, ChangeKind::Create)).unwrap(); |
154 | self.handle_change(path, ChangeKind::Create)?; | 182 | } |
155 | } | 183 | DebouncedEvent::Write(path) => { |
156 | DebouncedEvent::Write(path) => { | 184 | sender.send((path, ChangeKind::Write)).unwrap(); |
157 | self.handle_change(path, ChangeKind::Write)?; | 185 | } |
158 | } | 186 | DebouncedEvent::Remove(path) => { |
159 | DebouncedEvent::Remove(path) => { | 187 | sender.send((path, ChangeKind::Remove)).unwrap(); |
160 | self.handle_change(path, ChangeKind::Remove)?; | 188 | } |
161 | } | 189 | DebouncedEvent::Rename(src, dst) => { |
162 | DebouncedEvent::Rename(src, dst) => { | 190 | sender.send((src, ChangeKind::Remove)).unwrap(); |
163 | self.handle_change(src, ChangeKind::Remove)?; | 191 | sender.send((dst, ChangeKind::Create)).unwrap(); |
164 | self.handle_change(dst, ChangeKind::Create)?; | 192 | } |
165 | } | 193 | DebouncedEvent::Error(err, path) => { |
166 | DebouncedEvent::Error(err, path) => { | 194 | // TODO should we reload the file contents? |
167 | // TODO should we reload the file contents? | 195 | log::warn!("watcher error \"{}\", {:?}", err, path); |
168 | log::warn!("watcher error \"{}\", {:?}", err, path); | ||
169 | } | ||
170 | } | 196 | } |
171 | Ok(()) | ||
172 | } | 197 | } |
198 | } | ||
173 | 199 | ||
200 | impl WatcherCtx { | ||
174 | fn handle_change(&self, path: PathBuf, kind: ChangeKind) -> Result<()> { | 201 | fn handle_change(&self, path: PathBuf, kind: ChangeKind) -> Result<()> { |
175 | let (root, rel_path) = match self.roots.find(&path) { | 202 | let (root, rel_path) = match self.roots.find(&path) { |
176 | None => return Ok(()), | 203 | None => return Ok(()), |