aboutsummaryrefslogtreecommitdiff
path: root/crates/ra_lsp_server/src/main_loop.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/ra_lsp_server/src/main_loop.rs')
-rw-r--r--crates/ra_lsp_server/src/main_loop.rs139
1 files changed, 90 insertions, 49 deletions
diff --git a/crates/ra_lsp_server/src/main_loop.rs b/crates/ra_lsp_server/src/main_loop.rs
index 7822be2e2..ceff82fda 100644
--- a/crates/ra_lsp_server/src/main_loop.rs
+++ b/crates/ra_lsp_server/src/main_loop.rs
@@ -5,21 +5,29 @@ mod handlers;
5mod subscriptions; 5mod subscriptions;
6pub(crate) mod pending_requests; 6pub(crate) mod pending_requests;
7 7
8use std::{error::Error, fmt, panic, path::PathBuf, sync::Arc, time::Instant}; 8use std::{
9 env,
10 error::Error,
11 fmt, panic,
12 path::PathBuf,
13 sync::Arc,
14 time::{Duration, Instant},
15};
9 16
10use crossbeam_channel::{select, unbounded, RecvError, Sender}; 17use crossbeam_channel::{select, unbounded, RecvError, Sender};
11use lsp_server::{Connection, ErrorCode, Message, Notification, Request, RequestId, Response}; 18use lsp_server::{Connection, ErrorCode, Message, Notification, Request, RequestId, Response};
12use lsp_types::{ClientCapabilities, NumberOrString, Url}; 19use lsp_types::{ClientCapabilities, NumberOrString};
13use ra_cargo_watch::{CheckOptions, CheckTask}; 20use ra_cargo_watch::{url_from_path_with_drive_lowercasing, CheckOptions, CheckTask};
14use ra_ide::{Canceled, FeatureFlags, FileId, LibraryData, SourceRootId}; 21use ra_ide::{Canceled, FeatureFlags, FileId, LibraryData, SourceRootId};
15use ra_prof::profile; 22use ra_prof::profile;
16use ra_vfs::{VfsTask, Watch}; 23use ra_vfs::{VfsFile, VfsTask, Watch};
17use relative_path::RelativePathBuf; 24use relative_path::RelativePathBuf;
18use rustc_hash::FxHashSet; 25use rustc_hash::FxHashSet;
19use serde::{de::DeserializeOwned, Serialize}; 26use serde::{de::DeserializeOwned, Serialize};
20use threadpool::ThreadPool; 27use threadpool::ThreadPool;
21 28
22use crate::{ 29use crate::{
30 diagnostics::DiagnosticTask,
23 main_loop::{ 31 main_loop::{
24 pending_requests::{PendingRequest, PendingRequests}, 32 pending_requests::{PendingRequest, PendingRequests},
25 subscriptions::Subscriptions, 33 subscriptions::Subscriptions,
@@ -29,9 +37,6 @@ use crate::{
29 Result, ServerConfig, 37 Result, ServerConfig,
30}; 38};
31 39
32const THREADPOOL_SIZE: usize = 8;
33const MAX_IN_FLIGHT_LIBS: usize = THREADPOOL_SIZE - 3;
34
35#[derive(Debug)] 40#[derive(Debug)]
36pub struct LspError { 41pub struct LspError {
37 pub code: i32, 42 pub code: i32,
@@ -60,6 +65,25 @@ pub fn main_loop(
60) -> Result<()> { 65) -> Result<()> {
61 log::info!("server_config: {:#?}", config); 66 log::info!("server_config: {:#?}", config);
62 67
68 // Windows scheduler implements priority boosts: if thread waits for an
69 // event (like a condvar), and event fires, priority of the thread is
70 // temporary bumped. This optimization backfires in our case: each time the
71 // `main_loop` schedules a task to run on a threadpool, the worker threads
72 // gets a higher priority, and (on a machine with fewer cores) displaces the
73 // main loop! We work-around this by marking the main loop as a
74 // higher-priority thread.
75 //
76 // https://docs.microsoft.com/en-us/windows/win32/procthread/scheduling-priorities
77 // https://docs.microsoft.com/en-us/windows/win32/procthread/priority-boosts
78 // https://github.com/rust-analyzer/rust-analyzer/issues/2835
79 #[cfg(windows)]
80 unsafe {
81 use winapi::um::processthreadsapi::*;
82 let thread = GetCurrentThread();
83 let thread_priority_above_normal = 1;
84 SetThreadPriority(thread, thread_priority_above_normal);
85 }
86
63 let mut loop_state = LoopState::default(); 87 let mut loop_state = LoopState::default();
64 let mut world_state = { 88 let mut world_state = {
65 let feature_flags = { 89 let feature_flags = {
@@ -168,7 +192,7 @@ pub fn main_loop(
168 ) 192 )
169 }; 193 };
170 194
171 let pool = ThreadPool::new(THREADPOOL_SIZE); 195 let pool = ThreadPool::default();
172 let (task_sender, task_receiver) = unbounded::<Task>(); 196 let (task_sender, task_receiver) = unbounded::<Task>();
173 let (libdata_sender, libdata_receiver) = unbounded::<LibraryData>(); 197 let (libdata_sender, libdata_receiver) = unbounded::<LibraryData>();
174 198
@@ -210,7 +234,7 @@ pub fn main_loop(
210 )?; 234 )?;
211 } 235 }
212 } 236 }
213 237 world_state.analysis_host.request_cancellation();
214 log::info!("waiting for tasks to finish..."); 238 log::info!("waiting for tasks to finish...");
215 task_receiver.into_iter().for_each(|task| { 239 task_receiver.into_iter().for_each(|task| {
216 on_task(task, &connection.sender, &mut loop_state.pending_requests, &mut world_state) 240 on_task(task, &connection.sender, &mut loop_state.pending_requests, &mut world_state)
@@ -231,6 +255,7 @@ pub fn main_loop(
231enum Task { 255enum Task {
232 Respond(Response), 256 Respond(Response),
233 Notify(Notification), 257 Notify(Notification),
258 Diagnostic(DiagnosticTask),
234} 259}
235 260
236enum Event { 261enum Event {
@@ -371,7 +396,8 @@ fn loop_turn(
371 loop_state.pending_libraries.extend(changes); 396 loop_state.pending_libraries.extend(changes);
372 } 397 }
373 398
374 while loop_state.in_flight_libraries < MAX_IN_FLIGHT_LIBS 399 let max_in_flight_libs = pool.max_count().saturating_sub(2).max(1);
400 while loop_state.in_flight_libraries < max_in_flight_libs
375 && !loop_state.pending_libraries.is_empty() 401 && !loop_state.pending_libraries.is_empty()
376 { 402 {
377 let (root, files) = loop_state.pending_libraries.pop().unwrap(); 403 let (root, files) = loop_state.pending_libraries.pop().unwrap();
@@ -379,7 +405,6 @@ fn loop_turn(
379 let sender = libdata_sender.clone(); 405 let sender = libdata_sender.clone();
380 pool.execute(move || { 406 pool.execute(move || {
381 log::info!("indexing {:?} ... ", root); 407 log::info!("indexing {:?} ... ", root);
382 let _p = profile(&format!("indexed {:?}", root));
383 let data = LibraryData::prepare(root, files); 408 let data = LibraryData::prepare(root, files);
384 sender.send(data).unwrap(); 409 sender.send(data).unwrap();
385 }); 410 });
@@ -408,6 +433,19 @@ fn loop_turn(
408 loop_state.subscriptions.subscriptions(), 433 loop_state.subscriptions.subscriptions(),
409 ) 434 )
410 } 435 }
436
437 let loop_duration = loop_start.elapsed();
438 if loop_duration > Duration::from_millis(100) {
439 log::error!("overly long loop turn: {:?}", loop_duration);
440 if env::var("RA_PROFILE").is_ok() {
441 show_message(
442 req::MessageType::Error,
443 format!("overly long loop turn: {:?}", loop_duration),
444 &connection.sender,
445 );
446 }
447 }
448
411 Ok(()) 449 Ok(())
412} 450}
413 451
@@ -428,6 +466,7 @@ fn on_task(
428 Task::Notify(n) => { 466 Task::Notify(n) => {
429 msg_sender.send(n.into()).unwrap(); 467 msg_sender.send(n.into()).unwrap();
430 } 468 }
469 Task::Diagnostic(task) => on_diagnostic_task(task, msg_sender, state),
431 } 470 }
432} 471}
433 472
@@ -435,7 +474,7 @@ fn on_request(
435 world: &mut WorldState, 474 world: &mut WorldState,
436 pending_requests: &mut PendingRequests, 475 pending_requests: &mut PendingRequests,
437 pool: &ThreadPool, 476 pool: &ThreadPool,
438 sender: &Sender<Task>, 477 task_sender: &Sender<Task>,
439 msg_sender: &Sender<Message>, 478 msg_sender: &Sender<Message>,
440 request_received: Instant, 479 request_received: Instant,
441 req: Request, 480 req: Request,
@@ -444,7 +483,7 @@ fn on_request(
444 req: Some(req), 483 req: Some(req),
445 pool, 484 pool,
446 world, 485 world,
447 sender, 486 task_sender,
448 msg_sender, 487 msg_sender,
449 pending_requests, 488 pending_requests,
450 request_received, 489 request_received,
@@ -586,30 +625,26 @@ fn on_notification(
586 625
587fn on_check_task( 626fn on_check_task(
588 task: CheckTask, 627 task: CheckTask,
589 world_state: &WorldState, 628 world_state: &mut WorldState,
590 task_sender: &Sender<Task>, 629 task_sender: &Sender<Task>,
591) -> Result<()> { 630) -> Result<()> {
592 match task { 631 match task {
593 CheckTask::ClearDiagnostics => { 632 CheckTask::ClearDiagnostics => {
594 let cleared_files = world_state.check_watcher.state.write().clear(); 633 task_sender.send(Task::Diagnostic(DiagnosticTask::ClearCheck))?;
595
596 // Send updated diagnostics for each cleared file
597 for url in cleared_files {
598 publish_diagnostics_for_url(&url, world_state, task_sender)?;
599 }
600 } 634 }
601 635
602 CheckTask::AddDiagnostic(url, diagnostic) => { 636 CheckTask::AddDiagnostic { url, diagnostic, fixes } => {
603 world_state 637 let path = url.to_file_path().map_err(|()| format!("invalid uri: {}", url))?;
604 .check_watcher 638 let file_id = match world_state.vfs.read().path2file(&path) {
605 .state 639 Some(file) => FileId(file.0),
606 .write() 640 None => {
607 .add_diagnostic_with_fixes(url.clone(), diagnostic); 641 log::error!("File with cargo diagnostic not found in VFS: {}", path.display());
608 642 return Ok(());
609 // We manually send a diagnostic update when the watcher asks 643 }
610 // us to, to avoid the issue of having to change the file to 644 };
611 // receive updated diagnostics. 645
612 publish_diagnostics_for_url(&url, world_state, task_sender)?; 646 task_sender
647 .send(Task::Diagnostic(DiagnosticTask::AddCheck(file_id, diagnostic, fixes)))?;
613 } 648 }
614 649
615 CheckTask::Status(progress) => { 650 CheckTask::Status(progress) => {
@@ -620,22 +655,29 @@ fn on_check_task(
620 let not = notification_new::<req::Progress>(params); 655 let not = notification_new::<req::Progress>(params);
621 task_sender.send(Task::Notify(not)).unwrap(); 656 task_sender.send(Task::Notify(not)).unwrap();
622 } 657 }
623 } 658 };
659
624 Ok(()) 660 Ok(())
625} 661}
626 662
627fn publish_diagnostics_for_url( 663fn on_diagnostic_task(task: DiagnosticTask, msg_sender: &Sender<Message>, state: &mut WorldState) {
628 url: &Url, 664 let subscriptions = state.diagnostics.handle_task(task);
629 world_state: &WorldState, 665
630 task_sender: &Sender<Task>, 666 for file_id in subscriptions {
631) -> Result<()> { 667 let path = state.vfs.read().file2path(VfsFile(file_id.0));
632 let path = url.to_file_path().map_err(|()| format!("invalid uri: {}", url))?; 668 let uri = match url_from_path_with_drive_lowercasing(&path) {
633 if let Some(file_id) = world_state.vfs.read().path2file(&path) { 669 Ok(uri) => uri,
634 let params = handlers::publish_diagnostics(&world_state.snapshot(), FileId(file_id.0))?; 670 Err(err) => {
671 log::error!("Couldn't convert path to url ({}): {:?}", err, path.to_string_lossy());
672 continue;
673 }
674 };
675
676 let diagnostics = state.diagnostics.diagnostics_for(file_id).cloned().collect();
677 let params = req::PublishDiagnosticsParams { uri, diagnostics, version: None };
635 let not = notification_new::<req::PublishDiagnostics>(params); 678 let not = notification_new::<req::PublishDiagnostics>(params);
636 task_sender.send(Task::Notify(not)).unwrap(); 679 msg_sender.send(not.into()).unwrap();
637 } 680 }
638 Ok(())
639} 681}
640 682
641struct PoolDispatcher<'a> { 683struct PoolDispatcher<'a> {
@@ -644,7 +686,7 @@ struct PoolDispatcher<'a> {
644 world: &'a mut WorldState, 686 world: &'a mut WorldState,
645 pending_requests: &'a mut PendingRequests, 687 pending_requests: &'a mut PendingRequests,
646 msg_sender: &'a Sender<Message>, 688 msg_sender: &'a Sender<Message>,
647 sender: &'a Sender<Task>, 689 task_sender: &'a Sender<Task>,
648 request_received: Instant, 690 request_received: Instant,
649} 691}
650 692
@@ -691,7 +733,7 @@ impl<'a> PoolDispatcher<'a> {
691 733
692 self.pool.execute({ 734 self.pool.execute({
693 let world = self.world.snapshot(); 735 let world = self.world.snapshot();
694 let sender = self.sender.clone(); 736 let sender = self.task_sender.clone();
695 move || { 737 move || {
696 let result = f(world, params); 738 let result = f(world, params);
697 let task = result_to_task::<R>(id, result); 739 let task = result_to_task::<R>(id, result);
@@ -769,7 +811,7 @@ fn update_file_notifications_on_threadpool(
769 pool: &ThreadPool, 811 pool: &ThreadPool,
770 world: WorldSnapshot, 812 world: WorldSnapshot,
771 publish_decorations: bool, 813 publish_decorations: bool,
772 sender: Sender<Task>, 814 task_sender: Sender<Task>,
773 subscriptions: Vec<FileId>, 815 subscriptions: Vec<FileId>,
774) { 816) {
775 log::trace!("updating notifications for {:?}", subscriptions); 817 log::trace!("updating notifications for {:?}", subscriptions);
@@ -783,9 +825,8 @@ fn update_file_notifications_on_threadpool(
783 log::error!("failed to compute diagnostics: {:?}", e); 825 log::error!("failed to compute diagnostics: {:?}", e);
784 } 826 }
785 } 827 }
786 Ok(params) => { 828 Ok(task) => {
787 let not = notification_new::<req::PublishDiagnostics>(params); 829 task_sender.send(Task::Diagnostic(task)).unwrap();
788 sender.send(Task::Notify(not)).unwrap();
789 } 830 }
790 } 831 }
791 } 832 }
@@ -798,7 +839,7 @@ fn update_file_notifications_on_threadpool(
798 } 839 }
799 Ok(params) => { 840 Ok(params) => {
800 let not = notification_new::<req::PublishDecorations>(params); 841 let not = notification_new::<req::PublishDecorations>(params);
801 sender.send(Task::Notify(not)).unwrap(); 842 task_sender.send(Task::Notify(not)).unwrap();
802 } 843 }
803 } 844 }
804 } 845 }