aboutsummaryrefslogtreecommitdiff
path: root/crates/ra_vfs/src
diff options
context:
space:
mode:
authorAleksey Kladov <[email protected]>2019-02-14 17:43:45 +0000
committerAleksey Kladov <[email protected]>2019-02-14 18:11:07 +0000
commitbf352cd2511775a331d77dee261b64bd8359dacb (patch)
treeba7d988ebef437d5a9d7beac048b5ac0dbd2fe9c /crates/ra_vfs/src
parent10bf61b83b2600ed3cb7e7825f1cd0ee83e9b7e7 (diff)
automatically wait for worker threads
closes #817
Diffstat (limited to 'crates/ra_vfs/src')
-rw-r--r--crates/ra_vfs/src/io.rs106
-rw-r--r--crates/ra_vfs/src/lib.rs6
2 files changed, 50 insertions, 62 deletions
diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs
index 3952b200b..1b70cd8df 100644
--- a/crates/ra_vfs/src/io.rs
+++ b/crates/ra_vfs/src/io.rs
@@ -1,13 +1,11 @@
1use std::{ 1use std::{
2 fs, 2 fs,
3 thread,
4 path::{Path, PathBuf}, 3 path::{Path, PathBuf},
5 sync::{mpsc, Arc}, 4 sync::{mpsc, Arc},
6 time::Duration, 5 time::Duration,
7}; 6};
8use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select}; 7use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select};
9use relative_path::RelativePathBuf; 8use relative_path::RelativePathBuf;
10use thread_worker::WorkerHandle;
11use walkdir::WalkDir; 9use walkdir::WalkDir;
12use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; 10use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher};
13 11
@@ -49,8 +47,7 @@ enum ChangeKind {
49const WATCHER_DELAY: Duration = Duration::from_millis(250); 47const WATCHER_DELAY: Duration = Duration::from_millis(250);
50 48
51pub(crate) struct Worker { 49pub(crate) struct Worker {
52 worker: thread_worker::Worker<Task, TaskResult>, 50 thread_worker: thread_worker::Worker<Task, TaskResult>,
53 worker_handle: WorkerHandle,
54} 51}
55 52
56impl Worker { 53impl Worker {
@@ -62,82 +59,79 @@ impl Worker {
62 // * we want to read all files from a single thread, to guarantee that 59 // * we want to read all files from a single thread, to guarantee that
63 // we always get fresher versions and never go back in time. 60 // we always get fresher versions and never go back in time.
64 // * we want to tear down everything neatly during shutdown. 61 // * we want to tear down everything neatly during shutdown.
65 let (worker, worker_handle) = thread_worker::spawn( 62 let thread_worker = thread_worker::Worker::spawn(
66 "vfs", 63 "vfs",
67 128, 64 128,
68 // This are the channels we use to communicate with outside world. 65 // This are the channels we use to communicate with outside world.
69 // If `input_receiver` is closed we need to tear ourselves down. 66 // If `input_receiver` is closed we need to tear ourselves down.
70 // `output_sender` should not be closed unless the parent died. 67 // `output_sender` should not be closed unless the parent died.
71 move |input_receiver, output_sender| { 68 move |input_receiver, output_sender| {
72 // These are `std` channels notify will send events to 69 // Make sure that the destruction order is
73 let (notify_sender, notify_receiver) = mpsc::channel(); 70 //
71 // * notify_sender
72 // * _thread
73 // * watcher_sender
74 //
75 // this is required to avoid deadlocks.
76
74 // These are the corresponding crossbeam channels 77 // These are the corresponding crossbeam channels
75 let (watcher_sender, watcher_receiver) = unbounded(); 78 let (watcher_sender, watcher_receiver) = unbounded();
79 let _thread;
80 {
81 // These are `std` channels notify will send events to
82 let (notify_sender, notify_receiver) = mpsc::channel();
76 83
77 let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) 84 let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY)
78 .map_err(|e| log::error!("failed to spawn notify {}", e)) 85 .map_err(|e| log::error!("failed to spawn notify {}", e))
79 .ok(); 86 .ok();
80 // Start a silly thread to transform between two channels 87 // Start a silly thread to transform between two channels
81 let thread = thread::spawn(move || { 88 _thread = thread_worker::ScopedThread::spawn("notify-convertor", move || {
82 notify_receiver 89 notify_receiver
83 .into_iter() 90 .into_iter()
84 .for_each(|event| convert_notify_event(event, &watcher_sender)) 91 .for_each(|event| convert_notify_event(event, &watcher_sender))
85 }); 92 });
86 93
87 // Process requests from the called or notifications from 94 // Process requests from the called or notifications from
88 // watcher until the caller says stop. 95 // watcher until the caller says stop.
89 loop { 96 loop {
90 select! { 97 select! {
91 // Received request from the caller. If this channel is 98 // Received request from the caller. If this channel is
92 // closed, we should shutdown everything. 99 // closed, we should shutdown everything.
93 recv(input_receiver) -> t => match t { 100 recv(input_receiver) -> t => match t {
94 Err(RecvError) => { 101 Err(RecvError) => {
95 drop(input_receiver); 102 drop(input_receiver);
96 break 103 break
104 },
105 Ok(Task::AddRoot { root, config }) => {
106 watch_root(watcher.as_mut(), &output_sender, root, Arc::clone(&config));
107 }
108 },
109 // Watcher send us changes. If **this** channel is
110 // closed, the watcher has died, which indicates a bug
111 // -- escalate!
112 recv(watcher_receiver) -> event => match event {
113 Err(RecvError) => panic!("watcher is dead"),
114 Ok((path, change)) => {
115 handle_change(watcher.as_mut(), &output_sender, &*roots, path, change);
116 }
97 }, 117 },
98 Ok(Task::AddRoot { root, config }) => { 118 }
99 watch_root(watcher.as_mut(), &output_sender, root, Arc::clone(&config));
100 }
101 },
102 // Watcher send us changes. If **this** channel is
103 // closed, the watcher has died, which indicates a bug
104 // -- escalate!
105 recv(watcher_receiver) -> event => match event {
106 Err(RecvError) => panic!("watcher is dead"),
107 Ok((path, change)) => {
108 handle_change(watcher.as_mut(), &output_sender, &*roots, path, change);
109 }
110 },
111 } 119 }
112 } 120 }
113 // Stopped the watcher
114 drop(watcher.take());
115 // Drain pending events: we are not interested in them anyways! 121 // Drain pending events: we are not interested in them anyways!
116 watcher_receiver.into_iter().for_each(|_| ()); 122 watcher_receiver.into_iter().for_each(|_| ());
117
118 let res = thread.join();
119 match &res {
120 Ok(()) => log::info!("... Watcher terminated with ok"),
121 Err(_) => log::error!("... Watcher terminated with err"),
122 }
123 res.unwrap();
124 }, 123 },
125 ); 124 );
126 125
127 Worker { worker, worker_handle } 126 Worker { thread_worker }
128 } 127 }
129 128
130 pub(crate) fn sender(&self) -> &Sender<Task> { 129 pub(crate) fn sender(&self) -> &Sender<Task> {
131 &self.worker.inp 130 &self.thread_worker.sender()
132 } 131 }
133 132
134 pub(crate) fn receiver(&self) -> &Receiver<TaskResult> { 133 pub(crate) fn receiver(&self) -> &Receiver<TaskResult> {
135 &self.worker.out 134 &self.thread_worker.receiver()
136 }
137
138 pub(crate) fn shutdown(self) -> thread::Result<()> {
139 let _ = self.worker.shutdown();
140 self.worker_handle.shutdown()
141 } 135 }
142} 136}
143 137
diff --git a/crates/ra_vfs/src/lib.rs b/crates/ra_vfs/src/lib.rs
index f07657db6..1fb255365 100644
--- a/crates/ra_vfs/src/lib.rs
+++ b/crates/ra_vfs/src/lib.rs
@@ -22,7 +22,6 @@ use std::{
22 fmt, fs, mem, 22 fmt, fs, mem,
23 path::{Path, PathBuf}, 23 path::{Path, PathBuf},
24 sync::Arc, 24 sync::Arc,
25 thread,
26}; 25};
27 26
28use crossbeam_channel::Receiver; 27use crossbeam_channel::Receiver;
@@ -337,11 +336,6 @@ impl Vfs {
337 mem::replace(&mut self.pending_changes, Vec::new()) 336 mem::replace(&mut self.pending_changes, Vec::new())
338 } 337 }
339 338
340 /// Shutdown the VFS and terminate the background watching thread.
341 pub fn shutdown(self) -> thread::Result<()> {
342 self.worker.shutdown()
343 }
344
345 fn add_file( 339 fn add_file(
346 &mut self, 340 &mut self,
347 root: VfsRoot, 341 root: VfsRoot,