aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbors[bot] <26634292+bors[bot]@users.noreply.github.com>2020-06-25 17:29:23 +0100
committerGitHub <[email protected]>2020-06-25 17:29:23 +0100
commitb5aa92bab94880a289372c66c99117ae5e6e9e1f (patch)
tree70dc29d375136453569d35467a4ed31041727e30
parentba7286345256eb1140853dc0daa3c276e2ddcbbe (diff)
parentf5ea35a2710c78e77c81f491cc6f8abd40e33981 (diff)
Merge #5064
5064: Add NotificationDispatcher r=matklad a=matklad bors r+ 🤖 Co-authored-by: Aleksey Kladov <[email protected]>
-rw-r--r--crates/flycheck/src/lib.rs17
-rw-r--r--crates/rust-analyzer/src/dispatch.rs176
-rw-r--r--crates/rust-analyzer/src/global_state.rs19
-rw-r--r--crates/rust-analyzer/src/lib.rs1
-rw-r--r--crates/rust-analyzer/src/lsp_utils.rs13
-rw-r--r--crates/rust-analyzer/src/main_loop.rs872
-rw-r--r--crates/vfs-notify/src/lib.rs13
7 files changed, 560 insertions, 551 deletions
diff --git a/crates/flycheck/src/lib.rs b/crates/flycheck/src/lib.rs
index 9e8205ae7..4dcab7a61 100644
--- a/crates/flycheck/src/lib.rs
+++ b/crates/flycheck/src/lib.rs
@@ -120,7 +120,13 @@ impl FlycheckActor {
120 ) -> FlycheckActor { 120 ) -> FlycheckActor {
121 FlycheckActor { sender, config, workspace_root, last_update_req: None, check_process: None } 121 FlycheckActor { sender, config, workspace_root, last_update_req: None, check_process: None }
122 } 122 }
123 123 fn next_event(&self, inbox: &Receiver<Restart>) -> Option<Event> {
124 let check_chan = self.check_process.as_ref().map(|(chan, _thread)| chan);
125 select! {
126 recv(inbox) -> msg => msg.ok().map(Event::Restart),
127 recv(check_chan.unwrap_or(&never())) -> msg => Some(Event::CheckEvent(msg.ok())),
128 }
129 }
124 fn run(&mut self, inbox: Receiver<Restart>) { 130 fn run(&mut self, inbox: Receiver<Restart>) {
125 // If we rerun the thread, we need to discard the previous check results first 131 // If we rerun the thread, we need to discard the previous check results first
126 self.send(Message::ClearDiagnostics); 132 self.send(Message::ClearDiagnostics);
@@ -167,15 +173,6 @@ impl FlycheckActor {
167 } 173 }
168 } 174 }
169 } 175 }
170
171 fn next_event(&self, inbox: &Receiver<Restart>) -> Option<Event> {
172 let check_chan = self.check_process.as_ref().map(|(chan, _thread)| chan);
173 select! {
174 recv(inbox) -> msg => msg.ok().map(Event::Restart),
175 recv(check_chan.unwrap_or(&never())) -> msg => Some(Event::CheckEvent(msg.ok())),
176 }
177 }
178
179 fn should_recheck(&mut self) -> bool { 176 fn should_recheck(&mut self) -> bool {
180 if let Some(_last_update_req) = &self.last_update_req { 177 if let Some(_last_update_req) = &self.last_update_req {
181 // We currently only request an update on save, as we need up to 178 // We currently only request an update on save, as we need up to
diff --git a/crates/rust-analyzer/src/dispatch.rs b/crates/rust-analyzer/src/dispatch.rs
new file mode 100644
index 000000000..5fdbed8ef
--- /dev/null
+++ b/crates/rust-analyzer/src/dispatch.rs
@@ -0,0 +1,176 @@
1//! A visitor for downcasting arbitrary request (JSON) into a specific type.
2use std::{panic, time::Instant};
3
4use serde::{de::DeserializeOwned, Serialize};
5
6use crate::{
7 global_state::{GlobalState, GlobalStateSnapshot},
8 lsp_utils::is_canceled,
9 main_loop::Task,
10 LspError, Result,
11};
12
13pub(crate) struct RequestDispatcher<'a> {
14 pub(crate) req: Option<lsp_server::Request>,
15 pub(crate) global_state: &'a mut GlobalState,
16 pub(crate) request_received: Instant,
17}
18
19impl<'a> RequestDispatcher<'a> {
20 /// Dispatches the request onto the current thread
21 pub(crate) fn on_sync<R>(
22 &mut self,
23 f: fn(&mut GlobalState, R::Params) -> Result<R::Result>,
24 ) -> Result<&mut Self>
25 where
26 R: lsp_types::request::Request + 'static,
27 R::Params: DeserializeOwned + panic::UnwindSafe + 'static,
28 R::Result: Serialize + 'static,
29 {
30 let (id, params) = match self.parse::<R>() {
31 Some(it) => it,
32 None => {
33 return Ok(self);
34 }
35 };
36 let world = panic::AssertUnwindSafe(&mut *self.global_state);
37 let task = panic::catch_unwind(move || {
38 let result = f(world.0, params);
39 result_to_task::<R>(id, result)
40 })
41 .map_err(|_| format!("sync task {:?} panicked", R::METHOD))?;
42 self.global_state.on_task(task);
43 Ok(self)
44 }
45
46 /// Dispatches the request onto thread pool
47 pub(crate) fn on<R>(
48 &mut self,
49 f: fn(GlobalStateSnapshot, R::Params) -> Result<R::Result>,
50 ) -> Result<&mut Self>
51 where
52 R: lsp_types::request::Request + 'static,
53 R::Params: DeserializeOwned + Send + 'static,
54 R::Result: Serialize + 'static,
55 {
56 let (id, params) = match self.parse::<R>() {
57 Some(it) => it,
58 None => {
59 return Ok(self);
60 }
61 };
62
63 self.global_state.task_pool.0.spawn({
64 let world = self.global_state.snapshot();
65 move || {
66 let result = f(world, params);
67 result_to_task::<R>(id, result)
68 }
69 });
70
71 Ok(self)
72 }
73
74 pub(crate) fn finish(&mut self) {
75 match self.req.take() {
76 None => (),
77 Some(req) => {
78 log::error!("unknown request: {:?}", req);
79 let resp = lsp_server::Response::new_err(
80 req.id,
81 lsp_server::ErrorCode::MethodNotFound as i32,
82 "unknown request".to_string(),
83 );
84 self.global_state.send(resp.into());
85 }
86 }
87 }
88
89 fn parse<R>(&mut self) -> Option<(lsp_server::RequestId, R::Params)>
90 where
91 R: lsp_types::request::Request + 'static,
92 R::Params: DeserializeOwned + 'static,
93 {
94 let req = self.req.take()?;
95 let (id, params) = match req.extract::<R::Params>(R::METHOD) {
96 Ok(it) => it,
97 Err(req) => {
98 self.req = Some(req);
99 return None;
100 }
101 };
102 self.global_state
103 .req_queue
104 .incoming
105 .register(id.clone(), (R::METHOD, self.request_received));
106 Some((id, params))
107 }
108}
109
110fn result_to_task<R>(id: lsp_server::RequestId, result: Result<R::Result>) -> Task
111where
112 R: lsp_types::request::Request + 'static,
113 R::Params: DeserializeOwned + 'static,
114 R::Result: Serialize + 'static,
115{
116 let response = match result {
117 Ok(resp) => lsp_server::Response::new_ok(id, &resp),
118 Err(e) => match e.downcast::<LspError>() {
119 Ok(lsp_error) => lsp_server::Response::new_err(id, lsp_error.code, lsp_error.message),
120 Err(e) => {
121 if is_canceled(&*e) {
122 lsp_server::Response::new_err(
123 id,
124 lsp_server::ErrorCode::ContentModified as i32,
125 "content modified".to_string(),
126 )
127 } else {
128 lsp_server::Response::new_err(
129 id,
130 lsp_server::ErrorCode::InternalError as i32,
131 e.to_string(),
132 )
133 }
134 }
135 },
136 };
137 Task::Respond(response)
138}
139
140pub(crate) struct NotificationDispatcher<'a> {
141 pub(crate) not: Option<lsp_server::Notification>,
142 pub(crate) global_state: &'a mut GlobalState,
143}
144
145impl<'a> NotificationDispatcher<'a> {
146 pub(crate) fn on<N>(
147 &mut self,
148 f: fn(&mut GlobalState, N::Params) -> Result<()>,
149 ) -> Result<&mut Self>
150 where
151 N: lsp_types::notification::Notification + 'static,
152 N::Params: DeserializeOwned + Send + 'static,
153 {
154 let not = match self.not.take() {
155 Some(it) => it,
156 None => return Ok(self),
157 };
158 let params = match not.extract::<N::Params>(N::METHOD) {
159 Ok(it) => it,
160 Err(not) => {
161 self.not = Some(not);
162 return Ok(self);
163 }
164 };
165 f(self.global_state, params)?;
166 Ok(self)
167 }
168
169 pub(crate) fn finish(&mut self) {
170 if let Some(not) = &self.not {
171 if !not.method.starts_with("$/") {
172 log::error!("unhandled notification: {:?}", not);
173 }
174 }
175 }
176}
diff --git a/crates/rust-analyzer/src/global_state.rs b/crates/rust-analyzer/src/global_state.rs
index de6b95686..56d50c789 100644
--- a/crates/rust-analyzer/src/global_state.rs
+++ b/crates/rust-analyzer/src/global_state.rs
@@ -5,7 +5,7 @@
5 5
6use std::{convert::TryFrom, sync::Arc}; 6use std::{convert::TryFrom, sync::Arc};
7 7
8use crossbeam_channel::{unbounded, Receiver}; 8use crossbeam_channel::{unbounded, Receiver, Sender};
9use flycheck::{FlycheckConfig, FlycheckHandle}; 9use flycheck::{FlycheckConfig, FlycheckHandle};
10use lsp_types::Url; 10use lsp_types::Url;
11use parking_lot::RwLock; 11use parking_lot::RwLock;
@@ -22,6 +22,7 @@ use crate::{
22 line_endings::LineEndings, 22 line_endings::LineEndings,
23 main_loop::{ReqQueue, Task}, 23 main_loop::{ReqQueue, Task},
24 request_metrics::{LatestRequests, RequestMetrics}, 24 request_metrics::{LatestRequests, RequestMetrics},
25 show_message,
25 thread_pool::TaskPool, 26 thread_pool::TaskPool,
26 to_proto::url_from_abs_path, 27 to_proto::url_from_abs_path,
27 Result, 28 Result,
@@ -66,6 +67,7 @@ impl Default for Status {
66/// snapshot of the file systems, and `analysis_host`, which stores our 67/// snapshot of the file systems, and `analysis_host`, which stores our
67/// incremental salsa database. 68/// incremental salsa database.
68pub(crate) struct GlobalState { 69pub(crate) struct GlobalState {
70 sender: Sender<lsp_server::Message>,
69 pub(crate) config: Config, 71 pub(crate) config: Config,
70 pub(crate) task_pool: (TaskPool<Task>, Receiver<Task>), 72 pub(crate) task_pool: (TaskPool<Task>, Receiver<Task>),
71 pub(crate) analysis_host: AnalysisHost, 73 pub(crate) analysis_host: AnalysisHost,
@@ -95,6 +97,7 @@ pub(crate) struct GlobalStateSnapshot {
95 97
96impl GlobalState { 98impl GlobalState {
97 pub(crate) fn new( 99 pub(crate) fn new(
100 sender: Sender<lsp_server::Message>,
98 workspaces: Vec<ProjectWorkspace>, 101 workspaces: Vec<ProjectWorkspace>,
99 lru_capacity: Option<usize>, 102 lru_capacity: Option<usize>,
100 config: Config, 103 config: Config,
@@ -162,6 +165,7 @@ impl GlobalState {
162 }; 165 };
163 166
164 let mut res = GlobalState { 167 let mut res = GlobalState {
168 sender,
165 config, 169 config,
166 task_pool, 170 task_pool,
167 analysis_host, 171 analysis_host,
@@ -252,6 +256,19 @@ impl GlobalState {
252 pub(crate) fn complete_request(&mut self, request: RequestMetrics) { 256 pub(crate) fn complete_request(&mut self, request: RequestMetrics) {
253 self.latest_requests.write().record(request) 257 self.latest_requests.write().record(request)
254 } 258 }
259
260 pub(crate) fn send(&mut self, message: lsp_server::Message) {
261 self.sender.send(message).unwrap()
262 }
263 pub(crate) fn show_message(&mut self, typ: lsp_types::MessageType, message: String) {
264 show_message(typ, message, &self.sender)
265 }
266}
267
268impl Drop for GlobalState {
269 fn drop(&mut self) {
270 self.analysis_host.request_cancellation()
271 }
255} 272}
256 273
257impl GlobalStateSnapshot { 274impl GlobalStateSnapshot {
diff --git a/crates/rust-analyzer/src/lib.rs b/crates/rust-analyzer/src/lib.rs
index ca788dd3c..d503fe96e 100644
--- a/crates/rust-analyzer/src/lib.rs
+++ b/crates/rust-analyzer/src/lib.rs
@@ -19,6 +19,7 @@ macro_rules! eprintln {
19 19
20mod global_state; 20mod global_state;
21mod main_loop; 21mod main_loop;
22mod dispatch;
22mod handlers; 23mod handlers;
23mod caps; 24mod caps;
24mod cargo_target_spec; 25mod cargo_target_spec;
diff --git a/crates/rust-analyzer/src/lsp_utils.rs b/crates/rust-analyzer/src/lsp_utils.rs
index 14adb8ae7..35917030c 100644
--- a/crates/rust-analyzer/src/lsp_utils.rs
+++ b/crates/rust-analyzer/src/lsp_utils.rs
@@ -1,12 +1,13 @@
1//! Utilities for LSP-related boilerplate code. 1//! Utilities for LSP-related boilerplate code.
2use std::{error::Error, ops::Range}; 2use std::{error::Error, ops::Range};
3 3
4use crate::from_proto;
5use crossbeam_channel::Sender; 4use crossbeam_channel::Sender;
6use lsp_server::{Message, Notification}; 5use lsp_server::{Message, Notification};
7use ra_db::Canceled; 6use ra_db::Canceled;
8use ra_ide::LineIndex; 7use ra_ide::LineIndex;
9use serde::{de::DeserializeOwned, Serialize}; 8use serde::Serialize;
9
10use crate::from_proto;
10 11
11pub fn show_message( 12pub fn show_message(
12 typ: lsp_types::MessageType, 13 typ: lsp_types::MessageType,
@@ -29,14 +30,6 @@ pub(crate) fn notification_is<N: lsp_types::notification::Notification>(
29 notification.method == N::METHOD 30 notification.method == N::METHOD
30} 31}
31 32
32pub(crate) fn notification_cast<N>(notification: Notification) -> Result<N::Params, Notification>
33where
34 N: lsp_types::notification::Notification,
35 N::Params: DeserializeOwned,
36{
37 notification.extract(N::METHOD)
38}
39
40pub(crate) fn notification_new<N>(params: N::Params) -> Notification 33pub(crate) fn notification_new<N>(params: N::Params) -> Notification
41where 34where
42 N: lsp_types::notification::Notification, 35 N: lsp_types::notification::Notification,
diff --git a/crates/rust-analyzer/src/main_loop.rs b/crates/rust-analyzer/src/main_loop.rs
index 1a9c5ee2c..c2f43df1d 100644
--- a/crates/rust-analyzer/src/main_loop.rs
+++ b/crates/rust-analyzer/src/main_loop.rs
@@ -5,27 +5,26 @@ use std::{
5 time::{Duration, Instant}, 5 time::{Duration, Instant},
6}; 6};
7 7
8use crossbeam_channel::{never, select, RecvError, Sender}; 8use crossbeam_channel::{never, select, Receiver};
9use lsp_server::{Connection, ErrorCode, Notification, Request, RequestId, Response}; 9use lsp_server::{Connection, Notification, Request, Response};
10use lsp_types::{request::Request as _, NumberOrString}; 10use lsp_types::{notification::Notification as _, request::Request as _};
11use ra_db::VfsPath; 11use ra_db::VfsPath;
12use ra_ide::{Canceled, FileId}; 12use ra_ide::{Canceled, FileId};
13use ra_prof::profile; 13use ra_prof::profile;
14use ra_project_model::{PackageRoot, ProjectWorkspace}; 14use ra_project_model::{PackageRoot, ProjectWorkspace};
15use serde::{de::DeserializeOwned, Serialize};
16 15
17use crate::{ 16use crate::{
18 config::{Config, FilesWatcher, LinkedProject}, 17 config::{Config, FilesWatcher, LinkedProject},
19 diagnostics::DiagnosticTask, 18 diagnostics::DiagnosticTask,
19 dispatch::{NotificationDispatcher, RequestDispatcher},
20 from_proto, 20 from_proto,
21 global_state::{file_id_to_url, GlobalState, GlobalStateSnapshot, Status}, 21 global_state::{file_id_to_url, GlobalState, Status},
22 handlers, lsp_ext, 22 handlers, lsp_ext,
23 lsp_utils::{ 23 lsp_utils::{
24 apply_document_changes, is_canceled, notification_cast, notification_is, notification_new, 24 apply_document_changes, is_canceled, notification_is, notification_new, show_message,
25 show_message,
26 }, 25 },
27 request_metrics::RequestMetrics, 26 request_metrics::RequestMetrics,
28 LspError, Result, 27 Result,
29}; 28};
30 29
31pub fn main_loop(config: Config, connection: Connection) -> Result<()> { 30pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
@@ -50,7 +49,7 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
50 SetThreadPriority(thread, thread_priority_above_normal); 49 SetThreadPriority(thread, thread_priority_above_normal);
51 } 50 }
52 51
53 let mut global_state = { 52 let global_state = {
54 let workspaces = { 53 let workspaces = {
55 if config.linked_projects.is_empty() && config.notifications.cargo_toml_not_found { 54 if config.linked_projects.is_empty() && config.notifications.cargo_toml_not_found {
56 show_message( 55 show_message(
@@ -113,40 +112,346 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
113 connection.sender.send(request.into()).unwrap(); 112 connection.sender.send(request.into()).unwrap();
114 } 113 }
115 114
116 GlobalState::new(workspaces, config.lru_capacity, config, req_queue) 115 GlobalState::new(
116 connection.sender.clone(),
117 workspaces,
118 config.lru_capacity,
119 config,
120 req_queue,
121 )
117 }; 122 };
118 123
119 log::info!("server initialized, serving requests"); 124 log::info!("server initialized, serving requests");
120 { 125 global_state.run(connection.receiver)?;
121 loop { 126 Ok(())
122 log::trace!("selecting"); 127}
123 let event = select! { 128
124 recv(&connection.receiver) -> msg => match msg { 129impl GlobalState {
125 Ok(msg) => Event::Lsp(msg), 130 fn next_event(&self, inbox: &Receiver<lsp_server::Message>) -> Option<Event> {
126 Err(RecvError) => return Err("client exited without shutdown".into()), 131 select! {
127 }, 132 recv(inbox) -> msg =>
128 recv(&global_state.task_pool.1) -> task => Event::Task(task.unwrap()), 133 msg.ok().map(Event::Lsp),
129 recv(global_state.task_receiver) -> task => match task { 134
130 Ok(task) => Event::Vfs(task), 135 recv(self.task_pool.1) -> task =>
131 Err(RecvError) => return Err("vfs died".into()), 136 Some(Event::Task(task.unwrap())),
137
138 recv(self.task_receiver) -> task =>
139 Some(Event::Vfs(task.unwrap())),
140
141 recv(self.flycheck.as_ref().map_or(&never(), |it| &it.1)) -> task =>
142 Some(Event::Flycheck(task.unwrap())),
143 }
144 }
145
146 fn run(mut self, inbox: Receiver<lsp_server::Message>) -> Result<()> {
147 while let Some(event) = self.next_event(&inbox) {
148 let loop_start = Instant::now();
149 // NOTE: don't count blocking select! call as a loop-turn time
150 let _p = profile("main_loop_inner/loop-turn");
151
152 log::info!("loop turn = {:?}", event);
153 let queue_count = self.task_pool.0.len();
154 if queue_count > 0 {
155 log::info!("queued count = {}", queue_count);
156 }
157
158 let mut became_ready = false;
159 match event {
160 Event::Lsp(msg) => match msg {
161 lsp_server::Message::Request(req) => self.on_request(loop_start, req)?,
162 lsp_server::Message::Notification(not) => {
163 if not.method == lsp_types::notification::Exit::METHOD {
164 return Ok(());
165 }
166 self.on_notification(not)?;
167 }
168 lsp_server::Message::Response(resp) => {
169 let handler = self.req_queue.outgoing.complete(resp.id.clone());
170 handler(&mut self, resp)
171 }
132 }, 172 },
133 recv(global_state.flycheck.as_ref().map_or(&never(), |it| &it.1)) -> task => match task { 173 Event::Task(task) => {
134 Ok(task) => Event::Flycheck(task), 174 self.on_task(task);
135 Err(RecvError) => return Err("check watcher died".into()), 175 self.maybe_collect_garbage();
176 }
177 Event::Vfs(task) => match task {
178 vfs::loader::Message::Loaded { files } => {
179 let vfs = &mut self.vfs.write().0;
180 for (path, contents) in files {
181 let path = VfsPath::from(path);
182 if !self.mem_docs.contains(&path) {
183 vfs.set_file_contents(path, contents)
184 }
185 }
186 }
187 vfs::loader::Message::Progress { n_total, n_done } => {
188 let state = if n_done == 0 {
189 Progress::Begin
190 } else if n_done < n_total {
191 Progress::Report
192 } else {
193 assert_eq!(n_done, n_total);
194 self.status = Status::Ready;
195 became_ready = true;
196 Progress::End
197 };
198 report_progress(
199 &mut self,
200 "roots scanned",
201 state,
202 Some(format!("{}/{}", n_done, n_total)),
203 Some(percentage(n_done, n_total)),
204 )
205 }
136 }, 206 },
137 }; 207 Event::Flycheck(task) => on_check_task(task, &mut self)?,
138 if let Event::Lsp(lsp_server::Message::Request(req)) = &event { 208 }
139 if connection.handle_shutdown(&req)? { 209
140 break; 210 let state_changed = self.process_changes();
211 if became_ready {
212 if let Some(flycheck) = &self.flycheck {
213 flycheck.0.update();
214 }
215 }
216
217 if self.status == Status::Ready && (state_changed || became_ready) {
218 let subscriptions = self
219 .mem_docs
220 .iter()
221 .map(|path| self.vfs.read().0.file_id(&path).unwrap())
222 .collect::<Vec<_>>();
223
224 self.update_file_notifications_on_threadpool(subscriptions);
225 }
226
227 let loop_duration = loop_start.elapsed();
228 if loop_duration > Duration::from_millis(100) {
229 log::error!("overly long loop turn: {:?}", loop_duration);
230 if env::var("RA_PROFILE").is_ok() {
231 self.show_message(
232 lsp_types::MessageType::Error,
233 format!("overly long loop turn: {:?}", loop_duration),
234 )
235 }
236 }
237 }
238 Err("client exited without proper shutdown sequence")?
239 }
240
241 fn on_request(&mut self, request_received: Instant, req: Request) -> Result<()> {
242 RequestDispatcher { req: Some(req), global_state: self, request_received }
243 .on_sync::<lsp_ext::CollectGarbage>(|s, ()| Ok(s.collect_garbage()))?
244 .on_sync::<lsp_ext::JoinLines>(|s, p| handlers::handle_join_lines(s.snapshot(), p))?
245 .on_sync::<lsp_ext::OnEnter>(|s, p| handlers::handle_on_enter(s.snapshot(), p))?
246 .on_sync::<lsp_types::request::Shutdown>(|_, ()| Ok(()))?
247 .on_sync::<lsp_types::request::SelectionRangeRequest>(|s, p| {
248 handlers::handle_selection_range(s.snapshot(), p)
249 })?
250 .on_sync::<lsp_ext::MatchingBrace>(|s, p| {
251 handlers::handle_matching_brace(s.snapshot(), p)
252 })?
253 .on::<lsp_ext::AnalyzerStatus>(handlers::handle_analyzer_status)?
254 .on::<lsp_ext::SyntaxTree>(handlers::handle_syntax_tree)?
255 .on::<lsp_ext::ExpandMacro>(handlers::handle_expand_macro)?
256 .on::<lsp_ext::ParentModule>(handlers::handle_parent_module)?
257 .on::<lsp_ext::Runnables>(handlers::handle_runnables)?
258 .on::<lsp_ext::InlayHints>(handlers::handle_inlay_hints)?
259 .on::<lsp_ext::CodeActionRequest>(handlers::handle_code_action)?
260 .on::<lsp_ext::ResolveCodeActionRequest>(handlers::handle_resolve_code_action)?
261 .on::<lsp_ext::HoverRequest>(handlers::handle_hover)?
262 .on::<lsp_types::request::OnTypeFormatting>(handlers::handle_on_type_formatting)?
263 .on::<lsp_types::request::DocumentSymbolRequest>(handlers::handle_document_symbol)?
264 .on::<lsp_types::request::WorkspaceSymbol>(handlers::handle_workspace_symbol)?
265 .on::<lsp_types::request::GotoDefinition>(handlers::handle_goto_definition)?
266 .on::<lsp_types::request::GotoImplementation>(handlers::handle_goto_implementation)?
267 .on::<lsp_types::request::GotoTypeDefinition>(handlers::handle_goto_type_definition)?
268 .on::<lsp_types::request::Completion>(handlers::handle_completion)?
269 .on::<lsp_types::request::CodeLensRequest>(handlers::handle_code_lens)?
270 .on::<lsp_types::request::CodeLensResolve>(handlers::handle_code_lens_resolve)?
271 .on::<lsp_types::request::FoldingRangeRequest>(handlers::handle_folding_range)?
272 .on::<lsp_types::request::SignatureHelpRequest>(handlers::handle_signature_help)?
273 .on::<lsp_types::request::PrepareRenameRequest>(handlers::handle_prepare_rename)?
274 .on::<lsp_types::request::Rename>(handlers::handle_rename)?
275 .on::<lsp_types::request::References>(handlers::handle_references)?
276 .on::<lsp_types::request::Formatting>(handlers::handle_formatting)?
277 .on::<lsp_types::request::DocumentHighlightRequest>(
278 handlers::handle_document_highlight,
279 )?
280 .on::<lsp_types::request::CallHierarchyPrepare>(
281 handlers::handle_call_hierarchy_prepare,
282 )?
283 .on::<lsp_types::request::CallHierarchyIncomingCalls>(
284 handlers::handle_call_hierarchy_incoming,
285 )?
286 .on::<lsp_types::request::CallHierarchyOutgoingCalls>(
287 handlers::handle_call_hierarchy_outgoing,
288 )?
289 .on::<lsp_types::request::SemanticTokensRequest>(handlers::handle_semantic_tokens)?
290 .on::<lsp_types::request::SemanticTokensRangeRequest>(
291 handlers::handle_semantic_tokens_range,
292 )?
293 .on::<lsp_ext::Ssr>(handlers::handle_ssr)?
294 .finish();
295 Ok(())
296 }
297 fn on_notification(&mut self, not: Notification) -> Result<()> {
298 NotificationDispatcher { not: Some(not), global_state: self }
299 .on::<lsp_types::notification::Cancel>(|this, params| {
300 let id: lsp_server::RequestId = match params.id {
301 lsp_types::NumberOrString::Number(id) => id.into(),
302 lsp_types::NumberOrString::String(id) => id.into(),
303 };
304 if let Some(response) = this.req_queue.incoming.cancel(id) {
305 this.send(response.into());
306 }
307 Ok(())
308 })?
309 .on::<lsp_types::notification::DidOpenTextDocument>(|this, params| {
310 if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) {
311 if !this.mem_docs.insert(path.clone()) {
312 log::error!("duplicate DidOpenTextDocument: {}", path)
313 }
314 this.vfs
315 .write()
316 .0
317 .set_file_contents(path, Some(params.text_document.text.into_bytes()));
318 }
319 Ok(())
320 })?
321 .on::<lsp_types::notification::DidChangeTextDocument>(|this, params| {
322 if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) {
323 assert!(this.mem_docs.contains(&path));
324 let vfs = &mut this.vfs.write().0;
325 let file_id = vfs.file_id(&path).unwrap();
326 let mut text = String::from_utf8(vfs.file_contents(file_id).to_vec()).unwrap();
327 apply_document_changes(&mut text, params.content_changes);
328 vfs.set_file_contents(path, Some(text.into_bytes()))
329 }
330 Ok(())
331 })?
332 .on::<lsp_types::notification::DidCloseTextDocument>(|this, params| {
333 if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) {
334 if !this.mem_docs.remove(&path) {
335 log::error!("orphan DidCloseTextDocument: {}", path)
336 }
337 if let Some(path) = path.as_path() {
338 this.loader.invalidate(path.to_path_buf());
339 }
340 }
341 let params = lsp_types::PublishDiagnosticsParams {
342 uri: params.text_document.uri,
343 diagnostics: Vec::new(),
344 version: None,
141 }; 345 };
346 let not = notification_new::<lsp_types::notification::PublishDiagnostics>(params);
347 this.send(not.into());
348 Ok(())
349 })?
350 .on::<lsp_types::notification::DidSaveTextDocument>(|this, _params| {
351 if let Some(flycheck) = &this.flycheck {
352 flycheck.0.update();
353 }
354 Ok(())
355 })?
356 .on::<lsp_types::notification::DidChangeConfiguration>(|this, _params| {
357 // As stated in https://github.com/microsoft/language-server-protocol/issues/676,
358 // this notification's parameters should be ignored and the actual config queried separately.
359 let request = this.req_queue.outgoing.register(
360 lsp_types::request::WorkspaceConfiguration::METHOD.to_string(),
361 lsp_types::ConfigurationParams {
362 items: vec![lsp_types::ConfigurationItem {
363 scope_uri: None,
364 section: Some("rust-analyzer".to_string()),
365 }],
366 },
367 |this, resp| {
368 log::debug!("config update response: '{:?}", resp);
369 let Response { error, result, .. } = resp;
370
371 match (error, result) {
372 (Some(err), _) => {
373 log::error!("failed to fetch the server settings: {:?}", err)
374 }
375 (None, Some(configs)) => {
376 if let Some(new_config) = configs.get(0) {
377 let mut config = this.config.clone();
378 config.update(&new_config);
379 this.update_configuration(config);
380 }
381 }
382 (None, None) => log::error!(
383 "received empty server settings response from the client"
384 ),
385 }
386 },
387 );
388 this.send(request.into());
389
390 return Ok(());
391 })?
392 .on::<lsp_types::notification::DidChangeWatchedFiles>(|this, params| {
393 for change in params.changes {
394 if let Ok(path) = from_proto::abs_path(&change.uri) {
395 this.loader.invalidate(path);
396 }
397 }
398 Ok(())
399 })?
400 .finish();
401 Ok(())
402 }
403 pub(crate) fn on_task(&mut self, task: Task) {
404 match task {
405 Task::Respond(response) => {
406 if let Some((method, start)) = self.req_queue.incoming.complete(response.id.clone())
407 {
408 let duration = start.elapsed();
409 log::info!("handled req#{} in {:?}", response.id, duration);
410 self.complete_request(RequestMetrics {
411 id: response.id.clone(),
412 method: method.to_string(),
413 duration,
414 });
415 self.send(response.into());
416 }
142 } 417 }
143 assert!(!global_state.vfs.read().0.has_changes()); 418 Task::Diagnostics(tasks) => {
144 loop_turn(&connection, &mut global_state, event)?; 419 tasks.into_iter().for_each(|task| on_diagnostic_task(task, self))
145 assert!(!global_state.vfs.read().0.has_changes()); 420 }
421 Task::Unit => (),
146 } 422 }
147 } 423 }
148 global_state.analysis_host.request_cancellation(); 424 fn update_file_notifications_on_threadpool(&mut self, subscriptions: Vec<FileId>) {
149 Ok(()) 425 log::trace!("updating notifications for {:?}", subscriptions);
426 if self.config.publish_diagnostics {
427 let snapshot = self.snapshot();
428 let subscriptions = subscriptions.clone();
429 self.task_pool.0.spawn(move || {
430 let diagnostics = subscriptions
431 .into_iter()
432 .filter_map(|file_id| {
433 handlers::publish_diagnostics(&snapshot, file_id)
434 .map_err(|err| {
435 if !is_canceled(&*err) {
436 log::error!("failed to compute diagnostics: {:?}", err);
437 }
438 ()
439 })
440 .ok()
441 })
442 .collect::<Vec<_>>();
443 Task::Diagnostics(diagnostics)
444 })
445 }
446 self.task_pool.0.spawn({
447 let subs = subscriptions;
448 let snap = self.snapshot();
449 move || {
450 snap.analysis.prime_caches(subs).unwrap_or_else(|_: Canceled| ());
451 Task::Unit
452 }
453 });
454 }
150} 455}
151 456
152#[derive(Debug)] 457#[derive(Debug)]
@@ -199,333 +504,10 @@ pub(crate) type ReqHandler = fn(&mut GlobalState, Response);
199pub(crate) type ReqQueue = lsp_server::ReqQueue<(&'static str, Instant), ReqHandler>; 504pub(crate) type ReqQueue = lsp_server::ReqQueue<(&'static str, Instant), ReqHandler>;
200const DO_NOTHING: ReqHandler = |_, _| (); 505const DO_NOTHING: ReqHandler = |_, _| ();
201 506
202fn loop_turn(connection: &Connection, global_state: &mut GlobalState, event: Event) -> Result<()> { 507fn on_check_task(task: flycheck::Message, global_state: &mut GlobalState) -> Result<()> {
203 let loop_start = Instant::now();
204
205 // NOTE: don't count blocking select! call as a loop-turn time
206 let _p = profile("main_loop_inner/loop-turn");
207 log::info!("loop turn = {:?}", event);
208 let queue_count = global_state.task_pool.0.len();
209 if queue_count > 0 {
210 log::info!("queued count = {}", queue_count);
211 }
212
213 let mut became_ready = false;
214 match event {
215 Event::Task(task) => {
216 on_task(task, &connection.sender, global_state);
217 global_state.maybe_collect_garbage();
218 }
219 Event::Vfs(task) => match task {
220 vfs::loader::Message::Loaded { files } => {
221 let vfs = &mut global_state.vfs.write().0;
222 for (path, contents) in files {
223 let path = VfsPath::from(path);
224 if !global_state.mem_docs.contains(&path) {
225 vfs.set_file_contents(path, contents)
226 }
227 }
228 }
229 vfs::loader::Message::Progress { n_total, n_done } => {
230 let state = if n_done == 0 {
231 Progress::Begin
232 } else if n_done < n_total {
233 Progress::Report
234 } else {
235 assert_eq!(n_done, n_total);
236 global_state.status = Status::Ready;
237 became_ready = true;
238 Progress::End
239 };
240 report_progress(
241 global_state,
242 &connection.sender,
243 "roots scanned",
244 state,
245 Some(format!("{}/{}", n_done, n_total)),
246 Some(percentage(n_done, n_total)),
247 )
248 }
249 },
250 Event::Flycheck(task) => on_check_task(task, global_state, &connection.sender)?,
251 Event::Lsp(msg) => match msg {
252 lsp_server::Message::Request(req) => {
253 on_request(global_state, &connection.sender, loop_start, req)?
254 }
255 lsp_server::Message::Notification(not) => {
256 on_notification(&connection.sender, global_state, not)?;
257 }
258 lsp_server::Message::Response(resp) => {
259 let handler = global_state.req_queue.outgoing.complete(resp.id.clone());
260 handler(global_state, resp)
261 }
262 },
263 };
264
265 let state_changed = global_state.process_changes();
266
267 if became_ready {
268 if let Some(flycheck) = &global_state.flycheck {
269 flycheck.0.update();
270 }
271 }
272
273 if global_state.status == Status::Ready && (state_changed || became_ready) {
274 let subscriptions = global_state
275 .mem_docs
276 .iter()
277 .map(|path| global_state.vfs.read().0.file_id(&path).unwrap())
278 .collect::<Vec<_>>();
279
280 update_file_notifications_on_threadpool(global_state, subscriptions.clone());
281 global_state.task_pool.0.spawn({
282 let subs = subscriptions;
283 let snap = global_state.snapshot();
284 move || {
285 snap.analysis.prime_caches(subs).unwrap_or_else(|_: Canceled| ());
286 Task::Unit
287 }
288 });
289 }
290
291 let loop_duration = loop_start.elapsed();
292 if loop_duration > Duration::from_millis(100) {
293 log::error!("overly long loop turn: {:?}", loop_duration);
294 if env::var("RA_PROFILE").is_ok() {
295 show_message(
296 lsp_types::MessageType::Error,
297 format!("overly long loop turn: {:?}", loop_duration),
298 &connection.sender,
299 );
300 }
301 }
302
303 Ok(())
304}
305
306fn on_task(task: Task, msg_sender: &Sender<lsp_server::Message>, global_state: &mut GlobalState) {
307 match task {
308 Task::Respond(response) => {
309 if let Some((method, start)) =
310 global_state.req_queue.incoming.complete(response.id.clone())
311 {
312 let duration = start.elapsed();
313 log::info!("handled req#{} in {:?}", response.id, duration);
314 global_state.complete_request(RequestMetrics {
315 id: response.id.clone(),
316 method: method.to_string(),
317 duration,
318 });
319 msg_sender.send(response.into()).unwrap();
320 }
321 }
322 Task::Diagnostics(tasks) => {
323 tasks.into_iter().for_each(|task| on_diagnostic_task(task, msg_sender, global_state))
324 }
325 Task::Unit => (),
326 }
327}
328
329fn on_request(
330 global_state: &mut GlobalState,
331 msg_sender: &Sender<lsp_server::Message>,
332 request_received: Instant,
333 req: Request,
334) -> Result<()> {
335 let mut pool_dispatcher =
336 PoolDispatcher { req: Some(req), global_state, msg_sender, request_received };
337 pool_dispatcher
338 .on_sync::<lsp_ext::CollectGarbage>(|s, ()| Ok(s.collect_garbage()))?
339 .on_sync::<lsp_ext::JoinLines>(|s, p| handlers::handle_join_lines(s.snapshot(), p))?
340 .on_sync::<lsp_ext::OnEnter>(|s, p| handlers::handle_on_enter(s.snapshot(), p))?
341 .on_sync::<lsp_types::request::SelectionRangeRequest>(|s, p| {
342 handlers::handle_selection_range(s.snapshot(), p)
343 })?
344 .on_sync::<lsp_ext::MatchingBrace>(|s, p| handlers::handle_matching_brace(s.snapshot(), p))?
345 .on::<lsp_ext::AnalyzerStatus>(handlers::handle_analyzer_status)?
346 .on::<lsp_ext::SyntaxTree>(handlers::handle_syntax_tree)?
347 .on::<lsp_ext::ExpandMacro>(handlers::handle_expand_macro)?
348 .on::<lsp_ext::ParentModule>(handlers::handle_parent_module)?
349 .on::<lsp_ext::Runnables>(handlers::handle_runnables)?
350 .on::<lsp_ext::InlayHints>(handlers::handle_inlay_hints)?
351 .on::<lsp_ext::CodeActionRequest>(handlers::handle_code_action)?
352 .on::<lsp_ext::ResolveCodeActionRequest>(handlers::handle_resolve_code_action)?
353 .on::<lsp_ext::HoverRequest>(handlers::handle_hover)?
354 .on::<lsp_types::request::OnTypeFormatting>(handlers::handle_on_type_formatting)?
355 .on::<lsp_types::request::DocumentSymbolRequest>(handlers::handle_document_symbol)?
356 .on::<lsp_types::request::WorkspaceSymbol>(handlers::handle_workspace_symbol)?
357 .on::<lsp_types::request::GotoDefinition>(handlers::handle_goto_definition)?
358 .on::<lsp_types::request::GotoImplementation>(handlers::handle_goto_implementation)?
359 .on::<lsp_types::request::GotoTypeDefinition>(handlers::handle_goto_type_definition)?
360 .on::<lsp_types::request::Completion>(handlers::handle_completion)?
361 .on::<lsp_types::request::CodeLensRequest>(handlers::handle_code_lens)?
362 .on::<lsp_types::request::CodeLensResolve>(handlers::handle_code_lens_resolve)?
363 .on::<lsp_types::request::FoldingRangeRequest>(handlers::handle_folding_range)?
364 .on::<lsp_types::request::SignatureHelpRequest>(handlers::handle_signature_help)?
365 .on::<lsp_types::request::PrepareRenameRequest>(handlers::handle_prepare_rename)?
366 .on::<lsp_types::request::Rename>(handlers::handle_rename)?
367 .on::<lsp_types::request::References>(handlers::handle_references)?
368 .on::<lsp_types::request::Formatting>(handlers::handle_formatting)?
369 .on::<lsp_types::request::DocumentHighlightRequest>(handlers::handle_document_highlight)?
370 .on::<lsp_types::request::CallHierarchyPrepare>(handlers::handle_call_hierarchy_prepare)?
371 .on::<lsp_types::request::CallHierarchyIncomingCalls>(
372 handlers::handle_call_hierarchy_incoming,
373 )?
374 .on::<lsp_types::request::CallHierarchyOutgoingCalls>(
375 handlers::handle_call_hierarchy_outgoing,
376 )?
377 .on::<lsp_types::request::SemanticTokensRequest>(handlers::handle_semantic_tokens)?
378 .on::<lsp_types::request::SemanticTokensRangeRequest>(
379 handlers::handle_semantic_tokens_range,
380 )?
381 .on::<lsp_ext::Ssr>(handlers::handle_ssr)?
382 .finish();
383 Ok(())
384}
385
386fn on_notification(
387 msg_sender: &Sender<lsp_server::Message>,
388 global_state: &mut GlobalState,
389 not: Notification,
390) -> Result<()> {
391 let not = match notification_cast::<lsp_types::notification::Cancel>(not) {
392 Ok(params) => {
393 let id: RequestId = match params.id {
394 NumberOrString::Number(id) => id.into(),
395 NumberOrString::String(id) => id.into(),
396 };
397 if let Some(response) = global_state.req_queue.incoming.cancel(id) {
398 msg_sender.send(response.into()).unwrap()
399 }
400 return Ok(());
401 }
402 Err(not) => not,
403 };
404 let not = match notification_cast::<lsp_types::notification::DidOpenTextDocument>(not) {
405 Ok(params) => {
406 if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) {
407 if !global_state.mem_docs.insert(path.clone()) {
408 log::error!("duplicate DidOpenTextDocument: {}", path)
409 }
410 global_state
411 .vfs
412 .write()
413 .0
414 .set_file_contents(path, Some(params.text_document.text.into_bytes()));
415 }
416 return Ok(());
417 }
418 Err(not) => not,
419 };
420 let not = match notification_cast::<lsp_types::notification::DidChangeTextDocument>(not) {
421 Ok(params) => {
422 if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) {
423 assert!(global_state.mem_docs.contains(&path));
424 let vfs = &mut global_state.vfs.write().0;
425 let file_id = vfs.file_id(&path).unwrap();
426 let mut text = String::from_utf8(vfs.file_contents(file_id).to_vec()).unwrap();
427 apply_document_changes(&mut text, params.content_changes);
428 vfs.set_file_contents(path, Some(text.into_bytes()))
429 }
430 return Ok(());
431 }
432 Err(not) => not,
433 };
434 let not = match notification_cast::<lsp_types::notification::DidCloseTextDocument>(not) {
435 Ok(params) => {
436 if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) {
437 if !global_state.mem_docs.remove(&path) {
438 log::error!("orphan DidCloseTextDocument: {}", path)
439 }
440 if let Some(path) = path.as_path() {
441 global_state.loader.invalidate(path.to_path_buf());
442 }
443 }
444 let params = lsp_types::PublishDiagnosticsParams {
445 uri: params.text_document.uri,
446 diagnostics: Vec::new(),
447 version: None,
448 };
449 let not = notification_new::<lsp_types::notification::PublishDiagnostics>(params);
450 msg_sender.send(not.into()).unwrap();
451 return Ok(());
452 }
453 Err(not) => not,
454 };
455 let not = match notification_cast::<lsp_types::notification::DidSaveTextDocument>(not) {
456 Ok(_params) => {
457 if let Some(flycheck) = &global_state.flycheck {
458 flycheck.0.update();
459 }
460 return Ok(());
461 }
462 Err(not) => not,
463 };
464 let not = match notification_cast::<lsp_types::notification::DidChangeConfiguration>(not) {
465 Ok(_) => {
466 // As stated in https://github.com/microsoft/language-server-protocol/issues/676,
467 // this notification's parameters should be ignored and the actual config queried separately.
468 let request = global_state.req_queue.outgoing.register(
469 lsp_types::request::WorkspaceConfiguration::METHOD.to_string(),
470 lsp_types::ConfigurationParams {
471 items: vec![lsp_types::ConfigurationItem {
472 scope_uri: None,
473 section: Some("rust-analyzer".to_string()),
474 }],
475 },
476 |global_state, resp| {
477 log::debug!("config update response: '{:?}", resp);
478 let Response { error, result, .. } = resp;
479
480 match (error, result) {
481 (Some(err), _) => {
482 log::error!("failed to fetch the server settings: {:?}", err)
483 }
484 (None, Some(configs)) => {
485 if let Some(new_config) = configs.get(0) {
486 let mut config = global_state.config.clone();
487 config.update(&new_config);
488 global_state.update_configuration(config);
489 }
490 }
491 (None, None) => {
492 log::error!("received empty server settings response from the client")
493 }
494 }
495 },
496 );
497 msg_sender.send(request.into())?;
498
499 return Ok(());
500 }
501 Err(not) => not,
502 };
503 let not = match notification_cast::<lsp_types::notification::DidChangeWatchedFiles>(not) {
504 Ok(params) => {
505 for change in params.changes {
506 if let Ok(path) = from_proto::abs_path(&change.uri) {
507 global_state.loader.invalidate(path)
508 }
509 }
510 return Ok(());
511 }
512 Err(not) => not,
513 };
514 if not.method.starts_with("$/") {
515 return Ok(());
516 }
517 log::error!("unhandled notification: {:?}", not);
518 Ok(())
519}
520
521fn on_check_task(
522 task: flycheck::Message,
523 global_state: &mut GlobalState,
524 msg_sender: &Sender<lsp_server::Message>,
525) -> Result<()> {
526 match task { 508 match task {
527 flycheck::Message::ClearDiagnostics => { 509 flycheck::Message::ClearDiagnostics => {
528 on_diagnostic_task(DiagnosticTask::ClearCheck, msg_sender, global_state) 510 on_diagnostic_task(DiagnosticTask::ClearCheck, global_state)
529 } 511 }
530 512
531 flycheck::Message::AddDiagnostic { workspace_root, diagnostic } => { 513 flycheck::Message::AddDiagnostic { workspace_root, diagnostic } => {
@@ -550,7 +532,6 @@ fn on_check_task(
550 diag.diagnostic, 532 diag.diagnostic,
551 diag.fixes.into_iter().map(|it| it.into()).collect(), 533 diag.fixes.into_iter().map(|it| it.into()).collect(),
552 ), 534 ),
553 msg_sender,
554 global_state, 535 global_state,
555 ) 536 )
556 } 537 }
@@ -563,26 +544,22 @@ fn on_check_task(
563 flycheck::Progress::End => (Progress::End, None), 544 flycheck::Progress::End => (Progress::End, None),
564 }; 545 };
565 546
566 report_progress(global_state, msg_sender, "cargo check", state, message, None); 547 report_progress(global_state, "cargo check", state, message, None);
567 } 548 }
568 }; 549 };
569 550
570 Ok(()) 551 Ok(())
571} 552}
572 553
573fn on_diagnostic_task( 554fn on_diagnostic_task(task: DiagnosticTask, global_state: &mut GlobalState) {
574 task: DiagnosticTask, 555 let subscriptions = global_state.diagnostics.handle_task(task);
575 msg_sender: &Sender<lsp_server::Message>,
576 state: &mut GlobalState,
577) {
578 let subscriptions = state.diagnostics.handle_task(task);
579 556
580 for file_id in subscriptions { 557 for file_id in subscriptions {
581 let url = file_id_to_url(&state.vfs.read().0, file_id); 558 let url = file_id_to_url(&global_state.vfs.read().0, file_id);
582 let diagnostics = state.diagnostics.diagnostics_for(file_id).cloned().collect(); 559 let diagnostics = global_state.diagnostics.diagnostics_for(file_id).cloned().collect();
583 let params = lsp_types::PublishDiagnosticsParams { uri: url, diagnostics, version: None }; 560 let params = lsp_types::PublishDiagnosticsParams { uri: url, diagnostics, version: None };
584 let not = notification_new::<lsp_types::notification::PublishDiagnostics>(params); 561 let not = notification_new::<lsp_types::notification::PublishDiagnostics>(params);
585 msg_sender.send(not.into()).unwrap(); 562 global_state.send(not.into());
586 } 563 }
587} 564}
588 565
@@ -599,7 +576,6 @@ fn percentage(done: usize, total: usize) -> f64 {
599 576
600fn report_progress( 577fn report_progress(
601 global_state: &mut GlobalState, 578 global_state: &mut GlobalState,
602 sender: &Sender<lsp_server::Message>,
603 title: &str, 579 title: &str,
604 state: Progress, 580 state: Progress,
605 message: Option<String>, 581 message: Option<String>,
@@ -616,7 +592,7 @@ fn report_progress(
616 lsp_types::WorkDoneProgressCreateParams { token: token.clone() }, 592 lsp_types::WorkDoneProgressCreateParams { token: token.clone() },
617 DO_NOTHING, 593 DO_NOTHING,
618 ); 594 );
619 sender.send(work_done_progress_create.into()).unwrap(); 595 global_state.send(work_done_progress_create.into());
620 596
621 lsp_types::WorkDoneProgress::Begin(lsp_types::WorkDoneProgressBegin { 597 lsp_types::WorkDoneProgress::Begin(lsp_types::WorkDoneProgressBegin {
622 title: title.into(), 598 title: title.into(),
@@ -641,155 +617,5 @@ fn report_progress(
641 token, 617 token,
642 value: lsp_types::ProgressParamsValue::WorkDone(work_done_progress), 618 value: lsp_types::ProgressParamsValue::WorkDone(work_done_progress),
643 }); 619 });
644 sender.send(notification.into()).unwrap(); 620 global_state.send(notification.into());
645}
646
647struct PoolDispatcher<'a> {
648 req: Option<Request>,
649 global_state: &'a mut GlobalState,
650 msg_sender: &'a Sender<lsp_server::Message>,
651 request_received: Instant,
652}
653
654impl<'a> PoolDispatcher<'a> {
655 /// Dispatches the request onto the current thread
656 fn on_sync<R>(
657 &mut self,
658 f: fn(&mut GlobalState, R::Params) -> Result<R::Result>,
659 ) -> Result<&mut Self>
660 where
661 R: lsp_types::request::Request + 'static,
662 R::Params: DeserializeOwned + panic::UnwindSafe + 'static,
663 R::Result: Serialize + 'static,
664 {
665 let (id, params) = match self.parse::<R>() {
666 Some(it) => it,
667 None => {
668 return Ok(self);
669 }
670 };
671 let world = panic::AssertUnwindSafe(&mut *self.global_state);
672 let task = panic::catch_unwind(move || {
673 let result = f(world.0, params);
674 result_to_task::<R>(id, result)
675 })
676 .map_err(|_| format!("sync task {:?} panicked", R::METHOD))?;
677 on_task(task, self.msg_sender, self.global_state);
678 Ok(self)
679 }
680
681 /// Dispatches the request onto thread pool
682 fn on<R>(
683 &mut self,
684 f: fn(GlobalStateSnapshot, R::Params) -> Result<R::Result>,
685 ) -> Result<&mut Self>
686 where
687 R: lsp_types::request::Request + 'static,
688 R::Params: DeserializeOwned + Send + 'static,
689 R::Result: Serialize + 'static,
690 {
691 let (id, params) = match self.parse::<R>() {
692 Some(it) => it,
693 None => {
694 return Ok(self);
695 }
696 };
697
698 self.global_state.task_pool.0.spawn({
699 let world = self.global_state.snapshot();
700 move || {
701 let result = f(world, params);
702 result_to_task::<R>(id, result)
703 }
704 });
705
706 Ok(self)
707 }
708
709 fn parse<R>(&mut self) -> Option<(RequestId, R::Params)>
710 where
711 R: lsp_types::request::Request + 'static,
712 R::Params: DeserializeOwned + 'static,
713 {
714 let req = self.req.take()?;
715 let (id, params) = match req.extract::<R::Params>(R::METHOD) {
716 Ok(it) => it,
717 Err(req) => {
718 self.req = Some(req);
719 return None;
720 }
721 };
722 self.global_state
723 .req_queue
724 .incoming
725 .register(id.clone(), (R::METHOD, self.request_received));
726 Some((id, params))
727 }
728
729 fn finish(&mut self) {
730 match self.req.take() {
731 None => (),
732 Some(req) => {
733 log::error!("unknown request: {:?}", req);
734 let resp = Response::new_err(
735 req.id,
736 ErrorCode::MethodNotFound as i32,
737 "unknown request".to_string(),
738 );
739 self.msg_sender.send(resp.into()).unwrap();
740 }
741 }
742 }
743}
744
745fn result_to_task<R>(id: RequestId, result: Result<R::Result>) -> Task
746where
747 R: lsp_types::request::Request + 'static,
748 R::Params: DeserializeOwned + 'static,
749 R::Result: Serialize + 'static,
750{
751 let response = match result {
752 Ok(resp) => Response::new_ok(id, &resp),
753 Err(e) => match e.downcast::<LspError>() {
754 Ok(lsp_error) => Response::new_err(id, lsp_error.code, lsp_error.message),
755 Err(e) => {
756 if is_canceled(&*e) {
757 Response::new_err(
758 id,
759 ErrorCode::ContentModified as i32,
760 "content modified".to_string(),
761 )
762 } else {
763 Response::new_err(id, ErrorCode::InternalError as i32, e.to_string())
764 }
765 }
766 },
767 };
768 Task::Respond(response)
769}
770
771fn update_file_notifications_on_threadpool(
772 global_state: &mut GlobalState,
773 subscriptions: Vec<FileId>,
774) {
775 log::trace!("updating notifications for {:?}", subscriptions);
776 if global_state.config.publish_diagnostics {
777 let snapshot = global_state.snapshot();
778 global_state.task_pool.0.spawn(move || {
779 let diagnostics = subscriptions
780 .into_iter()
781 .filter_map(|file_id| {
782 handlers::publish_diagnostics(&snapshot, file_id)
783 .map_err(|err| {
784 if !is_canceled(&*err) {
785 log::error!("failed to compute diagnostics: {:?}", err);
786 }
787 ()
788 })
789 .ok()
790 })
791 .collect::<Vec<_>>();
792 Task::Diagnostics(diagnostics)
793 })
794 }
795} 621}
diff --git a/crates/vfs-notify/src/lib.rs b/crates/vfs-notify/src/lib.rs
index 68fdb8cb0..25ba8d798 100644
--- a/crates/vfs-notify/src/lib.rs
+++ b/crates/vfs-notify/src/lib.rs
@@ -82,7 +82,12 @@ impl NotifyActor {
82 watcher_receiver, 82 watcher_receiver,
83 } 83 }
84 } 84 }
85 85 fn next_event(&self, receiver: &Receiver<Message>) -> Option<Event> {
86 select! {
87 recv(receiver) -> it => it.ok().map(Event::Message),
88 recv(&self.watcher_receiver) -> it => Some(Event::NotifyEvent(it.unwrap())),
89 }
90 }
86 fn run(mut self, inbox: Receiver<Message>) { 91 fn run(mut self, inbox: Receiver<Message>) {
87 while let Some(event) = self.next_event(&inbox) { 92 while let Some(event) = self.next_event(&inbox) {
88 log::debug!("vfs-notify event: {:?}", event); 93 log::debug!("vfs-notify event: {:?}", event);
@@ -154,12 +159,6 @@ impl NotifyActor {
154 } 159 }
155 } 160 }
156 } 161 }
157 fn next_event(&self, receiver: &Receiver<Message>) -> Option<Event> {
158 select! {
159 recv(receiver) -> it => it.ok().map(Event::Message),
160 recv(&self.watcher_receiver) -> it => Some(Event::NotifyEvent(it.unwrap())),
161 }
162 }
163 fn load_entry( 162 fn load_entry(
164 &mut self, 163 &mut self,
165 entry: loader::Entry, 164 entry: loader::Entry,