aboutsummaryrefslogtreecommitdiff
path: root/crates
diff options
context:
space:
mode:
Diffstat (limited to 'crates')
-rw-r--r--crates/ra_vfs/Cargo.toml2
-rw-r--r--crates/ra_vfs/src/io.rs159
-rw-r--r--crates/ra_vfs/src/lib.rs4
3 files changed, 99 insertions, 66 deletions
diff --git a/crates/ra_vfs/Cargo.toml b/crates/ra_vfs/Cargo.toml
index 2fe3102ab..fdaf31b9c 100644
--- a/crates/ra_vfs/Cargo.toml
+++ b/crates/ra_vfs/Cargo.toml
@@ -13,8 +13,6 @@ log = "0.4.6"
13notify = "4.0.9" 13notify = "4.0.9"
14parking_lot = "0.7.0" 14parking_lot = "0.7.0"
15 15
16thread_worker = { path = "../thread_worker" }
17
18[dev-dependencies] 16[dev-dependencies]
19tempfile = "3" 17tempfile = "3"
20flexi_logger = "0.10.0" 18flexi_logger = "0.10.0"
diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs
index 8eb148a38..b6a057697 100644
--- a/crates/ra_vfs/src/io.rs
+++ b/crates/ra_vfs/src/io.rs
@@ -3,8 +3,9 @@ use std::{
3 path::{Path, PathBuf}, 3 path::{Path, PathBuf},
4 sync::{mpsc, Arc}, 4 sync::{mpsc, Arc},
5 time::Duration, 5 time::Duration,
6 thread,
6}; 7};
7use crossbeam_channel::{Sender, unbounded, RecvError, select}; 8use crossbeam_channel::{Sender, Receiver, unbounded, RecvError, select};
8use relative_path::RelativePathBuf; 9use relative_path::RelativePathBuf;
9use walkdir::WalkDir; 10use walkdir::WalkDir;
10use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; 11use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher};
@@ -46,7 +47,40 @@ enum ChangeKind {
46 47
47const WATCHER_DELAY: Duration = Duration::from_millis(250); 48const WATCHER_DELAY: Duration = Duration::from_millis(250);
48 49
49pub(crate) type Worker = thread_worker::Worker<Task, VfsTask>; 50// Like thread::JoinHandle, but joins the thread on drop.
51//
52// This is useful because it guarantees the absence of run-away threads, even if
53// code panics. This is important, because we might seem panics in the test and
54// we might be used in an IDE context, where a failed component is just
55// restarted.
56//
57// Because all threads are joined, care must be taken to avoid deadlocks. That
58// typically means ensuring that channels are dropped before the threads.
59struct ScopedThread(Option<thread::JoinHandle<()>>);
60
61impl ScopedThread {
62 fn spawn(name: String, f: impl FnOnce() + Send + 'static) -> ScopedThread {
63 let handle = thread::Builder::new().name(name).spawn(f).unwrap();
64 ScopedThread(Some(handle))
65 }
66}
67
68impl Drop for ScopedThread {
69 fn drop(&mut self) {
70 let res = self.0.take().unwrap().join();
71 if !thread::panicking() {
72 res.unwrap();
73 }
74 }
75}
76
77pub(crate) struct Worker {
78 // XXX: it's important to drop `sender` before `_thread` to avoid deadlock.
79 pub(crate) sender: Sender<Task>,
80 _thread: ScopedThread,
81 pub(crate) receiver: Receiver<VfsTask>,
82}
83
50pub(crate) fn start(roots: Arc<Roots>) -> Worker { 84pub(crate) fn start(roots: Arc<Roots>) -> Worker {
51 // This is a pretty elaborate setup of threads & channels! It is 85 // This is a pretty elaborate setup of threads & channels! It is
52 // explained by the following concerns: 86 // explained by the following concerns:
@@ -55,69 +89,70 @@ pub(crate) fn start(roots: Arc<Roots>) -> Worker {
55 // * we want to read all files from a single thread, to guarantee that 89 // * we want to read all files from a single thread, to guarantee that
56 // we always get fresher versions and never go back in time. 90 // we always get fresher versions and never go back in time.
57 // * we want to tear down everything neatly during shutdown. 91 // * we want to tear down everything neatly during shutdown.
58 Worker::spawn( 92 let _thread;
59 "vfs", 93 // This are the channels we use to communicate with outside world.
60 128, 94 // If `input_receiver` is closed we need to tear ourselves down.
61 // This are the channels we use to communicate with outside world. 95 // `output_sender` should not be closed unless the parent died.
62 // If `input_receiver` is closed we need to tear ourselves down. 96 let (input_sender, input_receiver) = unbounded();
63 // `output_sender` should not be closed unless the parent died. 97 let (output_sender, output_receiver) = unbounded();
64 move |input_receiver, output_sender| { 98
65 // Make sure that the destruction order is 99 _thread = ScopedThread::spawn("vfs".to_string(), move || {
66 // 100 // Make sure that the destruction order is
67 // * notify_sender 101 //
68 // * _thread 102 // * notify_sender
69 // * watcher_sender 103 // * _thread
70 // 104 // * watcher_sender
71 // this is required to avoid deadlocks. 105 //
72 106 // this is required to avoid deadlocks.
73 // These are the corresponding crossbeam channels 107
74 let (watcher_sender, watcher_receiver) = unbounded(); 108 // These are the corresponding crossbeam channels
75 let _thread; 109 let (watcher_sender, watcher_receiver) = unbounded();
76 { 110 let _notify_thread;
77 // These are `std` channels notify will send events to 111 {
78 let (notify_sender, notify_receiver) = mpsc::channel(); 112 // These are `std` channels notify will send events to
79 113 let (notify_sender, notify_receiver) = mpsc::channel();
80 let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) 114
81 .map_err(|e| log::error!("failed to spawn notify {}", e)) 115 let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY)
82 .ok(); 116 .map_err(|e| log::error!("failed to spawn notify {}", e))
83 // Start a silly thread to transform between two channels 117 .ok();
84 _thread = thread_worker::ScopedThread::spawn("notify-convertor", move || { 118 // Start a silly thread to transform between two channels
85 notify_receiver 119 _notify_thread = ScopedThread::spawn("notify-convertor".to_string(), move || {
86 .into_iter() 120 notify_receiver
87 .for_each(|event| convert_notify_event(event, &watcher_sender)) 121 .into_iter()
88 }); 122 .for_each(|event| convert_notify_event(event, &watcher_sender))
89 123 });
90 // Process requests from the called or notifications from 124
91 // watcher until the caller says stop. 125 // Process requests from the called or notifications from
92 loop { 126 // watcher until the caller says stop.
93 select! { 127 loop {
94 // Received request from the caller. If this channel is 128 select! {
95 // closed, we should shutdown everything. 129 // Received request from the caller. If this channel is
96 recv(input_receiver) -> t => match t { 130 // closed, we should shutdown everything.
97 Err(RecvError) => { 131 recv(input_receiver) -> t => match t {
98 drop(input_receiver); 132 Err(RecvError) => {
99 break 133 drop(input_receiver);
100 }, 134 break
101 Ok(Task::AddRoot { root }) => {
102 watch_root(watcher.as_mut(), &output_sender, &*roots, root);
103 }
104 },
105 // Watcher send us changes. If **this** channel is
106 // closed, the watcher has died, which indicates a bug
107 // -- escalate!
108 recv(watcher_receiver) -> event => match event {
109 Err(RecvError) => panic!("watcher is dead"),
110 Ok((path, change)) => {
111 handle_change(watcher.as_mut(), &output_sender, &*roots, path, change);
112 }
113 }, 135 },
114 } 136 Ok(Task::AddRoot { root }) => {
137 watch_root(watcher.as_mut(), &output_sender, &*roots, root);
138 }
139 },
140 // Watcher send us changes. If **this** channel is
141 // closed, the watcher has died, which indicates a bug
142 // -- escalate!
143 recv(watcher_receiver) -> event => match event {
144 Err(RecvError) => panic!("watcher is dead"),
145 Ok((path, change)) => {
146 handle_change(watcher.as_mut(), &output_sender, &*roots, path, change);
147 }
148 },
115 } 149 }
116 } 150 }
117 // Drain pending events: we are not interested in them anyways! 151 }
118 watcher_receiver.into_iter().for_each(|_| ()); 152 // Drain pending events: we are not interested in them anyways!
119 }, 153 watcher_receiver.into_iter().for_each(|_| ());
120 ) 154 });
155 Worker { sender: input_sender, _thread, receiver: output_receiver }
121} 156}
122 157
123fn watch_root( 158fn watch_root(
diff --git a/crates/ra_vfs/src/lib.rs b/crates/ra_vfs/src/lib.rs
index 3cd11c9f6..808c138df 100644
--- a/crates/ra_vfs/src/lib.rs
+++ b/crates/ra_vfs/src/lib.rs
@@ -92,7 +92,7 @@ impl Vfs {
92 92
93 for root in roots.iter() { 93 for root in roots.iter() {
94 root2files.insert(root, Default::default()); 94 root2files.insert(root, Default::default());
95 worker.sender().send(io::Task::AddRoot { root }).unwrap(); 95 worker.sender.send(io::Task::AddRoot { root }).unwrap();
96 } 96 }
97 let res = Vfs { roots, files: Vec::new(), root2files, worker, pending_changes: Vec::new() }; 97 let res = Vfs { roots, files: Vec::new(), root2files, worker, pending_changes: Vec::new() };
98 let vfs_roots = res.roots.iter().collect(); 98 let vfs_roots = res.roots.iter().collect();
@@ -170,7 +170,7 @@ impl Vfs {
170 } 170 }
171 171
172 pub fn task_receiver(&self) -> &Receiver<VfsTask> { 172 pub fn task_receiver(&self) -> &Receiver<VfsTask> {
173 self.worker.receiver() 173 &self.worker.receiver
174 } 174 }
175 175
176 pub fn handle_task(&mut self, task: VfsTask) { 176 pub fn handle_task(&mut self, task: VfsTask) {