diff options
Diffstat (limited to 'crates/ra_vfs/src/io.rs')
-rw-r--r-- | crates/ra_vfs/src/io.rs | 182 |
1 files changed, 113 insertions, 69 deletions
diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs index 0cffc03f3..5969ee0d0 100644 --- a/crates/ra_vfs/src/io.rs +++ b/crates/ra_vfs/src/io.rs | |||
@@ -3,13 +3,14 @@ use std::{ | |||
3 | path::{Path, PathBuf}, | 3 | path::{Path, PathBuf}, |
4 | sync::{mpsc, Arc}, | 4 | sync::{mpsc, Arc}, |
5 | time::Duration, | 5 | time::Duration, |
6 | thread, | ||
6 | }; | 7 | }; |
7 | use crossbeam_channel::{Sender, unbounded, RecvError, select}; | 8 | use crossbeam_channel::{Sender, Receiver, unbounded, RecvError, select}; |
8 | use relative_path::RelativePathBuf; | 9 | use relative_path::RelativePathBuf; |
9 | use walkdir::WalkDir; | 10 | use walkdir::WalkDir; |
10 | use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; | 11 | use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; |
11 | 12 | ||
12 | use crate::{Roots, VfsRoot}; | 13 | use crate::{Roots, VfsRoot, VfsTask}; |
13 | 14 | ||
14 | pub(crate) enum Task { | 15 | pub(crate) enum Task { |
15 | AddRoot { root: VfsRoot }, | 16 | AddRoot { root: VfsRoot }, |
@@ -18,7 +19,7 @@ pub(crate) enum Task { | |||
18 | /// `TaskResult` transfers files read on the IO thread to the VFS on the main | 19 | /// `TaskResult` transfers files read on the IO thread to the VFS on the main |
19 | /// thread. | 20 | /// thread. |
20 | #[derive(Debug)] | 21 | #[derive(Debug)] |
21 | pub enum TaskResult { | 22 | pub(crate) enum TaskResult { |
22 | /// Emitted when we've recursively scanned a source root during the initial | 23 | /// Emitted when we've recursively scanned a source root during the initial |
23 | /// load. | 24 | /// load. |
24 | BulkLoadRoot { root: VfsRoot, files: Vec<(RelativePathBuf, String)> }, | 25 | BulkLoadRoot { root: VfsRoot, files: Vec<(RelativePathBuf, String)> }, |
@@ -46,7 +47,46 @@ enum ChangeKind { | |||
46 | 47 | ||
47 | const WATCHER_DELAY: Duration = Duration::from_millis(250); | 48 | const WATCHER_DELAY: Duration = Duration::from_millis(250); |
48 | 49 | ||
49 | pub(crate) type Worker = thread_worker::Worker<Task, TaskResult>; | 50 | // Like thread::JoinHandle, but joins the thread on drop. |
51 | // | ||
52 | // This is useful because it guarantees the absence of run-away threads, even if | ||
53 | // code panics. This is important, because we might see panics in the test and | ||
54 | // we might be used in an IDE context, where a failed component is just | ||
55 | // restarted. | ||
56 | // | ||
57 | // Because all threads are joined, care must be taken to avoid deadlocks. That | ||
58 | // typically means ensuring that channels are dropped before the threads. | ||
59 | struct ScopedThread(Option<thread::JoinHandle<()>>); | ||
60 | |||
61 | impl ScopedThread { | ||
62 | fn spawn(name: String, f: impl FnOnce() + Send + 'static) -> ScopedThread { | ||
63 | let handle = thread::Builder::new().name(name).spawn(f).unwrap(); | ||
64 | ScopedThread(Some(handle)) | ||
65 | } | ||
66 | } | ||
67 | |||
68 | impl Drop for ScopedThread { | ||
69 | fn drop(&mut self) { | ||
70 | let res = self.0.take().unwrap().join(); | ||
71 | if !thread::panicking() { | ||
72 | res.unwrap(); | ||
73 | } | ||
74 | } | ||
75 | } | ||
76 | |||
77 | pub(crate) struct Worker { | ||
78 | // XXX: field order is significant here. | ||
79 | // | ||
80 | // In Rust, fields are dropped in the declaration order, and we rely on this | ||
81 | // here. We must close sender first, so that the `thread` (who holds the | ||
82 | // opposite side of the channel) noticed shutdown. Then, we must join the | ||
83 | // thread, but we must keep receiver alive so that the thread does not | ||
84 | // panic. | ||
85 | pub(crate) sender: Sender<Task>, | ||
86 | _thread: ScopedThread, | ||
87 | pub(crate) receiver: Receiver<VfsTask>, | ||
88 | } | ||
89 | |||
50 | pub(crate) fn start(roots: Arc<Roots>) -> Worker { | 90 | pub(crate) fn start(roots: Arc<Roots>) -> Worker { |
51 | // This is a pretty elaborate setup of threads & channels! It is | 91 | // This is a pretty elaborate setup of threads & channels! It is |
52 | // explained by the following concerns: | 92 | // explained by the following concerns: |
@@ -55,74 +95,75 @@ pub(crate) fn start(roots: Arc<Roots>) -> Worker { | |||
55 | // * we want to read all files from a single thread, to guarantee that | 95 | // * we want to read all files from a single thread, to guarantee that |
56 | // we always get fresher versions and never go back in time. | 96 | // we always get fresher versions and never go back in time. |
57 | // * we want to tear down everything neatly during shutdown. | 97 | // * we want to tear down everything neatly during shutdown. |
58 | Worker::spawn( | 98 | let _thread; |
59 | "vfs", | 99 | // This are the channels we use to communicate with outside world. |
60 | 128, | 100 | // If `input_receiver` is closed we need to tear ourselves down. |
61 | // This are the channels we use to communicate with outside world. | 101 | // `output_sender` should not be closed unless the parent died. |
62 | // If `input_receiver` is closed we need to tear ourselves down. | 102 | let (input_sender, input_receiver) = unbounded(); |
63 | // `output_sender` should not be closed unless the parent died. | 103 | let (output_sender, output_receiver) = unbounded(); |
64 | move |input_receiver, output_sender| { | 104 | |
65 | // Make sure that the destruction order is | 105 | _thread = ScopedThread::spawn("vfs".to_string(), move || { |
66 | // | 106 | // Make sure that the destruction order is |
67 | // * notify_sender | 107 | // |
68 | // * _thread | 108 | // * notify_sender |
69 | // * watcher_sender | 109 | // * _thread |
70 | // | 110 | // * watcher_sender |
71 | // this is required to avoid deadlocks. | 111 | // |
72 | 112 | // this is required to avoid deadlocks. | |
73 | // These are the corresponding crossbeam channels | 113 | |
74 | let (watcher_sender, watcher_receiver) = unbounded(); | 114 | // These are the corresponding crossbeam channels |
75 | let _thread; | 115 | let (watcher_sender, watcher_receiver) = unbounded(); |
76 | { | 116 | let _notify_thread; |
77 | // These are `std` channels notify will send events to | 117 | { |
78 | let (notify_sender, notify_receiver) = mpsc::channel(); | 118 | // These are `std` channels notify will send events to |
79 | 119 | let (notify_sender, notify_receiver) = mpsc::channel(); | |
80 | let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) | 120 | |
81 | .map_err(|e| log::error!("failed to spawn notify {}", e)) | 121 | let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) |
82 | .ok(); | 122 | .map_err(|e| log::error!("failed to spawn notify {}", e)) |
83 | // Start a silly thread to transform between two channels | 123 | .ok(); |
84 | _thread = thread_worker::ScopedThread::spawn("notify-convertor", move || { | 124 | // Start a silly thread to transform between two channels |
85 | notify_receiver | 125 | _notify_thread = ScopedThread::spawn("notify-convertor".to_string(), move || { |
86 | .into_iter() | 126 | notify_receiver |
87 | .for_each(|event| convert_notify_event(event, &watcher_sender)) | 127 | .into_iter() |
88 | }); | 128 | .for_each(|event| convert_notify_event(event, &watcher_sender)) |
89 | 129 | }); | |
90 | // Process requests from the called or notifications from | 130 | |
91 | // watcher until the caller says stop. | 131 | // Process requests from the called or notifications from |
92 | loop { | 132 | // watcher until the caller says stop. |
93 | select! { | 133 | loop { |
94 | // Received request from the caller. If this channel is | 134 | select! { |
95 | // closed, we should shutdown everything. | 135 | // Received request from the caller. If this channel is |
96 | recv(input_receiver) -> t => match t { | 136 | // closed, we should shutdown everything. |
97 | Err(RecvError) => { | 137 | recv(input_receiver) -> t => match t { |
98 | drop(input_receiver); | 138 | Err(RecvError) => { |
99 | break | 139 | drop(input_receiver); |
100 | }, | 140 | break |
101 | Ok(Task::AddRoot { root }) => { | ||
102 | watch_root(watcher.as_mut(), &output_sender, &*roots, root); | ||
103 | } | ||
104 | }, | ||
105 | // Watcher send us changes. If **this** channel is | ||
106 | // closed, the watcher has died, which indicates a bug | ||
107 | // -- escalate! | ||
108 | recv(watcher_receiver) -> event => match event { | ||
109 | Err(RecvError) => panic!("watcher is dead"), | ||
110 | Ok((path, change)) => { | ||
111 | handle_change(watcher.as_mut(), &output_sender, &*roots, path, change); | ||
112 | } | ||
113 | }, | 141 | }, |
114 | } | 142 | Ok(Task::AddRoot { root }) => { |
143 | watch_root(watcher.as_mut(), &output_sender, &*roots, root); | ||
144 | } | ||
145 | }, | ||
146 | // Watcher send us changes. If **this** channel is | ||
147 | // closed, the watcher has died, which indicates a bug | ||
148 | // -- escalate! | ||
149 | recv(watcher_receiver) -> event => match event { | ||
150 | Err(RecvError) => panic!("watcher is dead"), | ||
151 | Ok((path, change)) => { | ||
152 | handle_change(watcher.as_mut(), &output_sender, &*roots, path, change); | ||
153 | } | ||
154 | }, | ||
115 | } | 155 | } |
116 | } | 156 | } |
117 | // Drain pending events: we are not interested in them anyways! | 157 | } |
118 | watcher_receiver.into_iter().for_each(|_| ()); | 158 | // Drain pending events: we are not interested in them anyways! |
119 | }, | 159 | watcher_receiver.into_iter().for_each(|_| ()); |
120 | ) | 160 | }); |
161 | Worker { sender: input_sender, _thread, receiver: output_receiver } | ||
121 | } | 162 | } |
122 | 163 | ||
123 | fn watch_root( | 164 | fn watch_root( |
124 | watcher: Option<&mut RecommendedWatcher>, | 165 | watcher: Option<&mut RecommendedWatcher>, |
125 | sender: &Sender<TaskResult>, | 166 | sender: &Sender<VfsTask>, |
126 | roots: &Roots, | 167 | roots: &Roots, |
127 | root: VfsRoot, | 168 | root: VfsRoot, |
128 | ) { | 169 | ) { |
@@ -136,7 +177,8 @@ fn watch_root( | |||
136 | Some((path, text)) | 177 | Some((path, text)) |
137 | }) | 178 | }) |
138 | .collect(); | 179 | .collect(); |
139 | sender.send(TaskResult::BulkLoadRoot { root, files }).unwrap(); | 180 | let res = TaskResult::BulkLoadRoot { root, files }; |
181 | sender.send(VfsTask(res)).unwrap(); | ||
140 | log::debug!("... loaded {}", root_path.display()); | 182 | log::debug!("... loaded {}", root_path.display()); |
141 | } | 183 | } |
142 | 184 | ||
@@ -173,7 +215,7 @@ fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeK | |||
173 | 215 | ||
174 | fn handle_change( | 216 | fn handle_change( |
175 | watcher: Option<&mut RecommendedWatcher>, | 217 | watcher: Option<&mut RecommendedWatcher>, |
176 | sender: &Sender<TaskResult>, | 218 | sender: &Sender<VfsTask>, |
177 | roots: &Roots, | 219 | roots: &Roots, |
178 | path: PathBuf, | 220 | path: PathBuf, |
179 | kind: ChangeKind, | 221 | kind: ChangeKind, |
@@ -195,13 +237,15 @@ fn handle_change( | |||
195 | .try_for_each(|rel_path| { | 237 | .try_for_each(|rel_path| { |
196 | let abs_path = rel_path.to_path(&roots.path(root)); | 238 | let abs_path = rel_path.to_path(&roots.path(root)); |
197 | let text = read_to_string(&abs_path); | 239 | let text = read_to_string(&abs_path); |
198 | sender.send(TaskResult::SingleFile { root, path: rel_path, text }) | 240 | let res = TaskResult::SingleFile { root, path: rel_path, text }; |
241 | sender.send(VfsTask(res)) | ||
199 | }) | 242 | }) |
200 | .unwrap() | 243 | .unwrap() |
201 | } | 244 | } |
202 | ChangeKind::Write | ChangeKind::Remove => { | 245 | ChangeKind::Write | ChangeKind::Remove => { |
203 | let text = read_to_string(&path); | 246 | let text = read_to_string(&path); |
204 | sender.send(TaskResult::SingleFile { root, path: rel_path, text }).unwrap(); | 247 | let res = TaskResult::SingleFile { root, path: rel_path, text }; |
248 | sender.send(VfsTask(res)).unwrap(); | ||
205 | } | 249 | } |
206 | } | 250 | } |
207 | } | 251 | } |