aboutsummaryrefslogtreecommitdiff
path: root/crates
diff options
context:
space:
mode:
authorAleksey Kladov <[email protected]>2020-06-20 12:06:11 +0100
committerAleksey Kladov <[email protected]>2020-06-20 12:06:11 +0100
commit93605c49baccf7d28297028f0b49be8b94701f67 (patch)
treea75822c9456351234ef3819dbebb52093c0c4f35 /crates
parent0f7961d5570f17d6c2098ab11d2a3bcbbfb84ff6 (diff)
Centralize handing of in-flight requests
Diffstat (limited to 'crates')
-rw-r--r--crates/rust-analyzer/src/global_state.rs4
-rw-r--r--crates/rust-analyzer/src/main_loop.rs156
-rw-r--r--crates/rust-analyzer/src/main_loop/pending_requests.rs75
-rw-r--r--crates/rust-analyzer/src/main_loop/req_queue.rs123
4 files changed, 188 insertions, 170 deletions
diff --git a/crates/rust-analyzer/src/global_state.rs b/crates/rust-analyzer/src/global_state.rs
index 1527c9947..d1897bf50 100644
--- a/crates/rust-analyzer/src/global_state.rs
+++ b/crates/rust-analyzer/src/global_state.rs
@@ -20,7 +20,7 @@ use stdx::format_to;
20use crate::{ 20use crate::{
21 config::{Config, FilesWatcher}, 21 config::{Config, FilesWatcher},
22 diagnostics::{CheckFixes, DiagnosticCollection}, 22 diagnostics::{CheckFixes, DiagnosticCollection},
23 main_loop::pending_requests::{CompletedRequest, LatestRequests}, 23 main_loop::req_queue::{CompletedInRequest, LatestRequests},
24 to_proto::url_from_abs_path, 24 to_proto::url_from_abs_path,
25 vfs_glob::{Glob, RustPackageFilterBuilder}, 25 vfs_glob::{Glob, RustPackageFilterBuilder},
26 LspError, Result, 26 LspError, Result,
@@ -236,7 +236,7 @@ impl GlobalState {
236 self.analysis_host.collect_garbage() 236 self.analysis_host.collect_garbage()
237 } 237 }
238 238
239 pub fn complete_request(&mut self, request: CompletedRequest) { 239 pub(crate) fn complete_request(&mut self, request: CompletedInRequest) {
240 self.latest_requests.write().record(request) 240 self.latest_requests.write().record(request)
241 } 241 }
242} 242}
diff --git a/crates/rust-analyzer/src/main_loop.rs b/crates/rust-analyzer/src/main_loop.rs
index f0aaaa21e..fd40b2443 100644
--- a/crates/rust-analyzer/src/main_loop.rs
+++ b/crates/rust-analyzer/src/main_loop.rs
@@ -3,7 +3,7 @@
3 3
4mod handlers; 4mod handlers;
5mod subscriptions; 5mod subscriptions;
6pub(crate) mod pending_requests; 6pub(crate) mod req_queue;
7 7
8use std::{ 8use std::{
9 borrow::Cow, 9 borrow::Cow,
@@ -28,7 +28,6 @@ use ra_ide::{Canceled, FileId, LineIndex};
28use ra_prof::profile; 28use ra_prof::profile;
29use ra_project_model::{PackageRoot, ProjectWorkspace}; 29use ra_project_model::{PackageRoot, ProjectWorkspace};
30use ra_vfs::VfsTask; 30use ra_vfs::VfsTask;
31use rustc_hash::FxHashSet;
32use serde::{de::DeserializeOwned, Serialize}; 31use serde::{de::DeserializeOwned, Serialize};
33use threadpool::ThreadPool; 32use threadpool::ThreadPool;
34 33
@@ -38,12 +37,10 @@ use crate::{
38 from_proto, 37 from_proto,
39 global_state::{file_id_to_url, GlobalState, GlobalStateSnapshot}, 38 global_state::{file_id_to_url, GlobalState, GlobalStateSnapshot},
40 lsp_ext, 39 lsp_ext,
41 main_loop::{ 40 main_loop::subscriptions::Subscriptions,
42 pending_requests::{PendingRequest, PendingRequests},
43 subscriptions::Subscriptions,
44 },
45 Result, 41 Result,
46}; 42};
43use req_queue::ReqQueue;
47 44
48#[derive(Debug)] 45#[derive(Debug)]
49pub struct LspError { 46pub struct LspError {
@@ -153,10 +150,10 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
153 register_options: Some(serde_json::to_value(registration_options).unwrap()), 150 register_options: Some(serde_json::to_value(registration_options).unwrap()),
154 }; 151 };
155 let params = lsp_types::RegistrationParams { registrations: vec![registration] }; 152 let params = lsp_types::RegistrationParams { registrations: vec![registration] };
156 let request = request_new::<lsp_types::request::RegisterCapability>( 153 let request = loop_state
157 loop_state.next_request_id(), 154 .req_queue
158 params, 155 .outgoing
159 ); 156 .register::<lsp_types::request::RegisterCapability>(params, |_, _| ());
160 connection.sender.send(request.into()).unwrap(); 157 connection.sender.send(request.into()).unwrap();
161 } 158 }
162 159
@@ -199,7 +196,7 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
199 global_state.analysis_host.request_cancellation(); 196 global_state.analysis_host.request_cancellation();
200 log::info!("waiting for tasks to finish..."); 197 log::info!("waiting for tasks to finish...");
201 task_receiver.into_iter().for_each(|task| { 198 task_receiver.into_iter().for_each(|task| {
202 on_task(task, &connection.sender, &mut loop_state.pending_requests, &mut global_state) 199 on_task(task, &connection.sender, &mut loop_state.req_queue.incoming, &mut global_state)
203 }); 200 });
204 log::info!("...tasks have finished"); 201 log::info!("...tasks have finished");
205 log::info!("joining threadpool..."); 202 log::info!("joining threadpool...");
@@ -264,27 +261,14 @@ impl fmt::Debug for Event {
264 } 261 }
265} 262}
266 263
267#[derive(Debug, Default)] 264#[derive(Default)]
268struct LoopState { 265struct LoopState {
269 next_request_id: u64, 266 req_queue: ReqQueue<fn(&mut GlobalState, lsp_server::Response)>,
270 pending_responses: FxHashSet<RequestId>,
271 pending_requests: PendingRequests,
272 subscriptions: Subscriptions, 267 subscriptions: Subscriptions,
273 workspace_loaded: bool, 268 workspace_loaded: bool,
274 roots_progress_reported: Option<usize>, 269 roots_progress_reported: Option<usize>,
275 roots_scanned: usize, 270 roots_scanned: usize,
276 roots_total: usize, 271 roots_total: usize,
277 configuration_request_id: Option<RequestId>,
278}
279
280impl LoopState {
281 fn next_request_id(&mut self) -> RequestId {
282 self.next_request_id += 1;
283 let res: RequestId = self.next_request_id.into();
284 let inserted = self.pending_responses.insert(res.clone());
285 assert!(inserted);
286 res
287 }
288} 272}
289 273
290fn loop_turn( 274fn loop_turn(
@@ -307,7 +291,7 @@ fn loop_turn(
307 291
308 match event { 292 match event {
309 Event::Task(task) => { 293 Event::Task(task) => {
310 on_task(task, &connection.sender, &mut loop_state.pending_requests, global_state); 294 on_task(task, &connection.sender, &mut loop_state.req_queue.incoming, global_state);
311 global_state.maybe_collect_garbage(); 295 global_state.maybe_collect_garbage();
312 } 296 }
313 Event::Vfs(task) => { 297 Event::Vfs(task) => {
@@ -317,7 +301,7 @@ fn loop_turn(
317 Event::Msg(msg) => match msg { 301 Event::Msg(msg) => match msg {
318 Message::Request(req) => on_request( 302 Message::Request(req) => on_request(
319 global_state, 303 global_state,
320 &mut loop_state.pending_requests, 304 &mut loop_state.req_queue.incoming,
321 pool, 305 pool,
322 task_sender, 306 task_sender,
323 &connection.sender, 307 &connection.sender,
@@ -328,32 +312,8 @@ fn loop_turn(
328 on_notification(&connection.sender, global_state, loop_state, not)?; 312 on_notification(&connection.sender, global_state, loop_state, not)?;
329 } 313 }
330 Message::Response(resp) => { 314 Message::Response(resp) => {
331 let removed = loop_state.pending_responses.remove(&resp.id); 315 let handler = loop_state.req_queue.outgoing.complete(resp.id.clone());
332 if !removed { 316 handler(global_state, resp)
333 log::error!("unexpected response: {:?}", resp)
334 }
335
336 if Some(&resp.id) == loop_state.configuration_request_id.as_ref() {
337 loop_state.configuration_request_id = None;
338 log::debug!("config update response: '{:?}", resp);
339 let Response { error, result, .. } = resp;
340
341 match (error, result) {
342 (Some(err), _) => {
343 log::error!("failed to fetch the server settings: {:?}", err)
344 }
345 (None, Some(configs)) => {
346 if let Some(new_config) = configs.get(0) {
347 let mut config = global_state.config.clone();
348 config.update(&new_config);
349 global_state.update_configuration(config);
350 }
351 }
352 (None, None) => {
353 log::error!("received empty server settings response from the client")
354 }
355 }
356 }
357 } 317 }
358 }, 318 },
359 }; 319 };
@@ -407,12 +367,12 @@ fn loop_turn(
407fn on_task( 367fn on_task(
408 task: Task, 368 task: Task,
409 msg_sender: &Sender<Message>, 369 msg_sender: &Sender<Message>,
410 pending_requests: &mut PendingRequests, 370 incoming_requests: &mut req_queue::Incoming,
411 state: &mut GlobalState, 371 state: &mut GlobalState,
412) { 372) {
413 match task { 373 match task {
414 Task::Respond(response) => { 374 Task::Respond(response) => {
415 if let Some(completed) = pending_requests.finish(&response.id) { 375 if let Some(completed) = incoming_requests.complete(response.id.clone()) {
416 log::info!("handled req#{} in {:?}", completed.id, completed.duration); 376 log::info!("handled req#{} in {:?}", completed.id, completed.duration);
417 state.complete_request(completed); 377 state.complete_request(completed);
418 msg_sender.send(response.into()).unwrap(); 378 msg_sender.send(response.into()).unwrap();
@@ -427,7 +387,7 @@ fn on_task(
427 387
428fn on_request( 388fn on_request(
429 global_state: &mut GlobalState, 389 global_state: &mut GlobalState,
430 pending_requests: &mut PendingRequests, 390 incoming_requests: &mut req_queue::Incoming,
431 pool: &ThreadPool, 391 pool: &ThreadPool,
432 task_sender: &Sender<Task>, 392 task_sender: &Sender<Task>,
433 msg_sender: &Sender<Message>, 393 msg_sender: &Sender<Message>,
@@ -440,7 +400,7 @@ fn on_request(
440 global_state, 400 global_state,
441 task_sender, 401 task_sender,
442 msg_sender, 402 msg_sender,
443 pending_requests, 403 incoming_requests,
444 request_received, 404 request_received,
445 }; 405 };
446 pool_dispatcher 406 pool_dispatcher
@@ -504,12 +464,7 @@ fn on_notification(
504 NumberOrString::Number(id) => id.into(), 464 NumberOrString::Number(id) => id.into(),
505 NumberOrString::String(id) => id.into(), 465 NumberOrString::String(id) => id.into(),
506 }; 466 };
507 if loop_state.pending_requests.cancel(&id) { 467 if let Some(response) = loop_state.req_queue.incoming.cancel(id) {
508 let response = Response::new_err(
509 id,
510 ErrorCode::RequestCanceled as i32,
511 "canceled by client".to_string(),
512 );
513 msg_sender.send(response.into()).unwrap() 468 msg_sender.send(response.into()).unwrap()
514 } 469 }
515 return Ok(()); 470 return Ok(());
@@ -572,18 +527,38 @@ fn on_notification(
572 Ok(_) => { 527 Ok(_) => {
573 // As stated in https://github.com/microsoft/language-server-protocol/issues/676, 528 // As stated in https://github.com/microsoft/language-server-protocol/issues/676,
574 // this notification's parameters should be ignored and the actual config queried separately. 529 // this notification's parameters should be ignored and the actual config queried separately.
575 let request_id = loop_state.next_request_id(); 530 let request = loop_state
576 let request = request_new::<lsp_types::request::WorkspaceConfiguration>( 531 .req_queue
577 request_id.clone(), 532 .outgoing
578 lsp_types::ConfigurationParams { 533 .register::<lsp_types::request::WorkspaceConfiguration>(
579 items: vec![lsp_types::ConfigurationItem { 534 lsp_types::ConfigurationParams {
580 scope_uri: None, 535 items: vec![lsp_types::ConfigurationItem {
581 section: Some("rust-analyzer".to_string()), 536 scope_uri: None,
582 }], 537 section: Some("rust-analyzer".to_string()),
583 }, 538 }],
584 ); 539 },
540 |global_state, resp| {
541 log::debug!("config update response: '{:?}", resp);
542 let Response { error, result, .. } = resp;
543
544 match (error, result) {
545 (Some(err), _) => {
546 log::error!("failed to fetch the server settings: {:?}", err)
547 }
548 (None, Some(configs)) => {
549 if let Some(new_config) = configs.get(0) {
550 let mut config = global_state.config.clone();
551 config.update(&new_config);
552 global_state.update_configuration(config);
553 }
554 }
555 (None, None) => log::error!(
556 "received empty server settings response from the client"
557 ),
558 }
559 },
560 );
585 msg_sender.send(request.into())?; 561 msg_sender.send(request.into())?;
586 loop_state.configuration_request_id = Some(request_id);
587 562
588 return Ok(()); 563 return Ok(());
589 } 564 }
@@ -752,13 +727,16 @@ fn send_startup_progress(sender: &Sender<Message>, loop_state: &mut LoopState) {
752 727
753 match (prev, loop_state.workspace_loaded) { 728 match (prev, loop_state.workspace_loaded) {
754 (None, false) => { 729 (None, false) => {
755 let work_done_progress_create = request_new::<lsp_types::request::WorkDoneProgressCreate>( 730 let request = loop_state
756 loop_state.next_request_id(), 731 .req_queue
757 WorkDoneProgressCreateParams { 732 .outgoing
758 token: lsp_types::ProgressToken::String("rustAnalyzer/startup".into()), 733 .register::<lsp_types::request::WorkDoneProgressCreate>(
759 }, 734 WorkDoneProgressCreateParams {
760 ); 735 token: lsp_types::ProgressToken::String("rustAnalyzer/startup".into()),
761 sender.send(work_done_progress_create.into()).unwrap(); 736 },
737 |_, _| (),
738 );
739 sender.send(request.into()).unwrap();
762 send_startup_progress_notif( 740 send_startup_progress_notif(
763 sender, 741 sender,
764 WorkDoneProgress::Begin(WorkDoneProgressBegin { 742 WorkDoneProgress::Begin(WorkDoneProgressBegin {
@@ -800,7 +778,7 @@ struct PoolDispatcher<'a> {
800 req: Option<Request>, 778 req: Option<Request>,
801 pool: &'a ThreadPool, 779 pool: &'a ThreadPool,
802 global_state: &'a mut GlobalState, 780 global_state: &'a mut GlobalState,
803 pending_requests: &'a mut PendingRequests, 781 incoming_requests: &'a mut req_queue::Incoming,
804 msg_sender: &'a Sender<Message>, 782 msg_sender: &'a Sender<Message>,
805 task_sender: &'a Sender<Task>, 783 task_sender: &'a Sender<Task>,
806 request_received: Instant, 784 request_received: Instant,
@@ -829,7 +807,7 @@ impl<'a> PoolDispatcher<'a> {
829 result_to_task::<R>(id, result) 807 result_to_task::<R>(id, result)
830 }) 808 })
831 .map_err(|_| format!("sync task {:?} panicked", R::METHOD))?; 809 .map_err(|_| format!("sync task {:?} panicked", R::METHOD))?;
832 on_task(task, self.msg_sender, self.pending_requests, self.global_state); 810 on_task(task, self.msg_sender, self.incoming_requests, self.global_state);
833 Ok(self) 811 Ok(self)
834 } 812 }
835 813
@@ -876,7 +854,7 @@ impl<'a> PoolDispatcher<'a> {
876 return None; 854 return None;
877 } 855 }
878 }; 856 };
879 self.pending_requests.start(PendingRequest { 857 self.incoming_requests.register(req_queue::PendingInRequest {
880 id: id.clone(), 858 id: id.clone(),
881 method: R::METHOD.to_string(), 859 method: R::METHOD.to_string(),
882 received: self.request_received, 860 received: self.request_received,
@@ -993,14 +971,6 @@ where
993 Notification::new(N::METHOD.to_string(), params) 971 Notification::new(N::METHOD.to_string(), params)
994} 972}
995 973
996fn request_new<R>(id: RequestId, params: R::Params) -> Request
997where
998 R: lsp_types::request::Request,
999 R::Params: Serialize,
1000{
1001 Request::new(id, R::METHOD.to_string(), params)
1002}
1003
1004#[cfg(test)] 974#[cfg(test)]
1005mod tests { 975mod tests {
1006 use std::borrow::Cow; 976 use std::borrow::Cow;
diff --git a/crates/rust-analyzer/src/main_loop/pending_requests.rs b/crates/rust-analyzer/src/main_loop/pending_requests.rs
deleted file mode 100644
index 73b33e419..000000000
--- a/crates/rust-analyzer/src/main_loop/pending_requests.rs
+++ /dev/null
@@ -1,75 +0,0 @@
1//! Data structures that keep track of inflight requests.
2
3use std::time::{Duration, Instant};
4
5use lsp_server::RequestId;
6use rustc_hash::FxHashMap;
7
8#[derive(Debug)]
9pub struct CompletedRequest {
10 pub id: RequestId,
11 pub method: String,
12 pub duration: Duration,
13}
14
15#[derive(Debug)]
16pub(crate) struct PendingRequest {
17 pub(crate) id: RequestId,
18 pub(crate) method: String,
19 pub(crate) received: Instant,
20}
21
22impl From<PendingRequest> for CompletedRequest {
23 fn from(pending: PendingRequest) -> CompletedRequest {
24 CompletedRequest {
25 id: pending.id,
26 method: pending.method,
27 duration: pending.received.elapsed(),
28 }
29 }
30}
31
32#[derive(Debug, Default)]
33pub(crate) struct PendingRequests {
34 map: FxHashMap<RequestId, PendingRequest>,
35}
36
37impl PendingRequests {
38 pub(crate) fn start(&mut self, request: PendingRequest) {
39 let id = request.id.clone();
40 let prev = self.map.insert(id.clone(), request);
41 assert!(prev.is_none(), "duplicate request with id {}", id);
42 }
43 pub(crate) fn cancel(&mut self, id: &RequestId) -> bool {
44 self.map.remove(id).is_some()
45 }
46 pub(crate) fn finish(&mut self, id: &RequestId) -> Option<CompletedRequest> {
47 self.map.remove(id).map(CompletedRequest::from)
48 }
49}
50
51const N_COMPLETED_REQUESTS: usize = 10;
52
53#[derive(Debug, Default)]
54pub struct LatestRequests {
55 // hand-rolling VecDeque here to print things in a nicer way
56 buf: [Option<CompletedRequest>; N_COMPLETED_REQUESTS],
57 idx: usize,
58}
59
60impl LatestRequests {
61 pub(crate) fn record(&mut self, request: CompletedRequest) {
62 // special case: don't track status request itself
63 if request.method == "rust-analyzer/analyzerStatus" {
64 return;
65 }
66 let idx = self.idx;
67 self.buf[idx] = Some(request);
68 self.idx = (idx + 1) % N_COMPLETED_REQUESTS;
69 }
70
71 pub(crate) fn iter(&self) -> impl Iterator<Item = (bool, &CompletedRequest)> {
72 let idx = self.idx;
73 self.buf.iter().enumerate().filter_map(move |(i, req)| Some((i == idx, req.as_ref()?)))
74 }
75}
diff --git a/crates/rust-analyzer/src/main_loop/req_queue.rs b/crates/rust-analyzer/src/main_loop/req_queue.rs
new file mode 100644
index 000000000..5cf6d916b
--- /dev/null
+++ b/crates/rust-analyzer/src/main_loop/req_queue.rs
@@ -0,0 +1,123 @@
1//! Manages the set of in-flight requests in both directions.
2use std::time::{Duration, Instant};
3
4use lsp_server::RequestId;
5use rustc_hash::FxHashMap;
6use serde::Serialize;
7
8#[derive(Debug)]
9pub(crate) struct ReqQueue<H> {
10 pub(crate) incoming: Incoming,
11 pub(crate) outgoing: Outgoing<H>,
12}
13
14impl<H> Default for ReqQueue<H> {
15 fn default() -> Self {
16 ReqQueue { incoming: Incoming::default(), outgoing: Outgoing::default() }
17 }
18}
19
20#[derive(Debug)]
21pub(crate) struct Outgoing<H> {
22 next: u64,
23 pending: FxHashMap<RequestId, H>,
24}
25
26impl<H> Default for Outgoing<H> {
27 fn default() -> Self {
28 Outgoing { next: 0, pending: FxHashMap::default() }
29 }
30}
31
32impl<H> Outgoing<H> {
33 pub(crate) fn register<R>(&mut self, params: R::Params, handler: H) -> lsp_server::Request
34 where
35 R: lsp_types::request::Request,
36 R::Params: Serialize,
37 {
38 let id = RequestId::from(self.next);
39 self.next += 1;
40 self.pending.insert(id.clone(), handler);
41 lsp_server::Request::new(id, R::METHOD.to_string(), params)
42 }
43 pub(crate) fn complete(&mut self, id: RequestId) -> H {
44 self.pending.remove(&id).unwrap()
45 }
46}
47
48#[derive(Debug)]
49pub(crate) struct CompletedInRequest {
50 pub(crate) id: RequestId,
51 pub(crate) method: String,
52 pub(crate) duration: Duration,
53}
54
55#[derive(Debug)]
56pub(crate) struct PendingInRequest {
57 pub(crate) id: RequestId,
58 pub(crate) method: String,
59 pub(crate) received: Instant,
60}
61
62impl From<PendingInRequest> for CompletedInRequest {
63 fn from(pending: PendingInRequest) -> CompletedInRequest {
64 CompletedInRequest {
65 id: pending.id,
66 method: pending.method,
67 duration: pending.received.elapsed(),
68 }
69 }
70}
71
72#[derive(Debug, Default)]
73pub(crate) struct Incoming {
74 pending: FxHashMap<RequestId, PendingInRequest>,
75}
76
77impl Incoming {
78 pub(crate) fn register(&mut self, request: PendingInRequest) {
79 let id = request.id.clone();
80 let prev = self.pending.insert(id.clone(), request);
81 assert!(prev.is_none(), "duplicate request with id {}", id);
82 }
83 pub(crate) fn cancel(&mut self, id: RequestId) -> Option<lsp_server::Response> {
84 if self.pending.remove(&id).is_some() {
85 Some(lsp_server::Response::new_err(
86 id,
87 lsp_server::ErrorCode::RequestCanceled as i32,
88 "canceled by client".to_string(),
89 ))
90 } else {
91 None
92 }
93 }
94 pub(crate) fn complete(&mut self, id: RequestId) -> Option<CompletedInRequest> {
95 self.pending.remove(&id).map(CompletedInRequest::from)
96 }
97}
98
99const N_COMPLETED_REQUESTS: usize = 10;
100
101#[derive(Debug, Default)]
102pub struct LatestRequests {
103 // hand-rolling VecDeque here to print things in a nicer way
104 buf: [Option<CompletedInRequest>; N_COMPLETED_REQUESTS],
105 idx: usize,
106}
107
108impl LatestRequests {
109 pub(crate) fn record(&mut self, request: CompletedInRequest) {
110 // special case: don't track status request itself
111 if request.method == "rust-analyzer/analyzerStatus" {
112 return;
113 }
114 let idx = self.idx;
115 self.buf[idx] = Some(request);
116 self.idx = (idx + 1) % N_COMPLETED_REQUESTS;
117 }
118
119 pub(crate) fn iter(&self) -> impl Iterator<Item = (bool, &CompletedInRequest)> {
120 let idx = self.idx;
121 self.buf.iter().enumerate().filter_map(move |(i, req)| Some((i == idx, req.as_ref()?)))
122 }
123}