aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleksey Kladov <[email protected]>2020-06-25 16:22:18 +0100
committerAleksey Kladov <[email protected]>2020-06-25 16:22:18 +0100
commit22098127c4f4b7414f0695c7788f07d0a1c43892 (patch)
tree9e3bba5041c7afa181625ac1ddc5cb4b77cd519f
parent379a096de9ad06c23347b76a54d9cc22aee80f6a (diff)
Move request dispatcher to a separate file
-rw-r--r--crates/rust-analyzer/src/dispatch.rs137
-rw-r--r--crates/rust-analyzer/src/lib.rs1
-rw-r--r--crates/rust-analyzer/src/main_loop.rs137
3 files changed, 146 insertions, 129 deletions
diff --git a/crates/rust-analyzer/src/dispatch.rs b/crates/rust-analyzer/src/dispatch.rs
new file mode 100644
index 000000000..0a9b0428d
--- /dev/null
+++ b/crates/rust-analyzer/src/dispatch.rs
@@ -0,0 +1,137 @@
1use std::{panic, time::Instant};
2
3use serde::{de::DeserializeOwned, Serialize};
4
5use crate::{
6 global_state::{GlobalState, GlobalStateSnapshot},
7 lsp_utils::is_canceled,
8 main_loop::Task,
9 LspError, Result,
10};
11
12pub(crate) struct RequestDispatcher<'a> {
13 pub(crate) req: Option<lsp_server::Request>,
14 pub(crate) global_state: &'a mut GlobalState,
15 pub(crate) request_received: Instant,
16}
17
18impl<'a> RequestDispatcher<'a> {
19 /// Dispatches the request onto the current thread
20 pub(crate) fn on_sync<R>(
21 &mut self,
22 f: fn(&mut GlobalState, R::Params) -> Result<R::Result>,
23 ) -> Result<&mut Self>
24 where
25 R: lsp_types::request::Request + 'static,
26 R::Params: DeserializeOwned + panic::UnwindSafe + 'static,
27 R::Result: Serialize + 'static,
28 {
29 let (id, params) = match self.parse::<R>() {
30 Some(it) => it,
31 None => {
32 return Ok(self);
33 }
34 };
35 let world = panic::AssertUnwindSafe(&mut *self.global_state);
36 let task = panic::catch_unwind(move || {
37 let result = f(world.0, params);
38 result_to_task::<R>(id, result)
39 })
40 .map_err(|_| format!("sync task {:?} panicked", R::METHOD))?;
41 self.global_state.on_task(task);
42 Ok(self)
43 }
44
45 /// Dispatches the request onto thread pool
46 pub(crate) fn on<R>(
47 &mut self,
48 f: fn(GlobalStateSnapshot, R::Params) -> Result<R::Result>,
49 ) -> Result<&mut Self>
50 where
51 R: lsp_types::request::Request + 'static,
52 R::Params: DeserializeOwned + Send + 'static,
53 R::Result: Serialize + 'static,
54 {
55 let (id, params) = match self.parse::<R>() {
56 Some(it) => it,
57 None => {
58 return Ok(self);
59 }
60 };
61
62 self.global_state.task_pool.0.spawn({
63 let world = self.global_state.snapshot();
64 move || {
65 let result = f(world, params);
66 result_to_task::<R>(id, result)
67 }
68 });
69
70 Ok(self)
71 }
72
73 pub(crate) fn finish(&mut self) {
74 match self.req.take() {
75 None => (),
76 Some(req) => {
77 log::error!("unknown request: {:?}", req);
78 let resp = lsp_server::Response::new_err(
79 req.id,
80 lsp_server::ErrorCode::MethodNotFound as i32,
81 "unknown request".to_string(),
82 );
83 self.global_state.send(resp.into());
84 }
85 }
86 }
87
88 fn parse<R>(&mut self) -> Option<(lsp_server::RequestId, R::Params)>
89 where
90 R: lsp_types::request::Request + 'static,
91 R::Params: DeserializeOwned + 'static,
92 {
93 let req = self.req.take()?;
94 let (id, params) = match req.extract::<R::Params>(R::METHOD) {
95 Ok(it) => it,
96 Err(req) => {
97 self.req = Some(req);
98 return None;
99 }
100 };
101 self.global_state
102 .req_queue
103 .incoming
104 .register(id.clone(), (R::METHOD, self.request_received));
105 Some((id, params))
106 }
107}
108
109fn result_to_task<R>(id: lsp_server::RequestId, result: Result<R::Result>) -> Task
110where
111 R: lsp_types::request::Request + 'static,
112 R::Params: DeserializeOwned + 'static,
113 R::Result: Serialize + 'static,
114{
115 let response = match result {
116 Ok(resp) => lsp_server::Response::new_ok(id, &resp),
117 Err(e) => match e.downcast::<LspError>() {
118 Ok(lsp_error) => lsp_server::Response::new_err(id, lsp_error.code, lsp_error.message),
119 Err(e) => {
120 if is_canceled(&*e) {
121 lsp_server::Response::new_err(
122 id,
123 lsp_server::ErrorCode::ContentModified as i32,
124 "content modified".to_string(),
125 )
126 } else {
127 lsp_server::Response::new_err(
128 id,
129 lsp_server::ErrorCode::InternalError as i32,
130 e.to_string(),
131 )
132 }
133 }
134 },
135 };
136 Task::Respond(response)
137}
diff --git a/crates/rust-analyzer/src/lib.rs b/crates/rust-analyzer/src/lib.rs
index ca788dd3c..d503fe96e 100644
--- a/crates/rust-analyzer/src/lib.rs
+++ b/crates/rust-analyzer/src/lib.rs
@@ -19,6 +19,7 @@ macro_rules! eprintln {
19 19
20mod global_state; 20mod global_state;
21mod main_loop; 21mod main_loop;
22mod dispatch;
22mod handlers; 23mod handlers;
23mod caps; 24mod caps;
24mod cargo_target_spec; 25mod cargo_target_spec;
diff --git a/crates/rust-analyzer/src/main_loop.rs b/crates/rust-analyzer/src/main_loop.rs
index f3c8b5978..ebc232736 100644
--- a/crates/rust-analyzer/src/main_loop.rs
+++ b/crates/rust-analyzer/src/main_loop.rs
@@ -6,26 +6,26 @@ use std::{
6}; 6};
7 7
8use crossbeam_channel::{never, select, Receiver}; 8use crossbeam_channel::{never, select, Receiver};
9use lsp_server::{Connection, ErrorCode, Notification, Request, RequestId, Response}; 9use lsp_server::{Connection, Notification, Request, RequestId, Response};
10use lsp_types::{notification::Notification as _, request::Request as _, NumberOrString}; 10use lsp_types::{notification::Notification as _, request::Request as _, NumberOrString};
11use ra_db::VfsPath; 11use ra_db::VfsPath;
12use ra_ide::{Canceled, FileId}; 12use ra_ide::{Canceled, FileId};
13use ra_prof::profile; 13use ra_prof::profile;
14use ra_project_model::{PackageRoot, ProjectWorkspace}; 14use ra_project_model::{PackageRoot, ProjectWorkspace};
15use serde::{de::DeserializeOwned, Serialize};
16 15
17use crate::{ 16use crate::{
18 config::{Config, FilesWatcher, LinkedProject}, 17 config::{Config, FilesWatcher, LinkedProject},
19 diagnostics::DiagnosticTask, 18 diagnostics::DiagnosticTask,
19 dispatch::RequestDispatcher,
20 from_proto, 20 from_proto,
21 global_state::{file_id_to_url, GlobalState, GlobalStateSnapshot, Status}, 21 global_state::{file_id_to_url, GlobalState, Status},
22 handlers, lsp_ext, 22 handlers, lsp_ext,
23 lsp_utils::{ 23 lsp_utils::{
24 apply_document_changes, is_canceled, notification_cast, notification_is, notification_new, 24 apply_document_changes, is_canceled, notification_cast, notification_is, notification_new,
25 show_message, 25 show_message,
26 }, 26 },
27 request_metrics::RequestMetrics, 27 request_metrics::RequestMetrics,
28 LspError, Result, 28 Result,
29}; 29};
30 30
31pub fn main_loop(config: Config, connection: Connection) -> Result<()> { 31pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
@@ -241,7 +241,7 @@ impl GlobalState {
241 241
242 fn on_request(&mut self, request_received: Instant, req: Request) -> Result<()> { 242 fn on_request(&mut self, request_received: Instant, req: Request) -> Result<()> {
243 let mut pool_dispatcher = 243 let mut pool_dispatcher =
244 PoolDispatcher { req: Some(req), global_state: self, request_received }; 244 RequestDispatcher { req: Some(req), global_state: self, request_received };
245 pool_dispatcher 245 pool_dispatcher
246 .on_sync::<lsp_ext::CollectGarbage>(|s, ()| Ok(s.collect_garbage()))? 246 .on_sync::<lsp_ext::CollectGarbage>(|s, ()| Ok(s.collect_garbage()))?
247 .on_sync::<lsp_ext::JoinLines>(|s, p| handlers::handle_join_lines(s.snapshot(), p))? 247 .on_sync::<lsp_ext::JoinLines>(|s, p| handlers::handle_join_lines(s.snapshot(), p))?
@@ -426,7 +426,8 @@ impl GlobalState {
426 log::error!("unhandled notification: {:?}", not); 426 log::error!("unhandled notification: {:?}", not);
427 Ok(()) 427 Ok(())
428 } 428 }
429 fn on_task(&mut self, task: Task) { 429 // TODO
430 pub(crate) fn on_task(&mut self, task: Task) {
430 match task { 431 match task {
431 Task::Respond(response) => { 432 Task::Respond(response) => {
432 if let Some((method, start)) = self.req_queue.incoming.complete(response.id.clone()) 433 if let Some((method, start)) = self.req_queue.incoming.complete(response.id.clone())
@@ -480,6 +481,7 @@ impl GlobalState {
480 } 481 }
481} 482}
482 483
484// TODO
483#[derive(Debug)] 485#[derive(Debug)]
484pub(crate) enum Task { 486pub(crate) enum Task {
485 Respond(Response), 487 Respond(Response),
@@ -645,126 +647,3 @@ fn report_progress(
645 }); 647 });
646 global_state.send(notification.into()); 648 global_state.send(notification.into());
647} 649}
648
649struct PoolDispatcher<'a> {
650 req: Option<Request>,
651 global_state: &'a mut GlobalState,
652 request_received: Instant,
653}
654
655impl<'a> PoolDispatcher<'a> {
656 /// Dispatches the request onto the current thread
657 fn on_sync<R>(
658 &mut self,
659 f: fn(&mut GlobalState, R::Params) -> Result<R::Result>,
660 ) -> Result<&mut Self>
661 where
662 R: lsp_types::request::Request + 'static,
663 R::Params: DeserializeOwned + panic::UnwindSafe + 'static,
664 R::Result: Serialize + 'static,
665 {
666 let (id, params) = match self.parse::<R>() {
667 Some(it) => it,
668 None => {
669 return Ok(self);
670 }
671 };
672 let world = panic::AssertUnwindSafe(&mut *self.global_state);
673 let task = panic::catch_unwind(move || {
674 let result = f(world.0, params);
675 result_to_task::<R>(id, result)
676 })
677 .map_err(|_| format!("sync task {:?} panicked", R::METHOD))?;
678 self.global_state.on_task(task);
679 Ok(self)
680 }
681
682 /// Dispatches the request onto thread pool
683 fn on<R>(
684 &mut self,
685 f: fn(GlobalStateSnapshot, R::Params) -> Result<R::Result>,
686 ) -> Result<&mut Self>
687 where
688 R: lsp_types::request::Request + 'static,
689 R::Params: DeserializeOwned + Send + 'static,
690 R::Result: Serialize + 'static,
691 {
692 let (id, params) = match self.parse::<R>() {
693 Some(it) => it,
694 None => {
695 return Ok(self);
696 }
697 };
698
699 self.global_state.task_pool.0.spawn({
700 let world = self.global_state.snapshot();
701 move || {
702 let result = f(world, params);
703 result_to_task::<R>(id, result)
704 }
705 });
706
707 Ok(self)
708 }
709
710 fn parse<R>(&mut self) -> Option<(RequestId, R::Params)>
711 where
712 R: lsp_types::request::Request + 'static,
713 R::Params: DeserializeOwned + 'static,
714 {
715 let req = self.req.take()?;
716 let (id, params) = match req.extract::<R::Params>(R::METHOD) {
717 Ok(it) => it,
718 Err(req) => {
719 self.req = Some(req);
720 return None;
721 }
722 };
723 self.global_state
724 .req_queue
725 .incoming
726 .register(id.clone(), (R::METHOD, self.request_received));
727 Some((id, params))
728 }
729
730 fn finish(&mut self) {
731 match self.req.take() {
732 None => (),
733 Some(req) => {
734 log::error!("unknown request: {:?}", req);
735 let resp = Response::new_err(
736 req.id,
737 ErrorCode::MethodNotFound as i32,
738 "unknown request".to_string(),
739 );
740 self.global_state.send(resp.into());
741 }
742 }
743 }
744}
745
746fn result_to_task<R>(id: RequestId, result: Result<R::Result>) -> Task
747where
748 R: lsp_types::request::Request + 'static,
749 R::Params: DeserializeOwned + 'static,
750 R::Result: Serialize + 'static,
751{
752 let response = match result {
753 Ok(resp) => Response::new_ok(id, &resp),
754 Err(e) => match e.downcast::<LspError>() {
755 Ok(lsp_error) => Response::new_err(id, lsp_error.code, lsp_error.message),
756 Err(e) => {
757 if is_canceled(&*e) {
758 Response::new_err(
759 id,
760 ErrorCode::ContentModified as i32,
761 "content modified".to_string(),
762 )
763 } else {
764 Response::new_err(id, ErrorCode::InternalError as i32, e.to_string())
765 }
766 }
767 },
768 };
769 Task::Respond(response)
770}