From 4e3f5cc7293d97aef4630ea30b8e9ad6931589a8 Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Sat, 31 Aug 2019 14:47:37 +0300 Subject: cleanup main loop --- crates/ra_lsp_server/src/cargo_target_spec.rs | 13 +- crates/ra_lsp_server/src/lib.rs | 2 - crates/ra_lsp_server/src/main.rs | 26 +- crates/ra_lsp_server/src/main_loop.rs | 277 +++++++++++---------- .../ra_lsp_server/src/main_loop/subscriptions.rs | 2 +- crates/ra_lsp_server/src/project_model.rs | 21 -- crates/ra_lsp_server/src/thread_worker.rs | 49 ---- crates/ra_lsp_server/src/world.rs | 3 +- 8 files changed, 172 insertions(+), 221 deletions(-) delete mode 100644 crates/ra_lsp_server/src/project_model.rs delete mode 100644 crates/ra_lsp_server/src/thread_worker.rs (limited to 'crates/ra_lsp_server') diff --git a/crates/ra_lsp_server/src/cargo_target_spec.rs b/crates/ra_lsp_server/src/cargo_target_spec.rs index 050c5fd95..a083bb311 100644 --- a/crates/ra_lsp_server/src/cargo_target_spec.rs +++ b/crates/ra_lsp_server/src/cargo_target_spec.rs @@ -1,10 +1,7 @@ -use crate::{ - project_model::{self, TargetKind}, - world::WorldSnapshot, - Result, -}; - use ra_ide_api::{FileId, RunnableKind}; +use ra_project_model::{self, ProjectWorkspace, TargetKind}; + +use crate::{world::WorldSnapshot, Result}; pub(crate) fn runnable_args( world: &WorldSnapshot, @@ -66,7 +63,7 @@ impl CargoTargetSpec { let file_id = world.analysis().crate_root(crate_id)?; let path = world.vfs.read().file2path(ra_vfs::VfsFile(file_id.0)); let res = world.workspaces.iter().find_map(|ws| match ws { - project_model::ProjectWorkspace::Cargo { cargo, .. } => { + ProjectWorkspace::Cargo { cargo, .. } => { let tgt = cargo.target_by_root(&path)?; Some(CargoTargetSpec { package: tgt.package(&cargo).name(&cargo).to_string(), @@ -74,7 +71,7 @@ impl CargoTargetSpec { target_kind: tgt.kind(&cargo), }) } - project_model::ProjectWorkspace::Json { .. } => None, + ProjectWorkspace::Json { .. } => None, }); Ok(res) } diff --git a/crates/ra_lsp_server/src/lib.rs b/crates/ra_lsp_server/src/lib.rs index 69a577b3e..fa3d88abd 100644 --- a/crates/ra_lsp_server/src/lib.rs +++ b/crates/ra_lsp_server/src/lib.rs @@ -4,11 +4,9 @@ mod cargo_target_spec; mod conv; mod main_loop; mod markdown; -mod project_model; pub mod req; pub mod config; mod world; -mod thread_worker; pub type Result = std::result::Result>; pub use crate::{ diff --git a/crates/ra_lsp_server/src/main.rs b/crates/ra_lsp_server/src/main.rs index 88504bb89..d40fed947 100644 --- a/crates/ra_lsp_server/src/main.rs +++ b/crates/ra_lsp_server/src/main.rs @@ -5,37 +5,37 @@ use ra_lsp_server::{show_message, Result, ServerConfig}; use ra_prof; fn main() -> Result<()> { + setup_logging()?; + run_server()?; + Ok(()) +} + +fn setup_logging() -> Result<()> { std::env::set_var("RUST_BACKTRACE", "short"); + let logger = Logger::with_env_or_str("error").duplicate_to_stderr(Duplicate::All); match std::env::var("RA_LOG_DIR") { Ok(ref v) if v == "1" => logger.log_to_file().directory("log").start()?, _ => logger.start()?, }; + ra_prof::set_filter(match std::env::var("RA_PROFILE") { Ok(spec) => ra_prof::Filter::from_spec(&spec), Err(_) => ra_prof::Filter::disabled(), }); - log::info!("lifecycle: server started"); - match std::panic::catch_unwind(main_inner) { - Ok(res) => { - log::info!("lifecycle: terminating process with {:?}", res); - res - } - Err(_) => { - log::error!("server panicked"); - Err("server panicked")? - } - } + Ok(()) } -fn main_inner() -> Result<()> { - let cwd = std::env::current_dir()?; +fn run_server() -> Result<()> { + log::info!("lifecycle: server started"); + let (connection, io_threads) = Connection::stdio(); let server_capabilities = serde_json::to_value(ra_lsp_server::server_capabilities()).unwrap(); let initialize_params = connection.initialize(server_capabilities)?; let initialize_params: lsp_types::InitializeParams = serde_json::from_value(initialize_params)?; + let cwd = std::env::current_dir()?; let root = initialize_params.root_uri.and_then(|it| it.to_file_path().ok()).unwrap_or(cwd); let workspace_roots = initialize_params diff --git a/crates/ra_lsp_server/src/main_loop.rs b/crates/ra_lsp_server/src/main_loop.rs index 42ebb5cdf..80f0216e8 100644 --- a/crates/ra_lsp_server/src/main_loop.rs +++ b/crates/ra_lsp_server/src/main_loop.rs @@ -4,12 +4,13 @@ pub(crate) mod pending_requests; use std::{error::Error, fmt, path::PathBuf, sync::Arc, time::Instant}; -use crossbeam_channel::{select, unbounded, Receiver, RecvError, Sender}; +use crossbeam_channel::{select, unbounded, RecvError, Sender}; use lsp_server::{Connection, ErrorCode, Message, Notification, Request, RequestId, Response}; use lsp_types::{ClientCapabilities, NumberOrString}; -use ra_ide_api::{Canceled, FeatureFlags, FileId, LibraryData}; +use ra_ide_api::{Canceled, FeatureFlags, FileId, LibraryData, SourceRootId}; use ra_prof::profile; use ra_vfs::VfsTask; +use relative_path::RelativePathBuf; use serde::{de::DeserializeOwned, Serialize}; use threadpool::ThreadPool; @@ -18,7 +19,6 @@ use crate::{ pending_requests::{PendingRequest, PendingRequests}, subscriptions::Subscriptions, }, - project_model::workspace_loader, req, world::{Options, WorldSnapshot, WorldState}, Result, ServerConfig, @@ -54,14 +54,17 @@ pub fn main_loop( connection: &Connection, ) -> Result<()> { log::info!("server_config: {:#?}", config); + // FIXME: support dynamic workspace loading. let workspaces = { - let ws_worker = workspace_loader(config.with_sysroot); let mut loaded_workspaces = Vec::new(); for ws_root in &ws_roots { - ws_worker.sender().send(ws_root.clone()).unwrap(); - match ws_worker.receiver().recv().unwrap() { - Ok(ws) => loaded_workspaces.push(ws), + let workspace = ra_project_model::ProjectWorkspace::discover_with_sysroot( + ws_root.as_path(), + config.with_sysroot, + ); + match workspace { + Ok(workspace) => loaded_workspaces.push(workspace), Err(e) => { log::error!("loading workspace failed: {}", e); @@ -75,11 +78,13 @@ pub fn main_loop( } loaded_workspaces }; + let globs = config .exclude_globs .iter() .map(|glob| ra_vfs_glob::Glob::new(glob)) .collect::, _>>()?; + let feature_flags = { let mut ff = FeatureFlags::default(); for (flag, value) in config.feature_flags { @@ -95,7 +100,8 @@ pub fn main_loop( ff }; log::info!("feature_flags: {:#?}", feature_flags); - let mut state = WorldState::new( + + let mut world_state = WorldState::new( ws_roots, workspaces, config.lru_capacity, @@ -113,31 +119,58 @@ pub fn main_loop( let pool = ThreadPool::new(THREADPOOL_SIZE); let (task_sender, task_receiver) = unbounded::(); - let mut pending_requests = PendingRequests::default(); + let (libdata_sender, libdata_receiver) = unbounded::(); + let mut loop_state = LoopState::default(); log::info!("server initialized, serving requests"); - let main_res = main_loop_inner( - &pool, - connection, - task_sender, - task_receiver.clone(), - &mut state, - &mut pending_requests, - ); + { + let task_sender = task_sender; + let libdata_sender = libdata_sender; + loop { + log::trace!("selecting"); + let event = select! { + recv(&connection.receiver) -> msg => match msg { + Ok(msg) => Event::Msg(msg), + Err(RecvError) => Err("client exited without shutdown")?, + }, + recv(task_receiver) -> task => Event::Task(task.unwrap()), + recv(world_state.task_receiver) -> task => match task { + Ok(task) => Event::Vfs(task), + Err(RecvError) => Err("vfs died")?, + }, + recv(libdata_receiver) -> data => Event::Lib(data.unwrap()) + }; + if let Event::Msg(Message::Request(req)) = &event { + if connection.handle_shutdown(&req)? { + break; + }; + } + loop_turn( + &pool, + &task_sender, + &libdata_sender, + connection, + &mut world_state, + &mut loop_state, + event, + )?; + } + } log::info!("waiting for tasks to finish..."); - task_receiver - .into_iter() - .for_each(|task| on_task(task, &connection.sender, &mut pending_requests, &mut state)); + task_receiver.into_iter().for_each(|task| { + on_task(task, &connection.sender, &mut loop_state.pending_requests, &mut world_state) + }); + libdata_receiver.into_iter().for_each(|lib| drop(lib)); log::info!("...tasks have finished"); log::info!("joining threadpool..."); drop(pool); log::info!("...threadpool has finished"); - let vfs = Arc::try_unwrap(state.vfs).expect("all snapshots should be dead"); + let vfs = Arc::try_unwrap(world_state.vfs).expect("all snapshots should be dead"); drop(vfs); - main_res + Ok(()) } #[derive(Debug)] @@ -192,121 +225,113 @@ impl fmt::Debug for Event { } } -fn main_loop_inner( +#[derive(Debug, Default)] +struct LoopState { + pending_requests: PendingRequests, + subscriptions: Subscriptions, + // We try not to index more than MAX_IN_FLIGHT_LIBS libraries at the same + // time to always have a thread ready to react to input. + in_flight_libraries: usize, + pending_libraries: Vec<(SourceRootId, Vec<(FileId, RelativePathBuf, Arc)>)>, + workspace_loaded: bool, +} + +fn loop_turn( pool: &ThreadPool, + task_sender: &Sender, + libdata_sender: &Sender, connection: &Connection, - task_sender: Sender, - task_receiver: Receiver, - state: &mut WorldState, - pending_requests: &mut PendingRequests, + world_state: &mut WorldState, + loop_state: &mut LoopState, + event: Event, ) -> Result<()> { - let mut subs = Subscriptions::default(); - // We try not to index more than MAX_IN_FLIGHT_LIBS libraries at the same - // time to always have a thread ready to react to input. - let mut in_flight_libraries = 0; - let mut pending_libraries = Vec::new(); - let mut send_workspace_notification = true; - - let (libdata_sender, libdata_receiver) = unbounded(); - loop { - log::trace!("selecting"); - let event = select! { - recv(&connection.receiver) -> msg => match msg { - Ok(msg) => Event::Msg(msg), - Err(RecvError) => Err("client exited without shutdown")?, - }, - recv(task_receiver) -> task => Event::Task(task.unwrap()), - recv(state.task_receiver) -> task => match task { - Ok(task) => Event::Vfs(task), - Err(RecvError) => Err("vfs died")?, - }, - recv(libdata_receiver) -> data => Event::Lib(data.unwrap()) - }; - let loop_start = Instant::now(); - - // NOTE: don't count blocking select! call as a loop-turn time - let _p = profile("main_loop_inner/loop-turn"); - log::info!("loop turn = {:?}", event); - let queue_count = pool.queued_count(); - if queue_count > 0 { - log::info!("queued count = {}", queue_count); - } + let loop_start = Instant::now(); + + // NOTE: don't count blocking select! call as a loop-turn time + let _p = profile("main_loop_inner/loop-turn"); + log::info!("loop turn = {:?}", event); + let queue_count = pool.queued_count(); + if queue_count > 0 { + log::info!("queued count = {}", queue_count); + } - let mut state_changed = false; - match event { - Event::Task(task) => { - on_task(task, &connection.sender, pending_requests, state); - state.maybe_collect_garbage(); - } - Event::Vfs(task) => { - state.vfs.write().handle_task(task); + let mut state_changed = false; + match event { + Event::Task(task) => { + on_task(task, &connection.sender, &mut loop_state.pending_requests, world_state); + world_state.maybe_collect_garbage(); + } + Event::Vfs(task) => { + world_state.vfs.write().handle_task(task); + state_changed = true; + } + Event::Lib(lib) => { + world_state.add_lib(lib); + world_state.maybe_collect_garbage(); + loop_state.in_flight_libraries -= 1; + } + Event::Msg(msg) => match msg { + Message::Request(req) => on_request( + world_state, + &mut loop_state.pending_requests, + pool, + task_sender, + &connection.sender, + loop_start, + req, + )?, + Message::Notification(not) => { + on_notification( + &connection.sender, + world_state, + &mut loop_state.pending_requests, + &mut loop_state.subscriptions, + not, + )?; state_changed = true; } - Event::Lib(lib) => { - state.add_lib(lib); - state.maybe_collect_garbage(); - in_flight_libraries -= 1; - } - Event::Msg(msg) => match msg { - Message::Request(req) => { - if connection.handle_shutdown(&req)? { - return Ok(()); - }; - on_request( - state, - pending_requests, - pool, - &task_sender, - &connection.sender, - loop_start, - req, - )? - } - Message::Notification(not) => { - on_notification(&connection.sender, state, pending_requests, &mut subs, not)?; - state_changed = true; - } - Message::Response(resp) => log::error!("unexpected response: {:?}", resp), - }, - }; + Message::Response(resp) => log::error!("unexpected response: {:?}", resp), + }, + }; - pending_libraries.extend(state.process_changes()); - while in_flight_libraries < MAX_IN_FLIGHT_LIBS && !pending_libraries.is_empty() { - let (root, files) = pending_libraries.pop().unwrap(); - in_flight_libraries += 1; - let sender = libdata_sender.clone(); - pool.execute(move || { - log::info!("indexing {:?} ... ", root); - let _p = profile(&format!("indexed {:?}", root)); - let data = LibraryData::prepare(root, files); - sender.send(data).unwrap(); - }); - } + loop_state.pending_libraries.extend(world_state.process_changes()); + while loop_state.in_flight_libraries < MAX_IN_FLIGHT_LIBS + && !loop_state.pending_libraries.is_empty() + { + let (root, files) = loop_state.pending_libraries.pop().unwrap(); + loop_state.in_flight_libraries += 1; + let sender = libdata_sender.clone(); + pool.execute(move || { + log::info!("indexing {:?} ... ", root); + let _p = profile(&format!("indexed {:?}", root)); + let data = LibraryData::prepare(root, files); + sender.send(data).unwrap(); + }); + } - if send_workspace_notification - && state.roots_to_scan == 0 - && pending_libraries.is_empty() - && in_flight_libraries == 0 - { - let n_packages: usize = state.workspaces.iter().map(|it| it.n_packages()).sum(); - if state.feature_flags().get("notifications.workspace-loaded") { - let msg = format!("workspace loaded, {} rust packages", n_packages); - show_message(req::MessageType::Info, msg, &connection.sender); - } - // Only send the notification first time - send_workspace_notification = false; + if !loop_state.workspace_loaded + && world_state.roots_to_scan == 0 + && loop_state.pending_libraries.is_empty() + && loop_state.in_flight_libraries == 0 + { + loop_state.workspace_loaded = true; + let n_packages: usize = world_state.workspaces.iter().map(|it| it.n_packages()).sum(); + if world_state.feature_flags().get("notifications.workspace-loaded") { + let msg = format!("workspace loaded, {} rust packages", n_packages); + show_message(req::MessageType::Info, msg, &connection.sender); } + } - if state_changed { - update_file_notifications_on_threadpool( - pool, - state.snapshot(), - state.options.publish_decorations, - task_sender.clone(), - subs.subscriptions(), - ) - } + if state_changed { + update_file_notifications_on_threadpool( + pool, + world_state.snapshot(), + world_state.options.publish_decorations, + task_sender.clone(), + loop_state.subscriptions.subscriptions(), + ) } + Ok(()) } fn on_task( diff --git a/crates/ra_lsp_server/src/main_loop/subscriptions.rs b/crates/ra_lsp_server/src/main_loop/subscriptions.rs index 470bc1205..bbeda723c 100644 --- a/crates/ra_lsp_server/src/main_loop/subscriptions.rs +++ b/crates/ra_lsp_server/src/main_loop/subscriptions.rs @@ -1,7 +1,7 @@ use ra_ide_api::FileId; use rustc_hash::FxHashSet; -#[derive(Default)] +#[derive(Default, Debug)] pub(crate) struct Subscriptions { subs: FxHashSet, } diff --git a/crates/ra_lsp_server/src/project_model.rs b/crates/ra_lsp_server/src/project_model.rs deleted file mode 100644 index 6234563f2..000000000 --- a/crates/ra_lsp_server/src/project_model.rs +++ /dev/null @@ -1,21 +0,0 @@ -use std::path::PathBuf; - -use crate::{thread_worker::Worker, Result}; - -pub use ra_project_model::{ - CargoWorkspace, Package, ProjectWorkspace, Sysroot, Target, TargetKind, -}; - -pub fn workspace_loader(with_sysroot: bool) -> Worker> { - Worker::>::spawn( - "workspace loader", - 1, - move |input_receiver, output_sender| { - input_receiver - .into_iter() - .map(|path| ProjectWorkspace::discover_with_sysroot(path.as_path(), with_sysroot)) - .try_for_each(|it| output_sender.send(it)) - .unwrap() - }, - ) -} diff --git a/crates/ra_lsp_server/src/thread_worker.rs b/crates/ra_lsp_server/src/thread_worker.rs deleted file mode 100644 index 68e5c124d..000000000 --- a/crates/ra_lsp_server/src/thread_worker.rs +++ /dev/null @@ -1,49 +0,0 @@ -//! Small utility to correctly spawn crossbeam-channel based worker threads. - -use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; - -/// A wrapper around event-processing thread with automatic shutdown semantics. -pub struct Worker { - // XXX: field order is significant here. - // - // In Rust, fields are dropped in the declaration order, and we rely on this - // here. We must close input first, so that the `thread` (who holds the - // opposite side of the channel) noticed shutdown. Then, we must join the - // thread, but we must keep out alive so that the thread does not panic. - // - // Note that a potential problem here is that we might drop some messages - // from receiver on the floor. This is ok for rust-analyzer: we have only a - // single client, so, if we are shutting down, nobody is interested in the - // unfinished work anyway! - sender: Sender, - _thread: jod_thread::JoinHandle<()>, - receiver: Receiver, -} - -impl Worker { - pub fn spawn(name: &'static str, buf: usize, f: F) -> Worker - where - F: FnOnce(Receiver, Sender) + Send + 'static, - I: Send + 'static, - O: Send + 'static, - { - // Set up worker channels in a deadlock-avoiding way. If one sets both input - // and output buffers to a fixed size, a worker might get stuck. - let (sender, input_receiver) = bounded::(buf); - let (output_sender, receiver) = unbounded::(); - let _thread = jod_thread::Builder::new() - .name(name.to_string()) - .spawn(move || f(input_receiver, output_sender)) - .expect("failed to spawn a thread"); - Worker { sender, _thread, receiver } - } -} - -impl Worker { - pub fn sender(&self) -> &Sender { - &self.sender - } - pub fn receiver(&self) -> &Receiver { - &self.receiver - } -} diff --git a/crates/ra_lsp_server/src/world.rs b/crates/ra_lsp_server/src/world.rs index 73d7f8fb9..e1c5c3343 100644 --- a/crates/ra_lsp_server/src/world.rs +++ b/crates/ra_lsp_server/src/world.rs @@ -11,13 +11,13 @@ use ra_ide_api::{ Analysis, AnalysisChange, AnalysisHost, CrateGraph, FeatureFlags, FileId, LibraryData, SourceRootId, }; +use ra_project_model::ProjectWorkspace; use ra_vfs::{LineEndings, RootEntry, Vfs, VfsChange, VfsFile, VfsRoot, VfsTask}; use ra_vfs_glob::{Glob, RustPackageFilterBuilder}; use relative_path::RelativePathBuf; use crate::{ main_loop::pending_requests::{CompletedRequest, LatestRequests}, - project_model::ProjectWorkspace, LspError, Result, }; @@ -35,6 +35,7 @@ pub struct Options { #[derive(Debug)] pub struct WorldState { pub options: Options, + //FIXME: this belongs to `LoopState` rather than to `WorldState` pub roots_to_scan: usize, pub roots: Vec, pub workspaces: Arc>, -- cgit v1.2.3