diff options
author | bors[bot] <26634292+bors[bot]@users.noreply.github.com> | 2020-06-20 20:39:41 +0100 |
---|---|---|
committer | GitHub <[email protected]> | 2020-06-20 20:39:41 +0100 |
commit | b575b0244982da29159d341412945b92dd57f5df (patch) | |
tree | 33819de5d4de02d100a9c78110cb51b9b7df14c6 /crates/rust-analyzer/src/main_loop.rs | |
parent | c8557b91a3ccfea5c83260bc35a59e6839784281 (diff) | |
parent | 93605c49baccf7d28297028f0b49be8b94701f67 (diff) |
Merge #4961
4961: Centralize handing of in-flight requests r=matklad a=matklad
Co-authored-by: Aleksey Kladov <[email protected]>
Diffstat (limited to 'crates/rust-analyzer/src/main_loop.rs')
-rw-r--r-- | crates/rust-analyzer/src/main_loop.rs | 156 |
1 files changed, 63 insertions, 93 deletions
diff --git a/crates/rust-analyzer/src/main_loop.rs b/crates/rust-analyzer/src/main_loop.rs index f0aaaa21e..fd40b2443 100644 --- a/crates/rust-analyzer/src/main_loop.rs +++ b/crates/rust-analyzer/src/main_loop.rs | |||
@@ -3,7 +3,7 @@ | |||
3 | 3 | ||
4 | mod handlers; | 4 | mod handlers; |
5 | mod subscriptions; | 5 | mod subscriptions; |
6 | pub(crate) mod pending_requests; | 6 | pub(crate) mod req_queue; |
7 | 7 | ||
8 | use std::{ | 8 | use std::{ |
9 | borrow::Cow, | 9 | borrow::Cow, |
@@ -28,7 +28,6 @@ use ra_ide::{Canceled, FileId, LineIndex}; | |||
28 | use ra_prof::profile; | 28 | use ra_prof::profile; |
29 | use ra_project_model::{PackageRoot, ProjectWorkspace}; | 29 | use ra_project_model::{PackageRoot, ProjectWorkspace}; |
30 | use ra_vfs::VfsTask; | 30 | use ra_vfs::VfsTask; |
31 | use rustc_hash::FxHashSet; | ||
32 | use serde::{de::DeserializeOwned, Serialize}; | 31 | use serde::{de::DeserializeOwned, Serialize}; |
33 | use threadpool::ThreadPool; | 32 | use threadpool::ThreadPool; |
34 | 33 | ||
@@ -38,12 +37,10 @@ use crate::{ | |||
38 | from_proto, | 37 | from_proto, |
39 | global_state::{file_id_to_url, GlobalState, GlobalStateSnapshot}, | 38 | global_state::{file_id_to_url, GlobalState, GlobalStateSnapshot}, |
40 | lsp_ext, | 39 | lsp_ext, |
41 | main_loop::{ | 40 | main_loop::subscriptions::Subscriptions, |
42 | pending_requests::{PendingRequest, PendingRequests}, | ||
43 | subscriptions::Subscriptions, | ||
44 | }, | ||
45 | Result, | 41 | Result, |
46 | }; | 42 | }; |
43 | use req_queue::ReqQueue; | ||
47 | 44 | ||
48 | #[derive(Debug)] | 45 | #[derive(Debug)] |
49 | pub struct LspError { | 46 | pub struct LspError { |
@@ -153,10 +150,10 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> { | |||
153 | register_options: Some(serde_json::to_value(registration_options).unwrap()), | 150 | register_options: Some(serde_json::to_value(registration_options).unwrap()), |
154 | }; | 151 | }; |
155 | let params = lsp_types::RegistrationParams { registrations: vec![registration] }; | 152 | let params = lsp_types::RegistrationParams { registrations: vec![registration] }; |
156 | let request = request_new::<lsp_types::request::RegisterCapability>( | 153 | let request = loop_state |
157 | loop_state.next_request_id(), | 154 | .req_queue |
158 | params, | 155 | .outgoing |
159 | ); | 156 | .register::<lsp_types::request::RegisterCapability>(params, |_, _| ()); |
160 | connection.sender.send(request.into()).unwrap(); | 157 | connection.sender.send(request.into()).unwrap(); |
161 | } | 158 | } |
162 | 159 | ||
@@ -199,7 +196,7 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> { | |||
199 | global_state.analysis_host.request_cancellation(); | 196 | global_state.analysis_host.request_cancellation(); |
200 | log::info!("waiting for tasks to finish..."); | 197 | log::info!("waiting for tasks to finish..."); |
201 | task_receiver.into_iter().for_each(|task| { | 198 | task_receiver.into_iter().for_each(|task| { |
202 | on_task(task, &connection.sender, &mut loop_state.pending_requests, &mut global_state) | 199 | on_task(task, &connection.sender, &mut loop_state.req_queue.incoming, &mut global_state) |
203 | }); | 200 | }); |
204 | log::info!("...tasks have finished"); | 201 | log::info!("...tasks have finished"); |
205 | log::info!("joining threadpool..."); | 202 | log::info!("joining threadpool..."); |
@@ -264,27 +261,14 @@ impl fmt::Debug for Event { | |||
264 | } | 261 | } |
265 | } | 262 | } |
266 | 263 | ||
267 | #[derive(Debug, Default)] | 264 | #[derive(Default)] |
268 | struct LoopState { | 265 | struct LoopState { |
269 | next_request_id: u64, | 266 | req_queue: ReqQueue<fn(&mut GlobalState, lsp_server::Response)>, |
270 | pending_responses: FxHashSet<RequestId>, | ||
271 | pending_requests: PendingRequests, | ||
272 | subscriptions: Subscriptions, | 267 | subscriptions: Subscriptions, |
273 | workspace_loaded: bool, | 268 | workspace_loaded: bool, |
274 | roots_progress_reported: Option<usize>, | 269 | roots_progress_reported: Option<usize>, |
275 | roots_scanned: usize, | 270 | roots_scanned: usize, |
276 | roots_total: usize, | 271 | roots_total: usize, |
277 | configuration_request_id: Option<RequestId>, | ||
278 | } | ||
279 | |||
280 | impl LoopState { | ||
281 | fn next_request_id(&mut self) -> RequestId { | ||
282 | self.next_request_id += 1; | ||
283 | let res: RequestId = self.next_request_id.into(); | ||
284 | let inserted = self.pending_responses.insert(res.clone()); | ||
285 | assert!(inserted); | ||
286 | res | ||
287 | } | ||
288 | } | 272 | } |
289 | 273 | ||
290 | fn loop_turn( | 274 | fn loop_turn( |
@@ -307,7 +291,7 @@ fn loop_turn( | |||
307 | 291 | ||
308 | match event { | 292 | match event { |
309 | Event::Task(task) => { | 293 | Event::Task(task) => { |
310 | on_task(task, &connection.sender, &mut loop_state.pending_requests, global_state); | 294 | on_task(task, &connection.sender, &mut loop_state.req_queue.incoming, global_state); |
311 | global_state.maybe_collect_garbage(); | 295 | global_state.maybe_collect_garbage(); |
312 | } | 296 | } |
313 | Event::Vfs(task) => { | 297 | Event::Vfs(task) => { |
@@ -317,7 +301,7 @@ fn loop_turn( | |||
317 | Event::Msg(msg) => match msg { | 301 | Event::Msg(msg) => match msg { |
318 | Message::Request(req) => on_request( | 302 | Message::Request(req) => on_request( |
319 | global_state, | 303 | global_state, |
320 | &mut loop_state.pending_requests, | 304 | &mut loop_state.req_queue.incoming, |
321 | pool, | 305 | pool, |
322 | task_sender, | 306 | task_sender, |
323 | &connection.sender, | 307 | &connection.sender, |
@@ -328,32 +312,8 @@ fn loop_turn( | |||
328 | on_notification(&connection.sender, global_state, loop_state, not)?; | 312 | on_notification(&connection.sender, global_state, loop_state, not)?; |
329 | } | 313 | } |
330 | Message::Response(resp) => { | 314 | Message::Response(resp) => { |
331 | let removed = loop_state.pending_responses.remove(&resp.id); | 315 | let handler = loop_state.req_queue.outgoing.complete(resp.id.clone()); |
332 | if !removed { | 316 | handler(global_state, resp) |
333 | log::error!("unexpected response: {:?}", resp) | ||
334 | } | ||
335 | |||
336 | if Some(&resp.id) == loop_state.configuration_request_id.as_ref() { | ||
337 | loop_state.configuration_request_id = None; | ||
338 | log::debug!("config update response: '{:?}", resp); | ||
339 | let Response { error, result, .. } = resp; | ||
340 | |||
341 | match (error, result) { | ||
342 | (Some(err), _) => { | ||
343 | log::error!("failed to fetch the server settings: {:?}", err) | ||
344 | } | ||
345 | (None, Some(configs)) => { | ||
346 | if let Some(new_config) = configs.get(0) { | ||
347 | let mut config = global_state.config.clone(); | ||
348 | config.update(&new_config); | ||
349 | global_state.update_configuration(config); | ||
350 | } | ||
351 | } | ||
352 | (None, None) => { | ||
353 | log::error!("received empty server settings response from the client") | ||
354 | } | ||
355 | } | ||
356 | } | ||
357 | } | 317 | } |
358 | }, | 318 | }, |
359 | }; | 319 | }; |
@@ -407,12 +367,12 @@ fn loop_turn( | |||
407 | fn on_task( | 367 | fn on_task( |
408 | task: Task, | 368 | task: Task, |
409 | msg_sender: &Sender<Message>, | 369 | msg_sender: &Sender<Message>, |
410 | pending_requests: &mut PendingRequests, | 370 | incoming_requests: &mut req_queue::Incoming, |
411 | state: &mut GlobalState, | 371 | state: &mut GlobalState, |
412 | ) { | 372 | ) { |
413 | match task { | 373 | match task { |
414 | Task::Respond(response) => { | 374 | Task::Respond(response) => { |
415 | if let Some(completed) = pending_requests.finish(&response.id) { | 375 | if let Some(completed) = incoming_requests.complete(response.id.clone()) { |
416 | log::info!("handled req#{} in {:?}", completed.id, completed.duration); | 376 | log::info!("handled req#{} in {:?}", completed.id, completed.duration); |
417 | state.complete_request(completed); | 377 | state.complete_request(completed); |
418 | msg_sender.send(response.into()).unwrap(); | 378 | msg_sender.send(response.into()).unwrap(); |
@@ -427,7 +387,7 @@ fn on_task( | |||
427 | 387 | ||
428 | fn on_request( | 388 | fn on_request( |
429 | global_state: &mut GlobalState, | 389 | global_state: &mut GlobalState, |
430 | pending_requests: &mut PendingRequests, | 390 | incoming_requests: &mut req_queue::Incoming, |
431 | pool: &ThreadPool, | 391 | pool: &ThreadPool, |
432 | task_sender: &Sender<Task>, | 392 | task_sender: &Sender<Task>, |
433 | msg_sender: &Sender<Message>, | 393 | msg_sender: &Sender<Message>, |
@@ -440,7 +400,7 @@ fn on_request( | |||
440 | global_state, | 400 | global_state, |
441 | task_sender, | 401 | task_sender, |
442 | msg_sender, | 402 | msg_sender, |
443 | pending_requests, | 403 | incoming_requests, |
444 | request_received, | 404 | request_received, |
445 | }; | 405 | }; |
446 | pool_dispatcher | 406 | pool_dispatcher |
@@ -504,12 +464,7 @@ fn on_notification( | |||
504 | NumberOrString::Number(id) => id.into(), | 464 | NumberOrString::Number(id) => id.into(), |
505 | NumberOrString::String(id) => id.into(), | 465 | NumberOrString::String(id) => id.into(), |
506 | }; | 466 | }; |
507 | if loop_state.pending_requests.cancel(&id) { | 467 | if let Some(response) = loop_state.req_queue.incoming.cancel(id) { |
508 | let response = Response::new_err( | ||
509 | id, | ||
510 | ErrorCode::RequestCanceled as i32, | ||
511 | "canceled by client".to_string(), | ||
512 | ); | ||
513 | msg_sender.send(response.into()).unwrap() | 468 | msg_sender.send(response.into()).unwrap() |
514 | } | 469 | } |
515 | return Ok(()); | 470 | return Ok(()); |
@@ -572,18 +527,38 @@ fn on_notification( | |||
572 | Ok(_) => { | 527 | Ok(_) => { |
573 | // As stated in https://github.com/microsoft/language-server-protocol/issues/676, | 528 | // As stated in https://github.com/microsoft/language-server-protocol/issues/676, |
574 | // this notification's parameters should be ignored and the actual config queried separately. | 529 | // this notification's parameters should be ignored and the actual config queried separately. |
575 | let request_id = loop_state.next_request_id(); | 530 | let request = loop_state |
576 | let request = request_new::<lsp_types::request::WorkspaceConfiguration>( | 531 | .req_queue |
577 | request_id.clone(), | 532 | .outgoing |
578 | lsp_types::ConfigurationParams { | 533 | .register::<lsp_types::request::WorkspaceConfiguration>( |
579 | items: vec![lsp_types::ConfigurationItem { | 534 | lsp_types::ConfigurationParams { |
580 | scope_uri: None, | 535 | items: vec![lsp_types::ConfigurationItem { |
581 | section: Some("rust-analyzer".to_string()), | 536 | scope_uri: None, |
582 | }], | 537 | section: Some("rust-analyzer".to_string()), |
583 | }, | 538 | }], |
584 | ); | 539 | }, |
540 | |global_state, resp| { | ||
541 | log::debug!("config update response: '{:?}", resp); | ||
542 | let Response { error, result, .. } = resp; | ||
543 | |||
544 | match (error, result) { | ||
545 | (Some(err), _) => { | ||
546 | log::error!("failed to fetch the server settings: {:?}", err) | ||
547 | } | ||
548 | (None, Some(configs)) => { | ||
549 | if let Some(new_config) = configs.get(0) { | ||
550 | let mut config = global_state.config.clone(); | ||
551 | config.update(&new_config); | ||
552 | global_state.update_configuration(config); | ||
553 | } | ||
554 | } | ||
555 | (None, None) => log::error!( | ||
556 | "received empty server settings response from the client" | ||
557 | ), | ||
558 | } | ||
559 | }, | ||
560 | ); | ||
585 | msg_sender.send(request.into())?; | 561 | msg_sender.send(request.into())?; |
586 | loop_state.configuration_request_id = Some(request_id); | ||
587 | 562 | ||
588 | return Ok(()); | 563 | return Ok(()); |
589 | } | 564 | } |
@@ -752,13 +727,16 @@ fn send_startup_progress(sender: &Sender<Message>, loop_state: &mut LoopState) { | |||
752 | 727 | ||
753 | match (prev, loop_state.workspace_loaded) { | 728 | match (prev, loop_state.workspace_loaded) { |
754 | (None, false) => { | 729 | (None, false) => { |
755 | let work_done_progress_create = request_new::<lsp_types::request::WorkDoneProgressCreate>( | 730 | let request = loop_state |
756 | loop_state.next_request_id(), | 731 | .req_queue |
757 | WorkDoneProgressCreateParams { | 732 | .outgoing |
758 | token: lsp_types::ProgressToken::String("rustAnalyzer/startup".into()), | 733 | .register::<lsp_types::request::WorkDoneProgressCreate>( |
759 | }, | 734 | WorkDoneProgressCreateParams { |
760 | ); | 735 | token: lsp_types::ProgressToken::String("rustAnalyzer/startup".into()), |
761 | sender.send(work_done_progress_create.into()).unwrap(); | 736 | }, |
737 | |_, _| (), | ||
738 | ); | ||
739 | sender.send(request.into()).unwrap(); | ||
762 | send_startup_progress_notif( | 740 | send_startup_progress_notif( |
763 | sender, | 741 | sender, |
764 | WorkDoneProgress::Begin(WorkDoneProgressBegin { | 742 | WorkDoneProgress::Begin(WorkDoneProgressBegin { |
@@ -800,7 +778,7 @@ struct PoolDispatcher<'a> { | |||
800 | req: Option<Request>, | 778 | req: Option<Request>, |
801 | pool: &'a ThreadPool, | 779 | pool: &'a ThreadPool, |
802 | global_state: &'a mut GlobalState, | 780 | global_state: &'a mut GlobalState, |
803 | pending_requests: &'a mut PendingRequests, | 781 | incoming_requests: &'a mut req_queue::Incoming, |
804 | msg_sender: &'a Sender<Message>, | 782 | msg_sender: &'a Sender<Message>, |
805 | task_sender: &'a Sender<Task>, | 783 | task_sender: &'a Sender<Task>, |
806 | request_received: Instant, | 784 | request_received: Instant, |
@@ -829,7 +807,7 @@ impl<'a> PoolDispatcher<'a> { | |||
829 | result_to_task::<R>(id, result) | 807 | result_to_task::<R>(id, result) |
830 | }) | 808 | }) |
831 | .map_err(|_| format!("sync task {:?} panicked", R::METHOD))?; | 809 | .map_err(|_| format!("sync task {:?} panicked", R::METHOD))?; |
832 | on_task(task, self.msg_sender, self.pending_requests, self.global_state); | 810 | on_task(task, self.msg_sender, self.incoming_requests, self.global_state); |
833 | Ok(self) | 811 | Ok(self) |
834 | } | 812 | } |
835 | 813 | ||
@@ -876,7 +854,7 @@ impl<'a> PoolDispatcher<'a> { | |||
876 | return None; | 854 | return None; |
877 | } | 855 | } |
878 | }; | 856 | }; |
879 | self.pending_requests.start(PendingRequest { | 857 | self.incoming_requests.register(req_queue::PendingInRequest { |
880 | id: id.clone(), | 858 | id: id.clone(), |
881 | method: R::METHOD.to_string(), | 859 | method: R::METHOD.to_string(), |
882 | received: self.request_received, | 860 | received: self.request_received, |
@@ -993,14 +971,6 @@ where | |||
993 | Notification::new(N::METHOD.to_string(), params) | 971 | Notification::new(N::METHOD.to_string(), params) |
994 | } | 972 | } |
995 | 973 | ||
996 | fn request_new<R>(id: RequestId, params: R::Params) -> Request | ||
997 | where | ||
998 | R: lsp_types::request::Request, | ||
999 | R::Params: Serialize, | ||
1000 | { | ||
1001 | Request::new(id, R::METHOD.to_string(), params) | ||
1002 | } | ||
1003 | |||
1004 | #[cfg(test)] | 974 | #[cfg(test)] |
1005 | mod tests { | 975 | mod tests { |
1006 | use std::borrow::Cow; | 976 | use std::borrow::Cow; |