aboutsummaryrefslogtreecommitdiff
path: root/crates/rust-analyzer/src/main_loop.rs
diff options
context:
space:
mode:
authorbors[bot] <26634292+bors[bot]@users.noreply.github.com>2020-06-20 20:39:41 +0100
committerGitHub <[email protected]>2020-06-20 20:39:41 +0100
commitb575b0244982da29159d341412945b92dd57f5df (patch)
tree33819de5d4de02d100a9c78110cb51b9b7df14c6 /crates/rust-analyzer/src/main_loop.rs
parentc8557b91a3ccfea5c83260bc35a59e6839784281 (diff)
parent93605c49baccf7d28297028f0b49be8b94701f67 (diff)
Merge #4961
4961: Centralize handing of in-flight requests r=matklad a=matklad Co-authored-by: Aleksey Kladov <[email protected]>
Diffstat (limited to 'crates/rust-analyzer/src/main_loop.rs')
-rw-r--r--crates/rust-analyzer/src/main_loop.rs156
1 files changed, 63 insertions, 93 deletions
diff --git a/crates/rust-analyzer/src/main_loop.rs b/crates/rust-analyzer/src/main_loop.rs
index f0aaaa21e..fd40b2443 100644
--- a/crates/rust-analyzer/src/main_loop.rs
+++ b/crates/rust-analyzer/src/main_loop.rs
@@ -3,7 +3,7 @@
3 3
4mod handlers; 4mod handlers;
5mod subscriptions; 5mod subscriptions;
6pub(crate) mod pending_requests; 6pub(crate) mod req_queue;
7 7
8use std::{ 8use std::{
9 borrow::Cow, 9 borrow::Cow,
@@ -28,7 +28,6 @@ use ra_ide::{Canceled, FileId, LineIndex};
28use ra_prof::profile; 28use ra_prof::profile;
29use ra_project_model::{PackageRoot, ProjectWorkspace}; 29use ra_project_model::{PackageRoot, ProjectWorkspace};
30use ra_vfs::VfsTask; 30use ra_vfs::VfsTask;
31use rustc_hash::FxHashSet;
32use serde::{de::DeserializeOwned, Serialize}; 31use serde::{de::DeserializeOwned, Serialize};
33use threadpool::ThreadPool; 32use threadpool::ThreadPool;
34 33
@@ -38,12 +37,10 @@ use crate::{
38 from_proto, 37 from_proto,
39 global_state::{file_id_to_url, GlobalState, GlobalStateSnapshot}, 38 global_state::{file_id_to_url, GlobalState, GlobalStateSnapshot},
40 lsp_ext, 39 lsp_ext,
41 main_loop::{ 40 main_loop::subscriptions::Subscriptions,
42 pending_requests::{PendingRequest, PendingRequests},
43 subscriptions::Subscriptions,
44 },
45 Result, 41 Result,
46}; 42};
43use req_queue::ReqQueue;
47 44
48#[derive(Debug)] 45#[derive(Debug)]
49pub struct LspError { 46pub struct LspError {
@@ -153,10 +150,10 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
153 register_options: Some(serde_json::to_value(registration_options).unwrap()), 150 register_options: Some(serde_json::to_value(registration_options).unwrap()),
154 }; 151 };
155 let params = lsp_types::RegistrationParams { registrations: vec![registration] }; 152 let params = lsp_types::RegistrationParams { registrations: vec![registration] };
156 let request = request_new::<lsp_types::request::RegisterCapability>( 153 let request = loop_state
157 loop_state.next_request_id(), 154 .req_queue
158 params, 155 .outgoing
159 ); 156 .register::<lsp_types::request::RegisterCapability>(params, |_, _| ());
160 connection.sender.send(request.into()).unwrap(); 157 connection.sender.send(request.into()).unwrap();
161 } 158 }
162 159
@@ -199,7 +196,7 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
199 global_state.analysis_host.request_cancellation(); 196 global_state.analysis_host.request_cancellation();
200 log::info!("waiting for tasks to finish..."); 197 log::info!("waiting for tasks to finish...");
201 task_receiver.into_iter().for_each(|task| { 198 task_receiver.into_iter().for_each(|task| {
202 on_task(task, &connection.sender, &mut loop_state.pending_requests, &mut global_state) 199 on_task(task, &connection.sender, &mut loop_state.req_queue.incoming, &mut global_state)
203 }); 200 });
204 log::info!("...tasks have finished"); 201 log::info!("...tasks have finished");
205 log::info!("joining threadpool..."); 202 log::info!("joining threadpool...");
@@ -264,27 +261,14 @@ impl fmt::Debug for Event {
264 } 261 }
265} 262}
266 263
267#[derive(Debug, Default)] 264#[derive(Default)]
268struct LoopState { 265struct LoopState {
269 next_request_id: u64, 266 req_queue: ReqQueue<fn(&mut GlobalState, lsp_server::Response)>,
270 pending_responses: FxHashSet<RequestId>,
271 pending_requests: PendingRequests,
272 subscriptions: Subscriptions, 267 subscriptions: Subscriptions,
273 workspace_loaded: bool, 268 workspace_loaded: bool,
274 roots_progress_reported: Option<usize>, 269 roots_progress_reported: Option<usize>,
275 roots_scanned: usize, 270 roots_scanned: usize,
276 roots_total: usize, 271 roots_total: usize,
277 configuration_request_id: Option<RequestId>,
278}
279
280impl LoopState {
281 fn next_request_id(&mut self) -> RequestId {
282 self.next_request_id += 1;
283 let res: RequestId = self.next_request_id.into();
284 let inserted = self.pending_responses.insert(res.clone());
285 assert!(inserted);
286 res
287 }
288} 272}
289 273
290fn loop_turn( 274fn loop_turn(
@@ -307,7 +291,7 @@ fn loop_turn(
307 291
308 match event { 292 match event {
309 Event::Task(task) => { 293 Event::Task(task) => {
310 on_task(task, &connection.sender, &mut loop_state.pending_requests, global_state); 294 on_task(task, &connection.sender, &mut loop_state.req_queue.incoming, global_state);
311 global_state.maybe_collect_garbage(); 295 global_state.maybe_collect_garbage();
312 } 296 }
313 Event::Vfs(task) => { 297 Event::Vfs(task) => {
@@ -317,7 +301,7 @@ fn loop_turn(
317 Event::Msg(msg) => match msg { 301 Event::Msg(msg) => match msg {
318 Message::Request(req) => on_request( 302 Message::Request(req) => on_request(
319 global_state, 303 global_state,
320 &mut loop_state.pending_requests, 304 &mut loop_state.req_queue.incoming,
321 pool, 305 pool,
322 task_sender, 306 task_sender,
323 &connection.sender, 307 &connection.sender,
@@ -328,32 +312,8 @@ fn loop_turn(
328 on_notification(&connection.sender, global_state, loop_state, not)?; 312 on_notification(&connection.sender, global_state, loop_state, not)?;
329 } 313 }
330 Message::Response(resp) => { 314 Message::Response(resp) => {
331 let removed = loop_state.pending_responses.remove(&resp.id); 315 let handler = loop_state.req_queue.outgoing.complete(resp.id.clone());
332 if !removed { 316 handler(global_state, resp)
333 log::error!("unexpected response: {:?}", resp)
334 }
335
336 if Some(&resp.id) == loop_state.configuration_request_id.as_ref() {
337 loop_state.configuration_request_id = None;
338 log::debug!("config update response: '{:?}", resp);
339 let Response { error, result, .. } = resp;
340
341 match (error, result) {
342 (Some(err), _) => {
343 log::error!("failed to fetch the server settings: {:?}", err)
344 }
345 (None, Some(configs)) => {
346 if let Some(new_config) = configs.get(0) {
347 let mut config = global_state.config.clone();
348 config.update(&new_config);
349 global_state.update_configuration(config);
350 }
351 }
352 (None, None) => {
353 log::error!("received empty server settings response from the client")
354 }
355 }
356 }
357 } 317 }
358 }, 318 },
359 }; 319 };
@@ -407,12 +367,12 @@ fn loop_turn(
407fn on_task( 367fn on_task(
408 task: Task, 368 task: Task,
409 msg_sender: &Sender<Message>, 369 msg_sender: &Sender<Message>,
410 pending_requests: &mut PendingRequests, 370 incoming_requests: &mut req_queue::Incoming,
411 state: &mut GlobalState, 371 state: &mut GlobalState,
412) { 372) {
413 match task { 373 match task {
414 Task::Respond(response) => { 374 Task::Respond(response) => {
415 if let Some(completed) = pending_requests.finish(&response.id) { 375 if let Some(completed) = incoming_requests.complete(response.id.clone()) {
416 log::info!("handled req#{} in {:?}", completed.id, completed.duration); 376 log::info!("handled req#{} in {:?}", completed.id, completed.duration);
417 state.complete_request(completed); 377 state.complete_request(completed);
418 msg_sender.send(response.into()).unwrap(); 378 msg_sender.send(response.into()).unwrap();
@@ -427,7 +387,7 @@ fn on_task(
427 387
428fn on_request( 388fn on_request(
429 global_state: &mut GlobalState, 389 global_state: &mut GlobalState,
430 pending_requests: &mut PendingRequests, 390 incoming_requests: &mut req_queue::Incoming,
431 pool: &ThreadPool, 391 pool: &ThreadPool,
432 task_sender: &Sender<Task>, 392 task_sender: &Sender<Task>,
433 msg_sender: &Sender<Message>, 393 msg_sender: &Sender<Message>,
@@ -440,7 +400,7 @@ fn on_request(
440 global_state, 400 global_state,
441 task_sender, 401 task_sender,
442 msg_sender, 402 msg_sender,
443 pending_requests, 403 incoming_requests,
444 request_received, 404 request_received,
445 }; 405 };
446 pool_dispatcher 406 pool_dispatcher
@@ -504,12 +464,7 @@ fn on_notification(
504 NumberOrString::Number(id) => id.into(), 464 NumberOrString::Number(id) => id.into(),
505 NumberOrString::String(id) => id.into(), 465 NumberOrString::String(id) => id.into(),
506 }; 466 };
507 if loop_state.pending_requests.cancel(&id) { 467 if let Some(response) = loop_state.req_queue.incoming.cancel(id) {
508 let response = Response::new_err(
509 id,
510 ErrorCode::RequestCanceled as i32,
511 "canceled by client".to_string(),
512 );
513 msg_sender.send(response.into()).unwrap() 468 msg_sender.send(response.into()).unwrap()
514 } 469 }
515 return Ok(()); 470 return Ok(());
@@ -572,18 +527,38 @@ fn on_notification(
572 Ok(_) => { 527 Ok(_) => {
573 // As stated in https://github.com/microsoft/language-server-protocol/issues/676, 528 // As stated in https://github.com/microsoft/language-server-protocol/issues/676,
574 // this notification's parameters should be ignored and the actual config queried separately. 529 // this notification's parameters should be ignored and the actual config queried separately.
575 let request_id = loop_state.next_request_id(); 530 let request = loop_state
576 let request = request_new::<lsp_types::request::WorkspaceConfiguration>( 531 .req_queue
577 request_id.clone(), 532 .outgoing
578 lsp_types::ConfigurationParams { 533 .register::<lsp_types::request::WorkspaceConfiguration>(
579 items: vec![lsp_types::ConfigurationItem { 534 lsp_types::ConfigurationParams {
580 scope_uri: None, 535 items: vec![lsp_types::ConfigurationItem {
581 section: Some("rust-analyzer".to_string()), 536 scope_uri: None,
582 }], 537 section: Some("rust-analyzer".to_string()),
583 }, 538 }],
584 ); 539 },
540 |global_state, resp| {
541 log::debug!("config update response: '{:?}", resp);
542 let Response { error, result, .. } = resp;
543
544 match (error, result) {
545 (Some(err), _) => {
546 log::error!("failed to fetch the server settings: {:?}", err)
547 }
548 (None, Some(configs)) => {
549 if let Some(new_config) = configs.get(0) {
550 let mut config = global_state.config.clone();
551 config.update(&new_config);
552 global_state.update_configuration(config);
553 }
554 }
555 (None, None) => log::error!(
556 "received empty server settings response from the client"
557 ),
558 }
559 },
560 );
585 msg_sender.send(request.into())?; 561 msg_sender.send(request.into())?;
586 loop_state.configuration_request_id = Some(request_id);
587 562
588 return Ok(()); 563 return Ok(());
589 } 564 }
@@ -752,13 +727,16 @@ fn send_startup_progress(sender: &Sender<Message>, loop_state: &mut LoopState) {
752 727
753 match (prev, loop_state.workspace_loaded) { 728 match (prev, loop_state.workspace_loaded) {
754 (None, false) => { 729 (None, false) => {
755 let work_done_progress_create = request_new::<lsp_types::request::WorkDoneProgressCreate>( 730 let request = loop_state
756 loop_state.next_request_id(), 731 .req_queue
757 WorkDoneProgressCreateParams { 732 .outgoing
758 token: lsp_types::ProgressToken::String("rustAnalyzer/startup".into()), 733 .register::<lsp_types::request::WorkDoneProgressCreate>(
759 }, 734 WorkDoneProgressCreateParams {
760 ); 735 token: lsp_types::ProgressToken::String("rustAnalyzer/startup".into()),
761 sender.send(work_done_progress_create.into()).unwrap(); 736 },
737 |_, _| (),
738 );
739 sender.send(request.into()).unwrap();
762 send_startup_progress_notif( 740 send_startup_progress_notif(
763 sender, 741 sender,
764 WorkDoneProgress::Begin(WorkDoneProgressBegin { 742 WorkDoneProgress::Begin(WorkDoneProgressBegin {
@@ -800,7 +778,7 @@ struct PoolDispatcher<'a> {
800 req: Option<Request>, 778 req: Option<Request>,
801 pool: &'a ThreadPool, 779 pool: &'a ThreadPool,
802 global_state: &'a mut GlobalState, 780 global_state: &'a mut GlobalState,
803 pending_requests: &'a mut PendingRequests, 781 incoming_requests: &'a mut req_queue::Incoming,
804 msg_sender: &'a Sender<Message>, 782 msg_sender: &'a Sender<Message>,
805 task_sender: &'a Sender<Task>, 783 task_sender: &'a Sender<Task>,
806 request_received: Instant, 784 request_received: Instant,
@@ -829,7 +807,7 @@ impl<'a> PoolDispatcher<'a> {
829 result_to_task::<R>(id, result) 807 result_to_task::<R>(id, result)
830 }) 808 })
831 .map_err(|_| format!("sync task {:?} panicked", R::METHOD))?; 809 .map_err(|_| format!("sync task {:?} panicked", R::METHOD))?;
832 on_task(task, self.msg_sender, self.pending_requests, self.global_state); 810 on_task(task, self.msg_sender, self.incoming_requests, self.global_state);
833 Ok(self) 811 Ok(self)
834 } 812 }
835 813
@@ -876,7 +854,7 @@ impl<'a> PoolDispatcher<'a> {
876 return None; 854 return None;
877 } 855 }
878 }; 856 };
879 self.pending_requests.start(PendingRequest { 857 self.incoming_requests.register(req_queue::PendingInRequest {
880 id: id.clone(), 858 id: id.clone(),
881 method: R::METHOD.to_string(), 859 method: R::METHOD.to_string(),
882 received: self.request_received, 860 received: self.request_received,
@@ -993,14 +971,6 @@ where
993 Notification::new(N::METHOD.to_string(), params) 971 Notification::new(N::METHOD.to_string(), params)
994} 972}
995 973
996fn request_new<R>(id: RequestId, params: R::Params) -> Request
997where
998 R: lsp_types::request::Request,
999 R::Params: Serialize,
1000{
1001 Request::new(id, R::METHOD.to_string(), params)
1002}
1003
1004#[cfg(test)] 974#[cfg(test)]
1005mod tests { 975mod tests {
1006 use std::borrow::Cow; 976 use std::borrow::Cow;