From 7cc14a759699dd2503199116521e9ba65e3f1aa8 Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Fri, 30 Aug 2019 20:18:57 +0300 Subject: :arrow_up: lsp-server --- crates/ra_lsp_server/Cargo.toml | 3 +- crates/ra_lsp_server/src/main.rs | 74 +++++++++++------------ crates/ra_lsp_server/src/main_loop.rs | 29 ++++----- crates/ra_lsp_server/tests/heavy_tests/support.rs | 57 ++++++++--------- 4 files changed, 81 insertions(+), 82 deletions(-) (limited to 'crates/ra_lsp_server') diff --git a/crates/ra_lsp_server/Cargo.toml b/crates/ra_lsp_server/Cargo.toml index 084a20cd9..4f834519c 100644 --- a/crates/ra_lsp_server/Cargo.toml +++ b/crates/ra_lsp_server/Cargo.toml @@ -21,12 +21,13 @@ thread_worker = { path = "../thread_worker" } ra_syntax = { path = "../ra_syntax" } ra_text_edit = { path = "../ra_text_edit" } ra_ide_api = { path = "../ra_ide_api" } -lsp-server = "0.1.0" +lsp-server = "0.2.0" ra_project_model = { path = "../ra_project_model" } ra_prof = { path = "../ra_prof" } ra_vfs_glob = { path = "../ra_vfs_glob" } [dev-dependencies] +jod-thread = "0.1.0" tempfile = "3" test_utils = { path = "../test_utils" } diff --git a/crates/ra_lsp_server/src/main.rs b/crates/ra_lsp_server/src/main.rs index 1debe7660..88504bb89 100644 --- a/crates/ra_lsp_server/src/main.rs +++ b/crates/ra_lsp_server/src/main.rs @@ -1,5 +1,5 @@ use flexi_logger::{Duplicate, Logger}; -use lsp_server::{run_server, stdio_transport, LspServerError}; +use lsp_server::Connection; use ra_lsp_server::{show_message, Result, ServerConfig}; use ra_prof; @@ -29,46 +29,46 @@ fn main() -> Result<()> { } fn main_inner() -> Result<()> { - let (sender, receiver, io_threads) = stdio_transport(); let cwd = std::env::current_dir()?; - let caps = serde_json::to_value(ra_lsp_server::server_capabilities()).unwrap(); - run_server(caps, sender, receiver, |params, s, r| { - let params: lsp_types::InitializeParams = serde_json::from_value(params)?; - let root = params.root_uri.and_then(|it| it.to_file_path().ok()).unwrap_or(cwd); + let (connection, io_threads) = Connection::stdio(); + let server_capabilities = serde_json::to_value(ra_lsp_server::server_capabilities()).unwrap(); - let workspace_roots = params - .workspace_folders - .map(|workspaces| { - workspaces - .into_iter() - .filter_map(|it| it.uri.to_file_path().ok()) - .collect::>() - }) - .filter(|workspaces| !workspaces.is_empty()) - .unwrap_or_else(|| vec![root]); + let initialize_params = connection.initialize(server_capabilities)?; + let initialize_params: lsp_types::InitializeParams = serde_json::from_value(initialize_params)?; - let server_config: ServerConfig = params - .initialization_options - .and_then(|v| { - serde_json::from_value(v) - .map_err(|e| { - log::error!("failed to deserialize config: {}", e); - show_message( - lsp_types::MessageType::Error, - format!("failed to deserialize config: {}", e), - s, - ); - }) - .ok() - }) - .unwrap_or_default(); + let root = initialize_params.root_uri.and_then(|it| it.to_file_path().ok()).unwrap_or(cwd); + + let workspace_roots = initialize_params + .workspace_folders + .map(|workspaces| { + workspaces.into_iter().filter_map(|it| it.uri.to_file_path().ok()).collect::>() + }) + .filter(|workspaces| !workspaces.is_empty()) + .unwrap_or_else(|| vec![root]); + + let server_config: ServerConfig = initialize_params + .initialization_options + .and_then(|v| { + serde_json::from_value(v) + .map_err(|e| { + log::error!("failed to deserialize config: {}", e); + show_message( + lsp_types::MessageType::Error, + format!("failed to deserialize config: {}", e), + &connection.sender, + ); + }) + .ok() + }) + .unwrap_or_default(); + + ra_lsp_server::main_loop( + workspace_roots, + initialize_params.capabilities, + server_config, + &connection, + )?; - ra_lsp_server::main_loop(workspace_roots, params.capabilities, server_config, r, s) - }) - .map_err(|err| match err { - LspServerError::ProtocolError(err) => err.into(), - LspServerError::ServerError(err) => err, - })?; log::info!("shutting down IO..."); io_threads.join()?; log::info!("... IO is down"); diff --git a/crates/ra_lsp_server/src/main_loop.rs b/crates/ra_lsp_server/src/main_loop.rs index fb357b36b..42ebb5cdf 100644 --- a/crates/ra_lsp_server/src/main_loop.rs +++ b/crates/ra_lsp_server/src/main_loop.rs @@ -5,7 +5,7 @@ pub(crate) mod pending_requests; use std::{error::Error, fmt, path::PathBuf, sync::Arc, time::Instant}; use crossbeam_channel::{select, unbounded, Receiver, RecvError, Sender}; -use lsp_server::{handle_shutdown, ErrorCode, Message, Notification, Request, RequestId, Response}; +use lsp_server::{Connection, ErrorCode, Message, Notification, Request, RequestId, Response}; use lsp_types::{ClientCapabilities, NumberOrString}; use ra_ide_api::{Canceled, FeatureFlags, FileId, LibraryData}; use ra_prof::profile; @@ -51,8 +51,7 @@ pub fn main_loop( ws_roots: Vec, client_caps: ClientCapabilities, config: ServerConfig, - msg_receiver: &Receiver, - msg_sender: &Sender, + connection: &Connection, ) -> Result<()> { log::info!("server_config: {:#?}", config); // FIXME: support dynamic workspace loading. @@ -69,7 +68,7 @@ pub fn main_loop( show_message( req::MessageType::Error, format!("rust-analyzer failed to load workspace: {}", e), - msg_sender, + &connection.sender, ); } } @@ -89,7 +88,7 @@ pub fn main_loop( show_message( req::MessageType::Error, format!("unknown feature flag: {:?}", flag), - msg_sender, + &connection.sender, ); } } @@ -119,8 +118,7 @@ pub fn main_loop( log::info!("server initialized, serving requests"); let main_res = main_loop_inner( &pool, - msg_sender, - msg_receiver, + connection, task_sender, task_receiver.clone(), &mut state, @@ -130,7 +128,7 @@ pub fn main_loop( log::info!("waiting for tasks to finish..."); task_receiver .into_iter() - .for_each(|task| on_task(task, msg_sender, &mut pending_requests, &mut state)); + .for_each(|task| on_task(task, &connection.sender, &mut pending_requests, &mut state)); log::info!("...tasks have finished"); log::info!("joining threadpool..."); drop(pool); @@ -196,8 +194,7 @@ impl fmt::Debug for Event { fn main_loop_inner( pool: &ThreadPool, - msg_sender: &Sender, - msg_receiver: &Receiver, + connection: &Connection, task_sender: Sender, task_receiver: Receiver, state: &mut WorldState, @@ -214,7 +211,7 @@ fn main_loop_inner( loop { log::trace!("selecting"); let event = select! { - recv(msg_receiver) -> msg => match msg { + recv(&connection.receiver) -> msg => match msg { Ok(msg) => Event::Msg(msg), Err(RecvError) => Err("client exited without shutdown")?, }, @@ -238,7 +235,7 @@ fn main_loop_inner( let mut state_changed = false; match event { Event::Task(task) => { - on_task(task, msg_sender, pending_requests, state); + on_task(task, &connection.sender, pending_requests, state); state.maybe_collect_garbage(); } Event::Vfs(task) => { @@ -252,7 +249,7 @@ fn main_loop_inner( } Event::Msg(msg) => match msg { Message::Request(req) => { - if handle_shutdown(&req, msg_sender) { + if connection.handle_shutdown(&req)? { return Ok(()); }; on_request( @@ -260,13 +257,13 @@ fn main_loop_inner( pending_requests, pool, &task_sender, - msg_sender, + &connection.sender, loop_start, req, )? } Message::Notification(not) => { - on_notification(msg_sender, state, pending_requests, &mut subs, not)?; + on_notification(&connection.sender, state, pending_requests, &mut subs, not)?; state_changed = true; } Message::Response(resp) => log::error!("unexpected response: {:?}", resp), @@ -294,7 +291,7 @@ fn main_loop_inner( let n_packages: usize = state.workspaces.iter().map(|it| it.n_packages()).sum(); if state.feature_flags().get("notifications.workspace-loaded") { let msg = format!("workspace loaded, {} rust packages", n_packages); - show_message(req::MessageType::Info, msg, msg_sender); + show_message(req::MessageType::Info, msg, &connection.sender); } // Only send the notification first time send_workspace_notification = false; diff --git a/crates/ra_lsp_server/tests/heavy_tests/support.rs b/crates/ra_lsp_server/tests/heavy_tests/support.rs index 45b4cacf6..89f65cef4 100644 --- a/crates/ra_lsp_server/tests/heavy_tests/support.rs +++ b/crates/ra_lsp_server/tests/heavy_tests/support.rs @@ -8,16 +8,17 @@ use std::{ use crossbeam_channel::{after, select, Receiver}; use flexi_logger::Logger; -use lsp_server::{Message, Notification, Request}; +use lsp_server::{Connection, Message, Notification, Request}; use lsp_types::{ - request::Shutdown, ClientCapabilities, DidOpenTextDocumentParams, GotoCapability, - TextDocumentClientCapabilities, TextDocumentIdentifier, TextDocumentItem, Url, + notification::{DidOpenTextDocument, Exit}, + request::Shutdown, + ClientCapabilities, DidOpenTextDocumentParams, GotoCapability, TextDocumentClientCapabilities, + TextDocumentIdentifier, TextDocumentItem, Url, }; use serde::Serialize; use serde_json::{to_string_pretty, Value}; use tempfile::TempDir; use test_utils::{find_mismatch, parse_fixture}; -use thread_worker::Worker; use ra_lsp_server::{main_loop, req, ServerConfig}; @@ -83,7 +84,8 @@ pub struct Server { req_id: Cell, messages: RefCell>, dir: TempDir, - worker: Worker, + _thread: jod_thread::JoinHandle<()>, + client: Connection, } impl Server { @@ -96,11 +98,11 @@ impl Server { let path = dir.path().to_path_buf(); let roots = if roots.is_empty() { vec![path] } else { roots }; + let (connection, client) = Connection::memory(); - let worker = Worker::::spawn( - "test server", - 128, - move |msg_receiver, msg_sender| { + let _thread = jod_thread::Builder::new() + .name("test server".to_string()) + .spawn(move || { main_loop( roots, ClientCapabilities { @@ -116,26 +118,24 @@ impl Server { experimental: None, }, ServerConfig { with_sysroot, ..ServerConfig::default() }, - &msg_receiver, - &msg_sender, + &connection, ) .unwrap() - }, - ); - let res = Server { req_id: Cell::new(1), dir, messages: Default::default(), worker }; + }) + .expect("failed to spawn a thread"); + + let res = + Server { req_id: Cell::new(1), dir, messages: Default::default(), client, _thread }; for (path, text) in files { - res.send_notification(Notification::new( - "textDocument/didOpen".to_string(), - &DidOpenTextDocumentParams { - text_document: TextDocumentItem { - uri: Url::from_file_path(path).unwrap(), - language_id: "rust".to_string(), - version: 0, - text, - }, + res.notification::(DidOpenTextDocumentParams { + text_document: TextDocumentItem { + uri: Url::from_file_path(path).unwrap(), + language_id: "rust".to_string(), + version: 0, + text, }, - )) + }) } res } @@ -184,7 +184,7 @@ impl Server { } fn send_request_(&self, r: Request) -> Value { let id = r.id.clone(); - self.worker.sender().send(r.into()).unwrap(); + self.client.sender.send(r.into()).unwrap(); while let Some(msg) = self.recv() { match msg { Message::Request(req) => panic!("unexpected request: {:?}", req), @@ -225,13 +225,13 @@ impl Server { } } fn recv(&self) -> Option { - recv_timeout(&self.worker.receiver()).map(|msg| { + recv_timeout(&self.client.receiver).map(|msg| { self.messages.borrow_mut().push(msg.clone()); msg }) } fn send_notification(&self, not: Notification) { - self.worker.sender().send(Message::Notification(not)).unwrap(); + self.client.sender.send(Message::Notification(not)).unwrap(); } pub fn path(&self) -> &Path { @@ -241,7 +241,8 @@ impl Server { impl Drop for Server { fn drop(&mut self) { - self.send_request::(()); + self.request::((), Value::Null); + self.notification::(()); } } -- cgit v1.2.3 From 983de30a567f2cb4d9e28e12702e509ca713da62 Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Fri, 30 Aug 2019 20:23:28 +0300 Subject: inline thread-worker --- crates/ra_lsp_server/Cargo.toml | 4 +-- crates/ra_lsp_server/src/lib.rs | 1 + crates/ra_lsp_server/src/project_model.rs | 4 +-- crates/ra_lsp_server/src/thread_worker.rs | 49 +++++++++++++++++++++++++++++++ 4 files changed, 52 insertions(+), 6 deletions(-) create mode 100644 crates/ra_lsp_server/src/thread_worker.rs (limited to 'crates/ra_lsp_server') diff --git a/crates/ra_lsp_server/Cargo.toml b/crates/ra_lsp_server/Cargo.toml index 4f834519c..eb4812633 100644 --- a/crates/ra_lsp_server/Cargo.toml +++ b/crates/ra_lsp_server/Cargo.toml @@ -15,9 +15,8 @@ log = "0.4.3" lsp-types = { version = "0.60.0", features = ["proposed"] } rustc-hash = "1.0" parking_lot = "0.9.0" - +jod-thread = "0.1.0" ra_vfs = "0.3.0" -thread_worker = { path = "../thread_worker" } ra_syntax = { path = "../ra_syntax" } ra_text_edit = { path = "../ra_text_edit" } ra_ide_api = { path = "../ra_ide_api" } @@ -27,7 +26,6 @@ ra_prof = { path = "../ra_prof" } ra_vfs_glob = { path = "../ra_vfs_glob" } [dev-dependencies] -jod-thread = "0.1.0" tempfile = "3" test_utils = { path = "../test_utils" } diff --git a/crates/ra_lsp_server/src/lib.rs b/crates/ra_lsp_server/src/lib.rs index 2c5d7c72d..69a577b3e 100644 --- a/crates/ra_lsp_server/src/lib.rs +++ b/crates/ra_lsp_server/src/lib.rs @@ -8,6 +8,7 @@ mod project_model; pub mod req; pub mod config; mod world; +mod thread_worker; pub type Result = std::result::Result>; pub use crate::{ diff --git a/crates/ra_lsp_server/src/project_model.rs b/crates/ra_lsp_server/src/project_model.rs index ad59cde64..6234563f2 100644 --- a/crates/ra_lsp_server/src/project_model.rs +++ b/crates/ra_lsp_server/src/project_model.rs @@ -1,8 +1,6 @@ use std::path::PathBuf; -use thread_worker::Worker; - -use crate::Result; +use crate::{thread_worker::Worker, Result}; pub use ra_project_model::{ CargoWorkspace, Package, ProjectWorkspace, Sysroot, Target, TargetKind, diff --git a/crates/ra_lsp_server/src/thread_worker.rs b/crates/ra_lsp_server/src/thread_worker.rs new file mode 100644 index 000000000..68e5c124d --- /dev/null +++ b/crates/ra_lsp_server/src/thread_worker.rs @@ -0,0 +1,49 @@ +//! Small utility to correctly spawn crossbeam-channel based worker threads. + +use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; + +/// A wrapper around event-processing thread with automatic shutdown semantics. +pub struct Worker { + // XXX: field order is significant here. + // + // In Rust, fields are dropped in the declaration order, and we rely on this + // here. We must close input first, so that the `thread` (who holds the + // opposite side of the channel) noticed shutdown. Then, we must join the + // thread, but we must keep out alive so that the thread does not panic. + // + // Note that a potential problem here is that we might drop some messages + // from receiver on the floor. This is ok for rust-analyzer: we have only a + // single client, so, if we are shutting down, nobody is interested in the + // unfinished work anyway! + sender: Sender, + _thread: jod_thread::JoinHandle<()>, + receiver: Receiver, +} + +impl Worker { + pub fn spawn(name: &'static str, buf: usize, f: F) -> Worker + where + F: FnOnce(Receiver, Sender) + Send + 'static, + I: Send + 'static, + O: Send + 'static, + { + // Set up worker channels in a deadlock-avoiding way. If one sets both input + // and output buffers to a fixed size, a worker might get stuck. + let (sender, input_receiver) = bounded::(buf); + let (output_sender, receiver) = unbounded::(); + let _thread = jod_thread::Builder::new() + .name(name.to_string()) + .spawn(move || f(input_receiver, output_sender)) + .expect("failed to spawn a thread"); + Worker { sender, _thread, receiver } + } +} + +impl Worker { + pub fn sender(&self) -> &Sender { + &self.sender + } + pub fn receiver(&self) -> &Receiver { + &self.receiver + } +} -- cgit v1.2.3