From 8f1ce8275347e915b1cc824567e96369875cefd4 Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Sat, 1 Sep 2018 17:40:45 +0300 Subject: move to gen-server impl --- crates/gen_lsp_server/src/lib.rs | 2 +- crates/gen_lsp_server/src/msg.rs | 10 ++ crates/server/Cargo.toml | 1 + crates/server/src/dispatch.rs | 151 -------------------- crates/server/src/io.rs | 207 --------------------------- crates/server/src/main.rs | 104 +------------- crates/server/src/main_loop/mod.rs | 281 +++++++++++++++++++++---------------- 7 files changed, 181 insertions(+), 575 deletions(-) delete mode 100644 crates/server/src/dispatch.rs delete mode 100644 crates/server/src/io.rs (limited to 'crates') diff --git a/crates/gen_lsp_server/src/lib.rs b/crates/gen_lsp_server/src/lib.rs index a31e90f35..b47db0df4 100644 --- a/crates/gen_lsp_server/src/lib.rs +++ b/crates/gen_lsp_server/src/lib.rs @@ -21,7 +21,7 @@ use languageserver_types::{ pub type Result = ::std::result::Result; pub use { - msg::{RawMessage, RawRequest, RawResponse, RawResponseError, RawNotification}, + msg::{RawMessage, RawRequest, RawResponse, RawResponseError, RawNotification, ErrorCode}, stdio::{stdio_transport, Threads}, }; diff --git a/crates/gen_lsp_server/src/msg.rs b/crates/gen_lsp_server/src/msg.rs index 9426e98ec..533d7da3e 100644 --- a/crates/gen_lsp_server/src/msg.rs +++ b/crates/gen_lsp_server/src/msg.rs @@ -120,6 +120,16 @@ impl RawResponse { } impl RawNotification { + pub fn new(params: N::Params) -> RawNotification + where + N: Notification, + N::Params: Serialize, + { + RawNotification { + method: N::METHOD.to_string(), + params: to_value(¶ms).unwrap(), + } + } pub fn cast(self) -> ::std::result::Result where N: Notification, diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml index 35ced91ac..32c1219e1 100644 --- a/crates/server/Cargo.toml +++ b/crates/server/Cargo.toml @@ -23,3 +23,4 @@ text_unit = { version = "0.1.2", features = ["serde"] } libsyntax2 = { path = "../libsyntax2" } libeditor = { path = "../libeditor" } libanalysis = { path = "../libanalysis" } +gen_lsp_server = { path = "../gen_lsp_server" } diff --git a/crates/server/src/dispatch.rs b/crates/server/src/dispatch.rs deleted file mode 100644 index 806534944..000000000 --- a/crates/server/src/dispatch.rs +++ /dev/null @@ -1,151 +0,0 @@ -use std::marker::PhantomData; - -use serde::{ - ser::Serialize, - de::DeserializeOwned, -}; -use serde_json; -use drop_bomb::DropBomb; - -use ::{ - Result, - req::{ClientRequest, Notification}, - io::{RawResponse, RawRequest, RawNotification}, -}; - -pub struct Responder { - id: u64, - bomb: DropBomb, - ph: PhantomData, -} - -impl Responder { - pub fn into_response(mut self, result: Result) -> Result { - self.bomb.defuse(); - let res = match result { - Ok(result) => { - RawResponse { - id: self.id, - result: serde_json::to_value(result)?, - error: serde_json::Value::Null, - } - } - Err(e) => { - error_response( - self.id, - ErrorCode::InternalError, - format!("internal error: {}", e), - )? - } - }; - Ok(res) - } -} - -fn parse_request_as(raw: RawRequest) - -> Result<::std::result::Result<(R::Params, Responder), RawRequest>> -{ - if raw.method != R::METHOD { - return Ok(Err(raw)); - } - - let params: R::Params = serde_json::from_value(raw.params)?; - let responder = Responder { - id: raw.id, - bomb: DropBomb::new("dropped request"), - ph: PhantomData, - }; - Ok(Ok((params, responder))) -} - -pub fn handle_request(req: RawRequest, f: F) -> Result<::std::result::Result> - where - R: ClientRequest, - F: FnOnce(R::Params, Responder) -> Result<()> -{ - let id = req.id; - match parse_request_as::(req)? { - Ok((params, responder)) => { - let () = f(params, responder)?; - Ok(Ok(id)) - }, - Err(r) => Ok(Err(r)), - } -} - -fn parse_notification_as(raw: RawNotification) -> Result<::std::result::Result> - where - N: Notification, - N::Params: DeserializeOwned, -{ - if raw.method != N::METHOD { - return Ok(Err(raw)); - } - let params: N::Params = serde_json::from_value(raw.params)?; - Ok(Ok(params)) -} - -pub fn handle_notification(not: &mut Option, f: F) -> Result<()> - where - N: Notification, - N::Params: DeserializeOwned, - F: FnOnce(N::Params) -> Result<()> -{ - match not.take() { - None => Ok(()), - Some(n) => match parse_notification_as::(n)? { - Ok(params) => f(params), - Err(n) => { - *not = Some(n); - Ok(()) - } - } - } -} - -pub fn send_notification(params: N::Params) -> RawNotification - where - N: Notification, - N::Params: Serialize -{ - RawNotification { - method: N::METHOD.to_string(), - params: serde_json::to_value(params) - .unwrap(), - } -} - -pub fn unknown_method(id: u64) -> Result { - error_response(id, ErrorCode::MethodNotFound, "unknown method") -} - -fn error_response(id: u64, code: ErrorCode, message: impl Into) -> Result { - #[derive(Serialize)] - struct Error { - code: i32, - message: String, - } - let resp = RawResponse { - id, - result: serde_json::Value::Null, - error: serde_json::to_value(Error { - code: code as i32, - message: message.into(), - })?, - }; - Ok(resp) -} - -#[allow(unused)] -enum ErrorCode { - ParseError = -32700, - InvalidRequest = -32600, - MethodNotFound = -32601, - InvalidParams = -32602, - InternalError = -32603, - ServerErrorStart = -32099, - ServerErrorEnd = -32000, - ServerNotInitialized = -32002, - UnknownErrorCode = -32001, - RequestCancelled = -32800, -} diff --git a/crates/server/src/io.rs b/crates/server/src/io.rs deleted file mode 100644 index f247327ab..000000000 --- a/crates/server/src/io.rs +++ /dev/null @@ -1,207 +0,0 @@ -use std::{ - thread, - io::{ - stdout, stdin, - BufRead, Write, - }, -}; -use serde_json::{Value, from_str, to_string}; -use crossbeam_channel::{Receiver, Sender, bounded}; - -use Result; - - -#[derive(Debug, Serialize, Deserialize)] -#[serde(untagged)] -pub enum RawMsg { - Request(RawRequest), - Notification(RawNotification), - Response(RawResponse), -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct RawRequest { - pub id: u64, - pub method: String, - pub params: Value, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct RawNotification { - pub method: String, - pub params: Value, -} - -#[derive(Debug, Serialize, Deserialize)] -pub struct RawResponse { - // JSON RPC allows this to be null if it was impossible - // to decode the request's id. Ignore this special case - // and just die horribly. - pub id: u64, - #[serde(default)] - pub result: Value, - #[serde(default)] - pub error: Value, -} - -struct MsgReceiver { - chan: Receiver, - thread: Option>>, -} - -impl MsgReceiver { - fn recv(&mut self) -> Result { - match self.chan.recv() { - Some(msg) => Ok(msg), - None => { - self.cleanup()?; - unreachable!() - } - } - } - - fn cleanup(&mut self) -> Result<()> { - self.thread - .take() - .ok_or_else(|| format_err!("MsgReceiver thread panicked"))? - .join() - .map_err(|_| format_err!("MsgReceiver thread panicked"))??; - bail!("client disconnected") - } - - fn stop(self) -> Result<()> { - // Can't really self.thread.join() here, b/c it might be - // blocking on read - Ok(()) - } -} - -struct MsgSender { - chan: Sender, - thread: thread::JoinHandle>, -} - -impl MsgSender { - fn send(&mut self, msg: RawMsg) { - self.chan.send(msg) - } - - fn stop(self) -> Result<()> { - drop(self.chan); - self.thread.join() - .map_err(|_| format_err!("MsgSender thread panicked"))??; - Ok(()) - } -} - -pub struct Io { - receiver: MsgReceiver, - sender: MsgSender, -} - -impl Io { - pub fn from_stdio() -> Io { - let sender = { - let (tx, rx) = bounded(16); - MsgSender { - chan: tx, - thread: thread::spawn(move || { - let stdout = stdout(); - let mut stdout = stdout.lock(); - for msg in rx { - #[derive(Serialize)] - struct JsonRpc { - jsonrpc: &'static str, - #[serde(flatten)] - msg: RawMsg, - } - let text = to_string(&JsonRpc { - jsonrpc: "2.0", - msg, - })?; - write_msg_text(&mut stdout, &text)?; - } - Ok(()) - }), - } - }; - let receiver = { - let (tx, rx) = bounded(16); - MsgReceiver { - chan: rx, - thread: Some(thread::spawn(move || { - let stdin = stdin(); - let mut stdin = stdin.lock(); - while let Some(text) = read_msg_text(&mut stdin)? { - let msg: RawMsg = from_str(&text)?; - tx.send(msg); - } - Ok(()) - })), - } - }; - Io { receiver, sender } - } - - pub fn send(&mut self, msg: RawMsg) { - self.sender.send(msg) - } - - pub fn recv(&mut self) -> Result { - self.receiver.recv() - } - - pub fn receiver(&mut self) -> &mut Receiver { - &mut self.receiver.chan - } - - pub fn cleanup_receiver(&mut self) -> Result<()> { - self.receiver.cleanup() - } - - pub fn stop(self) -> Result<()> { - self.receiver.stop()?; - self.sender.stop()?; - Ok(()) - } -} - - -fn read_msg_text(inp: &mut impl BufRead) -> Result> { - let mut size = None; - let mut buf = String::new(); - loop { - buf.clear(); - if inp.read_line(&mut buf)? == 0 { - return Ok(None); - } - if !buf.ends_with("\r\n") { - bail!("malformed header: {:?}", buf); - } - let buf = &buf[..buf.len() - 2]; - if buf.is_empty() { - break; - } - let mut parts = buf.splitn(2, ": "); - let header_name = parts.next().unwrap(); - let header_value = parts.next().ok_or_else(|| format_err!("malformed header: {:?}", buf))?; - if header_name == "Content-Length" { - size = Some(header_value.parse::()?); - } - } - let size = size.ok_or_else(|| format_err!("no Content-Length"))?; - let mut buf = buf.into_bytes(); - buf.resize(size, 0); - inp.read_exact(&mut buf)?; - let buf = String::from_utf8(buf)?; - debug!("< {}", buf); - Ok(Some(buf)) -} - -fn write_msg_text(out: &mut impl Write, msg: &str) -> Result<()> { - debug!("> {}", msg); - write!(out, "Content-Length: {}\r\n\r\n", msg.len())?; - out.write_all(msg.as_bytes())?; - out.flush()?; - Ok(()) -} diff --git a/crates/server/src/main.rs b/crates/server/src/main.rs index eeb343b80..3e3bd44a1 100644 --- a/crates/server/src/main.rs +++ b/crates/server/src/main.rs @@ -17,26 +17,20 @@ extern crate walkdir; extern crate libeditor; extern crate libanalysis; extern crate libsyntax2; +extern crate gen_lsp_server; extern crate im; extern crate relative_path; -mod io; mod caps; mod req; -mod dispatch; mod conv; mod main_loop; mod vfs; mod path_map; mod server_world; -use threadpool::ThreadPool; -use crossbeam_channel::bounded; use flexi_logger::{Logger, Duplicate}; - -use ::{ - io::{Io, RawMsg, RawResponse, RawNotification}, -}; +use gen_lsp_server::{run_server, stdio_transport}; pub type Result = ::std::result::Result; @@ -60,96 +54,10 @@ fn main() -> Result<()> { } fn main_inner() -> Result<()> { - let mut io = Io::from_stdio(); - let res = initialize(&mut io); + let (receiver, sender, threads) = stdio_transport(); + run_server(caps::server_capabilities(), main_loop::main_loop, receiver, sender)?; info!("shutting down IO..."); - let io_res = io.stop(); + threads.join()?; info!("... IO is down"); - match (res, io_res) { - (Ok(()), Ok(())) => Ok(()), - (res, Ok(())) => res, - (Ok(()), io_res) => io_res, - (res, Err(io_err)) => { - error!("shutdown error: {:?}", io_err); - res - } - } -} - -fn initialize(io: &mut Io) -> Result<()> { - match io.recv()? { - RawMsg::Notification(n) => - bail!("expected initialize request, got {:?}", n), - RawMsg::Response(res) => - bail!("expected initialize request, got {:?}", res), - - RawMsg::Request(req) => { - let req = dispatch::handle_request::(req, |_params, resp| { - let res = req::InitializeResult { capabilities: caps::server_capabilities() }; - let resp = resp.into_response(Ok(res))?; - io.send(RawMsg::Response(resp)); - Ok(()) - })?; - if let Err(req) = req { - bail!("expected initialize request, got {:?}", req) - } - match io.recv()? { - RawMsg::Notification(n) => { - if n.method != "initialized" { - bail!("expected initialized notification"); - } - } - _ => bail!("expected initialized notification"), - } - } - } - initialized(io) -} - -enum Task { - Respond(RawResponse), - Notify(RawNotification), - Die(::failure::Error), -} - -fn initialized(io: &mut Io) -> Result<()> { - { - let mut pool = ThreadPool::new(4); - let (task_sender, task_receiver) = bounded::(16); - let (fs_events_receiver, watcher) = vfs::watch(vec![ - ::std::env::current_dir()?, - ]); - info!("lifecycle: handshake finished, server ready to serve requests"); - let res = main_loop::main_loop( - io, - &mut pool, - task_sender, - task_receiver.clone(), - fs_events_receiver, - ); - - info!("waiting for background jobs to finish..."); - task_receiver.for_each(drop); - pool.join(); - info!("...background jobs have finished"); - - info!("waiting for file watcher to finish..."); - watcher.stop()?; - info!("...file watcher has finished"); - - res - }?; - - match io.recv()? { - RawMsg::Notification(n) => { - if n.method == "exit" { - info!("lifecycle: shutdown complete"); - return Ok(()); - } - bail!("unexpected notification during shutdown: {:?}", n) - } - m => { - bail!("unexpected message during shutdown: {:?}", m) - } - } + Ok(()) } diff --git a/crates/server/src/main_loop/mod.rs b/crates/server/src/main_loop/mod.rs index db7d5ae34..34d077805 100644 --- a/crates/server/src/main_loop/mod.rs +++ b/crates/server/src/main_loop/mod.rs @@ -6,59 +6,97 @@ use std::{ }; use threadpool::ThreadPool; -use crossbeam_channel::{Sender, Receiver}; +use serde::{Serialize, de::DeserializeOwned}; +use crossbeam_channel::{bounded, Sender, Receiver}; use languageserver_types::{NumberOrString}; use libanalysis::{FileId, JobHandle, JobToken}; +use gen_lsp_server::{RawRequest, RawNotification, RawMessage, RawResponse, ErrorCode}; use { - req, dispatch, - Task, Result, - io::{Io, RawMsg, RawRequest, RawNotification}, - vfs::FileEvent, + req, + Result, + vfs::{self, FileEvent}, server_world::{ServerWorldState, ServerWorld}, main_loop::subscriptions::{Subscriptions}, }; +enum Task { + Respond(RawResponse), + Notify(RawNotification), +} + pub(super) fn main_loop( - io: &mut Io, - pool: &mut ThreadPool, - task_sender: Sender, - task_receiver: Receiver, - fs_events_receiver: Receiver>, + receriver: &mut Receiver, + sender: &mut Sender, ) -> Result<()> { + let pool = ThreadPool::new(4); + let (task_sender, task_receiver) = bounded::(16); + let (fs_events_receiver, watcher) = vfs::watch(vec![ + ::std::env::current_dir()?, + ]); + info!("server initialized, serving requests"); let mut state = ServerWorldState::new(); - let mut pending_requests: HashMap = HashMap::new(); - let mut fs_events_receiver = Some(&fs_events_receiver); + let mut pending_requests = HashMap::new(); let mut subs = Subscriptions::new(); + main_loop_inner( + &pool, + receriver, + sender, + task_receiver.clone(), + task_sender, + fs_events_receiver, + &mut state, + &mut pending_requests, + &mut subs, + )?; + + info!("waiting for background jobs to finish..."); + task_receiver.for_each(drop); + pool.join(); + info!("...background jobs have finished"); + + info!("waiting for file watcher to finish..."); + watcher.stop()?; + info!("...file watcher has finished"); + Ok(()) +} + +fn main_loop_inner( + pool: &ThreadPool, + msg_receiver: &mut Receiver, + msg_sender: &mut Sender, + task_receiver: Receiver, + task_sender: Sender, + fs_receiver: Receiver>, + state: &mut ServerWorldState, + pending_requests: &mut HashMap, + subs: &mut Subscriptions, +) -> Result { + let mut fs_receiver = Some(fs_receiver); loop { enum Event { - Msg(RawMsg), + Msg(RawMessage), Task(Task), Fs(Vec), - ReceiverDead, FsWatcherDead, } let event = select! { - recv(io.receiver(), msg) => match msg { + recv(msg_receiver, msg) => match msg { Some(msg) => Event::Msg(msg), - None => Event::ReceiverDead, + None => bail!("client exited without shutdown"), }, recv(task_receiver, task) => Event::Task(task.unwrap()), - recv(fs_events_receiver, events) => match events { + recv(fs_receiver, events) => match events { Some(events) => Event::Fs(events), None => Event::FsWatcherDead, } }; let mut state_changed = false; match event { - Event::ReceiverDead => { - io.cleanup_receiver()?; - unreachable!(); - } Event::FsWatcherDead => { - fs_events_receiver = None; + fs_receiver = None; } Event::Task(task) => { match task { @@ -66,12 +104,10 @@ pub(super) fn main_loop( if let Some(handle) = pending_requests.remove(&response.id) { assert!(handle.has_completed()); } - io.send(RawMsg::Response(response)) + msg_sender.send(RawMessage::Response(response)) } Task::Notify(n) => - io.send(RawMsg::Notification(n)), - Task::Die(error) => - return Err(error), + msg_sender.send(RawMessage::Notification(n)), } continue; } @@ -82,16 +118,29 @@ pub(super) fn main_loop( } Event::Msg(msg) => { match msg { - RawMsg::Request(req) => { - if !on_request(io, &mut state, &mut pending_requests, pool, &task_sender, req)? { - return Ok(()); + RawMessage::Request(req) => { + let req = match req.cast::() { + Ok((id, _params)) => return Ok(id), + Err(req) => req, + }; + match on_request(state, pending_requests, pool, &task_sender, req)? { + None => (), + Some(req) => { + error!("unknown request: {:?}", req); + let resp = RawResponse::err( + req.id, + ErrorCode::MethodNotFound as i32, + "unknown request".to_string(), + ); + msg_sender.send(RawMessage::Response(resp)) + } } } - RawMsg::Notification(not) => { - on_notification(io, &mut state, &mut pending_requests, &mut subs, not)?; + RawMessage::Notification(not) => { + on_notification(msg_sender, state, pending_requests, subs, not)?; state_changed = true; } - RawMsg::Response(resp) => { + RawMessage::Response(resp) => { error!("unexpected response: {:?}", resp) } } @@ -110,13 +159,12 @@ pub(super) fn main_loop( } fn on_request( - io: &mut Io, world: &mut ServerWorldState, pending_requests: &mut HashMap, pool: &ThreadPool, sender: &Sender, req: RawRequest, -) -> Result { +) -> Result> { let mut pool_dispatcher = PoolDispatcher { req: Some(req), res: None, @@ -141,81 +189,73 @@ fn on_request( Ok((id, handle)) => { let inserted = pending_requests.insert(id, handle).is_none(); assert!(inserted, "duplicate request: {}", id); + Ok(None) }, - Err(req) => { - let req = dispatch::handle_request::(req, |(), resp| { - let resp = resp.into_response(Ok(()))?; - io.send(RawMsg::Response(resp)); - Ok(()) - })?; - match req { - Ok(_id) => { - info!("lifecycle: initiating shutdown"); - return Ok(false); - } - Err(req) => { - error!("unknown method: {:?}", req); - io.send(RawMsg::Response(dispatch::unknown_method(req.id)?)); - } - } - } + Err(req) => Ok(Some(req)), } - Ok(true) } fn on_notification( - io: &mut Io, + msg_sender: &mut Sender, state: &mut ServerWorldState, pending_requests: &mut HashMap, subs: &mut Subscriptions, not: RawNotification, ) -> Result<()> { - let mut not = Some(not); - dispatch::handle_notification::(&mut not, |params| { - let id = match params.id { - NumberOrString::Number(id) => id, - NumberOrString::String(id) => { - panic!("string id's not supported: {:?}", id); + let not = match not.cast::() { + Ok(params) => { + let id = match params.id { + NumberOrString::Number(id) => id, + NumberOrString::String(id) => { + panic!("string id's not supported: {:?}", id); + } + }; + if let Some(handle) = pending_requests.remove(&id) { + handle.cancel(); } - }; - if let Some(handle) = pending_requests.remove(&id) { - handle.cancel(); + return Ok(()) } - Ok(()) - })?; - dispatch::handle_notification::(&mut not, |params| { - let uri = params.text_document.uri; - let path = uri.to_file_path() - .map_err(|()| format_err!("invalid uri: {}", uri))?; - let file_id = state.add_mem_file(path, params.text_document.text); - subs.add_sub(file_id); - Ok(()) - })?; - dispatch::handle_notification::(&mut not, |mut params| { - let uri = params.text_document.uri; - let path = uri.to_file_path() - .map_err(|()| format_err!("invalid uri: {}", uri))?; - let text = params.content_changes.pop() - .ok_or_else(|| format_err!("empty changes"))? - .text; - state.change_mem_file(path.as_path(), text)?; - Ok(()) - })?; - dispatch::handle_notification::(&mut not, |params| { - let uri = params.text_document.uri; - let path = uri.to_file_path() - .map_err(|()| format_err!("invalid uri: {}", uri))?; - let file_id = state.remove_mem_file(path.as_path())?; - subs.remove_sub(file_id); - let not = req::PublishDiagnosticsParams { uri, diagnostics: Vec::new() }; - let not = dispatch::send_notification::(not); - io.send(RawMsg::Notification(not)); - Ok(()) - })?; - - if let Some(not) = not { - error!("unhandled notification: {:?}", not); - } + Err(not) => not, + }; + let not = match not.cast::() { + Ok(params) => { + let uri = params.text_document.uri; + let path = uri.to_file_path() + .map_err(|()| format_err!("invalid uri: {}", uri))?; + let file_id = state.add_mem_file(path, params.text_document.text); + subs.add_sub(file_id); + return Ok(()) + } + Err(not) => not, + }; + let not = match not.cast::() { + Ok(mut params) => { + let uri = params.text_document.uri; + let path = uri.to_file_path() + .map_err(|()| format_err!("invalid uri: {}", uri))?; + let text = params.content_changes.pop() + .ok_or_else(|| format_err!("empty changes"))? + .text; + state.change_mem_file(path.as_path(), text)?; + return Ok(()) + } + Err(not) => not, + }; + let not = match not.cast::() { + Ok(params) => { + let uri = params.text_document.uri; + let path = uri.to_file_path() + .map_err(|()| format_err!("invalid uri: {}", uri))?; + let file_id = state.remove_mem_file(path.as_path())?; + subs.remove_sub(file_id); + let params = req::PublishDiagnosticsParams { uri, diagnostics: Vec::new() }; + let not = RawNotification::new::(params); + msg_sender.send(RawMessage::Notification(not)); + return Ok(()) + } + Err(not) => not, + }; + error!("unhandled notification: {:?}", not); Ok(()) } @@ -228,10 +268,14 @@ struct PoolDispatcher<'a> { } impl<'a> PoolDispatcher<'a> { - fn on<'b, R: req::ClientRequest>( + fn on<'b, R>( &'b mut self, f: fn(ServerWorld, R::Params, JobToken) -> Result - ) -> Result<&'b mut Self> { + ) -> Result<&'b mut Self> + where R: req::Request, + R::Params: DeserializeOwned + Send + 'static, + R::Result: Serialize + 'static, + { let req = match self.req.take() { None => return Ok(self), Some(req) => req, @@ -239,23 +283,24 @@ impl<'a> PoolDispatcher<'a> { let world = self.world; let sender = self.sender; let pool = self.pool; - let (handle, token) = JobHandle::new(); - let req = dispatch::handle_request::(req, |params, resp| { - let world = world.snapshot(); - let sender = sender.clone(); - pool.execute(move || { - let res = f(world, params, token); - let task = match resp.into_response(res) { - Ok(resp) => Task::Respond(resp), - Err(e) => Task::Die(e), - }; - sender.send(task); - }); - Ok(()) - })?; - match req { - Ok(id) => self.res = Some((id, handle)), - Err(req) => self.req = Some(req), + match req.cast::() { + Ok((id, params)) => { + let (handle, token) = JobHandle::new(); + let world = world.snapshot(); + let sender = sender.clone(); + pool.execute(move || { + let resp = match f(world, params, token) { + Ok(resp) => RawResponse::ok(id, resp), + Err(e) => RawResponse::err(id, ErrorCode::InternalError as i32, e.to_string()), + }; + let task = Task::Respond(resp); + sender.send(task); + }); + self.res = Some((id, handle)); + } + Err(req) => { + self.req = Some(req) + } } Ok(self) } @@ -282,7 +327,7 @@ fn update_file_notifications_on_threadpool( error!("failed to compute diagnostics: {:?}", e) } Ok(params) => { - let not = dispatch::send_notification::(params); + let not = RawNotification::new::(params); sender.send(Task::Notify(not)); } } @@ -291,7 +336,7 @@ fn update_file_notifications_on_threadpool( error!("failed to compute decorations: {:?}", e) } Ok(params) => { - let not = dispatch::send_notification::(params); + let not = RawNotification::new::(params); sender.send(Task::Notify(not)) } } -- cgit v1.2.3