aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--crates/ra_vfs/src/io.rs123
1 files changed, 75 insertions, 48 deletions
diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs
index 279fa5da8..98b107b35 100644
--- a/crates/ra_vfs/src/io.rs
+++ b/crates/ra_vfs/src/io.rs
@@ -5,7 +5,7 @@ use std::{
5 sync::{mpsc, Arc}, 5 sync::{mpsc, Arc},
6 time::Duration, 6 time::Duration,
7}; 7};
8use crossbeam_channel::{Receiver, Sender}; 8use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select};
9use relative_path::RelativePathBuf; 9use relative_path::RelativePathBuf;
10use thread_worker::WorkerHandle; 10use thread_worker::WorkerHandle;
11use walkdir::WalkDir; 11use walkdir::WalkDir;
@@ -61,9 +61,25 @@ pub(crate) struct Worker {
61 61
62impl Worker { 62impl Worker {
63 pub(crate) fn start(roots: Arc<Roots>) -> Worker { 63 pub(crate) fn start(roots: Arc<Roots>) -> Worker {
64 let (worker, worker_handle) = 64 // This is a pretty elaborate setup of threads & channels! It is
65 thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| { 65 // explained by the following concerns:
66
67 // * we need to burn a thread translating from notify's mpsc to
68 // crossbeam_channel.
69 // * we want to read all files from a single thread, to gurantee that
70 // we always get fresher versions and never go back in time.
71 // * we want to tear down everything neatly during shutdown.
72 let (worker, worker_handle) = thread_worker::spawn(
73 "vfs",
74 128,
75 // This are the channels we use to communicate with outside world.
76 // If `input_receiver` is closed we need to tear ourselves down.
77 // `output_sender` should not be closed unless the parent died.
78 move |input_receiver, output_sender| {
79 // These are `std` channels notify will send events to
66 let (notify_sender, notify_receiver) = mpsc::channel(); 80 let (notify_sender, notify_receiver) = mpsc::channel();
81 // These are the corresponding crossbeam channels
82 let (watcher_sender, watcher_receiver) = unbounded();
67 let watcher = notify::watcher(notify_sender, WATCHER_DELAY) 83 let watcher = notify::watcher(notify_sender, WATCHER_DELAY)
68 .map_err(|e| log::error!("failed to spawn notify {}", e)) 84 .map_err(|e| log::error!("failed to spawn notify {}", e))
69 .ok(); 85 .ok();
@@ -72,18 +88,30 @@ impl Worker {
72 watcher: Arc::new(Mutex::new(watcher)), 88 watcher: Arc::new(Mutex::new(watcher)),
73 sender: output_sender, 89 sender: output_sender,
74 }; 90 };
75 let thread = thread::spawn({ 91 let thread = thread::spawn(move || {
76 let ctx = ctx.clone(); 92 let _ = notify_receiver
77 move || { 93 .into_iter()
78 let _ = notify_receiver 94 // forward relevant events only
79 .into_iter() 95 .for_each(|event| convert_notify_event(event, &watcher_sender));
80 // forward relevant events only
81 .try_for_each(|change| ctx.handle_debounced_event(change));
82 }
83 });
84 let res1 = input_receiver.into_iter().try_for_each(|t| match t {
85 Task::AddRoot { root, config } => watch_root(&ctx, root, Arc::clone(&config)),
86 }); 96 });
97
98 loop {
99 select! {
100 // Received request from the caller. If this channel is
101 // closed, we should shutdown everything.
102 recv(input_receiver) -> t => match t {
103 Err(RecvError) => break,
104 Ok(Task::AddRoot { root, config }) => watch_root(&ctx, root, Arc::clone(&config)),
105 },
106 // Watcher send us changes. If **this** channel is
107 // closed, the watcher has died, which indicates a bug
108 // -- escalate!
109 recv(watcher_receiver) -> event => match event {
110 Err(RecvError) => panic!("watcher is dead"),
111 Ok((path, change)) => WatcherCtx::handle_change(&ctx, path, change).unwrap(),
112 },
113 }
114 }
87 drop(ctx.watcher.lock().take()); 115 drop(ctx.watcher.lock().take());
88 drop(ctx); 116 drop(ctx);
89 let res2 = thread.join(); 117 let res2 = thread.join();
@@ -91,9 +119,9 @@ impl Worker {
91 Ok(()) => log::info!("... Watcher terminated with ok"), 119 Ok(()) => log::info!("... Watcher terminated with ok"),
92 Err(_) => log::error!("... Watcher terminated with err"), 120 Err(_) => log::error!("... Watcher terminated with err"),
93 } 121 }
94 res1.unwrap();
95 res2.unwrap(); 122 res2.unwrap();
96 }); 123 },
124 );
97 Worker { 125 Worker {
98 worker, 126 worker,
99 worker_handle, 127 worker_handle,
@@ -114,7 +142,7 @@ impl Worker {
114 } 142 }
115} 143}
116 144
117fn watch_root(woker: &WatcherCtx, root: VfsRoot, config: Arc<RootConfig>) -> Result<()> { 145fn watch_root(woker: &WatcherCtx, root: VfsRoot, config: Arc<RootConfig>) {
118 let mut guard = woker.watcher.lock(); 146 let mut guard = woker.watcher.lock();
119 log::debug!("loading {} ...", config.root.as_path().display()); 147 log::debug!("loading {} ...", config.root.as_path().display());
120 let files = watch_recursive(guard.as_mut(), config.root.as_path(), &*config) 148 let files = watch_recursive(guard.as_mut(), config.root.as_path(), &*config)
@@ -127,9 +155,9 @@ fn watch_root(woker: &WatcherCtx, root: VfsRoot, config: Arc<RootConfig>) -> Res
127 .collect(); 155 .collect();
128 woker 156 woker
129 .sender 157 .sender
130 .send(TaskResult::BulkLoadRoot { root, files })?; 158 .send(TaskResult::BulkLoadRoot { root, files })
159 .unwrap();
131 log::debug!("... loaded {}", config.root.as_path().display()); 160 log::debug!("... loaded {}", config.root.as_path().display());
132 Ok(())
133} 161}
134 162
135#[derive(Clone)] 163#[derive(Clone)]
@@ -139,38 +167,37 @@ struct WatcherCtx {
139 sender: Sender<TaskResult>, 167 sender: Sender<TaskResult>,
140} 168}
141 169
142impl WatcherCtx { 170fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeKind)>) {
143 fn handle_debounced_event(&self, ev: DebouncedEvent) -> Result<()> { 171 match event {
144 match ev { 172 DebouncedEvent::NoticeWrite(_)
145 DebouncedEvent::NoticeWrite(_) 173 | DebouncedEvent::NoticeRemove(_)
146 | DebouncedEvent::NoticeRemove(_) 174 | DebouncedEvent::Chmod(_) => {
147 | DebouncedEvent::Chmod(_) => { 175 // ignore
148 // ignore 176 }
149 } 177 DebouncedEvent::Rescan => {
150 DebouncedEvent::Rescan => { 178 // TODO rescan all roots
151 // TODO rescan all roots 179 }
152 } 180 DebouncedEvent::Create(path) => {
153 DebouncedEvent::Create(path) => { 181 sender.send((path, ChangeKind::Create)).unwrap();
154 self.handle_change(path, ChangeKind::Create)?; 182 }
155 } 183 DebouncedEvent::Write(path) => {
156 DebouncedEvent::Write(path) => { 184 sender.send((path, ChangeKind::Write)).unwrap();
157 self.handle_change(path, ChangeKind::Write)?; 185 }
158 } 186 DebouncedEvent::Remove(path) => {
159 DebouncedEvent::Remove(path) => { 187 sender.send((path, ChangeKind::Remove)).unwrap();
160 self.handle_change(path, ChangeKind::Remove)?; 188 }
161 } 189 DebouncedEvent::Rename(src, dst) => {
162 DebouncedEvent::Rename(src, dst) => { 190 sender.send((src, ChangeKind::Remove)).unwrap();
163 self.handle_change(src, ChangeKind::Remove)?; 191 sender.send((dst, ChangeKind::Create)).unwrap();
164 self.handle_change(dst, ChangeKind::Create)?; 192 }
165 } 193 DebouncedEvent::Error(err, path) => {
166 DebouncedEvent::Error(err, path) => { 194 // TODO should we reload the file contents?
167 // TODO should we reload the file contents? 195 log::warn!("watcher error \"{}\", {:?}", err, path);
168 log::warn!("watcher error \"{}\", {:?}", err, path);
169 }
170 } 196 }
171 Ok(())
172 } 197 }
198}
173 199
200impl WatcherCtx {
174 fn handle_change(&self, path: PathBuf, kind: ChangeKind) -> Result<()> { 201 fn handle_change(&self, path: PathBuf, kind: ChangeKind) -> Result<()> {
175 let (root, rel_path) = match self.roots.find(&path) { 202 let (root, rel_path) = match self.roots.find(&path) {
176 None => return Ok(()), 203 None => return Ok(()),