From 22098127c4f4b7414f0695c7788f07d0a1c43892 Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Thu, 25 Jun 2020 17:22:18 +0200 Subject: Move request dispatcher to a separate file --- crates/rust-analyzer/src/dispatch.rs | 137 ++++++++++++++++++++++++++++++++++ crates/rust-analyzer/src/lib.rs | 1 + crates/rust-analyzer/src/main_loop.rs | 137 ++-------------------------------- 3 files changed, 146 insertions(+), 129 deletions(-) create mode 100644 crates/rust-analyzer/src/dispatch.rs (limited to 'crates') diff --git a/crates/rust-analyzer/src/dispatch.rs b/crates/rust-analyzer/src/dispatch.rs new file mode 100644 index 000000000..0a9b0428d --- /dev/null +++ b/crates/rust-analyzer/src/dispatch.rs @@ -0,0 +1,137 @@ +use std::{panic, time::Instant}; + +use serde::{de::DeserializeOwned, Serialize}; + +use crate::{ + global_state::{GlobalState, GlobalStateSnapshot}, + lsp_utils::is_canceled, + main_loop::Task, + LspError, Result, +}; + +pub(crate) struct RequestDispatcher<'a> { + pub(crate) req: Option, + pub(crate) global_state: &'a mut GlobalState, + pub(crate) request_received: Instant, +} + +impl<'a> RequestDispatcher<'a> { + /// Dispatches the request onto the current thread + pub(crate) fn on_sync( + &mut self, + f: fn(&mut GlobalState, R::Params) -> Result, + ) -> Result<&mut Self> + where + R: lsp_types::request::Request + 'static, + R::Params: DeserializeOwned + panic::UnwindSafe + 'static, + R::Result: Serialize + 'static, + { + let (id, params) = match self.parse::() { + Some(it) => it, + None => { + return Ok(self); + } + }; + let world = panic::AssertUnwindSafe(&mut *self.global_state); + let task = panic::catch_unwind(move || { + let result = f(world.0, params); + result_to_task::(id, result) + }) + .map_err(|_| format!("sync task {:?} panicked", R::METHOD))?; + self.global_state.on_task(task); + Ok(self) + } + + /// Dispatches the request onto thread pool + pub(crate) fn on( + &mut self, + f: fn(GlobalStateSnapshot, R::Params) -> Result, + ) -> Result<&mut Self> + where + R: lsp_types::request::Request + 'static, + R::Params: DeserializeOwned + Send + 'static, + R::Result: Serialize + 'static, + { + let (id, params) = match self.parse::() { + Some(it) => it, + None => { + return Ok(self); + } + }; + + self.global_state.task_pool.0.spawn({ + let world = self.global_state.snapshot(); + move || { + let result = f(world, params); + result_to_task::(id, result) + } + }); + + Ok(self) + } + + pub(crate) fn finish(&mut self) { + match self.req.take() { + None => (), + Some(req) => { + log::error!("unknown request: {:?}", req); + let resp = lsp_server::Response::new_err( + req.id, + lsp_server::ErrorCode::MethodNotFound as i32, + "unknown request".to_string(), + ); + self.global_state.send(resp.into()); + } + } + } + + fn parse(&mut self) -> Option<(lsp_server::RequestId, R::Params)> + where + R: lsp_types::request::Request + 'static, + R::Params: DeserializeOwned + 'static, + { + let req = self.req.take()?; + let (id, params) = match req.extract::(R::METHOD) { + Ok(it) => it, + Err(req) => { + self.req = Some(req); + return None; + } + }; + self.global_state + .req_queue + .incoming + .register(id.clone(), (R::METHOD, self.request_received)); + Some((id, params)) + } +} + +fn result_to_task(id: lsp_server::RequestId, result: Result) -> Task +where + R: lsp_types::request::Request + 'static, + R::Params: DeserializeOwned + 'static, + R::Result: Serialize + 'static, +{ + let response = match result { + Ok(resp) => lsp_server::Response::new_ok(id, &resp), + Err(e) => match e.downcast::() { + Ok(lsp_error) => lsp_server::Response::new_err(id, lsp_error.code, lsp_error.message), + Err(e) => { + if is_canceled(&*e) { + lsp_server::Response::new_err( + id, + lsp_server::ErrorCode::ContentModified as i32, + "content modified".to_string(), + ) + } else { + lsp_server::Response::new_err( + id, + lsp_server::ErrorCode::InternalError as i32, + e.to_string(), + ) + } + } + }, + }; + Task::Respond(response) +} diff --git a/crates/rust-analyzer/src/lib.rs b/crates/rust-analyzer/src/lib.rs index ca788dd3c..d503fe96e 100644 --- a/crates/rust-analyzer/src/lib.rs +++ b/crates/rust-analyzer/src/lib.rs @@ -19,6 +19,7 @@ macro_rules! eprintln { mod global_state; mod main_loop; +mod dispatch; mod handlers; mod caps; mod cargo_target_spec; diff --git a/crates/rust-analyzer/src/main_loop.rs b/crates/rust-analyzer/src/main_loop.rs index f3c8b5978..ebc232736 100644 --- a/crates/rust-analyzer/src/main_loop.rs +++ b/crates/rust-analyzer/src/main_loop.rs @@ -6,26 +6,26 @@ use std::{ }; use crossbeam_channel::{never, select, Receiver}; -use lsp_server::{Connection, ErrorCode, Notification, Request, RequestId, Response}; +use lsp_server::{Connection, Notification, Request, RequestId, Response}; use lsp_types::{notification::Notification as _, request::Request as _, NumberOrString}; use ra_db::VfsPath; use ra_ide::{Canceled, FileId}; use ra_prof::profile; use ra_project_model::{PackageRoot, ProjectWorkspace}; -use serde::{de::DeserializeOwned, Serialize}; use crate::{ config::{Config, FilesWatcher, LinkedProject}, diagnostics::DiagnosticTask, + dispatch::RequestDispatcher, from_proto, - global_state::{file_id_to_url, GlobalState, GlobalStateSnapshot, Status}, + global_state::{file_id_to_url, GlobalState, Status}, handlers, lsp_ext, lsp_utils::{ apply_document_changes, is_canceled, notification_cast, notification_is, notification_new, show_message, }, request_metrics::RequestMetrics, - LspError, Result, + Result, }; pub fn main_loop(config: Config, connection: Connection) -> Result<()> { @@ -241,7 +241,7 @@ impl GlobalState { fn on_request(&mut self, request_received: Instant, req: Request) -> Result<()> { let mut pool_dispatcher = - PoolDispatcher { req: Some(req), global_state: self, request_received }; + RequestDispatcher { req: Some(req), global_state: self, request_received }; pool_dispatcher .on_sync::(|s, ()| Ok(s.collect_garbage()))? .on_sync::(|s, p| handlers::handle_join_lines(s.snapshot(), p))? @@ -426,7 +426,8 @@ impl GlobalState { log::error!("unhandled notification: {:?}", not); Ok(()) } - fn on_task(&mut self, task: Task) { + // TODO + pub(crate) fn on_task(&mut self, task: Task) { match task { Task::Respond(response) => { if let Some((method, start)) = self.req_queue.incoming.complete(response.id.clone()) @@ -480,6 +481,7 @@ impl GlobalState { } } +// TODO #[derive(Debug)] pub(crate) enum Task { Respond(Response), @@ -645,126 +647,3 @@ fn report_progress( }); global_state.send(notification.into()); } - -struct PoolDispatcher<'a> { - req: Option, - global_state: &'a mut GlobalState, - request_received: Instant, -} - -impl<'a> PoolDispatcher<'a> { - /// Dispatches the request onto the current thread - fn on_sync( - &mut self, - f: fn(&mut GlobalState, R::Params) -> Result, - ) -> Result<&mut Self> - where - R: lsp_types::request::Request + 'static, - R::Params: DeserializeOwned + panic::UnwindSafe + 'static, - R::Result: Serialize + 'static, - { - let (id, params) = match self.parse::() { - Some(it) => it, - None => { - return Ok(self); - } - }; - let world = panic::AssertUnwindSafe(&mut *self.global_state); - let task = panic::catch_unwind(move || { - let result = f(world.0, params); - result_to_task::(id, result) - }) - .map_err(|_| format!("sync task {:?} panicked", R::METHOD))?; - self.global_state.on_task(task); - Ok(self) - } - - /// Dispatches the request onto thread pool - fn on( - &mut self, - f: fn(GlobalStateSnapshot, R::Params) -> Result, - ) -> Result<&mut Self> - where - R: lsp_types::request::Request + 'static, - R::Params: DeserializeOwned + Send + 'static, - R::Result: Serialize + 'static, - { - let (id, params) = match self.parse::() { - Some(it) => it, - None => { - return Ok(self); - } - }; - - self.global_state.task_pool.0.spawn({ - let world = self.global_state.snapshot(); - move || { - let result = f(world, params); - result_to_task::(id, result) - } - }); - - Ok(self) - } - - fn parse(&mut self) -> Option<(RequestId, R::Params)> - where - R: lsp_types::request::Request + 'static, - R::Params: DeserializeOwned + 'static, - { - let req = self.req.take()?; - let (id, params) = match req.extract::(R::METHOD) { - Ok(it) => it, - Err(req) => { - self.req = Some(req); - return None; - } - }; - self.global_state - .req_queue - .incoming - .register(id.clone(), (R::METHOD, self.request_received)); - Some((id, params)) - } - - fn finish(&mut self) { - match self.req.take() { - None => (), - Some(req) => { - log::error!("unknown request: {:?}", req); - let resp = Response::new_err( - req.id, - ErrorCode::MethodNotFound as i32, - "unknown request".to_string(), - ); - self.global_state.send(resp.into()); - } - } - } -} - -fn result_to_task(id: RequestId, result: Result) -> Task -where - R: lsp_types::request::Request + 'static, - R::Params: DeserializeOwned + 'static, - R::Result: Serialize + 'static, -{ - let response = match result { - Ok(resp) => Response::new_ok(id, &resp), - Err(e) => match e.downcast::() { - Ok(lsp_error) => Response::new_err(id, lsp_error.code, lsp_error.message), - Err(e) => { - if is_canceled(&*e) { - Response::new_err( - id, - ErrorCode::ContentModified as i32, - "content modified".to_string(), - ) - } else { - Response::new_err(id, ErrorCode::InternalError as i32, e.to_string()) - } - } - }, - }; - Task::Respond(response) -} -- cgit v1.2.3