aboutsummaryrefslogtreecommitdiff
path: root/crates/ra_vfs/src/io.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/ra_vfs/src/io.rs')
-rw-r--r--crates/ra_vfs/src/io.rs182
1 files changed, 113 insertions, 69 deletions
diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs
index 0cffc03f3..5969ee0d0 100644
--- a/crates/ra_vfs/src/io.rs
+++ b/crates/ra_vfs/src/io.rs
@@ -3,13 +3,14 @@ use std::{
3 path::{Path, PathBuf}, 3 path::{Path, PathBuf},
4 sync::{mpsc, Arc}, 4 sync::{mpsc, Arc},
5 time::Duration, 5 time::Duration,
6 thread,
6}; 7};
7use crossbeam_channel::{Sender, unbounded, RecvError, select}; 8use crossbeam_channel::{Sender, Receiver, unbounded, RecvError, select};
8use relative_path::RelativePathBuf; 9use relative_path::RelativePathBuf;
9use walkdir::WalkDir; 10use walkdir::WalkDir;
10use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; 11use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher};
11 12
12use crate::{Roots, VfsRoot}; 13use crate::{Roots, VfsRoot, VfsTask};
13 14
14pub(crate) enum Task { 15pub(crate) enum Task {
15 AddRoot { root: VfsRoot }, 16 AddRoot { root: VfsRoot },
@@ -18,7 +19,7 @@ pub(crate) enum Task {
18/// `TaskResult` transfers files read on the IO thread to the VFS on the main 19/// `TaskResult` transfers files read on the IO thread to the VFS on the main
19/// thread. 20/// thread.
20#[derive(Debug)] 21#[derive(Debug)]
21pub enum TaskResult { 22pub(crate) enum TaskResult {
22 /// Emitted when we've recursively scanned a source root during the initial 23 /// Emitted when we've recursively scanned a source root during the initial
23 /// load. 24 /// load.
24 BulkLoadRoot { root: VfsRoot, files: Vec<(RelativePathBuf, String)> }, 25 BulkLoadRoot { root: VfsRoot, files: Vec<(RelativePathBuf, String)> },
@@ -46,7 +47,46 @@ enum ChangeKind {
46 47
47const WATCHER_DELAY: Duration = Duration::from_millis(250); 48const WATCHER_DELAY: Duration = Duration::from_millis(250);
48 49
49pub(crate) type Worker = thread_worker::Worker<Task, TaskResult>; 50// Like thread::JoinHandle, but joins the thread on drop.
51//
52// This is useful because it guarantees the absence of run-away threads, even if
53// code panics. This is important, because we might see panics in the test and
54// we might be used in an IDE context, where a failed component is just
55// restarted.
56//
57// Because all threads are joined, care must be taken to avoid deadlocks. That
58// typically means ensuring that channels are dropped before the threads.
59struct ScopedThread(Option<thread::JoinHandle<()>>);
60
61impl ScopedThread {
62 fn spawn(name: String, f: impl FnOnce() + Send + 'static) -> ScopedThread {
63 let handle = thread::Builder::new().name(name).spawn(f).unwrap();
64 ScopedThread(Some(handle))
65 }
66}
67
68impl Drop for ScopedThread {
69 fn drop(&mut self) {
70 let res = self.0.take().unwrap().join();
71 if !thread::panicking() {
72 res.unwrap();
73 }
74 }
75}
76
77pub(crate) struct Worker {
78 // XXX: field order is significant here.
79 //
80 // In Rust, fields are dropped in the declaration order, and we rely on this
81 // here. We must close sender first, so that the `thread` (who holds the
82 // opposite side of the channel) noticed shutdown. Then, we must join the
83 // thread, but we must keep receiver alive so that the thread does not
84 // panic.
85 pub(crate) sender: Sender<Task>,
86 _thread: ScopedThread,
87 pub(crate) receiver: Receiver<VfsTask>,
88}
89
50pub(crate) fn start(roots: Arc<Roots>) -> Worker { 90pub(crate) fn start(roots: Arc<Roots>) -> Worker {
51 // This is a pretty elaborate setup of threads & channels! It is 91 // This is a pretty elaborate setup of threads & channels! It is
52 // explained by the following concerns: 92 // explained by the following concerns:
@@ -55,74 +95,75 @@ pub(crate) fn start(roots: Arc<Roots>) -> Worker {
55 // * we want to read all files from a single thread, to guarantee that 95 // * we want to read all files from a single thread, to guarantee that
56 // we always get fresher versions and never go back in time. 96 // we always get fresher versions and never go back in time.
57 // * we want to tear down everything neatly during shutdown. 97 // * we want to tear down everything neatly during shutdown.
58 Worker::spawn( 98 let _thread;
59 "vfs", 99 // This are the channels we use to communicate with outside world.
60 128, 100 // If `input_receiver` is closed we need to tear ourselves down.
61 // This are the channels we use to communicate with outside world. 101 // `output_sender` should not be closed unless the parent died.
62 // If `input_receiver` is closed we need to tear ourselves down. 102 let (input_sender, input_receiver) = unbounded();
63 // `output_sender` should not be closed unless the parent died. 103 let (output_sender, output_receiver) = unbounded();
64 move |input_receiver, output_sender| { 104
65 // Make sure that the destruction order is 105 _thread = ScopedThread::spawn("vfs".to_string(), move || {
66 // 106 // Make sure that the destruction order is
67 // * notify_sender 107 //
68 // * _thread 108 // * notify_sender
69 // * watcher_sender 109 // * _thread
70 // 110 // * watcher_sender
71 // this is required to avoid deadlocks. 111 //
72 112 // this is required to avoid deadlocks.
73 // These are the corresponding crossbeam channels 113
74 let (watcher_sender, watcher_receiver) = unbounded(); 114 // These are the corresponding crossbeam channels
75 let _thread; 115 let (watcher_sender, watcher_receiver) = unbounded();
76 { 116 let _notify_thread;
77 // These are `std` channels notify will send events to 117 {
78 let (notify_sender, notify_receiver) = mpsc::channel(); 118 // These are `std` channels notify will send events to
79 119 let (notify_sender, notify_receiver) = mpsc::channel();
80 let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) 120
81 .map_err(|e| log::error!("failed to spawn notify {}", e)) 121 let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY)
82 .ok(); 122 .map_err(|e| log::error!("failed to spawn notify {}", e))
83 // Start a silly thread to transform between two channels 123 .ok();
84 _thread = thread_worker::ScopedThread::spawn("notify-convertor", move || { 124 // Start a silly thread to transform between two channels
85 notify_receiver 125 _notify_thread = ScopedThread::spawn("notify-convertor".to_string(), move || {
86 .into_iter() 126 notify_receiver
87 .for_each(|event| convert_notify_event(event, &watcher_sender)) 127 .into_iter()
88 }); 128 .for_each(|event| convert_notify_event(event, &watcher_sender))
89 129 });
90 // Process requests from the called or notifications from 130
91 // watcher until the caller says stop. 131 // Process requests from the called or notifications from
92 loop { 132 // watcher until the caller says stop.
93 select! { 133 loop {
94 // Received request from the caller. If this channel is 134 select! {
95 // closed, we should shutdown everything. 135 // Received request from the caller. If this channel is
96 recv(input_receiver) -> t => match t { 136 // closed, we should shutdown everything.
97 Err(RecvError) => { 137 recv(input_receiver) -> t => match t {
98 drop(input_receiver); 138 Err(RecvError) => {
99 break 139 drop(input_receiver);
100 }, 140 break
101 Ok(Task::AddRoot { root }) => {
102 watch_root(watcher.as_mut(), &output_sender, &*roots, root);
103 }
104 },
105 // Watcher send us changes. If **this** channel is
106 // closed, the watcher has died, which indicates a bug
107 // -- escalate!
108 recv(watcher_receiver) -> event => match event {
109 Err(RecvError) => panic!("watcher is dead"),
110 Ok((path, change)) => {
111 handle_change(watcher.as_mut(), &output_sender, &*roots, path, change);
112 }
113 }, 141 },
114 } 142 Ok(Task::AddRoot { root }) => {
143 watch_root(watcher.as_mut(), &output_sender, &*roots, root);
144 }
145 },
146 // Watcher send us changes. If **this** channel is
147 // closed, the watcher has died, which indicates a bug
148 // -- escalate!
149 recv(watcher_receiver) -> event => match event {
150 Err(RecvError) => panic!("watcher is dead"),
151 Ok((path, change)) => {
152 handle_change(watcher.as_mut(), &output_sender, &*roots, path, change);
153 }
154 },
115 } 155 }
116 } 156 }
117 // Drain pending events: we are not interested in them anyways! 157 }
118 watcher_receiver.into_iter().for_each(|_| ()); 158 // Drain pending events: we are not interested in them anyways!
119 }, 159 watcher_receiver.into_iter().for_each(|_| ());
120 ) 160 });
161 Worker { sender: input_sender, _thread, receiver: output_receiver }
121} 162}
122 163
123fn watch_root( 164fn watch_root(
124 watcher: Option<&mut RecommendedWatcher>, 165 watcher: Option<&mut RecommendedWatcher>,
125 sender: &Sender<TaskResult>, 166 sender: &Sender<VfsTask>,
126 roots: &Roots, 167 roots: &Roots,
127 root: VfsRoot, 168 root: VfsRoot,
128) { 169) {
@@ -136,7 +177,8 @@ fn watch_root(
136 Some((path, text)) 177 Some((path, text))
137 }) 178 })
138 .collect(); 179 .collect();
139 sender.send(TaskResult::BulkLoadRoot { root, files }).unwrap(); 180 let res = TaskResult::BulkLoadRoot { root, files };
181 sender.send(VfsTask(res)).unwrap();
140 log::debug!("... loaded {}", root_path.display()); 182 log::debug!("... loaded {}", root_path.display());
141} 183}
142 184
@@ -173,7 +215,7 @@ fn convert_notify_event(event: DebouncedEvent, sender: &Sender<(PathBuf, ChangeK
173 215
174fn handle_change( 216fn handle_change(
175 watcher: Option<&mut RecommendedWatcher>, 217 watcher: Option<&mut RecommendedWatcher>,
176 sender: &Sender<TaskResult>, 218 sender: &Sender<VfsTask>,
177 roots: &Roots, 219 roots: &Roots,
178 path: PathBuf, 220 path: PathBuf,
179 kind: ChangeKind, 221 kind: ChangeKind,
@@ -195,13 +237,15 @@ fn handle_change(
195 .try_for_each(|rel_path| { 237 .try_for_each(|rel_path| {
196 let abs_path = rel_path.to_path(&roots.path(root)); 238 let abs_path = rel_path.to_path(&roots.path(root));
197 let text = read_to_string(&abs_path); 239 let text = read_to_string(&abs_path);
198 sender.send(TaskResult::SingleFile { root, path: rel_path, text }) 240 let res = TaskResult::SingleFile { root, path: rel_path, text };
241 sender.send(VfsTask(res))
199 }) 242 })
200 .unwrap() 243 .unwrap()
201 } 244 }
202 ChangeKind::Write | ChangeKind::Remove => { 245 ChangeKind::Write | ChangeKind::Remove => {
203 let text = read_to_string(&path); 246 let text = read_to_string(&path);
204 sender.send(TaskResult::SingleFile { root, path: rel_path, text }).unwrap(); 247 let res = TaskResult::SingleFile { root, path: rel_path, text };
248 sender.send(VfsTask(res)).unwrap();
205 } 249 }
206 } 250 }
207} 251}