aboutsummaryrefslogtreecommitdiff
path: root/crates/ra_vfs/src/io.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/ra_vfs/src/io.rs')
-rw-r--r--crates/ra_vfs/src/io.rs159
1 files changed, 97 insertions, 62 deletions
diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs
index 8eb148a38..b6a057697 100644
--- a/crates/ra_vfs/src/io.rs
+++ b/crates/ra_vfs/src/io.rs
@@ -3,8 +3,9 @@ use std::{
3 path::{Path, PathBuf}, 3 path::{Path, PathBuf},
4 sync::{mpsc, Arc}, 4 sync::{mpsc, Arc},
5 time::Duration, 5 time::Duration,
6 thread,
6}; 7};
7use crossbeam_channel::{Sender, unbounded, RecvError, select}; 8use crossbeam_channel::{Sender, Receiver, unbounded, RecvError, select};
8use relative_path::RelativePathBuf; 9use relative_path::RelativePathBuf;
9use walkdir::WalkDir; 10use walkdir::WalkDir;
10use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; 11use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher};
@@ -46,7 +47,40 @@ enum ChangeKind {
46 47
47const WATCHER_DELAY: Duration = Duration::from_millis(250); 48const WATCHER_DELAY: Duration = Duration::from_millis(250);
48 49
49pub(crate) type Worker = thread_worker::Worker<Task, VfsTask>; 50// Like thread::JoinHandle, but joins the thread on drop.
51//
52// This is useful because it guarantees the absence of run-away threads, even if
53// code panics. This is important, because we might seem panics in the test and
54// we might be used in an IDE context, where a failed component is just
55// restarted.
56//
57// Because all threads are joined, care must be taken to avoid deadlocks. That
58// typically means ensuring that channels are dropped before the threads.
59struct ScopedThread(Option<thread::JoinHandle<()>>);
60
61impl ScopedThread {
62 fn spawn(name: String, f: impl FnOnce() + Send + 'static) -> ScopedThread {
63 let handle = thread::Builder::new().name(name).spawn(f).unwrap();
64 ScopedThread(Some(handle))
65 }
66}
67
68impl Drop for ScopedThread {
69 fn drop(&mut self) {
70 let res = self.0.take().unwrap().join();
71 if !thread::panicking() {
72 res.unwrap();
73 }
74 }
75}
76
77pub(crate) struct Worker {
78 // XXX: it's important to drop `sender` before `_thread` to avoid deadlock.
79 pub(crate) sender: Sender<Task>,
80 _thread: ScopedThread,
81 pub(crate) receiver: Receiver<VfsTask>,
82}
83
50pub(crate) fn start(roots: Arc<Roots>) -> Worker { 84pub(crate) fn start(roots: Arc<Roots>) -> Worker {
51 // This is a pretty elaborate setup of threads & channels! It is 85 // This is a pretty elaborate setup of threads & channels! It is
52 // explained by the following concerns: 86 // explained by the following concerns:
@@ -55,69 +89,70 @@ pub(crate) fn start(roots: Arc<Roots>) -> Worker {
55 // * we want to read all files from a single thread, to guarantee that 89 // * we want to read all files from a single thread, to guarantee that
56 // we always get fresher versions and never go back in time. 90 // we always get fresher versions and never go back in time.
57 // * we want to tear down everything neatly during shutdown. 91 // * we want to tear down everything neatly during shutdown.
58 Worker::spawn( 92 let _thread;
59 "vfs", 93 // This are the channels we use to communicate with outside world.
60 128, 94 // If `input_receiver` is closed we need to tear ourselves down.
61 // This are the channels we use to communicate with outside world. 95 // `output_sender` should not be closed unless the parent died.
62 // If `input_receiver` is closed we need to tear ourselves down. 96 let (input_sender, input_receiver) = unbounded();
63 // `output_sender` should not be closed unless the parent died. 97 let (output_sender, output_receiver) = unbounded();
64 move |input_receiver, output_sender| { 98
65 // Make sure that the destruction order is 99 _thread = ScopedThread::spawn("vfs".to_string(), move || {
66 // 100 // Make sure that the destruction order is
67 // * notify_sender 101 //
68 // * _thread 102 // * notify_sender
69 // * watcher_sender 103 // * _thread
70 // 104 // * watcher_sender
71 // this is required to avoid deadlocks. 105 //
72 106 // this is required to avoid deadlocks.
73 // These are the corresponding crossbeam channels 107
74 let (watcher_sender, watcher_receiver) = unbounded(); 108 // These are the corresponding crossbeam channels
75 let _thread; 109 let (watcher_sender, watcher_receiver) = unbounded();
76 { 110 let _notify_thread;
77 // These are `std` channels notify will send events to 111 {
78 let (notify_sender, notify_receiver) = mpsc::channel(); 112 // These are `std` channels notify will send events to
79 113 let (notify_sender, notify_receiver) = mpsc::channel();
80 let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) 114
81 .map_err(|e| log::error!("failed to spawn notify {}", e)) 115 let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY)
82 .ok(); 116 .map_err(|e| log::error!("failed to spawn notify {}", e))
83 // Start a silly thread to transform between two channels 117 .ok();
84 _thread = thread_worker::ScopedThread::spawn("notify-convertor", move || { 118 // Start a silly thread to transform between two channels
85 notify_receiver 119 _notify_thread = ScopedThread::spawn("notify-convertor".to_string(), move || {
86 .into_iter() 120 notify_receiver
87 .for_each(|event| convert_notify_event(event, &watcher_sender)) 121 .into_iter()
88 }); 122 .for_each(|event| convert_notify_event(event, &watcher_sender))
89 123 });
90 // Process requests from the called or notifications from 124
91 // watcher until the caller says stop. 125 // Process requests from the called or notifications from
92 loop { 126 // watcher until the caller says stop.
93 select! { 127 loop {
94 // Received request from the caller. If this channel is 128 select! {
95 // closed, we should shutdown everything. 129 // Received request from the caller. If this channel is
96 recv(input_receiver) -> t => match t { 130 // closed, we should shutdown everything.
97 Err(RecvError) => { 131 recv(input_receiver) -> t => match t {
98 drop(input_receiver); 132 Err(RecvError) => {
99 break 133 drop(input_receiver);
100 }, 134 break
101 Ok(Task::AddRoot { root }) => {
102 watch_root(watcher.as_mut(), &output_sender, &*roots, root);
103 }
104 },
105 // Watcher send us changes. If **this** channel is
106 // closed, the watcher has died, which indicates a bug
107 // -- escalate!
108 recv(watcher_receiver) -> event => match event {
109 Err(RecvError) => panic!("watcher is dead"),
110 Ok((path, change)) => {
111 handle_change(watcher.as_mut(), &output_sender, &*roots, path, change);
112 }
113 }, 135 },
114 } 136 Ok(Task::AddRoot { root }) => {
137 watch_root(watcher.as_mut(), &output_sender, &*roots, root);
138 }
139 },
140 // Watcher send us changes. If **this** channel is
141 // closed, the watcher has died, which indicates a bug
142 // -- escalate!
143 recv(watcher_receiver) -> event => match event {
144 Err(RecvError) => panic!("watcher is dead"),
145 Ok((path, change)) => {
146 handle_change(watcher.as_mut(), &output_sender, &*roots, path, change);
147 }
148 },
115 } 149 }
116 } 150 }
117 // Drain pending events: we are not interested in them anyways! 151 }
118 watcher_receiver.into_iter().for_each(|_| ()); 152 // Drain pending events: we are not interested in them anyways!
119 }, 153 watcher_receiver.into_iter().for_each(|_| ());
120 ) 154 });
155 Worker { sender: input_sender, _thread, receiver: output_receiver }
121} 156}
122 157
123fn watch_root( 158fn watch_root(