From 19b063e055e9621a3af93b9bc6cfa20150ab0824 Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Thu, 25 Jun 2020 00:17:11 +0200 Subject: Merge LoopState into GlobalState --- crates/rust-analyzer/src/global_state.rs | 31 +++++++-- crates/rust-analyzer/src/main_loop.rs | 112 +++++++++++-------------------- 2 files changed, 64 insertions(+), 79 deletions(-) diff --git a/crates/rust-analyzer/src/global_state.rs b/crates/rust-analyzer/src/global_state.rs index 9a75cb2ab..ad5f94e87 100644 --- a/crates/rust-analyzer/src/global_state.rs +++ b/crates/rust-analyzer/src/global_state.rs @@ -20,11 +20,12 @@ use crate::{ diagnostics::{CheckFixes, DiagnosticCollection}, from_proto, line_endings::LineEndings, + main_loop::ReqQueue, request_metrics::{LatestRequests, RequestMetrics}, to_proto::url_from_abs_path, Result, }; -use rustc_hash::FxHashMap; +use rustc_hash::{FxHashMap, FxHashSet}; fn create_flycheck(workspaces: &[ProjectWorkspace], config: &FlycheckConfig) -> Option { // FIXME: Figure out the multi-workspace situation @@ -40,12 +41,23 @@ fn create_flycheck(workspaces: &[ProjectWorkspace], config: &FlycheckConfig) -> }) } +#[derive(Eq, PartialEq)] +pub(crate) enum Status { + Loading, + Ready, +} + +impl Default for Status { + fn default() -> Self { + Status::Loading + } +} + /// `GlobalState` is the primary mutable state of the language server /// /// The most interesting components are `vfs`, which stores a consistent /// snapshot of the file systems, and `analysis_host`, which stores our /// incremental salsa database. -#[derive(Debug)] pub(crate) struct GlobalState { pub(crate) config: Config, pub(crate) workspaces: Arc>, @@ -54,10 +66,13 @@ pub(crate) struct GlobalState { pub(crate) task_receiver: Receiver, pub(crate) flycheck: Option, pub(crate) diagnostics: DiagnosticCollection, - pub(crate) proc_macro_client: ProcMacroClient, + pub(crate) mem_docs: FxHashSet, pub(crate) vfs: Arc)>>, + pub(crate) status: Status, + pub(crate) req_queue: ReqQueue, pub(crate) latest_requests: Arc>, source_root_config: SourceRootConfig, + _proc_macro_client: ProcMacroClient, } /// An immutable snapshot of the world's state at a point in time. @@ -75,6 +90,7 @@ impl GlobalState { workspaces: Vec, lru_capacity: Option, config: Config, + req_queue: ReqQueue, ) -> GlobalState { let mut change = AnalysisChange::new(); @@ -136,13 +152,16 @@ impl GlobalState { workspaces: Arc::new(workspaces), analysis_host, loader, - vfs: Arc::new(RwLock::new((vfs, FxHashMap::default()))), task_receiver, - latest_requests: Default::default(), flycheck, diagnostics: Default::default(), - proc_macro_client, + mem_docs: FxHashSet::default(), + vfs: Arc::new(RwLock::new((vfs, FxHashMap::default()))), + status: Status::default(), + req_queue, + latest_requests: Default::default(), source_root_config: project_folders.source_root_config, + _proc_macro_client: proc_macro_client, }; res.process_changes(); res diff --git a/crates/rust-analyzer/src/main_loop.rs b/crates/rust-analyzer/src/main_loop.rs index c8819c3b0..02e188b02 100644 --- a/crates/rust-analyzer/src/main_loop.rs +++ b/crates/rust-analyzer/src/main_loop.rs @@ -11,16 +11,13 @@ use std::{ }; use crossbeam_channel::{never, select, unbounded, RecvError, Sender}; -use lsp_server::{ - Connection, ErrorCode, Message, Notification, ReqQueue, Request, RequestId, Response, -}; +use lsp_server::{Connection, ErrorCode, Message, Notification, Request, RequestId, Response}; use lsp_types::{request::Request as _, NumberOrString, TextDocumentContentChangeEvent}; use ra_db::VfsPath; use ra_flycheck::CheckTask; use ra_ide::{Canceled, FileId, LineIndex}; use ra_prof::profile; use ra_project_model::{PackageRoot, ProjectWorkspace}; -use rustc_hash::FxHashSet; use serde::{de::DeserializeOwned, Serialize}; use threadpool::ThreadPool; @@ -28,7 +25,7 @@ use crate::{ config::{Config, FilesWatcher, LinkedProject}, diagnostics::DiagnosticTask, from_proto, - global_state::{file_id_to_url, GlobalState, GlobalStateSnapshot}, + global_state::{file_id_to_url, GlobalState, GlobalStateSnapshot, Status}, handlers, lsp_ext, request_metrics::RequestMetrics, Result, @@ -78,7 +75,6 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> { SetThreadPriority(thread, thread_priority_above_normal); } - let mut loop_state = LoopState::default(); let mut global_state = { let workspaces = { if config.linked_projects.is_empty() && config.notifications.cargo_toml_not_found { @@ -116,6 +112,8 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> { .collect::>() }; + let mut req_queue = ReqQueue::default(); + if let FilesWatcher::Client = config.files.watcher { let registration_options = lsp_types::DidChangeWatchedFilesRegistrationOptions { watchers: workspaces @@ -132,7 +130,7 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> { register_options: Some(serde_json::to_value(registration_options).unwrap()), }; let params = lsp_types::RegistrationParams { registrations: vec![registration] }; - let request = loop_state.req_queue.outgoing.register( + let request = req_queue.outgoing.register( lsp_types::request::RegisterCapability::METHOD.to_string(), params, DO_NOTHING, @@ -140,7 +138,7 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> { connection.sender.send(request.into()).unwrap(); } - GlobalState::new(workspaces, config.lru_capacity, config) + GlobalState::new(workspaces, config.lru_capacity, config, req_queue) }; let pool = ThreadPool::default(); @@ -172,15 +170,13 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> { }; } assert!(!global_state.vfs.read().0.has_changes()); - loop_turn(&pool, &task_sender, &connection, &mut global_state, &mut loop_state, event)?; + loop_turn(&pool, &task_sender, &connection, &mut global_state, event)?; assert!(!global_state.vfs.read().0.has_changes()); } } global_state.analysis_host.request_cancellation(); log::info!("waiting for tasks to finish..."); - task_receiver.into_iter().for_each(|task| { - on_task(task, &connection.sender, &mut loop_state.req_queue.incoming, &mut global_state) - }); + task_receiver.into_iter().for_each(|task| on_task(task, &connection.sender, &mut global_state)); log::info!("...tasks have finished"); log::info!("joining threadpool..."); pool.join(); @@ -244,35 +240,15 @@ impl fmt::Debug for Event { } } -type ReqHandler = fn(&mut GlobalState, Response); +pub(crate) type ReqHandler = fn(&mut GlobalState, Response); +pub(crate) type ReqQueue = lsp_server::ReqQueue<(&'static str, Instant), ReqHandler>; const DO_NOTHING: ReqHandler = |_, _| (); -type Incoming = lsp_server::Incoming<(&'static str, Instant)>; - -#[derive(Default)] -struct LoopState { - req_queue: ReqQueue<(&'static str, Instant), ReqHandler>, - mem_docs: FxHashSet, - status: Status, -} - -#[derive(Eq, PartialEq)] -enum Status { - Loading, - Ready, -} - -impl Default for Status { - fn default() -> Self { - Status::Loading - } -} fn loop_turn( pool: &ThreadPool, task_sender: &Sender, connection: &Connection, global_state: &mut GlobalState, - loop_state: &mut LoopState, event: Event, ) -> Result<()> { let loop_start = Instant::now(); @@ -288,7 +264,7 @@ fn loop_turn( let mut became_ready = false; match event { Event::Task(task) => { - on_task(task, &connection.sender, &mut loop_state.req_queue.incoming, global_state); + on_task(task, &connection.sender, global_state); global_state.maybe_collect_garbage(); } Event::Vfs(task) => match task { @@ -296,35 +272,29 @@ fn loop_turn( let vfs = &mut global_state.vfs.write().0; for (path, contents) in files { let path = VfsPath::from(path); - if !loop_state.mem_docs.contains(&path) { + if !global_state.mem_docs.contains(&path) { vfs.set_file_contents(path, contents) } } } vfs::loader::Message::Progress { n_total, n_done } => { if n_done == n_total { - loop_state.status = Status::Ready; + global_state.status = Status::Ready; became_ready = true; } - report_progress(loop_state, &connection.sender, n_done, n_total, "roots scanned") + report_progress(global_state, &connection.sender, n_done, n_total, "roots scanned") } }, Event::CheckWatcher(task) => on_check_task(task, global_state, task_sender)?, Event::Msg(msg) => match msg { - Message::Request(req) => on_request( - global_state, - &mut loop_state.req_queue.incoming, - pool, - task_sender, - &connection.sender, - loop_start, - req, - )?, + Message::Request(req) => { + on_request(global_state, pool, task_sender, &connection.sender, loop_start, req)? + } Message::Notification(not) => { - on_notification(&connection.sender, global_state, loop_state, not)?; + on_notification(&connection.sender, global_state, not)?; } Message::Response(resp) => { - let handler = loop_state.req_queue.outgoing.complete(resp.id.clone()); + let handler = global_state.req_queue.outgoing.complete(resp.id.clone()); handler(global_state, resp) } }, @@ -338,8 +308,8 @@ fn loop_turn( } } - if loop_state.status == Status::Ready && (state_changed || became_ready) { - let subscriptions = loop_state + if global_state.status == Status::Ready && (state_changed || became_ready) { + let subscriptions = global_state .mem_docs .iter() .map(|path| global_state.vfs.read().0.file_id(&path).unwrap()) @@ -373,18 +343,15 @@ fn loop_turn( Ok(()) } -fn on_task( - task: Task, - msg_sender: &Sender, - incoming_requests: &mut Incoming, - state: &mut GlobalState, -) { +fn on_task(task: Task, msg_sender: &Sender, global_state: &mut GlobalState) { match task { Task::Respond(response) => { - if let Some((method, start)) = incoming_requests.complete(response.id.clone()) { + if let Some((method, start)) = + global_state.req_queue.incoming.complete(response.id.clone()) + { let duration = start.elapsed(); log::info!("handled req#{} in {:?}", response.id, duration); - state.complete_request(RequestMetrics { + global_state.complete_request(RequestMetrics { id: response.id.clone(), method: method.to_string(), duration, @@ -395,13 +362,12 @@ fn on_task( Task::Notify(n) => { msg_sender.send(n.into()).unwrap(); } - Task::Diagnostic(task) => on_diagnostic_task(task, msg_sender, state), + Task::Diagnostic(task) => on_diagnostic_task(task, msg_sender, global_state), } } fn on_request( global_state: &mut GlobalState, - incoming_requests: &mut Incoming, pool: &ThreadPool, task_sender: &Sender, msg_sender: &Sender, @@ -414,7 +380,6 @@ fn on_request( global_state, task_sender, msg_sender, - incoming_requests, request_received, }; pool_dispatcher @@ -469,7 +434,6 @@ fn on_request( fn on_notification( msg_sender: &Sender, global_state: &mut GlobalState, - loop_state: &mut LoopState, not: Notification, ) -> Result<()> { let not = match notification_cast::(not) { @@ -478,7 +442,7 @@ fn on_notification( NumberOrString::Number(id) => id.into(), NumberOrString::String(id) => id.into(), }; - if let Some(response) = loop_state.req_queue.incoming.cancel(id) { + if let Some(response) = global_state.req_queue.incoming.cancel(id) { msg_sender.send(response.into()).unwrap() } return Ok(()); @@ -488,7 +452,7 @@ fn on_notification( let not = match notification_cast::(not) { Ok(params) => { if let Ok(path) = from_proto::vfs_path(¶ms.text_document.uri) { - if !loop_state.mem_docs.insert(path.clone()) { + if !global_state.mem_docs.insert(path.clone()) { log::error!("duplicate DidOpenTextDocument: {}", path) } global_state @@ -504,7 +468,7 @@ fn on_notification( let not = match notification_cast::(not) { Ok(params) => { if let Ok(path) = from_proto::vfs_path(¶ms.text_document.uri) { - assert!(loop_state.mem_docs.contains(&path)); + assert!(global_state.mem_docs.contains(&path)); let vfs = &mut global_state.vfs.write().0; let file_id = vfs.file_id(&path).unwrap(); let mut text = String::from_utf8(vfs.file_contents(file_id).to_vec()).unwrap(); @@ -518,7 +482,7 @@ fn on_notification( let not = match notification_cast::(not) { Ok(params) => { if let Ok(path) = from_proto::vfs_path(¶ms.text_document.uri) { - if !loop_state.mem_docs.remove(&path) { + if !global_state.mem_docs.remove(&path) { log::error!("orphan DidCloseTextDocument: {}", path) } if let Some(path) = path.as_path() { @@ -549,7 +513,7 @@ fn on_notification( Ok(_) => { // As stated in https://github.com/microsoft/language-server-protocol/issues/676, // this notification's parameters should be ignored and the actual config queried separately. - let request = loop_state.req_queue.outgoing.register( + let request = global_state.req_queue.outgoing.register( lsp_types::request::WorkspaceConfiguration::METHOD.to_string(), lsp_types::ConfigurationParams { items: vec![lsp_types::ConfigurationItem { @@ -732,7 +696,7 @@ fn on_diagnostic_task(task: DiagnosticTask, msg_sender: &Sender, state: } fn report_progress( - loop_state: &mut LoopState, + global_state: &mut GlobalState, sender: &Sender, done: usize, total: usize, @@ -742,7 +706,7 @@ fn report_progress( let message = Some(format!("{}/{} {}", done, total, message)); let percentage = Some(100.0 * done as f64 / total.max(1) as f64); let work_done_progress = if done == 0 { - let work_done_progress_create = loop_state.req_queue.outgoing.register( + let work_done_progress_create = global_state.req_queue.outgoing.register( lsp_types::request::WorkDoneProgressCreate::METHOD.to_string(), lsp_types::WorkDoneProgressCreateParams { token: token.clone() }, DO_NOTHING, @@ -777,7 +741,6 @@ struct PoolDispatcher<'a> { req: Option, pool: &'a ThreadPool, global_state: &'a mut GlobalState, - incoming_requests: &'a mut Incoming, msg_sender: &'a Sender, task_sender: &'a Sender, request_received: Instant, @@ -806,7 +769,7 @@ impl<'a> PoolDispatcher<'a> { result_to_task::(id, result) }) .map_err(|_| format!("sync task {:?} panicked", R::METHOD))?; - on_task(task, self.msg_sender, self.incoming_requests, self.global_state); + on_task(task, self.msg_sender, self.global_state); Ok(self) } @@ -853,7 +816,10 @@ impl<'a> PoolDispatcher<'a> { return None; } }; - self.incoming_requests.register(id.clone(), (R::METHOD, self.request_received)); + self.global_state + .req_queue + .incoming + .register(id.clone(), (R::METHOD, self.request_received)); Some((id, params)) } -- cgit v1.2.3