diff options
Diffstat (limited to 'crates/ra_vfs')
-rw-r--r-- | crates/ra_vfs/src/io.rs | 106 | ||||
-rw-r--r-- | crates/ra_vfs/src/lib.rs | 6 | ||||
-rw-r--r-- | crates/ra_vfs/tests/vfs.rs | 1 |
3 files changed, 50 insertions, 63 deletions
diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs index 3952b200b..1b70cd8df 100644 --- a/crates/ra_vfs/src/io.rs +++ b/crates/ra_vfs/src/io.rs | |||
@@ -1,13 +1,11 @@ | |||
1 | use std::{ | 1 | use std::{ |
2 | fs, | 2 | fs, |
3 | thread, | ||
4 | path::{Path, PathBuf}, | 3 | path::{Path, PathBuf}, |
5 | sync::{mpsc, Arc}, | 4 | sync::{mpsc, Arc}, |
6 | time::Duration, | 5 | time::Duration, |
7 | }; | 6 | }; |
8 | use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select}; | 7 | use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select}; |
9 | use relative_path::RelativePathBuf; | 8 | use relative_path::RelativePathBuf; |
10 | use thread_worker::WorkerHandle; | ||
11 | use walkdir::WalkDir; | 9 | use walkdir::WalkDir; |
12 | use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; | 10 | use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; |
13 | 11 | ||
@@ -49,8 +47,7 @@ enum ChangeKind { | |||
49 | const WATCHER_DELAY: Duration = Duration::from_millis(250); | 47 | const WATCHER_DELAY: Duration = Duration::from_millis(250); |
50 | 48 | ||
51 | pub(crate) struct Worker { | 49 | pub(crate) struct Worker { |
52 | worker: thread_worker::Worker<Task, TaskResult>, | 50 | thread_worker: thread_worker::Worker<Task, TaskResult>, |
53 | worker_handle: WorkerHandle, | ||
54 | } | 51 | } |
55 | 52 | ||
56 | impl Worker { | 53 | impl Worker { |
@@ -62,82 +59,79 @@ impl Worker { | |||
62 | // * we want to read all files from a single thread, to guarantee that | 59 | // * we want to read all files from a single thread, to guarantee that |
63 | // we always get fresher versions and never go back in time. | 60 | // we always get fresher versions and never go back in time. |
64 | // * we want to tear down everything neatly during shutdown. | 61 | // * we want to tear down everything neatly during shutdown. |
65 | let (worker, worker_handle) = thread_worker::spawn( | 62 | let thread_worker = thread_worker::Worker::spawn( |
66 | "vfs", | 63 | "vfs", |
67 | 128, | 64 | 128, |
68 | // This are the channels we use to communicate with outside world. | 65 | // This are the channels we use to communicate with outside world. |
69 | // If `input_receiver` is closed we need to tear ourselves down. | 66 | // If `input_receiver` is closed we need to tear ourselves down. |
70 | // `output_sender` should not be closed unless the parent died. | 67 | // `output_sender` should not be closed unless the parent died. |
71 | move |input_receiver, output_sender| { | 68 | move |input_receiver, output_sender| { |
72 | // These are `std` channels notify will send events to | 69 | // Make sure that the destruction order is |
73 | let (notify_sender, notify_receiver) = mpsc::channel(); | 70 | // |
71 | // * notify_sender | ||
72 | // * _thread | ||
73 | // * watcher_sender | ||
74 | // | ||
75 | // this is required to avoid deadlocks. | ||
76 | |||
74 | // These are the corresponding crossbeam channels | 77 | // These are the corresponding crossbeam channels |
75 | let (watcher_sender, watcher_receiver) = unbounded(); | 78 | let (watcher_sender, watcher_receiver) = unbounded(); |
79 | let _thread; | ||
80 | { | ||
81 | // These are `std` channels notify will send events to | ||
82 | let (notify_sender, notify_receiver) = mpsc::channel(); | ||
76 | 83 | ||
77 | let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) | 84 | let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) |
78 | .map_err(|e| log::error!("failed to spawn notify {}", e)) | 85 | .map_err(|e| log::error!("failed to spawn notify {}", e)) |
79 | .ok(); | 86 | .ok(); |
80 | // Start a silly thread to transform between two channels | 87 | // Start a silly thread to transform between two channels |
81 | let thread = thread::spawn(move || { | 88 | _thread = thread_worker::ScopedThread::spawn("notify-convertor", move || { |
82 | notify_receiver | 89 | notify_receiver |
83 | .into_iter() | 90 | .into_iter() |
84 | .for_each(|event| convert_notify_event(event, &watcher_sender)) | 91 | .for_each(|event| convert_notify_event(event, &watcher_sender)) |
85 | }); | 92 | }); |
86 | 93 | ||
87 | // Process requests from the called or notifications from | 94 | // Process requests from the called or notifications from |
88 | // watcher until the caller says stop. | 95 | // watcher until the caller says stop. |
89 | loop { | 96 | loop { |
90 | select! { | 97 | select! { |
91 | // Received request from the caller. If this channel is | 98 | // Received request from the caller. If this channel is |
92 | // closed, we should shutdown everything. | 99 | // closed, we should shutdown everything. |
93 | recv(input_receiver) -> t => match t { | 100 | recv(input_receiver) -> t => match t { |
94 | Err(RecvError) => { | 101 | Err(RecvError) => { |
95 | drop(input_receiver); | 102 | drop(input_receiver); |
96 | break | 103 | break |
104 | }, | ||
105 | Ok(Task::AddRoot { root, config }) => { | ||
106 | watch_root(watcher.as_mut(), &output_sender, root, Arc::clone(&config)); | ||
107 | } | ||
108 | }, | ||
109 | // Watcher send us changes. If **this** channel is | ||
110 | // closed, the watcher has died, which indicates a bug | ||
111 | // -- escalate! | ||
112 | recv(watcher_receiver) -> event => match event { | ||
113 | Err(RecvError) => panic!("watcher is dead"), | ||
114 | Ok((path, change)) => { | ||
115 | handle_change(watcher.as_mut(), &output_sender, &*roots, path, change); | ||
116 | } | ||
97 | }, | 117 | }, |
98 | Ok(Task::AddRoot { root, config }) => { | 118 | } |
99 | watch_root(watcher.as_mut(), &output_sender, root, Arc::clone(&config)); | ||
100 | } | ||
101 | }, | ||
102 | // Watcher send us changes. If **this** channel is | ||
103 | // closed, the watcher has died, which indicates a bug | ||
104 | // -- escalate! | ||
105 | recv(watcher_receiver) -> event => match event { | ||
106 | Err(RecvError) => panic!("watcher is dead"), | ||
107 | Ok((path, change)) => { | ||
108 | handle_change(watcher.as_mut(), &output_sender, &*roots, path, change); | ||
109 | } | ||
110 | }, | ||
111 | } | 119 | } |
112 | } | 120 | } |
113 | // Stopped the watcher | ||
114 | drop(watcher.take()); | ||
115 | // Drain pending events: we are not interested in them anyways! | 121 | // Drain pending events: we are not interested in them anyways! |
116 | watcher_receiver.into_iter().for_each(|_| ()); | 122 | watcher_receiver.into_iter().for_each(|_| ()); |
117 | |||
118 | let res = thread.join(); | ||
119 | match &res { | ||
120 | Ok(()) => log::info!("... Watcher terminated with ok"), | ||
121 | Err(_) => log::error!("... Watcher terminated with err"), | ||
122 | } | ||
123 | res.unwrap(); | ||
124 | }, | 123 | }, |
125 | ); | 124 | ); |
126 | 125 | ||
127 | Worker { worker, worker_handle } | 126 | Worker { thread_worker } |
128 | } | 127 | } |
129 | 128 | ||
130 | pub(crate) fn sender(&self) -> &Sender<Task> { | 129 | pub(crate) fn sender(&self) -> &Sender<Task> { |
131 | &self.worker.inp | 130 | &self.thread_worker.sender() |
132 | } | 131 | } |
133 | 132 | ||
134 | pub(crate) fn receiver(&self) -> &Receiver<TaskResult> { | 133 | pub(crate) fn receiver(&self) -> &Receiver<TaskResult> { |
135 | &self.worker.out | 134 | &self.thread_worker.receiver() |
136 | } | ||
137 | |||
138 | pub(crate) fn shutdown(self) -> thread::Result<()> { | ||
139 | let _ = self.worker.shutdown(); | ||
140 | self.worker_handle.shutdown() | ||
141 | } | 135 | } |
142 | } | 136 | } |
143 | 137 | ||
diff --git a/crates/ra_vfs/src/lib.rs b/crates/ra_vfs/src/lib.rs index f07657db6..1fb255365 100644 --- a/crates/ra_vfs/src/lib.rs +++ b/crates/ra_vfs/src/lib.rs | |||
@@ -22,7 +22,6 @@ use std::{ | |||
22 | fmt, fs, mem, | 22 | fmt, fs, mem, |
23 | path::{Path, PathBuf}, | 23 | path::{Path, PathBuf}, |
24 | sync::Arc, | 24 | sync::Arc, |
25 | thread, | ||
26 | }; | 25 | }; |
27 | 26 | ||
28 | use crossbeam_channel::Receiver; | 27 | use crossbeam_channel::Receiver; |
@@ -337,11 +336,6 @@ impl Vfs { | |||
337 | mem::replace(&mut self.pending_changes, Vec::new()) | 336 | mem::replace(&mut self.pending_changes, Vec::new()) |
338 | } | 337 | } |
339 | 338 | ||
340 | /// Shutdown the VFS and terminate the background watching thread. | ||
341 | pub fn shutdown(self) -> thread::Result<()> { | ||
342 | self.worker.shutdown() | ||
343 | } | ||
344 | |||
345 | fn add_file( | 339 | fn add_file( |
346 | &mut self, | 340 | &mut self, |
347 | root: VfsRoot, | 341 | root: VfsRoot, |
diff --git a/crates/ra_vfs/tests/vfs.rs b/crates/ra_vfs/tests/vfs.rs index 649ef96c9..c76e6ea26 100644 --- a/crates/ra_vfs/tests/vfs.rs +++ b/crates/ra_vfs/tests/vfs.rs | |||
@@ -158,6 +158,5 @@ fn test_vfs_works() -> std::io::Result<()> { | |||
158 | Err(RecvTimeoutError::Timeout) | 158 | Err(RecvTimeoutError::Timeout) |
159 | ); | 159 | ); |
160 | 160 | ||
161 | vfs.shutdown().unwrap(); | ||
162 | Ok(()) | 161 | Ok(()) |
163 | } | 162 | } |