aboutsummaryrefslogtreecommitdiff
path: root/crates/rust-analyzer/src/main_loop.rs
diff options
context:
space:
mode:
authorbors[bot] <26634292+bors[bot]@users.noreply.github.com>2020-06-25 01:16:32 +0100
committerGitHub <noreply@github.com>2020-06-25 01:16:32 +0100
commit78e94e4570f09c8cbe1f8c6802df9b112ca37f08 (patch)
tree395a23d51c043660ec50ee289d695694037af158 /crates/rust-analyzer/src/main_loop.rs
parent104fad65daaa6ab103ba8815244afa8243421594 (diff)
parent10ee6eb7333d1978eac4c70039162f61d6275ba3 (diff)
Merge #5044
5044: Tweak visibility r=matklad a=matklad bors r+ 🤖 Co-authored-by: Aleksey Kladov <aleksey.kladov@gmail.com>
Diffstat (limited to 'crates/rust-analyzer/src/main_loop.rs')
-rw-r--r--crates/rust-analyzer/src/main_loop.rs151
1 files changed, 43 insertions, 108 deletions
diff --git a/crates/rust-analyzer/src/main_loop.rs b/crates/rust-analyzer/src/main_loop.rs
index c8819c3b0..eb9e7f913 100644
--- a/crates/rust-analyzer/src/main_loop.rs
+++ b/crates/rust-analyzer/src/main_loop.rs
@@ -1,9 +1,7 @@
1//! The main loop of `rust-analyzer` responsible for dispatching LSP 1//! The main loop of `rust-analyzer` responsible for dispatching LSP
2//! requests/replies and notifications back to the client. 2//! requests/replies and notifications back to the client.
3use std::{ 3use std::{
4 env, 4 env, fmt,
5 error::Error,
6 fmt,
7 ops::Range, 5 ops::Range,
8 panic, 6 panic,
9 sync::Arc, 7 sync::Arc,
@@ -11,16 +9,13 @@ use std::{
11}; 9};
12 10
13use crossbeam_channel::{never, select, unbounded, RecvError, Sender}; 11use crossbeam_channel::{never, select, unbounded, RecvError, Sender};
14use lsp_server::{ 12use lsp_server::{Connection, ErrorCode, Message, Notification, Request, RequestId, Response};
15 Connection, ErrorCode, Message, Notification, ReqQueue, Request, RequestId, Response,
16};
17use lsp_types::{request::Request as _, NumberOrString, TextDocumentContentChangeEvent}; 13use lsp_types::{request::Request as _, NumberOrString, TextDocumentContentChangeEvent};
18use ra_db::VfsPath; 14use ra_db::VfsPath;
19use ra_flycheck::CheckTask; 15use ra_flycheck::CheckTask;
20use ra_ide::{Canceled, FileId, LineIndex}; 16use ra_ide::{Canceled, FileId, LineIndex};
21use ra_prof::profile; 17use ra_prof::profile;
22use ra_project_model::{PackageRoot, ProjectWorkspace}; 18use ra_project_model::{PackageRoot, ProjectWorkspace};
23use rustc_hash::FxHashSet;
24use serde::{de::DeserializeOwned, Serialize}; 19use serde::{de::DeserializeOwned, Serialize};
25use threadpool::ThreadPool; 20use threadpool::ThreadPool;
26 21
@@ -28,34 +23,12 @@ use crate::{
28 config::{Config, FilesWatcher, LinkedProject}, 23 config::{Config, FilesWatcher, LinkedProject},
29 diagnostics::DiagnosticTask, 24 diagnostics::DiagnosticTask,
30 from_proto, 25 from_proto,
31 global_state::{file_id_to_url, GlobalState, GlobalStateSnapshot}, 26 global_state::{file_id_to_url, GlobalState, GlobalStateSnapshot, Status},
32 handlers, lsp_ext, 27 handlers, lsp_ext,
33 request_metrics::RequestMetrics, 28 request_metrics::RequestMetrics,
34 Result, 29 LspError, Result,
35}; 30};
36 31
37#[derive(Debug)]
38pub struct LspError {
39 pub code: i32,
40 pub message: String,
41}
42
43impl LspError {
44 pub const UNKNOWN_FILE: i32 = -32900;
45
46 pub fn new(code: i32, message: String) -> LspError {
47 LspError { code, message }
48 }
49}
50
51impl fmt::Display for LspError {
52 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
53 write!(f, "Language Server request failed with {}. ({})", self.code, self.message)
54 }
55}
56
57impl Error for LspError {}
58
59pub fn main_loop(config: Config, connection: Connection) -> Result<()> { 32pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
60 log::info!("initial config: {:#?}", config); 33 log::info!("initial config: {:#?}", config);
61 34
@@ -78,7 +51,6 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
78 SetThreadPriority(thread, thread_priority_above_normal); 51 SetThreadPriority(thread, thread_priority_above_normal);
79 } 52 }
80 53
81 let mut loop_state = LoopState::default();
82 let mut global_state = { 54 let mut global_state = {
83 let workspaces = { 55 let workspaces = {
84 if config.linked_projects.is_empty() && config.notifications.cargo_toml_not_found { 56 if config.linked_projects.is_empty() && config.notifications.cargo_toml_not_found {
@@ -116,6 +88,8 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
116 .collect::<Vec<_>>() 88 .collect::<Vec<_>>()
117 }; 89 };
118 90
91 let mut req_queue = ReqQueue::default();
92
119 if let FilesWatcher::Client = config.files.watcher { 93 if let FilesWatcher::Client = config.files.watcher {
120 let registration_options = lsp_types::DidChangeWatchedFilesRegistrationOptions { 94 let registration_options = lsp_types::DidChangeWatchedFilesRegistrationOptions {
121 watchers: workspaces 95 watchers: workspaces
@@ -132,7 +106,7 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
132 register_options: Some(serde_json::to_value(registration_options).unwrap()), 106 register_options: Some(serde_json::to_value(registration_options).unwrap()),
133 }; 107 };
134 let params = lsp_types::RegistrationParams { registrations: vec![registration] }; 108 let params = lsp_types::RegistrationParams { registrations: vec![registration] };
135 let request = loop_state.req_queue.outgoing.register( 109 let request = req_queue.outgoing.register(
136 lsp_types::request::RegisterCapability::METHOD.to_string(), 110 lsp_types::request::RegisterCapability::METHOD.to_string(),
137 params, 111 params,
138 DO_NOTHING, 112 DO_NOTHING,
@@ -140,7 +114,7 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
140 connection.sender.send(request.into()).unwrap(); 114 connection.sender.send(request.into()).unwrap();
141 } 115 }
142 116
143 GlobalState::new(workspaces, config.lru_capacity, config) 117 GlobalState::new(workspaces, config.lru_capacity, config, req_queue)
144 }; 118 };
145 119
146 let pool = ThreadPool::default(); 120 let pool = ThreadPool::default();
@@ -172,15 +146,13 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
172 }; 146 };
173 } 147 }
174 assert!(!global_state.vfs.read().0.has_changes()); 148 assert!(!global_state.vfs.read().0.has_changes());
175 loop_turn(&pool, &task_sender, &connection, &mut global_state, &mut loop_state, event)?; 149 loop_turn(&pool, &task_sender, &connection, &mut global_state, event)?;
176 assert!(!global_state.vfs.read().0.has_changes()); 150 assert!(!global_state.vfs.read().0.has_changes());
177 } 151 }
178 } 152 }
179 global_state.analysis_host.request_cancellation(); 153 global_state.analysis_host.request_cancellation();
180 log::info!("waiting for tasks to finish..."); 154 log::info!("waiting for tasks to finish...");
181 task_receiver.into_iter().for_each(|task| { 155 task_receiver.into_iter().for_each(|task| on_task(task, &connection.sender, &mut global_state));
182 on_task(task, &connection.sender, &mut loop_state.req_queue.incoming, &mut global_state)
183 });
184 log::info!("...tasks have finished"); 156 log::info!("...tasks have finished");
185 log::info!("joining threadpool..."); 157 log::info!("joining threadpool...");
186 pool.join(); 158 pool.join();
@@ -244,35 +216,15 @@ impl fmt::Debug for Event {
244 } 216 }
245} 217}
246 218
247type ReqHandler = fn(&mut GlobalState, Response); 219pub(crate) type ReqHandler = fn(&mut GlobalState, Response);
220pub(crate) type ReqQueue = lsp_server::ReqQueue<(&'static str, Instant), ReqHandler>;
248const DO_NOTHING: ReqHandler = |_, _| (); 221const DO_NOTHING: ReqHandler = |_, _| ();
249type Incoming = lsp_server::Incoming<(&'static str, Instant)>;
250
251#[derive(Default)]
252struct LoopState {
253 req_queue: ReqQueue<(&'static str, Instant), ReqHandler>,
254 mem_docs: FxHashSet<VfsPath>,
255 status: Status,
256}
257
258#[derive(Eq, PartialEq)]
259enum Status {
260 Loading,
261 Ready,
262}
263
264impl Default for Status {
265 fn default() -> Self {
266 Status::Loading
267 }
268}
269 222
270fn loop_turn( 223fn loop_turn(
271 pool: &ThreadPool, 224 pool: &ThreadPool,
272 task_sender: &Sender<Task>, 225 task_sender: &Sender<Task>,
273 connection: &Connection, 226 connection: &Connection,
274 global_state: &mut GlobalState, 227 global_state: &mut GlobalState,
275 loop_state: &mut LoopState,
276 event: Event, 228 event: Event,
277) -> Result<()> { 229) -> Result<()> {
278 let loop_start = Instant::now(); 230 let loop_start = Instant::now();
@@ -288,7 +240,7 @@ fn loop_turn(
288 let mut became_ready = false; 240 let mut became_ready = false;
289 match event { 241 match event {
290 Event::Task(task) => { 242 Event::Task(task) => {
291 on_task(task, &connection.sender, &mut loop_state.req_queue.incoming, global_state); 243 on_task(task, &connection.sender, global_state);
292 global_state.maybe_collect_garbage(); 244 global_state.maybe_collect_garbage();
293 } 245 }
294 Event::Vfs(task) => match task { 246 Event::Vfs(task) => match task {
@@ -296,35 +248,29 @@ fn loop_turn(
296 let vfs = &mut global_state.vfs.write().0; 248 let vfs = &mut global_state.vfs.write().0;
297 for (path, contents) in files { 249 for (path, contents) in files {
298 let path = VfsPath::from(path); 250 let path = VfsPath::from(path);
299 if !loop_state.mem_docs.contains(&path) { 251 if !global_state.mem_docs.contains(&path) {
300 vfs.set_file_contents(path, contents) 252 vfs.set_file_contents(path, contents)
301 } 253 }
302 } 254 }
303 } 255 }
304 vfs::loader::Message::Progress { n_total, n_done } => { 256 vfs::loader::Message::Progress { n_total, n_done } => {
305 if n_done == n_total { 257 if n_done == n_total {
306 loop_state.status = Status::Ready; 258 global_state.status = Status::Ready;
307 became_ready = true; 259 became_ready = true;
308 } 260 }
309 report_progress(loop_state, &connection.sender, n_done, n_total, "roots scanned") 261 report_progress(global_state, &connection.sender, n_done, n_total, "roots scanned")
310 } 262 }
311 }, 263 },
312 Event::CheckWatcher(task) => on_check_task(task, global_state, task_sender)?, 264 Event::CheckWatcher(task) => on_check_task(task, global_state, task_sender)?,
313 Event::Msg(msg) => match msg { 265 Event::Msg(msg) => match msg {
314 Message::Request(req) => on_request( 266 Message::Request(req) => {
315 global_state, 267 on_request(global_state, pool, task_sender, &connection.sender, loop_start, req)?
316 &mut loop_state.req_queue.incoming, 268 }
317 pool,
318 task_sender,
319 &connection.sender,
320 loop_start,
321 req,
322 )?,
323 Message::Notification(not) => { 269 Message::Notification(not) => {
324 on_notification(&connection.sender, global_state, loop_state, not)?; 270 on_notification(&connection.sender, global_state, not)?;
325 } 271 }
326 Message::Response(resp) => { 272 Message::Response(resp) => {
327 let handler = loop_state.req_queue.outgoing.complete(resp.id.clone()); 273 let handler = global_state.req_queue.outgoing.complete(resp.id.clone());
328 handler(global_state, resp) 274 handler(global_state, resp)
329 } 275 }
330 }, 276 },
@@ -338,8 +284,8 @@ fn loop_turn(
338 } 284 }
339 } 285 }
340 286
341 if loop_state.status == Status::Ready && (state_changed || became_ready) { 287 if global_state.status == Status::Ready && (state_changed || became_ready) {
342 let subscriptions = loop_state 288 let subscriptions = global_state
343 .mem_docs 289 .mem_docs
344 .iter() 290 .iter()
345 .map(|path| global_state.vfs.read().0.file_id(&path).unwrap()) 291 .map(|path| global_state.vfs.read().0.file_id(&path).unwrap())
@@ -354,7 +300,7 @@ fn loop_turn(
354 pool.execute({ 300 pool.execute({
355 let subs = subscriptions; 301 let subs = subscriptions;
356 let snap = global_state.snapshot(); 302 let snap = global_state.snapshot();
357 move || snap.analysis().prime_caches(subs).unwrap_or_else(|_: Canceled| ()) 303 move || snap.analysis.prime_caches(subs).unwrap_or_else(|_: Canceled| ())
358 }); 304 });
359 } 305 }
360 306
@@ -373,18 +319,15 @@ fn loop_turn(
373 Ok(()) 319 Ok(())
374} 320}
375 321
376fn on_task( 322fn on_task(task: Task, msg_sender: &Sender<Message>, global_state: &mut GlobalState) {
377 task: Task,
378 msg_sender: &Sender<Message>,
379 incoming_requests: &mut Incoming,
380 state: &mut GlobalState,
381) {
382 match task { 323 match task {
383 Task::Respond(response) => { 324 Task::Respond(response) => {
384 if let Some((method, start)) = incoming_requests.complete(response.id.clone()) { 325 if let Some((method, start)) =
326 global_state.req_queue.incoming.complete(response.id.clone())
327 {
385 let duration = start.elapsed(); 328 let duration = start.elapsed();
386 log::info!("handled req#{} in {:?}", response.id, duration); 329 log::info!("handled req#{} in {:?}", response.id, duration);
387 state.complete_request(RequestMetrics { 330 global_state.complete_request(RequestMetrics {
388 id: response.id.clone(), 331 id: response.id.clone(),
389 method: method.to_string(), 332 method: method.to_string(),
390 duration, 333 duration,
@@ -395,13 +338,12 @@ fn on_task(
395 Task::Notify(n) => { 338 Task::Notify(n) => {
396 msg_sender.send(n.into()).unwrap(); 339 msg_sender.send(n.into()).unwrap();
397 } 340 }
398 Task::Diagnostic(task) => on_diagnostic_task(task, msg_sender, state), 341 Task::Diagnostic(task) => on_diagnostic_task(task, msg_sender, global_state),
399 } 342 }
400} 343}
401 344
402fn on_request( 345fn on_request(
403 global_state: &mut GlobalState, 346 global_state: &mut GlobalState,
404 incoming_requests: &mut Incoming,
405 pool: &ThreadPool, 347 pool: &ThreadPool,
406 task_sender: &Sender<Task>, 348 task_sender: &Sender<Task>,
407 msg_sender: &Sender<Message>, 349 msg_sender: &Sender<Message>,
@@ -414,7 +356,6 @@ fn on_request(
414 global_state, 356 global_state,
415 task_sender, 357 task_sender,
416 msg_sender, 358 msg_sender,
417 incoming_requests,
418 request_received, 359 request_received,
419 }; 360 };
420 pool_dispatcher 361 pool_dispatcher
@@ -469,7 +410,6 @@ fn on_request(
469fn on_notification( 410fn on_notification(
470 msg_sender: &Sender<Message>, 411 msg_sender: &Sender<Message>,
471 global_state: &mut GlobalState, 412 global_state: &mut GlobalState,
472 loop_state: &mut LoopState,
473 not: Notification, 413 not: Notification,
474) -> Result<()> { 414) -> Result<()> {
475 let not = match notification_cast::<lsp_types::notification::Cancel>(not) { 415 let not = match notification_cast::<lsp_types::notification::Cancel>(not) {
@@ -478,7 +418,7 @@ fn on_notification(
478 NumberOrString::Number(id) => id.into(), 418 NumberOrString::Number(id) => id.into(),
479 NumberOrString::String(id) => id.into(), 419 NumberOrString::String(id) => id.into(),
480 }; 420 };
481 if let Some(response) = loop_state.req_queue.incoming.cancel(id) { 421 if let Some(response) = global_state.req_queue.incoming.cancel(id) {
482 msg_sender.send(response.into()).unwrap() 422 msg_sender.send(response.into()).unwrap()
483 } 423 }
484 return Ok(()); 424 return Ok(());
@@ -488,7 +428,7 @@ fn on_notification(
488 let not = match notification_cast::<lsp_types::notification::DidOpenTextDocument>(not) { 428 let not = match notification_cast::<lsp_types::notification::DidOpenTextDocument>(not) {
489 Ok(params) => { 429 Ok(params) => {
490 if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) { 430 if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) {
491 if !loop_state.mem_docs.insert(path.clone()) { 431 if !global_state.mem_docs.insert(path.clone()) {
492 log::error!("duplicate DidOpenTextDocument: {}", path) 432 log::error!("duplicate DidOpenTextDocument: {}", path)
493 } 433 }
494 global_state 434 global_state
@@ -504,7 +444,7 @@ fn on_notification(
504 let not = match notification_cast::<lsp_types::notification::DidChangeTextDocument>(not) { 444 let not = match notification_cast::<lsp_types::notification::DidChangeTextDocument>(not) {
505 Ok(params) => { 445 Ok(params) => {
506 if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) { 446 if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) {
507 assert!(loop_state.mem_docs.contains(&path)); 447 assert!(global_state.mem_docs.contains(&path));
508 let vfs = &mut global_state.vfs.write().0; 448 let vfs = &mut global_state.vfs.write().0;
509 let file_id = vfs.file_id(&path).unwrap(); 449 let file_id = vfs.file_id(&path).unwrap();
510 let mut text = String::from_utf8(vfs.file_contents(file_id).to_vec()).unwrap(); 450 let mut text = String::from_utf8(vfs.file_contents(file_id).to_vec()).unwrap();
@@ -518,7 +458,7 @@ fn on_notification(
518 let not = match notification_cast::<lsp_types::notification::DidCloseTextDocument>(not) { 458 let not = match notification_cast::<lsp_types::notification::DidCloseTextDocument>(not) {
519 Ok(params) => { 459 Ok(params) => {
520 if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) { 460 if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) {
521 if !loop_state.mem_docs.remove(&path) { 461 if !global_state.mem_docs.remove(&path) {
522 log::error!("orphan DidCloseTextDocument: {}", path) 462 log::error!("orphan DidCloseTextDocument: {}", path)
523 } 463 }
524 if let Some(path) = path.as_path() { 464 if let Some(path) = path.as_path() {
@@ -549,7 +489,7 @@ fn on_notification(
549 Ok(_) => { 489 Ok(_) => {
550 // As stated in https://github.com/microsoft/language-server-protocol/issues/676, 490 // As stated in https://github.com/microsoft/language-server-protocol/issues/676,
551 // this notification's parameters should be ignored and the actual config queried separately. 491 // this notification's parameters should be ignored and the actual config queried separately.
552 let request = loop_state.req_queue.outgoing.register( 492 let request = global_state.req_queue.outgoing.register(
553 lsp_types::request::WorkspaceConfiguration::METHOD.to_string(), 493 lsp_types::request::WorkspaceConfiguration::METHOD.to_string(),
554 lsp_types::ConfigurationParams { 494 lsp_types::ConfigurationParams {
555 items: vec![lsp_types::ConfigurationItem { 495 items: vec![lsp_types::ConfigurationItem {
@@ -732,7 +672,7 @@ fn on_diagnostic_task(task: DiagnosticTask, msg_sender: &Sender<Message>, state:
732} 672}
733 673
734fn report_progress( 674fn report_progress(
735 loop_state: &mut LoopState, 675 global_state: &mut GlobalState,
736 sender: &Sender<Message>, 676 sender: &Sender<Message>,
737 done: usize, 677 done: usize,
738 total: usize, 678 total: usize,
@@ -742,7 +682,7 @@ fn report_progress(
742 let message = Some(format!("{}/{} {}", done, total, message)); 682 let message = Some(format!("{}/{} {}", done, total, message));
743 let percentage = Some(100.0 * done as f64 / total.max(1) as f64); 683 let percentage = Some(100.0 * done as f64 / total.max(1) as f64);
744 let work_done_progress = if done == 0 { 684 let work_done_progress = if done == 0 {
745 let work_done_progress_create = loop_state.req_queue.outgoing.register( 685 let work_done_progress_create = global_state.req_queue.outgoing.register(
746 lsp_types::request::WorkDoneProgressCreate::METHOD.to_string(), 686 lsp_types::request::WorkDoneProgressCreate::METHOD.to_string(),
747 lsp_types::WorkDoneProgressCreateParams { token: token.clone() }, 687 lsp_types::WorkDoneProgressCreateParams { token: token.clone() },
748 DO_NOTHING, 688 DO_NOTHING,
@@ -777,7 +717,6 @@ struct PoolDispatcher<'a> {
777 req: Option<Request>, 717 req: Option<Request>,
778 pool: &'a ThreadPool, 718 pool: &'a ThreadPool,
779 global_state: &'a mut GlobalState, 719 global_state: &'a mut GlobalState,
780 incoming_requests: &'a mut Incoming,
781 msg_sender: &'a Sender<Message>, 720 msg_sender: &'a Sender<Message>,
782 task_sender: &'a Sender<Task>, 721 task_sender: &'a Sender<Task>,
783 request_received: Instant, 722 request_received: Instant,
@@ -806,7 +745,7 @@ impl<'a> PoolDispatcher<'a> {
806 result_to_task::<R>(id, result) 745 result_to_task::<R>(id, result)
807 }) 746 })
808 .map_err(|_| format!("sync task {:?} panicked", R::METHOD))?; 747 .map_err(|_| format!("sync task {:?} panicked", R::METHOD))?;
809 on_task(task, self.msg_sender, self.incoming_requests, self.global_state); 748 on_task(task, self.msg_sender, self.global_state);
810 Ok(self) 749 Ok(self)
811 } 750 }
812 751
@@ -853,7 +792,10 @@ impl<'a> PoolDispatcher<'a> {
853 return None; 792 return None;
854 } 793 }
855 }; 794 };
856 self.incoming_requests.register(id.clone(), (R::METHOD, self.request_received)); 795 self.global_state
796 .req_queue
797 .incoming
798 .register(id.clone(), (R::METHOD, self.request_received));
857 Some((id, params)) 799 Some((id, params))
858 } 800 }
859 801
@@ -882,14 +824,7 @@ where
882 let response = match result { 824 let response = match result {
883 Ok(resp) => Response::new_ok(id, &resp), 825 Ok(resp) => Response::new_ok(id, &resp),
884 Err(e) => match e.downcast::<LspError>() { 826 Err(e) => match e.downcast::<LspError>() {
885 Ok(lsp_error) => { 827 Ok(lsp_error) => Response::new_err(id, lsp_error.code, lsp_error.message),
886 if lsp_error.code == LspError::UNKNOWN_FILE {
887 // Work-around for https://github.com/rust-analyzer/rust-analyzer/issues/1521
888 Response::new_ok(id, ())
889 } else {
890 Response::new_err(id, lsp_error.code, lsp_error.message)
891 }
892 }
893 Err(e) => { 828 Err(e) => {
894 if is_canceled(&e) { 829 if is_canceled(&e) {
895 Response::new_err( 830 Response::new_err(