From 4e3f5cc7293d97aef4630ea30b8e9ad6931589a8 Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Sat, 31 Aug 2019 14:47:37 +0300 Subject: cleanup main loop --- crates/ra_lsp_server/src/main_loop.rs | 277 ++++++++++++++++++---------------- 1 file changed, 151 insertions(+), 126 deletions(-) (limited to 'crates/ra_lsp_server/src/main_loop.rs') diff --git a/crates/ra_lsp_server/src/main_loop.rs b/crates/ra_lsp_server/src/main_loop.rs index 42ebb5cdf..80f0216e8 100644 --- a/crates/ra_lsp_server/src/main_loop.rs +++ b/crates/ra_lsp_server/src/main_loop.rs @@ -4,12 +4,13 @@ pub(crate) mod pending_requests; use std::{error::Error, fmt, path::PathBuf, sync::Arc, time::Instant}; -use crossbeam_channel::{select, unbounded, Receiver, RecvError, Sender}; +use crossbeam_channel::{select, unbounded, RecvError, Sender}; use lsp_server::{Connection, ErrorCode, Message, Notification, Request, RequestId, Response}; use lsp_types::{ClientCapabilities, NumberOrString}; -use ra_ide_api::{Canceled, FeatureFlags, FileId, LibraryData}; +use ra_ide_api::{Canceled, FeatureFlags, FileId, LibraryData, SourceRootId}; use ra_prof::profile; use ra_vfs::VfsTask; +use relative_path::RelativePathBuf; use serde::{de::DeserializeOwned, Serialize}; use threadpool::ThreadPool; @@ -18,7 +19,6 @@ use crate::{ pending_requests::{PendingRequest, PendingRequests}, subscriptions::Subscriptions, }, - project_model::workspace_loader, req, world::{Options, WorldSnapshot, WorldState}, Result, ServerConfig, @@ -54,14 +54,17 @@ pub fn main_loop( connection: &Connection, ) -> Result<()> { log::info!("server_config: {:#?}", config); + // FIXME: support dynamic workspace loading. let workspaces = { - let ws_worker = workspace_loader(config.with_sysroot); let mut loaded_workspaces = Vec::new(); for ws_root in &ws_roots { - ws_worker.sender().send(ws_root.clone()).unwrap(); - match ws_worker.receiver().recv().unwrap() { - Ok(ws) => loaded_workspaces.push(ws), + let workspace = ra_project_model::ProjectWorkspace::discover_with_sysroot( + ws_root.as_path(), + config.with_sysroot, + ); + match workspace { + Ok(workspace) => loaded_workspaces.push(workspace), Err(e) => { log::error!("loading workspace failed: {}", e); @@ -75,11 +78,13 @@ pub fn main_loop( } loaded_workspaces }; + let globs = config .exclude_globs .iter() .map(|glob| ra_vfs_glob::Glob::new(glob)) .collect::, _>>()?; + let feature_flags = { let mut ff = FeatureFlags::default(); for (flag, value) in config.feature_flags { @@ -95,7 +100,8 @@ pub fn main_loop( ff }; log::info!("feature_flags: {:#?}", feature_flags); - let mut state = WorldState::new( + + let mut world_state = WorldState::new( ws_roots, workspaces, config.lru_capacity, @@ -113,31 +119,58 @@ pub fn main_loop( let pool = ThreadPool::new(THREADPOOL_SIZE); let (task_sender, task_receiver) = unbounded::(); - let mut pending_requests = PendingRequests::default(); + let (libdata_sender, libdata_receiver) = unbounded::(); + let mut loop_state = LoopState::default(); log::info!("server initialized, serving requests"); - let main_res = main_loop_inner( - &pool, - connection, - task_sender, - task_receiver.clone(), - &mut state, - &mut pending_requests, - ); + { + let task_sender = task_sender; + let libdata_sender = libdata_sender; + loop { + log::trace!("selecting"); + let event = select! { + recv(&connection.receiver) -> msg => match msg { + Ok(msg) => Event::Msg(msg), + Err(RecvError) => Err("client exited without shutdown")?, + }, + recv(task_receiver) -> task => Event::Task(task.unwrap()), + recv(world_state.task_receiver) -> task => match task { + Ok(task) => Event::Vfs(task), + Err(RecvError) => Err("vfs died")?, + }, + recv(libdata_receiver) -> data => Event::Lib(data.unwrap()) + }; + if let Event::Msg(Message::Request(req)) = &event { + if connection.handle_shutdown(&req)? { + break; + }; + } + loop_turn( + &pool, + &task_sender, + &libdata_sender, + connection, + &mut world_state, + &mut loop_state, + event, + )?; + } + } log::info!("waiting for tasks to finish..."); - task_receiver - .into_iter() - .for_each(|task| on_task(task, &connection.sender, &mut pending_requests, &mut state)); + task_receiver.into_iter().for_each(|task| { + on_task(task, &connection.sender, &mut loop_state.pending_requests, &mut world_state) + }); + libdata_receiver.into_iter().for_each(|lib| drop(lib)); log::info!("...tasks have finished"); log::info!("joining threadpool..."); drop(pool); log::info!("...threadpool has finished"); - let vfs = Arc::try_unwrap(state.vfs).expect("all snapshots should be dead"); + let vfs = Arc::try_unwrap(world_state.vfs).expect("all snapshots should be dead"); drop(vfs); - main_res + Ok(()) } #[derive(Debug)] @@ -192,121 +225,113 @@ impl fmt::Debug for Event { } } -fn main_loop_inner( +#[derive(Debug, Default)] +struct LoopState { + pending_requests: PendingRequests, + subscriptions: Subscriptions, + // We try not to index more than MAX_IN_FLIGHT_LIBS libraries at the same + // time to always have a thread ready to react to input. + in_flight_libraries: usize, + pending_libraries: Vec<(SourceRootId, Vec<(FileId, RelativePathBuf, Arc)>)>, + workspace_loaded: bool, +} + +fn loop_turn( pool: &ThreadPool, + task_sender: &Sender, + libdata_sender: &Sender, connection: &Connection, - task_sender: Sender, - task_receiver: Receiver, - state: &mut WorldState, - pending_requests: &mut PendingRequests, + world_state: &mut WorldState, + loop_state: &mut LoopState, + event: Event, ) -> Result<()> { - let mut subs = Subscriptions::default(); - // We try not to index more than MAX_IN_FLIGHT_LIBS libraries at the same - // time to always have a thread ready to react to input. - let mut in_flight_libraries = 0; - let mut pending_libraries = Vec::new(); - let mut send_workspace_notification = true; - - let (libdata_sender, libdata_receiver) = unbounded(); - loop { - log::trace!("selecting"); - let event = select! { - recv(&connection.receiver) -> msg => match msg { - Ok(msg) => Event::Msg(msg), - Err(RecvError) => Err("client exited without shutdown")?, - }, - recv(task_receiver) -> task => Event::Task(task.unwrap()), - recv(state.task_receiver) -> task => match task { - Ok(task) => Event::Vfs(task), - Err(RecvError) => Err("vfs died")?, - }, - recv(libdata_receiver) -> data => Event::Lib(data.unwrap()) - }; - let loop_start = Instant::now(); - - // NOTE: don't count blocking select! call as a loop-turn time - let _p = profile("main_loop_inner/loop-turn"); - log::info!("loop turn = {:?}", event); - let queue_count = pool.queued_count(); - if queue_count > 0 { - log::info!("queued count = {}", queue_count); - } + let loop_start = Instant::now(); + + // NOTE: don't count blocking select! call as a loop-turn time + let _p = profile("main_loop_inner/loop-turn"); + log::info!("loop turn = {:?}", event); + let queue_count = pool.queued_count(); + if queue_count > 0 { + log::info!("queued count = {}", queue_count); + } - let mut state_changed = false; - match event { - Event::Task(task) => { - on_task(task, &connection.sender, pending_requests, state); - state.maybe_collect_garbage(); - } - Event::Vfs(task) => { - state.vfs.write().handle_task(task); + let mut state_changed = false; + match event { + Event::Task(task) => { + on_task(task, &connection.sender, &mut loop_state.pending_requests, world_state); + world_state.maybe_collect_garbage(); + } + Event::Vfs(task) => { + world_state.vfs.write().handle_task(task); + state_changed = true; + } + Event::Lib(lib) => { + world_state.add_lib(lib); + world_state.maybe_collect_garbage(); + loop_state.in_flight_libraries -= 1; + } + Event::Msg(msg) => match msg { + Message::Request(req) => on_request( + world_state, + &mut loop_state.pending_requests, + pool, + task_sender, + &connection.sender, + loop_start, + req, + )?, + Message::Notification(not) => { + on_notification( + &connection.sender, + world_state, + &mut loop_state.pending_requests, + &mut loop_state.subscriptions, + not, + )?; state_changed = true; } - Event::Lib(lib) => { - state.add_lib(lib); - state.maybe_collect_garbage(); - in_flight_libraries -= 1; - } - Event::Msg(msg) => match msg { - Message::Request(req) => { - if connection.handle_shutdown(&req)? { - return Ok(()); - }; - on_request( - state, - pending_requests, - pool, - &task_sender, - &connection.sender, - loop_start, - req, - )? - } - Message::Notification(not) => { - on_notification(&connection.sender, state, pending_requests, &mut subs, not)?; - state_changed = true; - } - Message::Response(resp) => log::error!("unexpected response: {:?}", resp), - }, - }; + Message::Response(resp) => log::error!("unexpected response: {:?}", resp), + }, + }; - pending_libraries.extend(state.process_changes()); - while in_flight_libraries < MAX_IN_FLIGHT_LIBS && !pending_libraries.is_empty() { - let (root, files) = pending_libraries.pop().unwrap(); - in_flight_libraries += 1; - let sender = libdata_sender.clone(); - pool.execute(move || { - log::info!("indexing {:?} ... ", root); - let _p = profile(&format!("indexed {:?}", root)); - let data = LibraryData::prepare(root, files); - sender.send(data).unwrap(); - }); - } + loop_state.pending_libraries.extend(world_state.process_changes()); + while loop_state.in_flight_libraries < MAX_IN_FLIGHT_LIBS + && !loop_state.pending_libraries.is_empty() + { + let (root, files) = loop_state.pending_libraries.pop().unwrap(); + loop_state.in_flight_libraries += 1; + let sender = libdata_sender.clone(); + pool.execute(move || { + log::info!("indexing {:?} ... ", root); + let _p = profile(&format!("indexed {:?}", root)); + let data = LibraryData::prepare(root, files); + sender.send(data).unwrap(); + }); + } - if send_workspace_notification - && state.roots_to_scan == 0 - && pending_libraries.is_empty() - && in_flight_libraries == 0 - { - let n_packages: usize = state.workspaces.iter().map(|it| it.n_packages()).sum(); - if state.feature_flags().get("notifications.workspace-loaded") { - let msg = format!("workspace loaded, {} rust packages", n_packages); - show_message(req::MessageType::Info, msg, &connection.sender); - } - // Only send the notification first time - send_workspace_notification = false; + if !loop_state.workspace_loaded + && world_state.roots_to_scan == 0 + && loop_state.pending_libraries.is_empty() + && loop_state.in_flight_libraries == 0 + { + loop_state.workspace_loaded = true; + let n_packages: usize = world_state.workspaces.iter().map(|it| it.n_packages()).sum(); + if world_state.feature_flags().get("notifications.workspace-loaded") { + let msg = format!("workspace loaded, {} rust packages", n_packages); + show_message(req::MessageType::Info, msg, &connection.sender); } + } - if state_changed { - update_file_notifications_on_threadpool( - pool, - state.snapshot(), - state.options.publish_decorations, - task_sender.clone(), - subs.subscriptions(), - ) - } + if state_changed { + update_file_notifications_on_threadpool( + pool, + world_state.snapshot(), + world_state.options.publish_decorations, + task_sender.clone(), + loop_state.subscriptions.subscriptions(), + ) } + Ok(()) } fn on_task( -- cgit v1.2.3