diff options
-rw-r--r-- | crates/ra_vfs/src/io.rs | 155 | ||||
-rw-r--r-- | crates/ra_vfs/src/lib.rs | 2 |
2 files changed, 71 insertions, 86 deletions
diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs index 1b70cd8df..f64b4c532 100644 --- a/crates/ra_vfs/src/io.rs +++ b/crates/ra_vfs/src/io.rs | |||
@@ -4,7 +4,7 @@ use std::{ | |||
4 | sync::{mpsc, Arc}, | 4 | sync::{mpsc, Arc}, |
5 | time::Duration, | 5 | time::Duration, |
6 | }; | 6 | }; |
7 | use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select}; | 7 | use crossbeam_channel::{Sender, unbounded, RecvError, select}; |
8 | use relative_path::RelativePathBuf; | 8 | use relative_path::RelativePathBuf; |
9 | use walkdir::WalkDir; | 9 | use walkdir::WalkDir; |
10 | use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; | 10 | use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; |
@@ -46,93 +46,78 @@ enum ChangeKind { | |||
46 | 46 | ||
47 | const WATCHER_DELAY: Duration = Duration::from_millis(250); | 47 | const WATCHER_DELAY: Duration = Duration::from_millis(250); |
48 | 48 | ||
49 | pub(crate) struct Worker { | 49 | pub(crate) type Worker = thread_worker::Worker<Task, TaskResult>; |
50 | thread_worker: thread_worker::Worker<Task, TaskResult>, | 50 | pub(crate) fn start(roots: Arc<Roots>) -> Worker { |
51 | } | 51 | // This is a pretty elaborate setup of threads & channels! It is |
52 | 52 | // explained by the following concerns: | |
53 | impl Worker { | 53 | // * we need to burn a thread translating from notify's mpsc to |
54 | pub(crate) fn start(roots: Arc<Roots>) -> Worker { | 54 | // crossbeam_channel. |
55 | // This is a pretty elaborate setup of threads & channels! It is | 55 | // * we want to read all files from a single thread, to guarantee that |
56 | // explained by the following concerns: | 56 | // we always get fresher versions and never go back in time. |
57 | // * we need to burn a thread translating from notify's mpsc to | 57 | // * we want to tear down everything neatly during shutdown. |
58 | // crossbeam_channel. | 58 | Worker::spawn( |
59 | // * we want to read all files from a single thread, to guarantee that | 59 | "vfs", |
60 | // we always get fresher versions and never go back in time. | 60 | 128, |
61 | // * we want to tear down everything neatly during shutdown. | 61 | // This are the channels we use to communicate with outside world. |
62 | let thread_worker = thread_worker::Worker::spawn( | 62 | // If `input_receiver` is closed we need to tear ourselves down. |
63 | "vfs", | 63 | // `output_sender` should not be closed unless the parent died. |
64 | 128, | 64 | move |input_receiver, output_sender| { |
65 | // This are the channels we use to communicate with outside world. | 65 | // Make sure that the destruction order is |
66 | // If `input_receiver` is closed we need to tear ourselves down. | 66 | // |
67 | // `output_sender` should not be closed unless the parent died. | 67 | // * notify_sender |
68 | move |input_receiver, output_sender| { | 68 | // * _thread |
69 | // Make sure that the destruction order is | 69 | // * watcher_sender |
70 | // | 70 | // |
71 | // * notify_sender | 71 | // this is required to avoid deadlocks. |
72 | // * _thread | 72 | |
73 | // * watcher_sender | 73 | // These are the corresponding crossbeam channels |
74 | // | 74 | let (watcher_sender, watcher_receiver) = unbounded(); |
75 | // this is required to avoid deadlocks. | 75 | let _thread; |
76 | 76 | { | |
77 | // These are the corresponding crossbeam channels | 77 | // These are `std` channels notify will send events to |
78 | let (watcher_sender, watcher_receiver) = unbounded(); | 78 | let (notify_sender, notify_receiver) = mpsc::channel(); |
79 | let _thread; | 79 | |
80 | { | 80 | let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) |
81 | // These are `std` channels notify will send events to | 81 | .map_err(|e| log::error!("failed to spawn notify {}", e)) |
82 | let (notify_sender, notify_receiver) = mpsc::channel(); | 82 | .ok(); |
83 | 83 | // Start a silly thread to transform between two channels | |
84 | let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) | 84 | _thread = thread_worker::ScopedThread::spawn("notify-convertor", move || { |
85 | .map_err(|e| log::error!("failed to spawn notify {}", e)) | 85 | notify_receiver |
86 | .ok(); | 86 | .into_iter() |
87 | // Start a silly thread to transform between two channels | 87 | .for_each(|event| convert_notify_event(event, &watcher_sender)) |
88 | _thread = thread_worker::ScopedThread::spawn("notify-convertor", move || { | 88 | }); |
89 | notify_receiver | 89 | |
90 | .into_iter() | 90 | // Process requests from the called or notifications from |
91 | .for_each(|event| convert_notify_event(event, &watcher_sender)) | 91 | // watcher until the caller says stop. |
92 | }); | 92 | loop { |
93 | 93 | select! { | |
94 | // Process requests from the called or notifications from | 94 | // Received request from the caller. If this channel is |
95 | // watcher until the caller says stop. | 95 | // closed, we should shutdown everything. |
96 | loop { | 96 | recv(input_receiver) -> t => match t { |
97 | select! { | 97 | Err(RecvError) => { |
98 | // Received request from the caller. If this channel is | 98 | drop(input_receiver); |
99 | // closed, we should shutdown everything. | 99 | break |
100 | recv(input_receiver) -> t => match t { | ||
101 | Err(RecvError) => { | ||
102 | drop(input_receiver); | ||
103 | break | ||
104 | }, | ||
105 | Ok(Task::AddRoot { root, config }) => { | ||
106 | watch_root(watcher.as_mut(), &output_sender, root, Arc::clone(&config)); | ||
107 | } | ||
108 | }, | ||
109 | // Watcher send us changes. If **this** channel is | ||
110 | // closed, the watcher has died, which indicates a bug | ||
111 | // -- escalate! | ||
112 | recv(watcher_receiver) -> event => match event { | ||
113 | Err(RecvError) => panic!("watcher is dead"), | ||
114 | Ok((path, change)) => { | ||
115 | handle_change(watcher.as_mut(), &output_sender, &*roots, path, change); | ||
116 | } | ||
117 | }, | 100 | }, |
118 | } | 101 | Ok(Task::AddRoot { root, config }) => { |
102 | watch_root(watcher.as_mut(), &output_sender, root, Arc::clone(&config)); | ||
103 | } | ||
104 | }, | ||
105 | // Watcher send us changes. If **this** channel is | ||
106 | // closed, the watcher has died, which indicates a bug | ||
107 | // -- escalate! | ||
108 | recv(watcher_receiver) -> event => match event { | ||
109 | Err(RecvError) => panic!("watcher is dead"), | ||
110 | Ok((path, change)) => { | ||
111 | handle_change(watcher.as_mut(), &output_sender, &*roots, path, change); | ||
112 | } | ||
113 | }, | ||
119 | } | 114 | } |
120 | } | 115 | } |
121 | // Drain pending events: we are not interested in them anyways! | 116 | } |
122 | watcher_receiver.into_iter().for_each(|_| ()); | 117 | // Drain pending events: we are not interested in them anyways! |
123 | }, | 118 | watcher_receiver.into_iter().for_each(|_| ()); |
124 | ); | 119 | }, |
125 | 120 | ) | |
126 | Worker { thread_worker } | ||
127 | } | ||
128 | |||
129 | pub(crate) fn sender(&self) -> &Sender<Task> { | ||
130 | &self.thread_worker.sender() | ||
131 | } | ||
132 | |||
133 | pub(crate) fn receiver(&self) -> &Receiver<TaskResult> { | ||
134 | &self.thread_worker.receiver() | ||
135 | } | ||
136 | } | 121 | } |
137 | 122 | ||
138 | fn watch_root( | 123 | fn watch_root( |
diff --git a/crates/ra_vfs/src/lib.rs b/crates/ra_vfs/src/lib.rs index 1fb255365..cfdc1275f 100644 --- a/crates/ra_vfs/src/lib.rs +++ b/crates/ra_vfs/src/lib.rs | |||
@@ -159,7 +159,7 @@ impl fmt::Debug for Vfs { | |||
159 | impl Vfs { | 159 | impl Vfs { |
160 | pub fn new(roots: Vec<PathBuf>) -> (Vfs, Vec<VfsRoot>) { | 160 | pub fn new(roots: Vec<PathBuf>) -> (Vfs, Vec<VfsRoot>) { |
161 | let roots = Arc::new(Roots::new(roots)); | 161 | let roots = Arc::new(Roots::new(roots)); |
162 | let worker = io::Worker::start(Arc::clone(&roots)); | 162 | let worker = io::start(Arc::clone(&roots)); |
163 | let mut root2files = ArenaMap::default(); | 163 | let mut root2files = ArenaMap::default(); |
164 | 164 | ||
165 | for (root, config) in roots.iter() { | 165 | for (root, config) in roots.iter() { |