From d999f4b56803b613dcf354862e1db4b5d2a8a8d0 Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Fri, 31 Aug 2018 12:04:33 +0300 Subject: cancelation --- crates/server/src/dispatch.rs | 18 +++--- crates/server/src/main.rs | 5 +- crates/server/src/main_loop/handlers.rs | 25 ++++++-- crates/server/src/main_loop/mod.rs | 108 +++++++++++++++++++++++--------- 4 files changed, 106 insertions(+), 50 deletions(-) (limited to 'crates/server') diff --git a/crates/server/src/dispatch.rs b/crates/server/src/dispatch.rs index d8cca48d0..806534944 100644 --- a/crates/server/src/dispatch.rs +++ b/crates/server/src/dispatch.rs @@ -58,20 +58,18 @@ fn parse_request_as(raw: RawRequest) Ok(Ok((params, responder))) } -pub fn handle_request(req: &mut Option, f: F) -> Result<()> +pub fn handle_request(req: RawRequest, f: F) -> Result<::std::result::Result> where R: ClientRequest, F: FnOnce(R::Params, Responder) -> Result<()> { - match req.take() { - None => Ok(()), - Some(r) => match parse_request_as::(r)? { - Ok((params, responder)) => f(params, responder), - Err(r) => { - *req = Some(r); - Ok(()) - } - } + let id = req.id; + match parse_request_as::(req)? { + Ok((params, responder)) => { + let () = f(params, responder)?; + Ok(Ok(id)) + }, + Err(r) => Ok(Err(r)), } } diff --git a/crates/server/src/main.rs b/crates/server/src/main.rs index 6af8bf81b..eeb343b80 100644 --- a/crates/server/src/main.rs +++ b/crates/server/src/main.rs @@ -84,14 +84,13 @@ fn initialize(io: &mut Io) -> Result<()> { bail!("expected initialize request, got {:?}", res), RawMsg::Request(req) => { - let mut req = Some(req); - dispatch::handle_request::(&mut req, |_params, resp| { + let req = dispatch::handle_request::(req, |_params, resp| { let res = req::InitializeResult { capabilities: caps::server_capabilities() }; let resp = resp.into_response(Ok(res))?; io.send(RawMsg::Response(resp)); Ok(()) })?; - if let Some(req) = req { + if let Err(req) = req { bail!("expected initialize request, got {:?}", req) } match io.recv()? { diff --git a/crates/server/src/main_loop/handlers.rs b/crates/server/src/main_loop/handlers.rs index 6b70399b0..ab8b6f799 100644 --- a/crates/server/src/main_loop/handlers.rs +++ b/crates/server/src/main_loop/handlers.rs @@ -7,7 +7,7 @@ use languageserver_types::{ CompletionItem, InsertTextFormat, CompletionItemKind, }; use serde_json::to_value; -use libanalysis::{Query, FileId, RunnableKind}; +use libanalysis::{Query, FileId, RunnableKind, JobToken}; use libsyntax2::{ text_utils::contains_offset_nonstrict, }; @@ -21,6 +21,7 @@ use ::{ pub fn handle_syntax_tree( world: ServerWorld, params: req::SyntaxTreeParams, + _token: JobToken, ) -> Result { let id = params.text_document.try_conv_with(&world)?; let res = world.analysis().syntax_tree(id); @@ -30,6 +31,7 @@ pub fn handle_syntax_tree( pub fn handle_extend_selection( world: ServerWorld, params: req::ExtendSelectionParams, + _token: JobToken, ) -> Result { let file_id = params.text_document.try_conv_with(&world)?; let file = world.analysis().file_syntax(file_id); @@ -45,6 +47,7 @@ pub fn handle_extend_selection( pub fn handle_find_matching_brace( world: ServerWorld, params: req::FindMatchingBraceParams, + _token: JobToken, ) -> Result> { let file_id = params.text_document.try_conv_with(&world)?; let file = world.analysis().file_syntax(file_id); @@ -63,6 +66,7 @@ pub fn handle_find_matching_brace( pub fn handle_join_lines( world: ServerWorld, params: req::JoinLinesParams, + _token: JobToken, ) -> Result { let file_id = params.text_document.try_conv_with(&world)?; let line_index = world.analysis().file_line_index(file_id); @@ -74,6 +78,7 @@ pub fn handle_join_lines( pub fn handle_on_type_formatting( world: ServerWorld, params: req::DocumentOnTypeFormattingParams, + _token: JobToken, ) -> Result>> { if params.ch != "=" { return Ok(None); @@ -93,6 +98,7 @@ pub fn handle_on_type_formatting( pub fn handle_document_symbol( world: ServerWorld, params: req::DocumentSymbolParams, + _token: JobToken, ) -> Result> { let file_id = params.text_document.try_conv_with(&world)?; let line_index = world.analysis().file_line_index(file_id); @@ -131,6 +137,7 @@ pub fn handle_document_symbol( pub fn handle_workspace_symbol( world: ServerWorld, params: req::WorkspaceSymbolParams, + token: JobToken, ) -> Result>> { let all_symbols = params.query.contains("#"); let query = { @@ -144,18 +151,18 @@ pub fn handle_workspace_symbol( q.limit(128); q }; - let mut res = exec_query(&world, query)?; + let mut res = exec_query(&world, query, &token)?; if res.is_empty() && !all_symbols { let mut query = Query::new(params.query); query.limit(128); - res = exec_query(&world, query)?; + res = exec_query(&world, query, &token)?; } return Ok(Some(res)); - fn exec_query(world: &ServerWorld, query: Query) -> Result> { + fn exec_query(world: &ServerWorld, query: Query, token: &JobToken) -> Result> { let mut res = Vec::new(); - for (file_id, symbol) in world.analysis().symbol_search(query) { + for (file_id, symbol) in world.analysis().symbol_search(query, token) { let line_index = world.analysis().file_line_index(file_id); let info = SymbolInformation { name: symbol.name.to_string(), @@ -175,12 +182,13 @@ pub fn handle_workspace_symbol( pub fn handle_goto_definition( world: ServerWorld, params: req::TextDocumentPositionParams, + token: JobToken, ) -> Result> { let file_id = params.text_document.try_conv_with(&world)?; let line_index = world.analysis().file_line_index(file_id); let offset = params.position.conv_with(&line_index); let mut res = Vec::new(); - for (file_id, symbol) in world.analysis().approximately_resolve_symbol(file_id, offset) { + for (file_id, symbol) in world.analysis().approximately_resolve_symbol(file_id, offset, &token) { let line_index = world.analysis().file_line_index(file_id); let location = to_location( file_id, symbol.node_range, @@ -194,6 +202,7 @@ pub fn handle_goto_definition( pub fn handle_parent_module( world: ServerWorld, params: TextDocumentIdentifier, + _token: JobToken, ) -> Result> { let file_id = params.try_conv_with(&world)?; let mut res = Vec::new(); @@ -211,6 +220,7 @@ pub fn handle_parent_module( pub fn handle_runnables( world: ServerWorld, params: req::RunnablesParams, + _token: JobToken, ) -> Result> { let file_id = params.text_document.try_conv_with(&world)?; let line_index = world.analysis().file_line_index(file_id); @@ -260,6 +270,7 @@ pub fn handle_runnables( pub fn handle_decorations( world: ServerWorld, params: TextDocumentIdentifier, + _token: JobToken, ) -> Result> { let file_id = params.try_conv_with(&world)?; Ok(highlight(&world, file_id)) @@ -268,6 +279,7 @@ pub fn handle_decorations( pub fn handle_completion( world: ServerWorld, params: req::CompletionParams, + _token: JobToken, ) -> Result> { let file_id = params.text_document.try_conv_with(&world)?; let line_index = world.analysis().file_line_index(file_id); @@ -297,6 +309,7 @@ pub fn handle_completion( pub fn handle_code_action( world: ServerWorld, params: req::CodeActionParams, + _token: JobToken, ) -> Result>> { let file_id = params.text_document.try_conv_with(&world)?; let line_index = world.analysis().file_line_index(file_id); diff --git a/crates/server/src/main_loop/mod.rs b/crates/server/src/main_loop/mod.rs index cd17cab56..db7d5ae34 100644 --- a/crates/server/src/main_loop/mod.rs +++ b/crates/server/src/main_loop/mod.rs @@ -2,12 +2,13 @@ mod handlers; mod subscriptions; use std::{ - collections::{HashSet}, + collections::{HashMap}, }; use threadpool::ThreadPool; use crossbeam_channel::{Sender, Receiver}; -use libanalysis::FileId; +use languageserver_types::{NumberOrString}; +use libanalysis::{FileId, JobHandle, JobToken}; use { req, dispatch, @@ -28,7 +29,7 @@ pub(super) fn main_loop( info!("server initialized, serving requests"); let mut state = ServerWorldState::new(); - let mut pending_requests: HashSet = HashSet::new(); + let mut pending_requests: HashMap = HashMap::new(); let mut fs_events_receiver = Some(&fs_events_receiver); let mut subs = Subscriptions::new(); loop { @@ -61,8 +62,12 @@ pub(super) fn main_loop( } Event::Task(task) => { match task { - Task::Respond(response) => - io.send(RawMsg::Response(response)), + Task::Respond(response) => { + if let Some(handle) = pending_requests.remove(&response.id) { + assert!(handle.has_completed()); + } + io.send(RawMsg::Response(response)) + } Task::Notify(n) => io.send(RawMsg::Notification(n)), Task::Die(error) => @@ -78,18 +83,16 @@ pub(super) fn main_loop( Event::Msg(msg) => { match msg { RawMsg::Request(req) => { - if !on_request(io, &mut state, pool, &task_sender, req)? { + if !on_request(io, &mut state, &mut pending_requests, pool, &task_sender, req)? { return Ok(()); } } RawMsg::Notification(not) => { - on_notification(io, &mut state, &mut subs, not)?; + on_notification(io, &mut state, &mut pending_requests, &mut subs, not)?; state_changed = true; } RawMsg::Response(resp) => { - if !pending_requests.remove(&resp.id) { - error!("unexpected response: {:?}", resp) - } + error!("unexpected response: {:?}", resp) } } } @@ -109,15 +112,17 @@ pub(super) fn main_loop( fn on_request( io: &mut Io, world: &mut ServerWorldState, + pending_requests: &mut HashMap, pool: &ThreadPool, sender: &Sender, req: RawRequest, ) -> Result { let mut pool_dispatcher = PoolDispatcher { req: Some(req), + res: None, pool, world, sender }; - pool_dispatcher + let req = pool_dispatcher .on::(handlers::handle_syntax_tree)? .on::(handlers::handle_extend_selection)? .on::(handlers::handle_find_matching_brace)? @@ -130,23 +135,30 @@ fn on_request( .on::(handlers::handle_runnables)? .on::(handlers::handle_decorations)? .on::(handlers::handle_completion)? - .on::(handlers::handle_code_action)?; - - let mut req = pool_dispatcher.req; - let mut shutdown = false; - dispatch::handle_request::(&mut req, |(), resp| { - let resp = resp.into_response(Ok(()))?; - io.send(RawMsg::Response(resp)); - shutdown = true; - Ok(()) - })?; - if shutdown { - info!("lifecycle: initiating shutdown"); - return Ok(false); - } - if let Some(req) = req { - error!("unknown method: {:?}", req); - io.send(RawMsg::Response(dispatch::unknown_method(req.id)?)); + .on::(handlers::handle_code_action)? + .finish(); + match req { + Ok((id, handle)) => { + let inserted = pending_requests.insert(id, handle).is_none(); + assert!(inserted, "duplicate request: {}", id); + }, + Err(req) => { + let req = dispatch::handle_request::(req, |(), resp| { + let resp = resp.into_response(Ok(()))?; + io.send(RawMsg::Response(resp)); + Ok(()) + })?; + match req { + Ok(_id) => { + info!("lifecycle: initiating shutdown"); + return Ok(false); + } + Err(req) => { + error!("unknown method: {:?}", req); + io.send(RawMsg::Response(dispatch::unknown_method(req.id)?)); + } + } + } } Ok(true) } @@ -154,10 +166,23 @@ fn on_request( fn on_notification( io: &mut Io, state: &mut ServerWorldState, + pending_requests: &mut HashMap, subs: &mut Subscriptions, not: RawNotification, ) -> Result<()> { let mut not = Some(not); + dispatch::handle_notification::(&mut not, |params| { + let id = match params.id { + NumberOrString::Number(id) => id, + NumberOrString::String(id) => { + panic!("string id's not supported: {:?}", id); + } + }; + if let Some(handle) = pending_requests.remove(&id) { + handle.cancel(); + } + Ok(()) + })?; dispatch::handle_notification::(&mut not, |params| { let uri = params.text_document.uri; let path = uri.to_file_path() @@ -196,21 +221,30 @@ fn on_notification( struct PoolDispatcher<'a> { req: Option, + res: Option<(u64, JobHandle)>, pool: &'a ThreadPool, world: &'a ServerWorldState, sender: &'a Sender, } impl<'a> PoolDispatcher<'a> { - fn on<'b, R: req::ClientRequest>(&'b mut self, f: fn(ServerWorld, R::Params) -> Result) -> Result<&'b mut Self> { + fn on<'b, R: req::ClientRequest>( + &'b mut self, + f: fn(ServerWorld, R::Params, JobToken) -> Result + ) -> Result<&'b mut Self> { + let req = match self.req.take() { + None => return Ok(self), + Some(req) => req, + }; let world = self.world; let sender = self.sender; let pool = self.pool; - dispatch::handle_request::(&mut self.req, |params, resp| { + let (handle, token) = JobHandle::new(); + let req = dispatch::handle_request::(req, |params, resp| { let world = world.snapshot(); let sender = sender.clone(); pool.execute(move || { - let res = f(world, params); + let res = f(world, params, token); let task = match resp.into_response(res) { Ok(resp) => Task::Respond(resp), Err(e) => Task::Die(e), @@ -219,8 +253,20 @@ impl<'a> PoolDispatcher<'a> { }); Ok(()) })?; + match req { + Ok(id) => self.res = Some((id, handle)), + Err(req) => self.req = Some(req), + } Ok(self) } + + fn finish(&mut self) -> ::std::result::Result<(u64, JobHandle), RawRequest> { + match (self.res.take(), self.req.take()) { + (Some(res), None) => Ok(res), + (None, Some(req)) => Err(req), + _ => unreachable!(), + } + } } fn update_file_notifications_on_threadpool( -- cgit v1.2.3