aboutsummaryrefslogtreecommitdiff
path: root/crates/ra_vfs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/ra_vfs')
-rw-r--r--crates/ra_vfs/src/io.rs91
-rw-r--r--crates/ra_vfs/src/lib.rs8
-rw-r--r--crates/ra_vfs/tests/vfs.rs1
3 files changed, 36 insertions, 64 deletions
diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs
index 3952b200b..f64b4c532 100644
--- a/crates/ra_vfs/src/io.rs
+++ b/crates/ra_vfs/src/io.rs
@@ -1,13 +1,11 @@
1use std::{ 1use std::{
2 fs, 2 fs,
3 thread,
4 path::{Path, PathBuf}, 3 path::{Path, PathBuf},
5 sync::{mpsc, Arc}, 4 sync::{mpsc, Arc},
6 time::Duration, 5 time::Duration,
7}; 6};
8use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select}; 7use crossbeam_channel::{Sender, unbounded, RecvError, select};
9use relative_path::RelativePathBuf; 8use relative_path::RelativePathBuf;
10use thread_worker::WorkerHandle;
11use walkdir::WalkDir; 9use walkdir::WalkDir;
12use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; 10use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher};
13 11
@@ -48,37 +46,42 @@ enum ChangeKind {
48 46
49const WATCHER_DELAY: Duration = Duration::from_millis(250); 47const WATCHER_DELAY: Duration = Duration::from_millis(250);
50 48
51pub(crate) struct Worker { 49pub(crate) type Worker = thread_worker::Worker<Task, TaskResult>;
52 worker: thread_worker::Worker<Task, TaskResult>, 50pub(crate) fn start(roots: Arc<Roots>) -> Worker {
53 worker_handle: WorkerHandle, 51 // This is a pretty elaborate setup of threads & channels! It is
54} 52 // explained by the following concerns:
55 53 // * we need to burn a thread translating from notify's mpsc to
56impl Worker { 54 // crossbeam_channel.
57 pub(crate) fn start(roots: Arc<Roots>) -> Worker { 55 // * we want to read all files from a single thread, to guarantee that
58 // This is a pretty elaborate setup of threads & channels! It is 56 // we always get fresher versions and never go back in time.
59 // explained by the following concerns: 57 // * we want to tear down everything neatly during shutdown.
60 // * we need to burn a thread translating from notify's mpsc to 58 Worker::spawn(
61 // crossbeam_channel. 59 "vfs",
62 // * we want to read all files from a single thread, to guarantee that 60 128,
63 // we always get fresher versions and never go back in time. 61 // This are the channels we use to communicate with outside world.
64 // * we want to tear down everything neatly during shutdown. 62 // If `input_receiver` is closed we need to tear ourselves down.
65 let (worker, worker_handle) = thread_worker::spawn( 63 // `output_sender` should not be closed unless the parent died.
66 "vfs", 64 move |input_receiver, output_sender| {
67 128, 65 // Make sure that the destruction order is
68 // This are the channels we use to communicate with outside world. 66 //
69 // If `input_receiver` is closed we need to tear ourselves down. 67 // * notify_sender
70 // `output_sender` should not be closed unless the parent died. 68 // * _thread
71 move |input_receiver, output_sender| { 69 // * watcher_sender
70 //
71 // this is required to avoid deadlocks.
72
73 // These are the corresponding crossbeam channels
74 let (watcher_sender, watcher_receiver) = unbounded();
75 let _thread;
76 {
72 // These are `std` channels notify will send events to 77 // These are `std` channels notify will send events to
73 let (notify_sender, notify_receiver) = mpsc::channel(); 78 let (notify_sender, notify_receiver) = mpsc::channel();
74 // These are the corresponding crossbeam channels
75 let (watcher_sender, watcher_receiver) = unbounded();
76 79
77 let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) 80 let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY)
78 .map_err(|e| log::error!("failed to spawn notify {}", e)) 81 .map_err(|e| log::error!("failed to spawn notify {}", e))
79 .ok(); 82 .ok();
80 // Start a silly thread to transform between two channels 83 // Start a silly thread to transform between two channels
81 let thread = thread::spawn(move || { 84 _thread = thread_worker::ScopedThread::spawn("notify-convertor", move || {
82 notify_receiver 85 notify_receiver
83 .into_iter() 86 .into_iter()
84 .for_each(|event| convert_notify_event(event, &watcher_sender)) 87 .for_each(|event| convert_notify_event(event, &watcher_sender))
@@ -110,35 +113,11 @@ impl Worker {
110 }, 113 },
111 } 114 }
112 } 115 }
113 // Stopped the watcher 116 }
114 drop(watcher.take()); 117 // Drain pending events: we are not interested in them anyways!
115 // Drain pending events: we are not interested in them anyways! 118 watcher_receiver.into_iter().for_each(|_| ());
116 watcher_receiver.into_iter().for_each(|_| ()); 119 },
117 120 )
118 let res = thread.join();
119 match &res {
120 Ok(()) => log::info!("... Watcher terminated with ok"),
121 Err(_) => log::error!("... Watcher terminated with err"),
122 }
123 res.unwrap();
124 },
125 );
126
127 Worker { worker, worker_handle }
128 }
129
130 pub(crate) fn sender(&self) -> &Sender<Task> {
131 &self.worker.inp
132 }
133
134 pub(crate) fn receiver(&self) -> &Receiver<TaskResult> {
135 &self.worker.out
136 }
137
138 pub(crate) fn shutdown(self) -> thread::Result<()> {
139 let _ = self.worker.shutdown();
140 self.worker_handle.shutdown()
141 }
142} 121}
143 122
144fn watch_root( 123fn watch_root(
diff --git a/crates/ra_vfs/src/lib.rs b/crates/ra_vfs/src/lib.rs
index f07657db6..cfdc1275f 100644
--- a/crates/ra_vfs/src/lib.rs
+++ b/crates/ra_vfs/src/lib.rs
@@ -22,7 +22,6 @@ use std::{
22 fmt, fs, mem, 22 fmt, fs, mem,
23 path::{Path, PathBuf}, 23 path::{Path, PathBuf},
24 sync::Arc, 24 sync::Arc,
25 thread,
26}; 25};
27 26
28use crossbeam_channel::Receiver; 27use crossbeam_channel::Receiver;
@@ -160,7 +159,7 @@ impl fmt::Debug for Vfs {
160impl Vfs { 159impl Vfs {
161 pub fn new(roots: Vec<PathBuf>) -> (Vfs, Vec<VfsRoot>) { 160 pub fn new(roots: Vec<PathBuf>) -> (Vfs, Vec<VfsRoot>) {
162 let roots = Arc::new(Roots::new(roots)); 161 let roots = Arc::new(Roots::new(roots));
163 let worker = io::Worker::start(Arc::clone(&roots)); 162 let worker = io::start(Arc::clone(&roots));
164 let mut root2files = ArenaMap::default(); 163 let mut root2files = ArenaMap::default();
165 164
166 for (root, config) in roots.iter() { 165 for (root, config) in roots.iter() {
@@ -337,11 +336,6 @@ impl Vfs {
337 mem::replace(&mut self.pending_changes, Vec::new()) 336 mem::replace(&mut self.pending_changes, Vec::new())
338 } 337 }
339 338
340 /// Shutdown the VFS and terminate the background watching thread.
341 pub fn shutdown(self) -> thread::Result<()> {
342 self.worker.shutdown()
343 }
344
345 fn add_file( 339 fn add_file(
346 &mut self, 340 &mut self,
347 root: VfsRoot, 341 root: VfsRoot,
diff --git a/crates/ra_vfs/tests/vfs.rs b/crates/ra_vfs/tests/vfs.rs
index 0ed59bb19..b31e0f288 100644
--- a/crates/ra_vfs/tests/vfs.rs
+++ b/crates/ra_vfs/tests/vfs.rs
@@ -160,6 +160,5 @@ fn test_vfs_works() -> std::io::Result<()> {
160 Err(RecvTimeoutError::Timeout) 160 Err(RecvTimeoutError::Timeout)
161 ); 161 );
162 162
163 vfs.shutdown().unwrap();
164 Ok(()) 163 Ok(())
165} 164}