diff options
author | Aleksey Kladov <[email protected]> | 2020-06-25 16:22:18 +0100 |
---|---|---|
committer | Aleksey Kladov <[email protected]> | 2020-06-25 16:22:18 +0100 |
commit | 22098127c4f4b7414f0695c7788f07d0a1c43892 (patch) | |
tree | 9e3bba5041c7afa181625ac1ddc5cb4b77cd519f | |
parent | 379a096de9ad06c23347b76a54d9cc22aee80f6a (diff) |
Move request dispatcher to a separate file
-rw-r--r-- | crates/rust-analyzer/src/dispatch.rs | 137 | ||||
-rw-r--r-- | crates/rust-analyzer/src/lib.rs | 1 | ||||
-rw-r--r-- | crates/rust-analyzer/src/main_loop.rs | 137 |
3 files changed, 146 insertions, 129 deletions
diff --git a/crates/rust-analyzer/src/dispatch.rs b/crates/rust-analyzer/src/dispatch.rs new file mode 100644 index 000000000..0a9b0428d --- /dev/null +++ b/crates/rust-analyzer/src/dispatch.rs | |||
@@ -0,0 +1,137 @@ | |||
1 | use std::{panic, time::Instant}; | ||
2 | |||
3 | use serde::{de::DeserializeOwned, Serialize}; | ||
4 | |||
5 | use crate::{ | ||
6 | global_state::{GlobalState, GlobalStateSnapshot}, | ||
7 | lsp_utils::is_canceled, | ||
8 | main_loop::Task, | ||
9 | LspError, Result, | ||
10 | }; | ||
11 | |||
12 | pub(crate) struct RequestDispatcher<'a> { | ||
13 | pub(crate) req: Option<lsp_server::Request>, | ||
14 | pub(crate) global_state: &'a mut GlobalState, | ||
15 | pub(crate) request_received: Instant, | ||
16 | } | ||
17 | |||
18 | impl<'a> RequestDispatcher<'a> { | ||
19 | /// Dispatches the request onto the current thread | ||
20 | pub(crate) fn on_sync<R>( | ||
21 | &mut self, | ||
22 | f: fn(&mut GlobalState, R::Params) -> Result<R::Result>, | ||
23 | ) -> Result<&mut Self> | ||
24 | where | ||
25 | R: lsp_types::request::Request + 'static, | ||
26 | R::Params: DeserializeOwned + panic::UnwindSafe + 'static, | ||
27 | R::Result: Serialize + 'static, | ||
28 | { | ||
29 | let (id, params) = match self.parse::<R>() { | ||
30 | Some(it) => it, | ||
31 | None => { | ||
32 | return Ok(self); | ||
33 | } | ||
34 | }; | ||
35 | let world = panic::AssertUnwindSafe(&mut *self.global_state); | ||
36 | let task = panic::catch_unwind(move || { | ||
37 | let result = f(world.0, params); | ||
38 | result_to_task::<R>(id, result) | ||
39 | }) | ||
40 | .map_err(|_| format!("sync task {:?} panicked", R::METHOD))?; | ||
41 | self.global_state.on_task(task); | ||
42 | Ok(self) | ||
43 | } | ||
44 | |||
45 | /// Dispatches the request onto thread pool | ||
46 | pub(crate) fn on<R>( | ||
47 | &mut self, | ||
48 | f: fn(GlobalStateSnapshot, R::Params) -> Result<R::Result>, | ||
49 | ) -> Result<&mut Self> | ||
50 | where | ||
51 | R: lsp_types::request::Request + 'static, | ||
52 | R::Params: DeserializeOwned + Send + 'static, | ||
53 | R::Result: Serialize + 'static, | ||
54 | { | ||
55 | let (id, params) = match self.parse::<R>() { | ||
56 | Some(it) => it, | ||
57 | None => { | ||
58 | return Ok(self); | ||
59 | } | ||
60 | }; | ||
61 | |||
62 | self.global_state.task_pool.0.spawn({ | ||
63 | let world = self.global_state.snapshot(); | ||
64 | move || { | ||
65 | let result = f(world, params); | ||
66 | result_to_task::<R>(id, result) | ||
67 | } | ||
68 | }); | ||
69 | |||
70 | Ok(self) | ||
71 | } | ||
72 | |||
73 | pub(crate) fn finish(&mut self) { | ||
74 | match self.req.take() { | ||
75 | None => (), | ||
76 | Some(req) => { | ||
77 | log::error!("unknown request: {:?}", req); | ||
78 | let resp = lsp_server::Response::new_err( | ||
79 | req.id, | ||
80 | lsp_server::ErrorCode::MethodNotFound as i32, | ||
81 | "unknown request".to_string(), | ||
82 | ); | ||
83 | self.global_state.send(resp.into()); | ||
84 | } | ||
85 | } | ||
86 | } | ||
87 | |||
88 | fn parse<R>(&mut self) -> Option<(lsp_server::RequestId, R::Params)> | ||
89 | where | ||
90 | R: lsp_types::request::Request + 'static, | ||
91 | R::Params: DeserializeOwned + 'static, | ||
92 | { | ||
93 | let req = self.req.take()?; | ||
94 | let (id, params) = match req.extract::<R::Params>(R::METHOD) { | ||
95 | Ok(it) => it, | ||
96 | Err(req) => { | ||
97 | self.req = Some(req); | ||
98 | return None; | ||
99 | } | ||
100 | }; | ||
101 | self.global_state | ||
102 | .req_queue | ||
103 | .incoming | ||
104 | .register(id.clone(), (R::METHOD, self.request_received)); | ||
105 | Some((id, params)) | ||
106 | } | ||
107 | } | ||
108 | |||
109 | fn result_to_task<R>(id: lsp_server::RequestId, result: Result<R::Result>) -> Task | ||
110 | where | ||
111 | R: lsp_types::request::Request + 'static, | ||
112 | R::Params: DeserializeOwned + 'static, | ||
113 | R::Result: Serialize + 'static, | ||
114 | { | ||
115 | let response = match result { | ||
116 | Ok(resp) => lsp_server::Response::new_ok(id, &resp), | ||
117 | Err(e) => match e.downcast::<LspError>() { | ||
118 | Ok(lsp_error) => lsp_server::Response::new_err(id, lsp_error.code, lsp_error.message), | ||
119 | Err(e) => { | ||
120 | if is_canceled(&*e) { | ||
121 | lsp_server::Response::new_err( | ||
122 | id, | ||
123 | lsp_server::ErrorCode::ContentModified as i32, | ||
124 | "content modified".to_string(), | ||
125 | ) | ||
126 | } else { | ||
127 | lsp_server::Response::new_err( | ||
128 | id, | ||
129 | lsp_server::ErrorCode::InternalError as i32, | ||
130 | e.to_string(), | ||
131 | ) | ||
132 | } | ||
133 | } | ||
134 | }, | ||
135 | }; | ||
136 | Task::Respond(response) | ||
137 | } | ||
diff --git a/crates/rust-analyzer/src/lib.rs b/crates/rust-analyzer/src/lib.rs index ca788dd3c..d503fe96e 100644 --- a/crates/rust-analyzer/src/lib.rs +++ b/crates/rust-analyzer/src/lib.rs | |||
@@ -19,6 +19,7 @@ macro_rules! eprintln { | |||
19 | 19 | ||
20 | mod global_state; | 20 | mod global_state; |
21 | mod main_loop; | 21 | mod main_loop; |
22 | mod dispatch; | ||
22 | mod handlers; | 23 | mod handlers; |
23 | mod caps; | 24 | mod caps; |
24 | mod cargo_target_spec; | 25 | mod cargo_target_spec; |
diff --git a/crates/rust-analyzer/src/main_loop.rs b/crates/rust-analyzer/src/main_loop.rs index f3c8b5978..ebc232736 100644 --- a/crates/rust-analyzer/src/main_loop.rs +++ b/crates/rust-analyzer/src/main_loop.rs | |||
@@ -6,26 +6,26 @@ use std::{ | |||
6 | }; | 6 | }; |
7 | 7 | ||
8 | use crossbeam_channel::{never, select, Receiver}; | 8 | use crossbeam_channel::{never, select, Receiver}; |
9 | use lsp_server::{Connection, ErrorCode, Notification, Request, RequestId, Response}; | 9 | use lsp_server::{Connection, Notification, Request, RequestId, Response}; |
10 | use lsp_types::{notification::Notification as _, request::Request as _, NumberOrString}; | 10 | use lsp_types::{notification::Notification as _, request::Request as _, NumberOrString}; |
11 | use ra_db::VfsPath; | 11 | use ra_db::VfsPath; |
12 | use ra_ide::{Canceled, FileId}; | 12 | use ra_ide::{Canceled, FileId}; |
13 | use ra_prof::profile; | 13 | use ra_prof::profile; |
14 | use ra_project_model::{PackageRoot, ProjectWorkspace}; | 14 | use ra_project_model::{PackageRoot, ProjectWorkspace}; |
15 | use serde::{de::DeserializeOwned, Serialize}; | ||
16 | 15 | ||
17 | use crate::{ | 16 | use crate::{ |
18 | config::{Config, FilesWatcher, LinkedProject}, | 17 | config::{Config, FilesWatcher, LinkedProject}, |
19 | diagnostics::DiagnosticTask, | 18 | diagnostics::DiagnosticTask, |
19 | dispatch::RequestDispatcher, | ||
20 | from_proto, | 20 | from_proto, |
21 | global_state::{file_id_to_url, GlobalState, GlobalStateSnapshot, Status}, | 21 | global_state::{file_id_to_url, GlobalState, Status}, |
22 | handlers, lsp_ext, | 22 | handlers, lsp_ext, |
23 | lsp_utils::{ | 23 | lsp_utils::{ |
24 | apply_document_changes, is_canceled, notification_cast, notification_is, notification_new, | 24 | apply_document_changes, is_canceled, notification_cast, notification_is, notification_new, |
25 | show_message, | 25 | show_message, |
26 | }, | 26 | }, |
27 | request_metrics::RequestMetrics, | 27 | request_metrics::RequestMetrics, |
28 | LspError, Result, | 28 | Result, |
29 | }; | 29 | }; |
30 | 30 | ||
31 | pub fn main_loop(config: Config, connection: Connection) -> Result<()> { | 31 | pub fn main_loop(config: Config, connection: Connection) -> Result<()> { |
@@ -241,7 +241,7 @@ impl GlobalState { | |||
241 | 241 | ||
242 | fn on_request(&mut self, request_received: Instant, req: Request) -> Result<()> { | 242 | fn on_request(&mut self, request_received: Instant, req: Request) -> Result<()> { |
243 | let mut pool_dispatcher = | 243 | let mut pool_dispatcher = |
244 | PoolDispatcher { req: Some(req), global_state: self, request_received }; | 244 | RequestDispatcher { req: Some(req), global_state: self, request_received }; |
245 | pool_dispatcher | 245 | pool_dispatcher |
246 | .on_sync::<lsp_ext::CollectGarbage>(|s, ()| Ok(s.collect_garbage()))? | 246 | .on_sync::<lsp_ext::CollectGarbage>(|s, ()| Ok(s.collect_garbage()))? |
247 | .on_sync::<lsp_ext::JoinLines>(|s, p| handlers::handle_join_lines(s.snapshot(), p))? | 247 | .on_sync::<lsp_ext::JoinLines>(|s, p| handlers::handle_join_lines(s.snapshot(), p))? |
@@ -426,7 +426,8 @@ impl GlobalState { | |||
426 | log::error!("unhandled notification: {:?}", not); | 426 | log::error!("unhandled notification: {:?}", not); |
427 | Ok(()) | 427 | Ok(()) |
428 | } | 428 | } |
429 | fn on_task(&mut self, task: Task) { | 429 | // TODO |
430 | pub(crate) fn on_task(&mut self, task: Task) { | ||
430 | match task { | 431 | match task { |
431 | Task::Respond(response) => { | 432 | Task::Respond(response) => { |
432 | if let Some((method, start)) = self.req_queue.incoming.complete(response.id.clone()) | 433 | if let Some((method, start)) = self.req_queue.incoming.complete(response.id.clone()) |
@@ -480,6 +481,7 @@ impl GlobalState { | |||
480 | } | 481 | } |
481 | } | 482 | } |
482 | 483 | ||
484 | // TODO | ||
483 | #[derive(Debug)] | 485 | #[derive(Debug)] |
484 | pub(crate) enum Task { | 486 | pub(crate) enum Task { |
485 | Respond(Response), | 487 | Respond(Response), |
@@ -645,126 +647,3 @@ fn report_progress( | |||
645 | }); | 647 | }); |
646 | global_state.send(notification.into()); | 648 | global_state.send(notification.into()); |
647 | } | 649 | } |
648 | |||
649 | struct PoolDispatcher<'a> { | ||
650 | req: Option<Request>, | ||
651 | global_state: &'a mut GlobalState, | ||
652 | request_received: Instant, | ||
653 | } | ||
654 | |||
655 | impl<'a> PoolDispatcher<'a> { | ||
656 | /// Dispatches the request onto the current thread | ||
657 | fn on_sync<R>( | ||
658 | &mut self, | ||
659 | f: fn(&mut GlobalState, R::Params) -> Result<R::Result>, | ||
660 | ) -> Result<&mut Self> | ||
661 | where | ||
662 | R: lsp_types::request::Request + 'static, | ||
663 | R::Params: DeserializeOwned + panic::UnwindSafe + 'static, | ||
664 | R::Result: Serialize + 'static, | ||
665 | { | ||
666 | let (id, params) = match self.parse::<R>() { | ||
667 | Some(it) => it, | ||
668 | None => { | ||
669 | return Ok(self); | ||
670 | } | ||
671 | }; | ||
672 | let world = panic::AssertUnwindSafe(&mut *self.global_state); | ||
673 | let task = panic::catch_unwind(move || { | ||
674 | let result = f(world.0, params); | ||
675 | result_to_task::<R>(id, result) | ||
676 | }) | ||
677 | .map_err(|_| format!("sync task {:?} panicked", R::METHOD))?; | ||
678 | self.global_state.on_task(task); | ||
679 | Ok(self) | ||
680 | } | ||
681 | |||
682 | /// Dispatches the request onto thread pool | ||
683 | fn on<R>( | ||
684 | &mut self, | ||
685 | f: fn(GlobalStateSnapshot, R::Params) -> Result<R::Result>, | ||
686 | ) -> Result<&mut Self> | ||
687 | where | ||
688 | R: lsp_types::request::Request + 'static, | ||
689 | R::Params: DeserializeOwned + Send + 'static, | ||
690 | R::Result: Serialize + 'static, | ||
691 | { | ||
692 | let (id, params) = match self.parse::<R>() { | ||
693 | Some(it) => it, | ||
694 | None => { | ||
695 | return Ok(self); | ||
696 | } | ||
697 | }; | ||
698 | |||
699 | self.global_state.task_pool.0.spawn({ | ||
700 | let world = self.global_state.snapshot(); | ||
701 | move || { | ||
702 | let result = f(world, params); | ||
703 | result_to_task::<R>(id, result) | ||
704 | } | ||
705 | }); | ||
706 | |||
707 | Ok(self) | ||
708 | } | ||
709 | |||
710 | fn parse<R>(&mut self) -> Option<(RequestId, R::Params)> | ||
711 | where | ||
712 | R: lsp_types::request::Request + 'static, | ||
713 | R::Params: DeserializeOwned + 'static, | ||
714 | { | ||
715 | let req = self.req.take()?; | ||
716 | let (id, params) = match req.extract::<R::Params>(R::METHOD) { | ||
717 | Ok(it) => it, | ||
718 | Err(req) => { | ||
719 | self.req = Some(req); | ||
720 | return None; | ||
721 | } | ||
722 | }; | ||
723 | self.global_state | ||
724 | .req_queue | ||
725 | .incoming | ||
726 | .register(id.clone(), (R::METHOD, self.request_received)); | ||
727 | Some((id, params)) | ||
728 | } | ||
729 | |||
730 | fn finish(&mut self) { | ||
731 | match self.req.take() { | ||
732 | None => (), | ||
733 | Some(req) => { | ||
734 | log::error!("unknown request: {:?}", req); | ||
735 | let resp = Response::new_err( | ||
736 | req.id, | ||
737 | ErrorCode::MethodNotFound as i32, | ||
738 | "unknown request".to_string(), | ||
739 | ); | ||
740 | self.global_state.send(resp.into()); | ||
741 | } | ||
742 | } | ||
743 | } | ||
744 | } | ||
745 | |||
746 | fn result_to_task<R>(id: RequestId, result: Result<R::Result>) -> Task | ||
747 | where | ||
748 | R: lsp_types::request::Request + 'static, | ||
749 | R::Params: DeserializeOwned + 'static, | ||
750 | R::Result: Serialize + 'static, | ||
751 | { | ||
752 | let response = match result { | ||
753 | Ok(resp) => Response::new_ok(id, &resp), | ||
754 | Err(e) => match e.downcast::<LspError>() { | ||
755 | Ok(lsp_error) => Response::new_err(id, lsp_error.code, lsp_error.message), | ||
756 | Err(e) => { | ||
757 | if is_canceled(&*e) { | ||
758 | Response::new_err( | ||
759 | id, | ||
760 | ErrorCode::ContentModified as i32, | ||
761 | "content modified".to_string(), | ||
762 | ) | ||
763 | } else { | ||
764 | Response::new_err(id, ErrorCode::InternalError as i32, e.to_string()) | ||
765 | } | ||
766 | } | ||
767 | }, | ||
768 | }; | ||
769 | Task::Respond(response) | ||
770 | } | ||