diff options
Diffstat (limited to 'crates')
-rw-r--r-- | crates/flycheck/src/lib.rs | 124 | ||||
-rw-r--r-- | crates/rust-analyzer/src/global_state.rs | 11 | ||||
-rw-r--r-- | crates/rust-analyzer/src/lib.rs | 1 | ||||
-rw-r--r-- | crates/rust-analyzer/src/main_loop.rs | 179 | ||||
-rw-r--r-- | crates/rust-analyzer/src/thread_pool.rs | 35 | ||||
-rw-r--r-- | crates/vfs-notify/src/lib.rs | 4 |
6 files changed, 179 insertions, 175 deletions
diff --git a/crates/flycheck/src/lib.rs b/crates/flycheck/src/lib.rs index 3e73cf6ff..9e8205ae7 100644 --- a/crates/flycheck/src/lib.rs +++ b/crates/flycheck/src/lib.rs | |||
@@ -10,7 +10,7 @@ use std::{ | |||
10 | time::Instant, | 10 | time::Instant, |
11 | }; | 11 | }; |
12 | 12 | ||
13 | use crossbeam_channel::{never, select, unbounded, Receiver, RecvError, Sender}; | 13 | use crossbeam_channel::{never, select, unbounded, Receiver, Sender}; |
14 | 14 | ||
15 | pub use cargo_metadata::diagnostic::{ | 15 | pub use cargo_metadata::diagnostic::{ |
16 | Applicability, Diagnostic, DiagnosticLevel, DiagnosticSpan, DiagnosticSpanMacroExpansion, | 16 | Applicability, Diagnostic, DiagnosticLevel, DiagnosticSpan, DiagnosticSpanMacroExpansion, |
@@ -61,7 +61,7 @@ impl FlycheckHandle { | |||
61 | ) -> FlycheckHandle { | 61 | ) -> FlycheckHandle { |
62 | let (cmd_send, cmd_recv) = unbounded::<Restart>(); | 62 | let (cmd_send, cmd_recv) = unbounded::<Restart>(); |
63 | let handle = jod_thread::spawn(move || { | 63 | let handle = jod_thread::spawn(move || { |
64 | FlycheckActor::new(sender, config, workspace_root).run(&cmd_recv); | 64 | FlycheckActor::new(sender, config, workspace_root).run(cmd_recv); |
65 | }); | 65 | }); |
66 | FlycheckHandle { cmd_send, handle } | 66 | FlycheckHandle { cmd_send, handle } |
67 | } | 67 | } |
@@ -98,14 +98,18 @@ struct FlycheckActor { | |||
98 | config: FlycheckConfig, | 98 | config: FlycheckConfig, |
99 | workspace_root: PathBuf, | 99 | workspace_root: PathBuf, |
100 | last_update_req: Option<Instant>, | 100 | last_update_req: Option<Instant>, |
101 | // XXX: drop order is significant | ||
102 | message_recv: Receiver<CheckEvent>, | ||
103 | /// WatchThread exists to wrap around the communication needed to be able to | 101 | /// WatchThread exists to wrap around the communication needed to be able to |
104 | /// run `cargo check` without blocking. Currently the Rust standard library | 102 | /// run `cargo check` without blocking. Currently the Rust standard library |
105 | /// doesn't provide a way to read sub-process output without blocking, so we | 103 | /// doesn't provide a way to read sub-process output without blocking, so we |
106 | /// have to wrap sub-processes output handling in a thread and pass messages | 104 | /// have to wrap sub-processes output handling in a thread and pass messages |
107 | /// back over a channel. | 105 | /// back over a channel. |
108 | check_process: Option<jod_thread::JoinHandle>, | 106 | // XXX: drop order is significant |
107 | check_process: Option<(Receiver<CheckEvent>, jod_thread::JoinHandle)>, | ||
108 | } | ||
109 | |||
110 | enum Event { | ||
111 | Restart(Restart), | ||
112 | CheckEvent(Option<CheckEvent>), | ||
109 | } | 113 | } |
110 | 114 | ||
111 | impl FlycheckActor { | 115 | impl FlycheckActor { |
@@ -114,40 +118,48 @@ impl FlycheckActor { | |||
114 | config: FlycheckConfig, | 118 | config: FlycheckConfig, |
115 | workspace_root: PathBuf, | 119 | workspace_root: PathBuf, |
116 | ) -> FlycheckActor { | 120 | ) -> FlycheckActor { |
117 | FlycheckActor { | 121 | FlycheckActor { sender, config, workspace_root, last_update_req: None, check_process: None } |
118 | sender, | ||
119 | config, | ||
120 | workspace_root, | ||
121 | last_update_req: None, | ||
122 | message_recv: never(), | ||
123 | check_process: None, | ||
124 | } | ||
125 | } | 122 | } |
126 | 123 | ||
127 | fn run(&mut self, cmd_recv: &Receiver<Restart>) { | 124 | fn run(&mut self, inbox: Receiver<Restart>) { |
128 | // If we rerun the thread, we need to discard the previous check results first | 125 | // If we rerun the thread, we need to discard the previous check results first |
129 | self.clean_previous_results(); | 126 | self.send(Message::ClearDiagnostics); |
130 | 127 | self.send(Message::Progress(Progress::End)); | |
131 | loop { | 128 | |
132 | select! { | 129 | while let Some(event) = self.next_event(&inbox) { |
133 | recv(&cmd_recv) -> cmd => match cmd { | 130 | match event { |
134 | Ok(Restart) => self.last_update_req = Some(Instant::now()), | 131 | Event::Restart(Restart) => self.last_update_req = Some(Instant::now()), |
135 | Err(RecvError) => { | 132 | Event::CheckEvent(None) => { |
136 | // Command channel has closed, so shut down | 133 | // Watcher finished, replace it with a never channel to |
137 | break; | 134 | // avoid busy-waiting. |
138 | }, | 135 | self.check_process = None; |
139 | }, | ||
140 | recv(self.message_recv) -> msg => match msg { | ||
141 | Ok(msg) => self.handle_message(msg), | ||
142 | Err(RecvError) => { | ||
143 | // Watcher finished, replace it with a never channel to | ||
144 | // avoid busy-waiting. | ||
145 | self.message_recv = never(); | ||
146 | self.check_process = None; | ||
147 | }, | ||
148 | } | 136 | } |
149 | }; | 137 | Event::CheckEvent(Some(event)) => match event { |
138 | CheckEvent::Begin => { | ||
139 | self.send(Message::Progress(Progress::Being)); | ||
140 | } | ||
141 | |||
142 | CheckEvent::End => { | ||
143 | self.send(Message::Progress(Progress::End)); | ||
144 | } | ||
145 | |||
146 | CheckEvent::Msg(cargo_metadata::Message::CompilerArtifact(msg)) => { | ||
147 | self.send(Message::Progress(Progress::DidCheckCrate(msg.target.name))); | ||
148 | } | ||
149 | |||
150 | CheckEvent::Msg(cargo_metadata::Message::CompilerMessage(msg)) => { | ||
151 | self.send(Message::AddDiagnostic { | ||
152 | workspace_root: self.workspace_root.clone(), | ||
153 | diagnostic: msg.message, | ||
154 | }); | ||
155 | } | ||
150 | 156 | ||
157 | CheckEvent::Msg(cargo_metadata::Message::BuildScriptExecuted(_)) | ||
158 | | CheckEvent::Msg(cargo_metadata::Message::BuildFinished(_)) | ||
159 | | CheckEvent::Msg(cargo_metadata::Message::TextLine(_)) | ||
160 | | CheckEvent::Msg(cargo_metadata::Message::Unknown) => {} | ||
161 | }, | ||
162 | } | ||
151 | if self.should_recheck() { | 163 | if self.should_recheck() { |
152 | self.last_update_req = None; | 164 | self.last_update_req = None; |
153 | self.send(Message::ClearDiagnostics); | 165 | self.send(Message::ClearDiagnostics); |
@@ -156,9 +168,12 @@ impl FlycheckActor { | |||
156 | } | 168 | } |
157 | } | 169 | } |
158 | 170 | ||
159 | fn clean_previous_results(&self) { | 171 | fn next_event(&self, inbox: &Receiver<Restart>) -> Option<Event> { |
160 | self.send(Message::ClearDiagnostics); | 172 | let check_chan = self.check_process.as_ref().map(|(chan, _thread)| chan); |
161 | self.send(Message::Progress(Progress::End)); | 173 | select! { |
174 | recv(inbox) -> msg => msg.ok().map(Event::Restart), | ||
175 | recv(check_chan.unwrap_or(&never())) -> msg => Some(Event::CheckEvent(msg.ok())), | ||
176 | } | ||
162 | } | 177 | } |
163 | 178 | ||
164 | fn should_recheck(&mut self) -> bool { | 179 | fn should_recheck(&mut self) -> bool { |
@@ -171,37 +186,8 @@ impl FlycheckActor { | |||
171 | false | 186 | false |
172 | } | 187 | } |
173 | 188 | ||
174 | fn handle_message(&self, msg: CheckEvent) { | ||
175 | match msg { | ||
176 | CheckEvent::Begin => { | ||
177 | self.send(Message::Progress(Progress::Being)); | ||
178 | } | ||
179 | |||
180 | CheckEvent::End => { | ||
181 | self.send(Message::Progress(Progress::End)); | ||
182 | } | ||
183 | |||
184 | CheckEvent::Msg(cargo_metadata::Message::CompilerArtifact(msg)) => { | ||
185 | self.send(Message::Progress(Progress::DidCheckCrate(msg.target.name))); | ||
186 | } | ||
187 | |||
188 | CheckEvent::Msg(cargo_metadata::Message::CompilerMessage(msg)) => { | ||
189 | self.send(Message::AddDiagnostic { | ||
190 | workspace_root: self.workspace_root.clone(), | ||
191 | diagnostic: msg.message, | ||
192 | }); | ||
193 | } | ||
194 | |||
195 | CheckEvent::Msg(cargo_metadata::Message::BuildScriptExecuted(_)) | ||
196 | | CheckEvent::Msg(cargo_metadata::Message::BuildFinished(_)) | ||
197 | | CheckEvent::Msg(cargo_metadata::Message::TextLine(_)) | ||
198 | | CheckEvent::Msg(cargo_metadata::Message::Unknown) => {} | ||
199 | } | ||
200 | } | ||
201 | |||
202 | fn restart_check_process(&mut self) { | 189 | fn restart_check_process(&mut self) { |
203 | // First, clear and cancel the old thread | 190 | // First, clear and cancel the old thread |
204 | self.message_recv = never(); | ||
205 | self.check_process = None; | 191 | self.check_process = None; |
206 | 192 | ||
207 | let mut cmd = match &self.config { | 193 | let mut cmd = match &self.config { |
@@ -237,8 +223,7 @@ impl FlycheckActor { | |||
237 | cmd.current_dir(&self.workspace_root); | 223 | cmd.current_dir(&self.workspace_root); |
238 | 224 | ||
239 | let (message_send, message_recv) = unbounded(); | 225 | let (message_send, message_recv) = unbounded(); |
240 | self.message_recv = message_recv; | 226 | let thread = jod_thread::spawn(move || { |
241 | self.check_process = Some(jod_thread::spawn(move || { | ||
242 | // If we trigger an error here, we will do so in the loop instead, | 227 | // If we trigger an error here, we will do so in the loop instead, |
243 | // which will break out of the loop, and continue the shutdown | 228 | // which will break out of the loop, and continue the shutdown |
244 | let _ = message_send.send(CheckEvent::Begin); | 229 | let _ = message_send.send(CheckEvent::Begin); |
@@ -267,7 +252,8 @@ impl FlycheckActor { | |||
267 | // We can ignore any error here, as we are already in the progress | 252 | // We can ignore any error here, as we are already in the progress |
268 | // of shutting down. | 253 | // of shutting down. |
269 | let _ = message_send.send(CheckEvent::End); | 254 | let _ = message_send.send(CheckEvent::End); |
270 | })) | 255 | }); |
256 | self.check_process = Some((message_recv, thread)) | ||
271 | } | 257 | } |
272 | 258 | ||
273 | fn send(&self, check_task: Message) { | 259 | fn send(&self, check_task: Message) { |
diff --git a/crates/rust-analyzer/src/global_state.rs b/crates/rust-analyzer/src/global_state.rs index 446207e9e..de6b95686 100644 --- a/crates/rust-analyzer/src/global_state.rs +++ b/crates/rust-analyzer/src/global_state.rs | |||
@@ -20,8 +20,9 @@ use crate::{ | |||
20 | diagnostics::{CheckFixes, DiagnosticCollection}, | 20 | diagnostics::{CheckFixes, DiagnosticCollection}, |
21 | from_proto, | 21 | from_proto, |
22 | line_endings::LineEndings, | 22 | line_endings::LineEndings, |
23 | main_loop::ReqQueue, | 23 | main_loop::{ReqQueue, Task}, |
24 | request_metrics::{LatestRequests, RequestMetrics}, | 24 | request_metrics::{LatestRequests, RequestMetrics}, |
25 | thread_pool::TaskPool, | ||
25 | to_proto::url_from_abs_path, | 26 | to_proto::url_from_abs_path, |
26 | Result, | 27 | Result, |
27 | }; | 28 | }; |
@@ -66,6 +67,7 @@ impl Default for Status { | |||
66 | /// incremental salsa database. | 67 | /// incremental salsa database. |
67 | pub(crate) struct GlobalState { | 68 | pub(crate) struct GlobalState { |
68 | pub(crate) config: Config, | 69 | pub(crate) config: Config, |
70 | pub(crate) task_pool: (TaskPool<Task>, Receiver<Task>), | ||
69 | pub(crate) analysis_host: AnalysisHost, | 71 | pub(crate) analysis_host: AnalysisHost, |
70 | pub(crate) loader: Box<dyn vfs::loader::Handle>, | 72 | pub(crate) loader: Box<dyn vfs::loader::Handle>, |
71 | pub(crate) task_receiver: Receiver<vfs::loader::Message>, | 73 | pub(crate) task_receiver: Receiver<vfs::loader::Message>, |
@@ -153,8 +155,15 @@ impl GlobalState { | |||
153 | 155 | ||
154 | let mut analysis_host = AnalysisHost::new(lru_capacity); | 156 | let mut analysis_host = AnalysisHost::new(lru_capacity); |
155 | analysis_host.apply_change(change); | 157 | analysis_host.apply_change(change); |
158 | |||
159 | let task_pool = { | ||
160 | let (sender, receiver) = unbounded(); | ||
161 | (TaskPool::new(sender), receiver) | ||
162 | }; | ||
163 | |||
156 | let mut res = GlobalState { | 164 | let mut res = GlobalState { |
157 | config, | 165 | config, |
166 | task_pool, | ||
158 | analysis_host, | 167 | analysis_host, |
159 | loader, | 168 | loader, |
160 | task_receiver, | 169 | task_receiver, |
diff --git a/crates/rust-analyzer/src/lib.rs b/crates/rust-analyzer/src/lib.rs index 794286672..ca788dd3c 100644 --- a/crates/rust-analyzer/src/lib.rs +++ b/crates/rust-analyzer/src/lib.rs | |||
@@ -30,6 +30,7 @@ mod diagnostics; | |||
30 | mod line_endings; | 30 | mod line_endings; |
31 | mod request_metrics; | 31 | mod request_metrics; |
32 | mod lsp_utils; | 32 | mod lsp_utils; |
33 | mod thread_pool; | ||
33 | pub mod lsp_ext; | 34 | pub mod lsp_ext; |
34 | pub mod config; | 35 | pub mod config; |
35 | 36 | ||
diff --git a/crates/rust-analyzer/src/main_loop.rs b/crates/rust-analyzer/src/main_loop.rs index 3b3b83209..1a9c5ee2c 100644 --- a/crates/rust-analyzer/src/main_loop.rs +++ b/crates/rust-analyzer/src/main_loop.rs | |||
@@ -2,19 +2,17 @@ | |||
2 | //! requests/replies and notifications back to the client. | 2 | //! requests/replies and notifications back to the client. |
3 | use std::{ | 3 | use std::{ |
4 | env, fmt, panic, | 4 | env, fmt, panic, |
5 | sync::Arc, | ||
6 | time::{Duration, Instant}, | 5 | time::{Duration, Instant}, |
7 | }; | 6 | }; |
8 | 7 | ||
9 | use crossbeam_channel::{never, select, unbounded, RecvError, Sender}; | 8 | use crossbeam_channel::{never, select, RecvError, Sender}; |
10 | use lsp_server::{Connection, ErrorCode, Message, Notification, Request, RequestId, Response}; | 9 | use lsp_server::{Connection, ErrorCode, Notification, Request, RequestId, Response}; |
11 | use lsp_types::{request::Request as _, NumberOrString}; | 10 | use lsp_types::{request::Request as _, NumberOrString}; |
12 | use ra_db::VfsPath; | 11 | use ra_db::VfsPath; |
13 | use ra_ide::{Canceled, FileId}; | 12 | use ra_ide::{Canceled, FileId}; |
14 | use ra_prof::profile; | 13 | use ra_prof::profile; |
15 | use ra_project_model::{PackageRoot, ProjectWorkspace}; | 14 | use ra_project_model::{PackageRoot, ProjectWorkspace}; |
16 | use serde::{de::DeserializeOwned, Serialize}; | 15 | use serde::{de::DeserializeOwned, Serialize}; |
17 | use threadpool::ThreadPool; | ||
18 | 16 | ||
19 | use crate::{ | 17 | use crate::{ |
20 | config::{Config, FilesWatcher, LinkedProject}, | 18 | config::{Config, FilesWatcher, LinkedProject}, |
@@ -118,65 +116,51 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> { | |||
118 | GlobalState::new(workspaces, config.lru_capacity, config, req_queue) | 116 | GlobalState::new(workspaces, config.lru_capacity, config, req_queue) |
119 | }; | 117 | }; |
120 | 118 | ||
121 | let pool = ThreadPool::default(); | ||
122 | let (task_sender, task_receiver) = unbounded::<Task>(); | ||
123 | |||
124 | log::info!("server initialized, serving requests"); | 119 | log::info!("server initialized, serving requests"); |
125 | { | 120 | { |
126 | let task_sender = task_sender; | ||
127 | loop { | 121 | loop { |
128 | log::trace!("selecting"); | 122 | log::trace!("selecting"); |
129 | let event = select! { | 123 | let event = select! { |
130 | recv(&connection.receiver) -> msg => match msg { | 124 | recv(&connection.receiver) -> msg => match msg { |
131 | Ok(msg) => Event::Msg(msg), | 125 | Ok(msg) => Event::Lsp(msg), |
132 | Err(RecvError) => return Err("client exited without shutdown".into()), | 126 | Err(RecvError) => return Err("client exited without shutdown".into()), |
133 | }, | 127 | }, |
134 | recv(task_receiver) -> task => Event::Task(task.unwrap()), | 128 | recv(&global_state.task_pool.1) -> task => Event::Task(task.unwrap()), |
135 | recv(global_state.task_receiver) -> task => match task { | 129 | recv(global_state.task_receiver) -> task => match task { |
136 | Ok(task) => Event::Vfs(task), | 130 | Ok(task) => Event::Vfs(task), |
137 | Err(RecvError) => return Err("vfs died".into()), | 131 | Err(RecvError) => return Err("vfs died".into()), |
138 | }, | 132 | }, |
139 | recv(global_state.flycheck.as_ref().map_or(&never(), |it| &it.1)) -> task => match task { | 133 | recv(global_state.flycheck.as_ref().map_or(&never(), |it| &it.1)) -> task => match task { |
140 | Ok(task) => Event::CheckWatcher(task), | 134 | Ok(task) => Event::Flycheck(task), |
141 | Err(RecvError) => return Err("check watcher died".into()), | 135 | Err(RecvError) => return Err("check watcher died".into()), |
142 | }, | 136 | }, |
143 | }; | 137 | }; |
144 | if let Event::Msg(Message::Request(req)) = &event { | 138 | if let Event::Lsp(lsp_server::Message::Request(req)) = &event { |
145 | if connection.handle_shutdown(&req)? { | 139 | if connection.handle_shutdown(&req)? { |
146 | break; | 140 | break; |
147 | }; | 141 | }; |
148 | } | 142 | } |
149 | assert!(!global_state.vfs.read().0.has_changes()); | 143 | assert!(!global_state.vfs.read().0.has_changes()); |
150 | loop_turn(&pool, &task_sender, &connection, &mut global_state, event)?; | 144 | loop_turn(&connection, &mut global_state, event)?; |
151 | assert!(!global_state.vfs.read().0.has_changes()); | 145 | assert!(!global_state.vfs.read().0.has_changes()); |
152 | } | 146 | } |
153 | } | 147 | } |
154 | global_state.analysis_host.request_cancellation(); | 148 | global_state.analysis_host.request_cancellation(); |
155 | log::info!("waiting for tasks to finish..."); | ||
156 | task_receiver.into_iter().for_each(|task| on_task(task, &connection.sender, &mut global_state)); | ||
157 | log::info!("...tasks have finished"); | ||
158 | log::info!("joining threadpool..."); | ||
159 | pool.join(); | ||
160 | drop(pool); | ||
161 | log::info!("...threadpool has finished"); | ||
162 | |||
163 | let vfs = Arc::try_unwrap(global_state.vfs).expect("all snapshots should be dead"); | ||
164 | drop(vfs); | ||
165 | |||
166 | Ok(()) | 149 | Ok(()) |
167 | } | 150 | } |
168 | 151 | ||
169 | #[derive(Debug)] | 152 | #[derive(Debug)] |
170 | enum Task { | 153 | pub(crate) enum Task { |
171 | Respond(Response), | 154 | Respond(Response), |
172 | Diagnostic(DiagnosticTask), | 155 | Diagnostics(Vec<DiagnosticTask>), |
156 | Unit, | ||
173 | } | 157 | } |
174 | 158 | ||
175 | enum Event { | 159 | enum Event { |
176 | Msg(Message), | 160 | Lsp(lsp_server::Message), |
177 | Task(Task), | 161 | Task(Task), |
178 | Vfs(vfs::loader::Message), | 162 | Vfs(vfs::loader::Message), |
179 | CheckWatcher(flycheck::Message), | 163 | Flycheck(flycheck::Message), |
180 | } | 164 | } |
181 | 165 | ||
182 | impl fmt::Debug for Event { | 166 | impl fmt::Debug for Event { |
@@ -186,7 +170,7 @@ impl fmt::Debug for Event { | |||
186 | }; | 170 | }; |
187 | 171 | ||
188 | match self { | 172 | match self { |
189 | Event::Msg(Message::Notification(not)) => { | 173 | Event::Lsp(lsp_server::Message::Notification(not)) => { |
190 | if notification_is::<lsp_types::notification::DidOpenTextDocument>(not) | 174 | if notification_is::<lsp_types::notification::DidOpenTextDocument>(not) |
191 | || notification_is::<lsp_types::notification::DidChangeTextDocument>(not) | 175 | || notification_is::<lsp_types::notification::DidChangeTextDocument>(not) |
192 | { | 176 | { |
@@ -203,10 +187,10 @@ impl fmt::Debug for Event { | |||
203 | _ => (), | 187 | _ => (), |
204 | } | 188 | } |
205 | match self { | 189 | match self { |
206 | Event::Msg(it) => fmt::Debug::fmt(it, f), | 190 | Event::Lsp(it) => fmt::Debug::fmt(it, f), |
207 | Event::Task(it) => fmt::Debug::fmt(it, f), | 191 | Event::Task(it) => fmt::Debug::fmt(it, f), |
208 | Event::Vfs(it) => fmt::Debug::fmt(it, f), | 192 | Event::Vfs(it) => fmt::Debug::fmt(it, f), |
209 | Event::CheckWatcher(it) => fmt::Debug::fmt(it, f), | 193 | Event::Flycheck(it) => fmt::Debug::fmt(it, f), |
210 | } | 194 | } |
211 | } | 195 | } |
212 | } | 196 | } |
@@ -215,19 +199,13 @@ pub(crate) type ReqHandler = fn(&mut GlobalState, Response); | |||
215 | pub(crate) type ReqQueue = lsp_server::ReqQueue<(&'static str, Instant), ReqHandler>; | 199 | pub(crate) type ReqQueue = lsp_server::ReqQueue<(&'static str, Instant), ReqHandler>; |
216 | const DO_NOTHING: ReqHandler = |_, _| (); | 200 | const DO_NOTHING: ReqHandler = |_, _| (); |
217 | 201 | ||
218 | fn loop_turn( | 202 | fn loop_turn(connection: &Connection, global_state: &mut GlobalState, event: Event) -> Result<()> { |
219 | pool: &ThreadPool, | ||
220 | task_sender: &Sender<Task>, | ||
221 | connection: &Connection, | ||
222 | global_state: &mut GlobalState, | ||
223 | event: Event, | ||
224 | ) -> Result<()> { | ||
225 | let loop_start = Instant::now(); | 203 | let loop_start = Instant::now(); |
226 | 204 | ||
227 | // NOTE: don't count blocking select! call as a loop-turn time | 205 | // NOTE: don't count blocking select! call as a loop-turn time |
228 | let _p = profile("main_loop_inner/loop-turn"); | 206 | let _p = profile("main_loop_inner/loop-turn"); |
229 | log::info!("loop turn = {:?}", event); | 207 | log::info!("loop turn = {:?}", event); |
230 | let queue_count = pool.queued_count(); | 208 | let queue_count = global_state.task_pool.0.len(); |
231 | if queue_count > 0 { | 209 | if queue_count > 0 { |
232 | log::info!("queued count = {}", queue_count); | 210 | log::info!("queued count = {}", queue_count); |
233 | } | 211 | } |
@@ -269,17 +247,15 @@ fn loop_turn( | |||
269 | ) | 247 | ) |
270 | } | 248 | } |
271 | }, | 249 | }, |
272 | Event::CheckWatcher(task) => { | 250 | Event::Flycheck(task) => on_check_task(task, global_state, &connection.sender)?, |
273 | on_check_task(task, global_state, task_sender, &connection.sender)? | 251 | Event::Lsp(msg) => match msg { |
274 | } | 252 | lsp_server::Message::Request(req) => { |
275 | Event::Msg(msg) => match msg { | 253 | on_request(global_state, &connection.sender, loop_start, req)? |
276 | Message::Request(req) => { | ||
277 | on_request(global_state, pool, task_sender, &connection.sender, loop_start, req)? | ||
278 | } | 254 | } |
279 | Message::Notification(not) => { | 255 | lsp_server::Message::Notification(not) => { |
280 | on_notification(&connection.sender, global_state, not)?; | 256 | on_notification(&connection.sender, global_state, not)?; |
281 | } | 257 | } |
282 | Message::Response(resp) => { | 258 | lsp_server::Message::Response(resp) => { |
283 | let handler = global_state.req_queue.outgoing.complete(resp.id.clone()); | 259 | let handler = global_state.req_queue.outgoing.complete(resp.id.clone()); |
284 | handler(global_state, resp) | 260 | handler(global_state, resp) |
285 | } | 261 | } |
@@ -301,16 +277,14 @@ fn loop_turn( | |||
301 | .map(|path| global_state.vfs.read().0.file_id(&path).unwrap()) | 277 | .map(|path| global_state.vfs.read().0.file_id(&path).unwrap()) |
302 | .collect::<Vec<_>>(); | 278 | .collect::<Vec<_>>(); |
303 | 279 | ||
304 | update_file_notifications_on_threadpool( | 280 | update_file_notifications_on_threadpool(global_state, subscriptions.clone()); |
305 | pool, | 281 | global_state.task_pool.0.spawn({ |
306 | global_state.snapshot(), | ||
307 | task_sender.clone(), | ||
308 | subscriptions.clone(), | ||
309 | ); | ||
310 | pool.execute({ | ||
311 | let subs = subscriptions; | 282 | let subs = subscriptions; |
312 | let snap = global_state.snapshot(); | 283 | let snap = global_state.snapshot(); |
313 | move || snap.analysis.prime_caches(subs).unwrap_or_else(|_: Canceled| ()) | 284 | move || { |
285 | snap.analysis.prime_caches(subs).unwrap_or_else(|_: Canceled| ()); | ||
286 | Task::Unit | ||
287 | } | ||
314 | }); | 288 | }); |
315 | } | 289 | } |
316 | 290 | ||
@@ -329,7 +303,7 @@ fn loop_turn( | |||
329 | Ok(()) | 303 | Ok(()) |
330 | } | 304 | } |
331 | 305 | ||
332 | fn on_task(task: Task, msg_sender: &Sender<Message>, global_state: &mut GlobalState) { | 306 | fn on_task(task: Task, msg_sender: &Sender<lsp_server::Message>, global_state: &mut GlobalState) { |
333 | match task { | 307 | match task { |
334 | Task::Respond(response) => { | 308 | Task::Respond(response) => { |
335 | if let Some((method, start)) = | 309 | if let Some((method, start)) = |
@@ -345,26 +319,21 @@ fn on_task(task: Task, msg_sender: &Sender<Message>, global_state: &mut GlobalSt | |||
345 | msg_sender.send(response.into()).unwrap(); | 319 | msg_sender.send(response.into()).unwrap(); |
346 | } | 320 | } |
347 | } | 321 | } |
348 | Task::Diagnostic(task) => on_diagnostic_task(task, msg_sender, global_state), | 322 | Task::Diagnostics(tasks) => { |
323 | tasks.into_iter().for_each(|task| on_diagnostic_task(task, msg_sender, global_state)) | ||
324 | } | ||
325 | Task::Unit => (), | ||
349 | } | 326 | } |
350 | } | 327 | } |
351 | 328 | ||
352 | fn on_request( | 329 | fn on_request( |
353 | global_state: &mut GlobalState, | 330 | global_state: &mut GlobalState, |
354 | pool: &ThreadPool, | 331 | msg_sender: &Sender<lsp_server::Message>, |
355 | task_sender: &Sender<Task>, | ||
356 | msg_sender: &Sender<Message>, | ||
357 | request_received: Instant, | 332 | request_received: Instant, |
358 | req: Request, | 333 | req: Request, |
359 | ) -> Result<()> { | 334 | ) -> Result<()> { |
360 | let mut pool_dispatcher = PoolDispatcher { | 335 | let mut pool_dispatcher = |
361 | req: Some(req), | 336 | PoolDispatcher { req: Some(req), global_state, msg_sender, request_received }; |
362 | pool, | ||
363 | global_state, | ||
364 | task_sender, | ||
365 | msg_sender, | ||
366 | request_received, | ||
367 | }; | ||
368 | pool_dispatcher | 337 | pool_dispatcher |
369 | .on_sync::<lsp_ext::CollectGarbage>(|s, ()| Ok(s.collect_garbage()))? | 338 | .on_sync::<lsp_ext::CollectGarbage>(|s, ()| Ok(s.collect_garbage()))? |
370 | .on_sync::<lsp_ext::JoinLines>(|s, p| handlers::handle_join_lines(s.snapshot(), p))? | 339 | .on_sync::<lsp_ext::JoinLines>(|s, p| handlers::handle_join_lines(s.snapshot(), p))? |
@@ -415,7 +384,7 @@ fn on_request( | |||
415 | } | 384 | } |
416 | 385 | ||
417 | fn on_notification( | 386 | fn on_notification( |
418 | msg_sender: &Sender<Message>, | 387 | msg_sender: &Sender<lsp_server::Message>, |
419 | global_state: &mut GlobalState, | 388 | global_state: &mut GlobalState, |
420 | not: Notification, | 389 | not: Notification, |
421 | ) -> Result<()> { | 390 | ) -> Result<()> { |
@@ -552,12 +521,11 @@ fn on_notification( | |||
552 | fn on_check_task( | 521 | fn on_check_task( |
553 | task: flycheck::Message, | 522 | task: flycheck::Message, |
554 | global_state: &mut GlobalState, | 523 | global_state: &mut GlobalState, |
555 | task_sender: &Sender<Task>, | 524 | msg_sender: &Sender<lsp_server::Message>, |
556 | msg_sender: &Sender<Message>, | ||
557 | ) -> Result<()> { | 525 | ) -> Result<()> { |
558 | match task { | 526 | match task { |
559 | flycheck::Message::ClearDiagnostics => { | 527 | flycheck::Message::ClearDiagnostics => { |
560 | task_sender.send(Task::Diagnostic(DiagnosticTask::ClearCheck))?; | 528 | on_diagnostic_task(DiagnosticTask::ClearCheck, msg_sender, global_state) |
561 | } | 529 | } |
562 | 530 | ||
563 | flycheck::Message::AddDiagnostic { workspace_root, diagnostic } => { | 531 | flycheck::Message::AddDiagnostic { workspace_root, diagnostic } => { |
@@ -576,11 +544,15 @@ fn on_check_task( | |||
576 | } | 544 | } |
577 | }; | 545 | }; |
578 | 546 | ||
579 | task_sender.send(Task::Diagnostic(DiagnosticTask::AddCheck( | 547 | on_diagnostic_task( |
580 | file_id, | 548 | DiagnosticTask::AddCheck( |
581 | diag.diagnostic, | 549 | file_id, |
582 | diag.fixes.into_iter().map(|it| it.into()).collect(), | 550 | diag.diagnostic, |
583 | )))?; | 551 | diag.fixes.into_iter().map(|it| it.into()).collect(), |
552 | ), | ||
553 | msg_sender, | ||
554 | global_state, | ||
555 | ) | ||
584 | } | 556 | } |
585 | } | 557 | } |
586 | 558 | ||
@@ -598,7 +570,11 @@ fn on_check_task( | |||
598 | Ok(()) | 570 | Ok(()) |
599 | } | 571 | } |
600 | 572 | ||
601 | fn on_diagnostic_task(task: DiagnosticTask, msg_sender: &Sender<Message>, state: &mut GlobalState) { | 573 | fn on_diagnostic_task( |
574 | task: DiagnosticTask, | ||
575 | msg_sender: &Sender<lsp_server::Message>, | ||
576 | state: &mut GlobalState, | ||
577 | ) { | ||
602 | let subscriptions = state.diagnostics.handle_task(task); | 578 | let subscriptions = state.diagnostics.handle_task(task); |
603 | 579 | ||
604 | for file_id in subscriptions { | 580 | for file_id in subscriptions { |
@@ -623,7 +599,7 @@ fn percentage(done: usize, total: usize) -> f64 { | |||
623 | 599 | ||
624 | fn report_progress( | 600 | fn report_progress( |
625 | global_state: &mut GlobalState, | 601 | global_state: &mut GlobalState, |
626 | sender: &Sender<Message>, | 602 | sender: &Sender<lsp_server::Message>, |
627 | title: &str, | 603 | title: &str, |
628 | state: Progress, | 604 | state: Progress, |
629 | message: Option<String>, | 605 | message: Option<String>, |
@@ -670,10 +646,8 @@ fn report_progress( | |||
670 | 646 | ||
671 | struct PoolDispatcher<'a> { | 647 | struct PoolDispatcher<'a> { |
672 | req: Option<Request>, | 648 | req: Option<Request>, |
673 | pool: &'a ThreadPool, | ||
674 | global_state: &'a mut GlobalState, | 649 | global_state: &'a mut GlobalState, |
675 | msg_sender: &'a Sender<Message>, | 650 | msg_sender: &'a Sender<lsp_server::Message>, |
676 | task_sender: &'a Sender<Task>, | ||
677 | request_received: Instant, | 651 | request_received: Instant, |
678 | } | 652 | } |
679 | 653 | ||
@@ -721,13 +695,11 @@ impl<'a> PoolDispatcher<'a> { | |||
721 | } | 695 | } |
722 | }; | 696 | }; |
723 | 697 | ||
724 | self.pool.execute({ | 698 | self.global_state.task_pool.0.spawn({ |
725 | let world = self.global_state.snapshot(); | 699 | let world = self.global_state.snapshot(); |
726 | let sender = self.task_sender.clone(); | ||
727 | move || { | 700 | move || { |
728 | let result = f(world, params); | 701 | let result = f(world, params); |
729 | let task = result_to_task::<R>(id, result); | 702 | result_to_task::<R>(id, result) |
730 | sender.send(task).unwrap(); | ||
731 | } | 703 | } |
732 | }); | 704 | }); |
733 | 705 | ||
@@ -797,26 +769,27 @@ where | |||
797 | } | 769 | } |
798 | 770 | ||
799 | fn update_file_notifications_on_threadpool( | 771 | fn update_file_notifications_on_threadpool( |
800 | pool: &ThreadPool, | 772 | global_state: &mut GlobalState, |
801 | world: GlobalStateSnapshot, | ||
802 | task_sender: Sender<Task>, | ||
803 | subscriptions: Vec<FileId>, | 773 | subscriptions: Vec<FileId>, |
804 | ) { | 774 | ) { |
805 | log::trace!("updating notifications for {:?}", subscriptions); | 775 | log::trace!("updating notifications for {:?}", subscriptions); |
806 | if world.config.publish_diagnostics { | 776 | if global_state.config.publish_diagnostics { |
807 | pool.execute(move || { | 777 | let snapshot = global_state.snapshot(); |
808 | for file_id in subscriptions { | 778 | global_state.task_pool.0.spawn(move || { |
809 | match handlers::publish_diagnostics(&world, file_id) { | 779 | let diagnostics = subscriptions |
810 | Err(e) => { | 780 | .into_iter() |
811 | if !is_canceled(&*e) { | 781 | .filter_map(|file_id| { |
812 | log::error!("failed to compute diagnostics: {:?}", e); | 782 | handlers::publish_diagnostics(&snapshot, file_id) |
813 | } | 783 | .map_err(|err| { |
814 | } | 784 | if !is_canceled(&*err) { |
815 | Ok(task) => { | 785 | log::error!("failed to compute diagnostics: {:?}", err); |
816 | task_sender.send(Task::Diagnostic(task)).unwrap(); | 786 | } |
817 | } | 787 | () |
818 | } | 788 | }) |
819 | } | 789 | .ok() |
790 | }) | ||
791 | .collect::<Vec<_>>(); | ||
792 | Task::Diagnostics(diagnostics) | ||
820 | }) | 793 | }) |
821 | } | 794 | } |
822 | } | 795 | } |
diff --git a/crates/rust-analyzer/src/thread_pool.rs b/crates/rust-analyzer/src/thread_pool.rs new file mode 100644 index 000000000..4fa502925 --- /dev/null +++ b/crates/rust-analyzer/src/thread_pool.rs | |||
@@ -0,0 +1,35 @@ | |||
1 | //! A thin wrapper around `ThreadPool` to make sure that we join all things | ||
2 | //! properly. | ||
3 | use crossbeam_channel::Sender; | ||
4 | |||
5 | pub(crate) struct TaskPool<T> { | ||
6 | sender: Sender<T>, | ||
7 | inner: threadpool::ThreadPool, | ||
8 | } | ||
9 | |||
10 | impl<T> TaskPool<T> { | ||
11 | pub(crate) fn new(sender: Sender<T>) -> TaskPool<T> { | ||
12 | TaskPool { sender, inner: threadpool::ThreadPool::default() } | ||
13 | } | ||
14 | |||
15 | pub(crate) fn spawn<F>(&mut self, task: F) | ||
16 | where | ||
17 | F: FnOnce() -> T + Send + 'static, | ||
18 | T: Send + 'static, | ||
19 | { | ||
20 | self.inner.execute({ | ||
21 | let sender = self.sender.clone(); | ||
22 | move || sender.send(task()).unwrap() | ||
23 | }) | ||
24 | } | ||
25 | |||
26 | pub(crate) fn len(&self) -> usize { | ||
27 | self.inner.queued_count() | ||
28 | } | ||
29 | } | ||
30 | |||
31 | impl<T> Drop for TaskPool<T> { | ||
32 | fn drop(&mut self) { | ||
33 | self.inner.join() | ||
34 | } | ||
35 | } | ||
diff --git a/crates/vfs-notify/src/lib.rs b/crates/vfs-notify/src/lib.rs index 282cf0358..68fdb8cb0 100644 --- a/crates/vfs-notify/src/lib.rs +++ b/crates/vfs-notify/src/lib.rs | |||
@@ -83,8 +83,8 @@ impl NotifyActor { | |||
83 | } | 83 | } |
84 | } | 84 | } |
85 | 85 | ||
86 | fn run(mut self, receiver: Receiver<Message>) { | 86 | fn run(mut self, inbox: Receiver<Message>) { |
87 | while let Some(event) = self.next_event(&receiver) { | 87 | while let Some(event) = self.next_event(&inbox) { |
88 | log::debug!("vfs-notify event: {:?}", event); | 88 | log::debug!("vfs-notify event: {:?}", event); |
89 | match event { | 89 | match event { |
90 | Event::Message(msg) => match msg { | 90 | Event::Message(msg) => match msg { |