aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleksey Kladov <[email protected]>2020-06-25 16:14:11 +0100
committerAleksey Kladov <[email protected]>2020-06-25 16:14:11 +0100
commit379a096de9ad06c23347b76a54d9cc22aee80f6a (patch)
tree427fd07eb770b627cfc5d7376ed3746632509650
parentdd20c2ec5bc0c5ac02149479c2b5f3746f3df505 (diff)
Refactor main_loop
-rw-r--r--crates/flycheck/src/lib.rs17
-rw-r--r--crates/rust-analyzer/src/global_state.rs19
-rw-r--r--crates/rust-analyzer/src/main_loop.rs767
-rw-r--r--crates/vfs-notify/src/lib.rs13
4 files changed, 402 insertions, 414 deletions
diff --git a/crates/flycheck/src/lib.rs b/crates/flycheck/src/lib.rs
index 9e8205ae7..4dcab7a61 100644
--- a/crates/flycheck/src/lib.rs
+++ b/crates/flycheck/src/lib.rs
@@ -120,7 +120,13 @@ impl FlycheckActor {
120 ) -> FlycheckActor { 120 ) -> FlycheckActor {
121 FlycheckActor { sender, config, workspace_root, last_update_req: None, check_process: None } 121 FlycheckActor { sender, config, workspace_root, last_update_req: None, check_process: None }
122 } 122 }
123 123 fn next_event(&self, inbox: &Receiver<Restart>) -> Option<Event> {
124 let check_chan = self.check_process.as_ref().map(|(chan, _thread)| chan);
125 select! {
126 recv(inbox) -> msg => msg.ok().map(Event::Restart),
127 recv(check_chan.unwrap_or(&never())) -> msg => Some(Event::CheckEvent(msg.ok())),
128 }
129 }
124 fn run(&mut self, inbox: Receiver<Restart>) { 130 fn run(&mut self, inbox: Receiver<Restart>) {
125 // If we rerun the thread, we need to discard the previous check results first 131 // If we rerun the thread, we need to discard the previous check results first
126 self.send(Message::ClearDiagnostics); 132 self.send(Message::ClearDiagnostics);
@@ -167,15 +173,6 @@ impl FlycheckActor {
167 } 173 }
168 } 174 }
169 } 175 }
170
171 fn next_event(&self, inbox: &Receiver<Restart>) -> Option<Event> {
172 let check_chan = self.check_process.as_ref().map(|(chan, _thread)| chan);
173 select! {
174 recv(inbox) -> msg => msg.ok().map(Event::Restart),
175 recv(check_chan.unwrap_or(&never())) -> msg => Some(Event::CheckEvent(msg.ok())),
176 }
177 }
178
179 fn should_recheck(&mut self) -> bool { 176 fn should_recheck(&mut self) -> bool {
180 if let Some(_last_update_req) = &self.last_update_req { 177 if let Some(_last_update_req) = &self.last_update_req {
181 // We currently only request an update on save, as we need up to 178 // We currently only request an update on save, as we need up to
diff --git a/crates/rust-analyzer/src/global_state.rs b/crates/rust-analyzer/src/global_state.rs
index de6b95686..56d50c789 100644
--- a/crates/rust-analyzer/src/global_state.rs
+++ b/crates/rust-analyzer/src/global_state.rs
@@ -5,7 +5,7 @@
5 5
6use std::{convert::TryFrom, sync::Arc}; 6use std::{convert::TryFrom, sync::Arc};
7 7
8use crossbeam_channel::{unbounded, Receiver}; 8use crossbeam_channel::{unbounded, Receiver, Sender};
9use flycheck::{FlycheckConfig, FlycheckHandle}; 9use flycheck::{FlycheckConfig, FlycheckHandle};
10use lsp_types::Url; 10use lsp_types::Url;
11use parking_lot::RwLock; 11use parking_lot::RwLock;
@@ -22,6 +22,7 @@ use crate::{
22 line_endings::LineEndings, 22 line_endings::LineEndings,
23 main_loop::{ReqQueue, Task}, 23 main_loop::{ReqQueue, Task},
24 request_metrics::{LatestRequests, RequestMetrics}, 24 request_metrics::{LatestRequests, RequestMetrics},
25 show_message,
25 thread_pool::TaskPool, 26 thread_pool::TaskPool,
26 to_proto::url_from_abs_path, 27 to_proto::url_from_abs_path,
27 Result, 28 Result,
@@ -66,6 +67,7 @@ impl Default for Status {
66/// snapshot of the file systems, and `analysis_host`, which stores our 67/// snapshot of the file systems, and `analysis_host`, which stores our
67/// incremental salsa database. 68/// incremental salsa database.
68pub(crate) struct GlobalState { 69pub(crate) struct GlobalState {
70 sender: Sender<lsp_server::Message>,
69 pub(crate) config: Config, 71 pub(crate) config: Config,
70 pub(crate) task_pool: (TaskPool<Task>, Receiver<Task>), 72 pub(crate) task_pool: (TaskPool<Task>, Receiver<Task>),
71 pub(crate) analysis_host: AnalysisHost, 73 pub(crate) analysis_host: AnalysisHost,
@@ -95,6 +97,7 @@ pub(crate) struct GlobalStateSnapshot {
95 97
96impl GlobalState { 98impl GlobalState {
97 pub(crate) fn new( 99 pub(crate) fn new(
100 sender: Sender<lsp_server::Message>,
98 workspaces: Vec<ProjectWorkspace>, 101 workspaces: Vec<ProjectWorkspace>,
99 lru_capacity: Option<usize>, 102 lru_capacity: Option<usize>,
100 config: Config, 103 config: Config,
@@ -162,6 +165,7 @@ impl GlobalState {
162 }; 165 };
163 166
164 let mut res = GlobalState { 167 let mut res = GlobalState {
168 sender,
165 config, 169 config,
166 task_pool, 170 task_pool,
167 analysis_host, 171 analysis_host,
@@ -252,6 +256,19 @@ impl GlobalState {
252 pub(crate) fn complete_request(&mut self, request: RequestMetrics) { 256 pub(crate) fn complete_request(&mut self, request: RequestMetrics) {
253 self.latest_requests.write().record(request) 257 self.latest_requests.write().record(request)
254 } 258 }
259
260 pub(crate) fn send(&mut self, message: lsp_server::Message) {
261 self.sender.send(message).unwrap()
262 }
263 pub(crate) fn show_message(&mut self, typ: lsp_types::MessageType, message: String) {
264 show_message(typ, message, &self.sender)
265 }
266}
267
268impl Drop for GlobalState {
269 fn drop(&mut self) {
270 self.analysis_host.request_cancellation()
271 }
255} 272}
256 273
257impl GlobalStateSnapshot { 274impl GlobalStateSnapshot {
diff --git a/crates/rust-analyzer/src/main_loop.rs b/crates/rust-analyzer/src/main_loop.rs
index 1a9c5ee2c..f3c8b5978 100644
--- a/crates/rust-analyzer/src/main_loop.rs
+++ b/crates/rust-analyzer/src/main_loop.rs
@@ -5,9 +5,9 @@ use std::{
5 time::{Duration, Instant}, 5 time::{Duration, Instant},
6}; 6};
7 7
8use crossbeam_channel::{never, select, RecvError, Sender}; 8use crossbeam_channel::{never, select, Receiver};
9use lsp_server::{Connection, ErrorCode, Notification, Request, RequestId, Response}; 9use lsp_server::{Connection, ErrorCode, Notification, Request, RequestId, Response};
10use lsp_types::{request::Request as _, NumberOrString}; 10use lsp_types::{notification::Notification as _, request::Request as _, NumberOrString};
11use ra_db::VfsPath; 11use ra_db::VfsPath;
12use ra_ide::{Canceled, FileId}; 12use ra_ide::{Canceled, FileId};
13use ra_prof::profile; 13use ra_prof::profile;
@@ -50,7 +50,7 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
50 SetThreadPriority(thread, thread_priority_above_normal); 50 SetThreadPriority(thread, thread_priority_above_normal);
51 } 51 }
52 52
53 let mut global_state = { 53 let global_state = {
54 let workspaces = { 54 let workspaces = {
55 if config.linked_projects.is_empty() && config.notifications.cargo_toml_not_found { 55 if config.linked_projects.is_empty() && config.notifications.cargo_toml_not_found {
56 show_message( 56 show_message(
@@ -113,40 +113,371 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
113 connection.sender.send(request.into()).unwrap(); 113 connection.sender.send(request.into()).unwrap();
114 } 114 }
115 115
116 GlobalState::new(workspaces, config.lru_capacity, config, req_queue) 116 GlobalState::new(
117 connection.sender.clone(),
118 workspaces,
119 config.lru_capacity,
120 config,
121 req_queue,
122 )
117 }; 123 };
118 124
119 log::info!("server initialized, serving requests"); 125 log::info!("server initialized, serving requests");
120 { 126 global_state.run(connection.receiver)?;
121 loop { 127 Ok(())
122 log::trace!("selecting"); 128}
123 let event = select! { 129
124 recv(&connection.receiver) -> msg => match msg { 130impl GlobalState {
125 Ok(msg) => Event::Lsp(msg), 131 fn next_event(&self, inbox: &Receiver<lsp_server::Message>) -> Option<Event> {
126 Err(RecvError) => return Err("client exited without shutdown".into()), 132 select! {
127 }, 133 recv(inbox) -> msg =>
128 recv(&global_state.task_pool.1) -> task => Event::Task(task.unwrap()), 134 msg.ok().map(Event::Lsp),
129 recv(global_state.task_receiver) -> task => match task { 135
130 Ok(task) => Event::Vfs(task), 136 recv(self.task_pool.1) -> task =>
131 Err(RecvError) => return Err("vfs died".into()), 137 Some(Event::Task(task.unwrap())),
138
139 recv(self.task_receiver) -> task =>
140 Some(Event::Vfs(task.unwrap())),
141
142 recv(self.flycheck.as_ref().map_or(&never(), |it| &it.1)) -> task =>
143 Some(Event::Flycheck(task.unwrap())),
144 }
145 }
146
147 fn run(mut self, inbox: Receiver<lsp_server::Message>) -> Result<()> {
148 while let Some(event) = self.next_event(&inbox) {
149 let loop_start = Instant::now();
150 // NOTE: don't count blocking select! call as a loop-turn time
151 let _p = profile("main_loop_inner/loop-turn");
152
153 log::info!("loop turn = {:?}", event);
154 let queue_count = self.task_pool.0.len();
155 if queue_count > 0 {
156 log::info!("queued count = {}", queue_count);
157 }
158
159 let mut became_ready = false;
160 match event {
161 Event::Lsp(msg) => match msg {
162 lsp_server::Message::Request(req) => self.on_request(loop_start, req)?,
163 lsp_server::Message::Notification(not) => {
164 if not.method == lsp_types::notification::Exit::METHOD {
165 return Ok(());
166 }
167 self.on_notification(not)?;
168 }
169 lsp_server::Message::Response(resp) => {
170 let handler = self.req_queue.outgoing.complete(resp.id.clone());
171 handler(&mut self, resp)
172 }
132 }, 173 },
133 recv(global_state.flycheck.as_ref().map_or(&never(), |it| &it.1)) -> task => match task { 174 Event::Task(task) => {
134 Ok(task) => Event::Flycheck(task), 175 self.on_task(task);
135 Err(RecvError) => return Err("check watcher died".into()), 176 self.maybe_collect_garbage();
177 }
178 Event::Vfs(task) => match task {
179 vfs::loader::Message::Loaded { files } => {
180 let vfs = &mut self.vfs.write().0;
181 for (path, contents) in files {
182 let path = VfsPath::from(path);
183 if !self.mem_docs.contains(&path) {
184 vfs.set_file_contents(path, contents)
185 }
186 }
187 }
188 vfs::loader::Message::Progress { n_total, n_done } => {
189 let state = if n_done == 0 {
190 Progress::Begin
191 } else if n_done < n_total {
192 Progress::Report
193 } else {
194 assert_eq!(n_done, n_total);
195 self.status = Status::Ready;
196 became_ready = true;
197 Progress::End
198 };
199 report_progress(
200 &mut self,
201 "roots scanned",
202 state,
203 Some(format!("{}/{}", n_done, n_total)),
204 Some(percentage(n_done, n_total)),
205 )
206 }
136 }, 207 },
137 }; 208 Event::Flycheck(task) => on_check_task(task, &mut self)?,
138 if let Event::Lsp(lsp_server::Message::Request(req)) = &event { 209 }
139 if connection.handle_shutdown(&req)? { 210
140 break; 211 let state_changed = self.process_changes();
212 if became_ready {
213 if let Some(flycheck) = &self.flycheck {
214 flycheck.0.update();
215 }
216 }
217
218 if self.status == Status::Ready && (state_changed || became_ready) {
219 let subscriptions = self
220 .mem_docs
221 .iter()
222 .map(|path| self.vfs.read().0.file_id(&path).unwrap())
223 .collect::<Vec<_>>();
224
225 self.update_file_notifications_on_threadpool(subscriptions);
226 }
227
228 let loop_duration = loop_start.elapsed();
229 if loop_duration > Duration::from_millis(100) {
230 log::error!("overly long loop turn: {:?}", loop_duration);
231 if env::var("RA_PROFILE").is_ok() {
232 self.show_message(
233 lsp_types::MessageType::Error,
234 format!("overly long loop turn: {:?}", loop_duration),
235 )
236 }
237 }
238 }
239 Err("client exited without proper shutdown sequence")?
240 }
241
242 fn on_request(&mut self, request_received: Instant, req: Request) -> Result<()> {
243 let mut pool_dispatcher =
244 PoolDispatcher { req: Some(req), global_state: self, request_received };
245 pool_dispatcher
246 .on_sync::<lsp_ext::CollectGarbage>(|s, ()| Ok(s.collect_garbage()))?
247 .on_sync::<lsp_ext::JoinLines>(|s, p| handlers::handle_join_lines(s.snapshot(), p))?
248 .on_sync::<lsp_ext::OnEnter>(|s, p| handlers::handle_on_enter(s.snapshot(), p))?
249 .on_sync::<lsp_types::request::Shutdown>(|_, ()| Ok(()))?
250 .on_sync::<lsp_types::request::SelectionRangeRequest>(|s, p| {
251 handlers::handle_selection_range(s.snapshot(), p)
252 })?
253 .on_sync::<lsp_ext::MatchingBrace>(|s, p| {
254 handlers::handle_matching_brace(s.snapshot(), p)
255 })?
256 .on::<lsp_ext::AnalyzerStatus>(handlers::handle_analyzer_status)?
257 .on::<lsp_ext::SyntaxTree>(handlers::handle_syntax_tree)?
258 .on::<lsp_ext::ExpandMacro>(handlers::handle_expand_macro)?
259 .on::<lsp_ext::ParentModule>(handlers::handle_parent_module)?
260 .on::<lsp_ext::Runnables>(handlers::handle_runnables)?
261 .on::<lsp_ext::InlayHints>(handlers::handle_inlay_hints)?
262 .on::<lsp_ext::CodeActionRequest>(handlers::handle_code_action)?
263 .on::<lsp_ext::ResolveCodeActionRequest>(handlers::handle_resolve_code_action)?
264 .on::<lsp_ext::HoverRequest>(handlers::handle_hover)?
265 .on::<lsp_types::request::OnTypeFormatting>(handlers::handle_on_type_formatting)?
266 .on::<lsp_types::request::DocumentSymbolRequest>(handlers::handle_document_symbol)?
267 .on::<lsp_types::request::WorkspaceSymbol>(handlers::handle_workspace_symbol)?
268 .on::<lsp_types::request::GotoDefinition>(handlers::handle_goto_definition)?
269 .on::<lsp_types::request::GotoImplementation>(handlers::handle_goto_implementation)?
270 .on::<lsp_types::request::GotoTypeDefinition>(handlers::handle_goto_type_definition)?
271 .on::<lsp_types::request::Completion>(handlers::handle_completion)?
272 .on::<lsp_types::request::CodeLensRequest>(handlers::handle_code_lens)?
273 .on::<lsp_types::request::CodeLensResolve>(handlers::handle_code_lens_resolve)?
274 .on::<lsp_types::request::FoldingRangeRequest>(handlers::handle_folding_range)?
275 .on::<lsp_types::request::SignatureHelpRequest>(handlers::handle_signature_help)?
276 .on::<lsp_types::request::PrepareRenameRequest>(handlers::handle_prepare_rename)?
277 .on::<lsp_types::request::Rename>(handlers::handle_rename)?
278 .on::<lsp_types::request::References>(handlers::handle_references)?
279 .on::<lsp_types::request::Formatting>(handlers::handle_formatting)?
280 .on::<lsp_types::request::DocumentHighlightRequest>(
281 handlers::handle_document_highlight,
282 )?
283 .on::<lsp_types::request::CallHierarchyPrepare>(
284 handlers::handle_call_hierarchy_prepare,
285 )?
286 .on::<lsp_types::request::CallHierarchyIncomingCalls>(
287 handlers::handle_call_hierarchy_incoming,
288 )?
289 .on::<lsp_types::request::CallHierarchyOutgoingCalls>(
290 handlers::handle_call_hierarchy_outgoing,
291 )?
292 .on::<lsp_types::request::SemanticTokensRequest>(handlers::handle_semantic_tokens)?
293 .on::<lsp_types::request::SemanticTokensRangeRequest>(
294 handlers::handle_semantic_tokens_range,
295 )?
296 .on::<lsp_ext::Ssr>(handlers::handle_ssr)?
297 .finish();
298 Ok(())
299 }
300 fn on_notification(&mut self, not: Notification) -> Result<()> {
301 let not = match notification_cast::<lsp_types::notification::Cancel>(not) {
302 Ok(params) => {
303 let id: RequestId = match params.id {
304 NumberOrString::Number(id) => id.into(),
305 NumberOrString::String(id) => id.into(),
306 };
307 if let Some(response) = self.req_queue.incoming.cancel(id) {
308 self.send(response.into())
309 }
310 return Ok(());
311 }
312 Err(not) => not,
313 };
314 let not = match notification_cast::<lsp_types::notification::DidOpenTextDocument>(not) {
315 Ok(params) => {
316 if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) {
317 if !self.mem_docs.insert(path.clone()) {
318 log::error!("duplicate DidOpenTextDocument: {}", path)
319 }
320 self.vfs
321 .write()
322 .0
323 .set_file_contents(path, Some(params.text_document.text.into_bytes()));
324 }
325 return Ok(());
326 }
327 Err(not) => not,
328 };
329 let not = match notification_cast::<lsp_types::notification::DidChangeTextDocument>(not) {
330 Ok(params) => {
331 if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) {
332 assert!(self.mem_docs.contains(&path));
333 let vfs = &mut self.vfs.write().0;
334 let file_id = vfs.file_id(&path).unwrap();
335 let mut text = String::from_utf8(vfs.file_contents(file_id).to_vec()).unwrap();
336 apply_document_changes(&mut text, params.content_changes);
337 vfs.set_file_contents(path, Some(text.into_bytes()))
338 }
339 return Ok(());
340 }
341 Err(not) => not,
342 };
343 let not = match notification_cast::<lsp_types::notification::DidCloseTextDocument>(not) {
344 Ok(params) => {
345 if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) {
346 if !self.mem_docs.remove(&path) {
347 log::error!("orphan DidCloseTextDocument: {}", path)
348 }
349 if let Some(path) = path.as_path() {
350 self.loader.invalidate(path.to_path_buf());
351 }
352 }
353 let params = lsp_types::PublishDiagnosticsParams {
354 uri: params.text_document.uri,
355 diagnostics: Vec::new(),
356 version: None,
141 }; 357 };
358 let not = notification_new::<lsp_types::notification::PublishDiagnostics>(params);
359 self.send(not.into());
360 return Ok(());
361 }
362 Err(not) => not,
363 };
364 let not = match notification_cast::<lsp_types::notification::DidSaveTextDocument>(not) {
365 Ok(_params) => {
366 if let Some(flycheck) = &self.flycheck {
367 flycheck.0.update();
368 }
369 return Ok(());
142 } 370 }
143 assert!(!global_state.vfs.read().0.has_changes()); 371 Err(not) => not,
144 loop_turn(&connection, &mut global_state, event)?; 372 };
145 assert!(!global_state.vfs.read().0.has_changes()); 373 let not = match notification_cast::<lsp_types::notification::DidChangeConfiguration>(not) {
374 Ok(_) => {
375 // As stated in https://github.com/microsoft/language-server-protocol/issues/676,
376 // this notification's parameters should be ignored and the actual config queried separately.
377 let request = self.req_queue.outgoing.register(
378 lsp_types::request::WorkspaceConfiguration::METHOD.to_string(),
379 lsp_types::ConfigurationParams {
380 items: vec![lsp_types::ConfigurationItem {
381 scope_uri: None,
382 section: Some("rust-analyzer".to_string()),
383 }],
384 },
385 |this, resp| {
386 log::debug!("config update response: '{:?}", resp);
387 let Response { error, result, .. } = resp;
388
389 match (error, result) {
390 (Some(err), _) => {
391 log::error!("failed to fetch the server settings: {:?}", err)
392 }
393 (None, Some(configs)) => {
394 if let Some(new_config) = configs.get(0) {
395 let mut config = this.config.clone();
396 config.update(&new_config);
397 this.update_configuration(config);
398 }
399 }
400 (None, None) => log::error!(
401 "received empty server settings response from the client"
402 ),
403 }
404 },
405 );
406 self.send(request.into());
407
408 return Ok(());
409 }
410 Err(not) => not,
411 };
412 let not = match notification_cast::<lsp_types::notification::DidChangeWatchedFiles>(not) {
413 Ok(params) => {
414 for change in params.changes {
415 if let Ok(path) = from_proto::abs_path(&change.uri) {
416 self.loader.invalidate(path)
417 }
418 }
419 return Ok(());
420 }
421 Err(not) => not,
422 };
423 if not.method.starts_with("$/") {
424 return Ok(());
146 } 425 }
426 log::error!("unhandled notification: {:?}", not);
427 Ok(())
428 }
429 fn on_task(&mut self, task: Task) {
430 match task {
431 Task::Respond(response) => {
432 if let Some((method, start)) = self.req_queue.incoming.complete(response.id.clone())
433 {
434 let duration = start.elapsed();
435 log::info!("handled req#{} in {:?}", response.id, duration);
436 self.complete_request(RequestMetrics {
437 id: response.id.clone(),
438 method: method.to_string(),
439 duration,
440 });
441 self.send(response.into());
442 }
443 }
444 Task::Diagnostics(tasks) => {
445 tasks.into_iter().for_each(|task| on_diagnostic_task(task, self))
446 }
447 Task::Unit => (),
448 }
449 }
450 fn update_file_notifications_on_threadpool(&mut self, subscriptions: Vec<FileId>) {
451 log::trace!("updating notifications for {:?}", subscriptions);
452 if self.config.publish_diagnostics {
453 let snapshot = self.snapshot();
454 let subscriptions = subscriptions.clone();
455 self.task_pool.0.spawn(move || {
456 let diagnostics = subscriptions
457 .into_iter()
458 .filter_map(|file_id| {
459 handlers::publish_diagnostics(&snapshot, file_id)
460 .map_err(|err| {
461 if !is_canceled(&*err) {
462 log::error!("failed to compute diagnostics: {:?}", err);
463 }
464 ()
465 })
466 .ok()
467 })
468 .collect::<Vec<_>>();
469 Task::Diagnostics(diagnostics)
470 })
471 }
472 self.task_pool.0.spawn({
473 let subs = subscriptions;
474 let snap = self.snapshot();
475 move || {
476 snap.analysis.prime_caches(subs).unwrap_or_else(|_: Canceled| ());
477 Task::Unit
478 }
479 });
147 } 480 }
148 global_state.analysis_host.request_cancellation();
149 Ok(())
150} 481}
151 482
152#[derive(Debug)] 483#[derive(Debug)]
@@ -199,333 +530,10 @@ pub(crate) type ReqHandler = fn(&mut GlobalState, Response);
199pub(crate) type ReqQueue = lsp_server::ReqQueue<(&'static str, Instant), ReqHandler>; 530pub(crate) type ReqQueue = lsp_server::ReqQueue<(&'static str, Instant), ReqHandler>;
200const DO_NOTHING: ReqHandler = |_, _| (); 531const DO_NOTHING: ReqHandler = |_, _| ();
201 532
202fn loop_turn(connection: &Connection, global_state: &mut GlobalState, event: Event) -> Result<()> { 533fn on_check_task(task: flycheck::Message, global_state: &mut GlobalState) -> Result<()> {
203 let loop_start = Instant::now();
204
205 // NOTE: don't count blocking select! call as a loop-turn time
206 let _p = profile("main_loop_inner/loop-turn");
207 log::info!("loop turn = {:?}", event);
208 let queue_count = global_state.task_pool.0.len();
209 if queue_count > 0 {
210 log::info!("queued count = {}", queue_count);
211 }
212
213 let mut became_ready = false;
214 match event {
215 Event::Task(task) => {
216 on_task(task, &connection.sender, global_state);
217 global_state.maybe_collect_garbage();
218 }
219 Event::Vfs(task) => match task {
220 vfs::loader::Message::Loaded { files } => {
221 let vfs = &mut global_state.vfs.write().0;
222 for (path, contents) in files {
223 let path = VfsPath::from(path);
224 if !global_state.mem_docs.contains(&path) {
225 vfs.set_file_contents(path, contents)
226 }
227 }
228 }
229 vfs::loader::Message::Progress { n_total, n_done } => {
230 let state = if n_done == 0 {
231 Progress::Begin
232 } else if n_done < n_total {
233 Progress::Report
234 } else {
235 assert_eq!(n_done, n_total);
236 global_state.status = Status::Ready;
237 became_ready = true;
238 Progress::End
239 };
240 report_progress(
241 global_state,
242 &connection.sender,
243 "roots scanned",
244 state,
245 Some(format!("{}/{}", n_done, n_total)),
246 Some(percentage(n_done, n_total)),
247 )
248 }
249 },
250 Event::Flycheck(task) => on_check_task(task, global_state, &connection.sender)?,
251 Event::Lsp(msg) => match msg {
252 lsp_server::Message::Request(req) => {
253 on_request(global_state, &connection.sender, loop_start, req)?
254 }
255 lsp_server::Message::Notification(not) => {
256 on_notification(&connection.sender, global_state, not)?;
257 }
258 lsp_server::Message::Response(resp) => {
259 let handler = global_state.req_queue.outgoing.complete(resp.id.clone());
260 handler(global_state, resp)
261 }
262 },
263 };
264
265 let state_changed = global_state.process_changes();
266
267 if became_ready {
268 if let Some(flycheck) = &global_state.flycheck {
269 flycheck.0.update();
270 }
271 }
272
273 if global_state.status == Status::Ready && (state_changed || became_ready) {
274 let subscriptions = global_state
275 .mem_docs
276 .iter()
277 .map(|path| global_state.vfs.read().0.file_id(&path).unwrap())
278 .collect::<Vec<_>>();
279
280 update_file_notifications_on_threadpool(global_state, subscriptions.clone());
281 global_state.task_pool.0.spawn({
282 let subs = subscriptions;
283 let snap = global_state.snapshot();
284 move || {
285 snap.analysis.prime_caches(subs).unwrap_or_else(|_: Canceled| ());
286 Task::Unit
287 }
288 });
289 }
290
291 let loop_duration = loop_start.elapsed();
292 if loop_duration > Duration::from_millis(100) {
293 log::error!("overly long loop turn: {:?}", loop_duration);
294 if env::var("RA_PROFILE").is_ok() {
295 show_message(
296 lsp_types::MessageType::Error,
297 format!("overly long loop turn: {:?}", loop_duration),
298 &connection.sender,
299 );
300 }
301 }
302
303 Ok(())
304}
305
306fn on_task(task: Task, msg_sender: &Sender<lsp_server::Message>, global_state: &mut GlobalState) {
307 match task {
308 Task::Respond(response) => {
309 if let Some((method, start)) =
310 global_state.req_queue.incoming.complete(response.id.clone())
311 {
312 let duration = start.elapsed();
313 log::info!("handled req#{} in {:?}", response.id, duration);
314 global_state.complete_request(RequestMetrics {
315 id: response.id.clone(),
316 method: method.to_string(),
317 duration,
318 });
319 msg_sender.send(response.into()).unwrap();
320 }
321 }
322 Task::Diagnostics(tasks) => {
323 tasks.into_iter().for_each(|task| on_diagnostic_task(task, msg_sender, global_state))
324 }
325 Task::Unit => (),
326 }
327}
328
329fn on_request(
330 global_state: &mut GlobalState,
331 msg_sender: &Sender<lsp_server::Message>,
332 request_received: Instant,
333 req: Request,
334) -> Result<()> {
335 let mut pool_dispatcher =
336 PoolDispatcher { req: Some(req), global_state, msg_sender, request_received };
337 pool_dispatcher
338 .on_sync::<lsp_ext::CollectGarbage>(|s, ()| Ok(s.collect_garbage()))?
339 .on_sync::<lsp_ext::JoinLines>(|s, p| handlers::handle_join_lines(s.snapshot(), p))?
340 .on_sync::<lsp_ext::OnEnter>(|s, p| handlers::handle_on_enter(s.snapshot(), p))?
341 .on_sync::<lsp_types::request::SelectionRangeRequest>(|s, p| {
342 handlers::handle_selection_range(s.snapshot(), p)
343 })?
344 .on_sync::<lsp_ext::MatchingBrace>(|s, p| handlers::handle_matching_brace(s.snapshot(), p))?
345 .on::<lsp_ext::AnalyzerStatus>(handlers::handle_analyzer_status)?
346 .on::<lsp_ext::SyntaxTree>(handlers::handle_syntax_tree)?
347 .on::<lsp_ext::ExpandMacro>(handlers::handle_expand_macro)?
348 .on::<lsp_ext::ParentModule>(handlers::handle_parent_module)?
349 .on::<lsp_ext::Runnables>(handlers::handle_runnables)?
350 .on::<lsp_ext::InlayHints>(handlers::handle_inlay_hints)?
351 .on::<lsp_ext::CodeActionRequest>(handlers::handle_code_action)?
352 .on::<lsp_ext::ResolveCodeActionRequest>(handlers::handle_resolve_code_action)?
353 .on::<lsp_ext::HoverRequest>(handlers::handle_hover)?
354 .on::<lsp_types::request::OnTypeFormatting>(handlers::handle_on_type_formatting)?
355 .on::<lsp_types::request::DocumentSymbolRequest>(handlers::handle_document_symbol)?
356 .on::<lsp_types::request::WorkspaceSymbol>(handlers::handle_workspace_symbol)?
357 .on::<lsp_types::request::GotoDefinition>(handlers::handle_goto_definition)?
358 .on::<lsp_types::request::GotoImplementation>(handlers::handle_goto_implementation)?
359 .on::<lsp_types::request::GotoTypeDefinition>(handlers::handle_goto_type_definition)?
360 .on::<lsp_types::request::Completion>(handlers::handle_completion)?
361 .on::<lsp_types::request::CodeLensRequest>(handlers::handle_code_lens)?
362 .on::<lsp_types::request::CodeLensResolve>(handlers::handle_code_lens_resolve)?
363 .on::<lsp_types::request::FoldingRangeRequest>(handlers::handle_folding_range)?
364 .on::<lsp_types::request::SignatureHelpRequest>(handlers::handle_signature_help)?
365 .on::<lsp_types::request::PrepareRenameRequest>(handlers::handle_prepare_rename)?
366 .on::<lsp_types::request::Rename>(handlers::handle_rename)?
367 .on::<lsp_types::request::References>(handlers::handle_references)?
368 .on::<lsp_types::request::Formatting>(handlers::handle_formatting)?
369 .on::<lsp_types::request::DocumentHighlightRequest>(handlers::handle_document_highlight)?
370 .on::<lsp_types::request::CallHierarchyPrepare>(handlers::handle_call_hierarchy_prepare)?
371 .on::<lsp_types::request::CallHierarchyIncomingCalls>(
372 handlers::handle_call_hierarchy_incoming,
373 )?
374 .on::<lsp_types::request::CallHierarchyOutgoingCalls>(
375 handlers::handle_call_hierarchy_outgoing,
376 )?
377 .on::<lsp_types::request::SemanticTokensRequest>(handlers::handle_semantic_tokens)?
378 .on::<lsp_types::request::SemanticTokensRangeRequest>(
379 handlers::handle_semantic_tokens_range,
380 )?
381 .on::<lsp_ext::Ssr>(handlers::handle_ssr)?
382 .finish();
383 Ok(())
384}
385
386fn on_notification(
387 msg_sender: &Sender<lsp_server::Message>,
388 global_state: &mut GlobalState,
389 not: Notification,
390) -> Result<()> {
391 let not = match notification_cast::<lsp_types::notification::Cancel>(not) {
392 Ok(params) => {
393 let id: RequestId = match params.id {
394 NumberOrString::Number(id) => id.into(),
395 NumberOrString::String(id) => id.into(),
396 };
397 if let Some(response) = global_state.req_queue.incoming.cancel(id) {
398 msg_sender.send(response.into()).unwrap()
399 }
400 return Ok(());
401 }
402 Err(not) => not,
403 };
404 let not = match notification_cast::<lsp_types::notification::DidOpenTextDocument>(not) {
405 Ok(params) => {
406 if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) {
407 if !global_state.mem_docs.insert(path.clone()) {
408 log::error!("duplicate DidOpenTextDocument: {}", path)
409 }
410 global_state
411 .vfs
412 .write()
413 .0
414 .set_file_contents(path, Some(params.text_document.text.into_bytes()));
415 }
416 return Ok(());
417 }
418 Err(not) => not,
419 };
420 let not = match notification_cast::<lsp_types::notification::DidChangeTextDocument>(not) {
421 Ok(params) => {
422 if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) {
423 assert!(global_state.mem_docs.contains(&path));
424 let vfs = &mut global_state.vfs.write().0;
425 let file_id = vfs.file_id(&path).unwrap();
426 let mut text = String::from_utf8(vfs.file_contents(file_id).to_vec()).unwrap();
427 apply_document_changes(&mut text, params.content_changes);
428 vfs.set_file_contents(path, Some(text.into_bytes()))
429 }
430 return Ok(());
431 }
432 Err(not) => not,
433 };
434 let not = match notification_cast::<lsp_types::notification::DidCloseTextDocument>(not) {
435 Ok(params) => {
436 if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) {
437 if !global_state.mem_docs.remove(&path) {
438 log::error!("orphan DidCloseTextDocument: {}", path)
439 }
440 if let Some(path) = path.as_path() {
441 global_state.loader.invalidate(path.to_path_buf());
442 }
443 }
444 let params = lsp_types::PublishDiagnosticsParams {
445 uri: params.text_document.uri,
446 diagnostics: Vec::new(),
447 version: None,
448 };
449 let not = notification_new::<lsp_types::notification::PublishDiagnostics>(params);
450 msg_sender.send(not.into()).unwrap();
451 return Ok(());
452 }
453 Err(not) => not,
454 };
455 let not = match notification_cast::<lsp_types::notification::DidSaveTextDocument>(not) {
456 Ok(_params) => {
457 if let Some(flycheck) = &global_state.flycheck {
458 flycheck.0.update();
459 }
460 return Ok(());
461 }
462 Err(not) => not,
463 };
464 let not = match notification_cast::<lsp_types::notification::DidChangeConfiguration>(not) {
465 Ok(_) => {
466 // As stated in https://github.com/microsoft/language-server-protocol/issues/676,
467 // this notification's parameters should be ignored and the actual config queried separately.
468 let request = global_state.req_queue.outgoing.register(
469 lsp_types::request::WorkspaceConfiguration::METHOD.to_string(),
470 lsp_types::ConfigurationParams {
471 items: vec![lsp_types::ConfigurationItem {
472 scope_uri: None,
473 section: Some("rust-analyzer".to_string()),
474 }],
475 },
476 |global_state, resp| {
477 log::debug!("config update response: '{:?}", resp);
478 let Response { error, result, .. } = resp;
479
480 match (error, result) {
481 (Some(err), _) => {
482 log::error!("failed to fetch the server settings: {:?}", err)
483 }
484 (None, Some(configs)) => {
485 if let Some(new_config) = configs.get(0) {
486 let mut config = global_state.config.clone();
487 config.update(&new_config);
488 global_state.update_configuration(config);
489 }
490 }
491 (None, None) => {
492 log::error!("received empty server settings response from the client")
493 }
494 }
495 },
496 );
497 msg_sender.send(request.into())?;
498
499 return Ok(());
500 }
501 Err(not) => not,
502 };
503 let not = match notification_cast::<lsp_types::notification::DidChangeWatchedFiles>(not) {
504 Ok(params) => {
505 for change in params.changes {
506 if let Ok(path) = from_proto::abs_path(&change.uri) {
507 global_state.loader.invalidate(path)
508 }
509 }
510 return Ok(());
511 }
512 Err(not) => not,
513 };
514 if not.method.starts_with("$/") {
515 return Ok(());
516 }
517 log::error!("unhandled notification: {:?}", not);
518 Ok(())
519}
520
521fn on_check_task(
522 task: flycheck::Message,
523 global_state: &mut GlobalState,
524 msg_sender: &Sender<lsp_server::Message>,
525) -> Result<()> {
526 match task { 534 match task {
527 flycheck::Message::ClearDiagnostics => { 535 flycheck::Message::ClearDiagnostics => {
528 on_diagnostic_task(DiagnosticTask::ClearCheck, msg_sender, global_state) 536 on_diagnostic_task(DiagnosticTask::ClearCheck, global_state)
529 } 537 }
530 538
531 flycheck::Message::AddDiagnostic { workspace_root, diagnostic } => { 539 flycheck::Message::AddDiagnostic { workspace_root, diagnostic } => {
@@ -550,7 +558,6 @@ fn on_check_task(
550 diag.diagnostic, 558 diag.diagnostic,
551 diag.fixes.into_iter().map(|it| it.into()).collect(), 559 diag.fixes.into_iter().map(|it| it.into()).collect(),
552 ), 560 ),
553 msg_sender,
554 global_state, 561 global_state,
555 ) 562 )
556 } 563 }
@@ -563,26 +570,22 @@ fn on_check_task(
563 flycheck::Progress::End => (Progress::End, None), 570 flycheck::Progress::End => (Progress::End, None),
564 }; 571 };
565 572
566 report_progress(global_state, msg_sender, "cargo check", state, message, None); 573 report_progress(global_state, "cargo check", state, message, None);
567 } 574 }
568 }; 575 };
569 576
570 Ok(()) 577 Ok(())
571} 578}
572 579
573fn on_diagnostic_task( 580fn on_diagnostic_task(task: DiagnosticTask, global_state: &mut GlobalState) {
574 task: DiagnosticTask, 581 let subscriptions = global_state.diagnostics.handle_task(task);
575 msg_sender: &Sender<lsp_server::Message>,
576 state: &mut GlobalState,
577) {
578 let subscriptions = state.diagnostics.handle_task(task);
579 582
580 for file_id in subscriptions { 583 for file_id in subscriptions {
581 let url = file_id_to_url(&state.vfs.read().0, file_id); 584 let url = file_id_to_url(&global_state.vfs.read().0, file_id);
582 let diagnostics = state.diagnostics.diagnostics_for(file_id).cloned().collect(); 585 let diagnostics = global_state.diagnostics.diagnostics_for(file_id).cloned().collect();
583 let params = lsp_types::PublishDiagnosticsParams { uri: url, diagnostics, version: None }; 586 let params = lsp_types::PublishDiagnosticsParams { uri: url, diagnostics, version: None };
584 let not = notification_new::<lsp_types::notification::PublishDiagnostics>(params); 587 let not = notification_new::<lsp_types::notification::PublishDiagnostics>(params);
585 msg_sender.send(not.into()).unwrap(); 588 global_state.send(not.into());
586 } 589 }
587} 590}
588 591
@@ -599,7 +602,6 @@ fn percentage(done: usize, total: usize) -> f64 {
599 602
600fn report_progress( 603fn report_progress(
601 global_state: &mut GlobalState, 604 global_state: &mut GlobalState,
602 sender: &Sender<lsp_server::Message>,
603 title: &str, 605 title: &str,
604 state: Progress, 606 state: Progress,
605 message: Option<String>, 607 message: Option<String>,
@@ -616,7 +618,7 @@ fn report_progress(
616 lsp_types::WorkDoneProgressCreateParams { token: token.clone() }, 618 lsp_types::WorkDoneProgressCreateParams { token: token.clone() },
617 DO_NOTHING, 619 DO_NOTHING,
618 ); 620 );
619 sender.send(work_done_progress_create.into()).unwrap(); 621 global_state.send(work_done_progress_create.into());
620 622
621 lsp_types::WorkDoneProgress::Begin(lsp_types::WorkDoneProgressBegin { 623 lsp_types::WorkDoneProgress::Begin(lsp_types::WorkDoneProgressBegin {
622 title: title.into(), 624 title: title.into(),
@@ -641,13 +643,12 @@ fn report_progress(
641 token, 643 token,
642 value: lsp_types::ProgressParamsValue::WorkDone(work_done_progress), 644 value: lsp_types::ProgressParamsValue::WorkDone(work_done_progress),
643 }); 645 });
644 sender.send(notification.into()).unwrap(); 646 global_state.send(notification.into());
645} 647}
646 648
647struct PoolDispatcher<'a> { 649struct PoolDispatcher<'a> {
648 req: Option<Request>, 650 req: Option<Request>,
649 global_state: &'a mut GlobalState, 651 global_state: &'a mut GlobalState,
650 msg_sender: &'a Sender<lsp_server::Message>,
651 request_received: Instant, 652 request_received: Instant,
652} 653}
653 654
@@ -674,7 +675,7 @@ impl<'a> PoolDispatcher<'a> {
674 result_to_task::<R>(id, result) 675 result_to_task::<R>(id, result)
675 }) 676 })
676 .map_err(|_| format!("sync task {:?} panicked", R::METHOD))?; 677 .map_err(|_| format!("sync task {:?} panicked", R::METHOD))?;
677 on_task(task, self.msg_sender, self.global_state); 678 self.global_state.on_task(task);
678 Ok(self) 679 Ok(self)
679 } 680 }
680 681
@@ -736,7 +737,7 @@ impl<'a> PoolDispatcher<'a> {
736 ErrorCode::MethodNotFound as i32, 737 ErrorCode::MethodNotFound as i32,
737 "unknown request".to_string(), 738 "unknown request".to_string(),
738 ); 739 );
739 self.msg_sender.send(resp.into()).unwrap(); 740 self.global_state.send(resp.into());
740 } 741 }
741 } 742 }
742 } 743 }
@@ -767,29 +768,3 @@ where
767 }; 768 };
768 Task::Respond(response) 769 Task::Respond(response)
769} 770}
770
771fn update_file_notifications_on_threadpool(
772 global_state: &mut GlobalState,
773 subscriptions: Vec<FileId>,
774) {
775 log::trace!("updating notifications for {:?}", subscriptions);
776 if global_state.config.publish_diagnostics {
777 let snapshot = global_state.snapshot();
778 global_state.task_pool.0.spawn(move || {
779 let diagnostics = subscriptions
780 .into_iter()
781 .filter_map(|file_id| {
782 handlers::publish_diagnostics(&snapshot, file_id)
783 .map_err(|err| {
784 if !is_canceled(&*err) {
785 log::error!("failed to compute diagnostics: {:?}", err);
786 }
787 ()
788 })
789 .ok()
790 })
791 .collect::<Vec<_>>();
792 Task::Diagnostics(diagnostics)
793 })
794 }
795}
diff --git a/crates/vfs-notify/src/lib.rs b/crates/vfs-notify/src/lib.rs
index 68fdb8cb0..25ba8d798 100644
--- a/crates/vfs-notify/src/lib.rs
+++ b/crates/vfs-notify/src/lib.rs
@@ -82,7 +82,12 @@ impl NotifyActor {
82 watcher_receiver, 82 watcher_receiver,
83 } 83 }
84 } 84 }
85 85 fn next_event(&self, receiver: &Receiver<Message>) -> Option<Event> {
86 select! {
87 recv(receiver) -> it => it.ok().map(Event::Message),
88 recv(&self.watcher_receiver) -> it => Some(Event::NotifyEvent(it.unwrap())),
89 }
90 }
86 fn run(mut self, inbox: Receiver<Message>) { 91 fn run(mut self, inbox: Receiver<Message>) {
87 while let Some(event) = self.next_event(&inbox) { 92 while let Some(event) = self.next_event(&inbox) {
88 log::debug!("vfs-notify event: {:?}", event); 93 log::debug!("vfs-notify event: {:?}", event);
@@ -154,12 +159,6 @@ impl NotifyActor {
154 } 159 }
155 } 160 }
156 } 161 }
157 fn next_event(&self, receiver: &Receiver<Message>) -> Option<Event> {
158 select! {
159 recv(receiver) -> it => it.ok().map(Event::Message),
160 recv(&self.watcher_receiver) -> it => Some(Event::NotifyEvent(it.unwrap())),
161 }
162 }
163 fn load_entry( 162 fn load_entry(
164 &mut self, 163 &mut self,
165 entry: loader::Entry, 164 entry: loader::Entry,