aboutsummaryrefslogtreecommitdiff
path: root/crates
diff options
context:
space:
mode:
Diffstat (limited to 'crates')
-rw-r--r--crates/flycheck/src/lib.rs124
-rw-r--r--crates/rust-analyzer/src/global_state.rs11
-rw-r--r--crates/rust-analyzer/src/lib.rs1
-rw-r--r--crates/rust-analyzer/src/main_loop.rs179
-rw-r--r--crates/rust-analyzer/src/thread_pool.rs35
-rw-r--r--crates/vfs-notify/src/lib.rs4
6 files changed, 179 insertions, 175 deletions
diff --git a/crates/flycheck/src/lib.rs b/crates/flycheck/src/lib.rs
index 3e73cf6ff..9e8205ae7 100644
--- a/crates/flycheck/src/lib.rs
+++ b/crates/flycheck/src/lib.rs
@@ -10,7 +10,7 @@ use std::{
10 time::Instant, 10 time::Instant,
11}; 11};
12 12
13use crossbeam_channel::{never, select, unbounded, Receiver, RecvError, Sender}; 13use crossbeam_channel::{never, select, unbounded, Receiver, Sender};
14 14
15pub use cargo_metadata::diagnostic::{ 15pub use cargo_metadata::diagnostic::{
16 Applicability, Diagnostic, DiagnosticLevel, DiagnosticSpan, DiagnosticSpanMacroExpansion, 16 Applicability, Diagnostic, DiagnosticLevel, DiagnosticSpan, DiagnosticSpanMacroExpansion,
@@ -61,7 +61,7 @@ impl FlycheckHandle {
61 ) -> FlycheckHandle { 61 ) -> FlycheckHandle {
62 let (cmd_send, cmd_recv) = unbounded::<Restart>(); 62 let (cmd_send, cmd_recv) = unbounded::<Restart>();
63 let handle = jod_thread::spawn(move || { 63 let handle = jod_thread::spawn(move || {
64 FlycheckActor::new(sender, config, workspace_root).run(&cmd_recv); 64 FlycheckActor::new(sender, config, workspace_root).run(cmd_recv);
65 }); 65 });
66 FlycheckHandle { cmd_send, handle } 66 FlycheckHandle { cmd_send, handle }
67 } 67 }
@@ -98,14 +98,18 @@ struct FlycheckActor {
98 config: FlycheckConfig, 98 config: FlycheckConfig,
99 workspace_root: PathBuf, 99 workspace_root: PathBuf,
100 last_update_req: Option<Instant>, 100 last_update_req: Option<Instant>,
101 // XXX: drop order is significant
102 message_recv: Receiver<CheckEvent>,
103 /// WatchThread exists to wrap around the communication needed to be able to 101 /// WatchThread exists to wrap around the communication needed to be able to
104 /// run `cargo check` without blocking. Currently the Rust standard library 102 /// run `cargo check` without blocking. Currently the Rust standard library
105 /// doesn't provide a way to read sub-process output without blocking, so we 103 /// doesn't provide a way to read sub-process output without blocking, so we
106 /// have to wrap sub-processes output handling in a thread and pass messages 104 /// have to wrap sub-processes output handling in a thread and pass messages
107 /// back over a channel. 105 /// back over a channel.
108 check_process: Option<jod_thread::JoinHandle>, 106 // XXX: drop order is significant
107 check_process: Option<(Receiver<CheckEvent>, jod_thread::JoinHandle)>,
108}
109
110enum Event {
111 Restart(Restart),
112 CheckEvent(Option<CheckEvent>),
109} 113}
110 114
111impl FlycheckActor { 115impl FlycheckActor {
@@ -114,40 +118,48 @@ impl FlycheckActor {
114 config: FlycheckConfig, 118 config: FlycheckConfig,
115 workspace_root: PathBuf, 119 workspace_root: PathBuf,
116 ) -> FlycheckActor { 120 ) -> FlycheckActor {
117 FlycheckActor { 121 FlycheckActor { sender, config, workspace_root, last_update_req: None, check_process: None }
118 sender,
119 config,
120 workspace_root,
121 last_update_req: None,
122 message_recv: never(),
123 check_process: None,
124 }
125 } 122 }
126 123
127 fn run(&mut self, cmd_recv: &Receiver<Restart>) { 124 fn run(&mut self, inbox: Receiver<Restart>) {
128 // If we rerun the thread, we need to discard the previous check results first 125 // If we rerun the thread, we need to discard the previous check results first
129 self.clean_previous_results(); 126 self.send(Message::ClearDiagnostics);
130 127 self.send(Message::Progress(Progress::End));
131 loop { 128
132 select! { 129 while let Some(event) = self.next_event(&inbox) {
133 recv(&cmd_recv) -> cmd => match cmd { 130 match event {
134 Ok(Restart) => self.last_update_req = Some(Instant::now()), 131 Event::Restart(Restart) => self.last_update_req = Some(Instant::now()),
135 Err(RecvError) => { 132 Event::CheckEvent(None) => {
136 // Command channel has closed, so shut down 133 // Watcher finished, replace it with a never channel to
137 break; 134 // avoid busy-waiting.
138 }, 135 self.check_process = None;
139 },
140 recv(self.message_recv) -> msg => match msg {
141 Ok(msg) => self.handle_message(msg),
142 Err(RecvError) => {
143 // Watcher finished, replace it with a never channel to
144 // avoid busy-waiting.
145 self.message_recv = never();
146 self.check_process = None;
147 },
148 } 136 }
149 }; 137 Event::CheckEvent(Some(event)) => match event {
138 CheckEvent::Begin => {
139 self.send(Message::Progress(Progress::Being));
140 }
141
142 CheckEvent::End => {
143 self.send(Message::Progress(Progress::End));
144 }
145
146 CheckEvent::Msg(cargo_metadata::Message::CompilerArtifact(msg)) => {
147 self.send(Message::Progress(Progress::DidCheckCrate(msg.target.name)));
148 }
149
150 CheckEvent::Msg(cargo_metadata::Message::CompilerMessage(msg)) => {
151 self.send(Message::AddDiagnostic {
152 workspace_root: self.workspace_root.clone(),
153 diagnostic: msg.message,
154 });
155 }
150 156
157 CheckEvent::Msg(cargo_metadata::Message::BuildScriptExecuted(_))
158 | CheckEvent::Msg(cargo_metadata::Message::BuildFinished(_))
159 | CheckEvent::Msg(cargo_metadata::Message::TextLine(_))
160 | CheckEvent::Msg(cargo_metadata::Message::Unknown) => {}
161 },
162 }
151 if self.should_recheck() { 163 if self.should_recheck() {
152 self.last_update_req = None; 164 self.last_update_req = None;
153 self.send(Message::ClearDiagnostics); 165 self.send(Message::ClearDiagnostics);
@@ -156,9 +168,12 @@ impl FlycheckActor {
156 } 168 }
157 } 169 }
158 170
159 fn clean_previous_results(&self) { 171 fn next_event(&self, inbox: &Receiver<Restart>) -> Option<Event> {
160 self.send(Message::ClearDiagnostics); 172 let check_chan = self.check_process.as_ref().map(|(chan, _thread)| chan);
161 self.send(Message::Progress(Progress::End)); 173 select! {
174 recv(inbox) -> msg => msg.ok().map(Event::Restart),
175 recv(check_chan.unwrap_or(&never())) -> msg => Some(Event::CheckEvent(msg.ok())),
176 }
162 } 177 }
163 178
164 fn should_recheck(&mut self) -> bool { 179 fn should_recheck(&mut self) -> bool {
@@ -171,37 +186,8 @@ impl FlycheckActor {
171 false 186 false
172 } 187 }
173 188
174 fn handle_message(&self, msg: CheckEvent) {
175 match msg {
176 CheckEvent::Begin => {
177 self.send(Message::Progress(Progress::Being));
178 }
179
180 CheckEvent::End => {
181 self.send(Message::Progress(Progress::End));
182 }
183
184 CheckEvent::Msg(cargo_metadata::Message::CompilerArtifact(msg)) => {
185 self.send(Message::Progress(Progress::DidCheckCrate(msg.target.name)));
186 }
187
188 CheckEvent::Msg(cargo_metadata::Message::CompilerMessage(msg)) => {
189 self.send(Message::AddDiagnostic {
190 workspace_root: self.workspace_root.clone(),
191 diagnostic: msg.message,
192 });
193 }
194
195 CheckEvent::Msg(cargo_metadata::Message::BuildScriptExecuted(_))
196 | CheckEvent::Msg(cargo_metadata::Message::BuildFinished(_))
197 | CheckEvent::Msg(cargo_metadata::Message::TextLine(_))
198 | CheckEvent::Msg(cargo_metadata::Message::Unknown) => {}
199 }
200 }
201
202 fn restart_check_process(&mut self) { 189 fn restart_check_process(&mut self) {
203 // First, clear and cancel the old thread 190 // First, clear and cancel the old thread
204 self.message_recv = never();
205 self.check_process = None; 191 self.check_process = None;
206 192
207 let mut cmd = match &self.config { 193 let mut cmd = match &self.config {
@@ -237,8 +223,7 @@ impl FlycheckActor {
237 cmd.current_dir(&self.workspace_root); 223 cmd.current_dir(&self.workspace_root);
238 224
239 let (message_send, message_recv) = unbounded(); 225 let (message_send, message_recv) = unbounded();
240 self.message_recv = message_recv; 226 let thread = jod_thread::spawn(move || {
241 self.check_process = Some(jod_thread::spawn(move || {
242 // If we trigger an error here, we will do so in the loop instead, 227 // If we trigger an error here, we will do so in the loop instead,
243 // which will break out of the loop, and continue the shutdown 228 // which will break out of the loop, and continue the shutdown
244 let _ = message_send.send(CheckEvent::Begin); 229 let _ = message_send.send(CheckEvent::Begin);
@@ -267,7 +252,8 @@ impl FlycheckActor {
267 // We can ignore any error here, as we are already in the progress 252 // We can ignore any error here, as we are already in the progress
268 // of shutting down. 253 // of shutting down.
269 let _ = message_send.send(CheckEvent::End); 254 let _ = message_send.send(CheckEvent::End);
270 })) 255 });
256 self.check_process = Some((message_recv, thread))
271 } 257 }
272 258
273 fn send(&self, check_task: Message) { 259 fn send(&self, check_task: Message) {
diff --git a/crates/rust-analyzer/src/global_state.rs b/crates/rust-analyzer/src/global_state.rs
index 446207e9e..de6b95686 100644
--- a/crates/rust-analyzer/src/global_state.rs
+++ b/crates/rust-analyzer/src/global_state.rs
@@ -20,8 +20,9 @@ use crate::{
20 diagnostics::{CheckFixes, DiagnosticCollection}, 20 diagnostics::{CheckFixes, DiagnosticCollection},
21 from_proto, 21 from_proto,
22 line_endings::LineEndings, 22 line_endings::LineEndings,
23 main_loop::ReqQueue, 23 main_loop::{ReqQueue, Task},
24 request_metrics::{LatestRequests, RequestMetrics}, 24 request_metrics::{LatestRequests, RequestMetrics},
25 thread_pool::TaskPool,
25 to_proto::url_from_abs_path, 26 to_proto::url_from_abs_path,
26 Result, 27 Result,
27}; 28};
@@ -66,6 +67,7 @@ impl Default for Status {
66/// incremental salsa database. 67/// incremental salsa database.
67pub(crate) struct GlobalState { 68pub(crate) struct GlobalState {
68 pub(crate) config: Config, 69 pub(crate) config: Config,
70 pub(crate) task_pool: (TaskPool<Task>, Receiver<Task>),
69 pub(crate) analysis_host: AnalysisHost, 71 pub(crate) analysis_host: AnalysisHost,
70 pub(crate) loader: Box<dyn vfs::loader::Handle>, 72 pub(crate) loader: Box<dyn vfs::loader::Handle>,
71 pub(crate) task_receiver: Receiver<vfs::loader::Message>, 73 pub(crate) task_receiver: Receiver<vfs::loader::Message>,
@@ -153,8 +155,15 @@ impl GlobalState {
153 155
154 let mut analysis_host = AnalysisHost::new(lru_capacity); 156 let mut analysis_host = AnalysisHost::new(lru_capacity);
155 analysis_host.apply_change(change); 157 analysis_host.apply_change(change);
158
159 let task_pool = {
160 let (sender, receiver) = unbounded();
161 (TaskPool::new(sender), receiver)
162 };
163
156 let mut res = GlobalState { 164 let mut res = GlobalState {
157 config, 165 config,
166 task_pool,
158 analysis_host, 167 analysis_host,
159 loader, 168 loader,
160 task_receiver, 169 task_receiver,
diff --git a/crates/rust-analyzer/src/lib.rs b/crates/rust-analyzer/src/lib.rs
index 794286672..ca788dd3c 100644
--- a/crates/rust-analyzer/src/lib.rs
+++ b/crates/rust-analyzer/src/lib.rs
@@ -30,6 +30,7 @@ mod diagnostics;
30mod line_endings; 30mod line_endings;
31mod request_metrics; 31mod request_metrics;
32mod lsp_utils; 32mod lsp_utils;
33mod thread_pool;
33pub mod lsp_ext; 34pub mod lsp_ext;
34pub mod config; 35pub mod config;
35 36
diff --git a/crates/rust-analyzer/src/main_loop.rs b/crates/rust-analyzer/src/main_loop.rs
index 3b3b83209..1a9c5ee2c 100644
--- a/crates/rust-analyzer/src/main_loop.rs
+++ b/crates/rust-analyzer/src/main_loop.rs
@@ -2,19 +2,17 @@
2//! requests/replies and notifications back to the client. 2//! requests/replies and notifications back to the client.
3use std::{ 3use std::{
4 env, fmt, panic, 4 env, fmt, panic,
5 sync::Arc,
6 time::{Duration, Instant}, 5 time::{Duration, Instant},
7}; 6};
8 7
9use crossbeam_channel::{never, select, unbounded, RecvError, Sender}; 8use crossbeam_channel::{never, select, RecvError, Sender};
10use lsp_server::{Connection, ErrorCode, Message, Notification, Request, RequestId, Response}; 9use lsp_server::{Connection, ErrorCode, Notification, Request, RequestId, Response};
11use lsp_types::{request::Request as _, NumberOrString}; 10use lsp_types::{request::Request as _, NumberOrString};
12use ra_db::VfsPath; 11use ra_db::VfsPath;
13use ra_ide::{Canceled, FileId}; 12use ra_ide::{Canceled, FileId};
14use ra_prof::profile; 13use ra_prof::profile;
15use ra_project_model::{PackageRoot, ProjectWorkspace}; 14use ra_project_model::{PackageRoot, ProjectWorkspace};
16use serde::{de::DeserializeOwned, Serialize}; 15use serde::{de::DeserializeOwned, Serialize};
17use threadpool::ThreadPool;
18 16
19use crate::{ 17use crate::{
20 config::{Config, FilesWatcher, LinkedProject}, 18 config::{Config, FilesWatcher, LinkedProject},
@@ -118,65 +116,51 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
118 GlobalState::new(workspaces, config.lru_capacity, config, req_queue) 116 GlobalState::new(workspaces, config.lru_capacity, config, req_queue)
119 }; 117 };
120 118
121 let pool = ThreadPool::default();
122 let (task_sender, task_receiver) = unbounded::<Task>();
123
124 log::info!("server initialized, serving requests"); 119 log::info!("server initialized, serving requests");
125 { 120 {
126 let task_sender = task_sender;
127 loop { 121 loop {
128 log::trace!("selecting"); 122 log::trace!("selecting");
129 let event = select! { 123 let event = select! {
130 recv(&connection.receiver) -> msg => match msg { 124 recv(&connection.receiver) -> msg => match msg {
131 Ok(msg) => Event::Msg(msg), 125 Ok(msg) => Event::Lsp(msg),
132 Err(RecvError) => return Err("client exited without shutdown".into()), 126 Err(RecvError) => return Err("client exited without shutdown".into()),
133 }, 127 },
134 recv(task_receiver) -> task => Event::Task(task.unwrap()), 128 recv(&global_state.task_pool.1) -> task => Event::Task(task.unwrap()),
135 recv(global_state.task_receiver) -> task => match task { 129 recv(global_state.task_receiver) -> task => match task {
136 Ok(task) => Event::Vfs(task), 130 Ok(task) => Event::Vfs(task),
137 Err(RecvError) => return Err("vfs died".into()), 131 Err(RecvError) => return Err("vfs died".into()),
138 }, 132 },
139 recv(global_state.flycheck.as_ref().map_or(&never(), |it| &it.1)) -> task => match task { 133 recv(global_state.flycheck.as_ref().map_or(&never(), |it| &it.1)) -> task => match task {
140 Ok(task) => Event::CheckWatcher(task), 134 Ok(task) => Event::Flycheck(task),
141 Err(RecvError) => return Err("check watcher died".into()), 135 Err(RecvError) => return Err("check watcher died".into()),
142 }, 136 },
143 }; 137 };
144 if let Event::Msg(Message::Request(req)) = &event { 138 if let Event::Lsp(lsp_server::Message::Request(req)) = &event {
145 if connection.handle_shutdown(&req)? { 139 if connection.handle_shutdown(&req)? {
146 break; 140 break;
147 }; 141 };
148 } 142 }
149 assert!(!global_state.vfs.read().0.has_changes()); 143 assert!(!global_state.vfs.read().0.has_changes());
150 loop_turn(&pool, &task_sender, &connection, &mut global_state, event)?; 144 loop_turn(&connection, &mut global_state, event)?;
151 assert!(!global_state.vfs.read().0.has_changes()); 145 assert!(!global_state.vfs.read().0.has_changes());
152 } 146 }
153 } 147 }
154 global_state.analysis_host.request_cancellation(); 148 global_state.analysis_host.request_cancellation();
155 log::info!("waiting for tasks to finish...");
156 task_receiver.into_iter().for_each(|task| on_task(task, &connection.sender, &mut global_state));
157 log::info!("...tasks have finished");
158 log::info!("joining threadpool...");
159 pool.join();
160 drop(pool);
161 log::info!("...threadpool has finished");
162
163 let vfs = Arc::try_unwrap(global_state.vfs).expect("all snapshots should be dead");
164 drop(vfs);
165
166 Ok(()) 149 Ok(())
167} 150}
168 151
169#[derive(Debug)] 152#[derive(Debug)]
170enum Task { 153pub(crate) enum Task {
171 Respond(Response), 154 Respond(Response),
172 Diagnostic(DiagnosticTask), 155 Diagnostics(Vec<DiagnosticTask>),
156 Unit,
173} 157}
174 158
175enum Event { 159enum Event {
176 Msg(Message), 160 Lsp(lsp_server::Message),
177 Task(Task), 161 Task(Task),
178 Vfs(vfs::loader::Message), 162 Vfs(vfs::loader::Message),
179 CheckWatcher(flycheck::Message), 163 Flycheck(flycheck::Message),
180} 164}
181 165
182impl fmt::Debug for Event { 166impl fmt::Debug for Event {
@@ -186,7 +170,7 @@ impl fmt::Debug for Event {
186 }; 170 };
187 171
188 match self { 172 match self {
189 Event::Msg(Message::Notification(not)) => { 173 Event::Lsp(lsp_server::Message::Notification(not)) => {
190 if notification_is::<lsp_types::notification::DidOpenTextDocument>(not) 174 if notification_is::<lsp_types::notification::DidOpenTextDocument>(not)
191 || notification_is::<lsp_types::notification::DidChangeTextDocument>(not) 175 || notification_is::<lsp_types::notification::DidChangeTextDocument>(not)
192 { 176 {
@@ -203,10 +187,10 @@ impl fmt::Debug for Event {
203 _ => (), 187 _ => (),
204 } 188 }
205 match self { 189 match self {
206 Event::Msg(it) => fmt::Debug::fmt(it, f), 190 Event::Lsp(it) => fmt::Debug::fmt(it, f),
207 Event::Task(it) => fmt::Debug::fmt(it, f), 191 Event::Task(it) => fmt::Debug::fmt(it, f),
208 Event::Vfs(it) => fmt::Debug::fmt(it, f), 192 Event::Vfs(it) => fmt::Debug::fmt(it, f),
209 Event::CheckWatcher(it) => fmt::Debug::fmt(it, f), 193 Event::Flycheck(it) => fmt::Debug::fmt(it, f),
210 } 194 }
211 } 195 }
212} 196}
@@ -215,19 +199,13 @@ pub(crate) type ReqHandler = fn(&mut GlobalState, Response);
215pub(crate) type ReqQueue = lsp_server::ReqQueue<(&'static str, Instant), ReqHandler>; 199pub(crate) type ReqQueue = lsp_server::ReqQueue<(&'static str, Instant), ReqHandler>;
216const DO_NOTHING: ReqHandler = |_, _| (); 200const DO_NOTHING: ReqHandler = |_, _| ();
217 201
218fn loop_turn( 202fn loop_turn(connection: &Connection, global_state: &mut GlobalState, event: Event) -> Result<()> {
219 pool: &ThreadPool,
220 task_sender: &Sender<Task>,
221 connection: &Connection,
222 global_state: &mut GlobalState,
223 event: Event,
224) -> Result<()> {
225 let loop_start = Instant::now(); 203 let loop_start = Instant::now();
226 204
227 // NOTE: don't count blocking select! call as a loop-turn time 205 // NOTE: don't count blocking select! call as a loop-turn time
228 let _p = profile("main_loop_inner/loop-turn"); 206 let _p = profile("main_loop_inner/loop-turn");
229 log::info!("loop turn = {:?}", event); 207 log::info!("loop turn = {:?}", event);
230 let queue_count = pool.queued_count(); 208 let queue_count = global_state.task_pool.0.len();
231 if queue_count > 0 { 209 if queue_count > 0 {
232 log::info!("queued count = {}", queue_count); 210 log::info!("queued count = {}", queue_count);
233 } 211 }
@@ -269,17 +247,15 @@ fn loop_turn(
269 ) 247 )
270 } 248 }
271 }, 249 },
272 Event::CheckWatcher(task) => { 250 Event::Flycheck(task) => on_check_task(task, global_state, &connection.sender)?,
273 on_check_task(task, global_state, task_sender, &connection.sender)? 251 Event::Lsp(msg) => match msg {
274 } 252 lsp_server::Message::Request(req) => {
275 Event::Msg(msg) => match msg { 253 on_request(global_state, &connection.sender, loop_start, req)?
276 Message::Request(req) => {
277 on_request(global_state, pool, task_sender, &connection.sender, loop_start, req)?
278 } 254 }
279 Message::Notification(not) => { 255 lsp_server::Message::Notification(not) => {
280 on_notification(&connection.sender, global_state, not)?; 256 on_notification(&connection.sender, global_state, not)?;
281 } 257 }
282 Message::Response(resp) => { 258 lsp_server::Message::Response(resp) => {
283 let handler = global_state.req_queue.outgoing.complete(resp.id.clone()); 259 let handler = global_state.req_queue.outgoing.complete(resp.id.clone());
284 handler(global_state, resp) 260 handler(global_state, resp)
285 } 261 }
@@ -301,16 +277,14 @@ fn loop_turn(
301 .map(|path| global_state.vfs.read().0.file_id(&path).unwrap()) 277 .map(|path| global_state.vfs.read().0.file_id(&path).unwrap())
302 .collect::<Vec<_>>(); 278 .collect::<Vec<_>>();
303 279
304 update_file_notifications_on_threadpool( 280 update_file_notifications_on_threadpool(global_state, subscriptions.clone());
305 pool, 281 global_state.task_pool.0.spawn({
306 global_state.snapshot(),
307 task_sender.clone(),
308 subscriptions.clone(),
309 );
310 pool.execute({
311 let subs = subscriptions; 282 let subs = subscriptions;
312 let snap = global_state.snapshot(); 283 let snap = global_state.snapshot();
313 move || snap.analysis.prime_caches(subs).unwrap_or_else(|_: Canceled| ()) 284 move || {
285 snap.analysis.prime_caches(subs).unwrap_or_else(|_: Canceled| ());
286 Task::Unit
287 }
314 }); 288 });
315 } 289 }
316 290
@@ -329,7 +303,7 @@ fn loop_turn(
329 Ok(()) 303 Ok(())
330} 304}
331 305
332fn on_task(task: Task, msg_sender: &Sender<Message>, global_state: &mut GlobalState) { 306fn on_task(task: Task, msg_sender: &Sender<lsp_server::Message>, global_state: &mut GlobalState) {
333 match task { 307 match task {
334 Task::Respond(response) => { 308 Task::Respond(response) => {
335 if let Some((method, start)) = 309 if let Some((method, start)) =
@@ -345,26 +319,21 @@ fn on_task(task: Task, msg_sender: &Sender<Message>, global_state: &mut GlobalSt
345 msg_sender.send(response.into()).unwrap(); 319 msg_sender.send(response.into()).unwrap();
346 } 320 }
347 } 321 }
348 Task::Diagnostic(task) => on_diagnostic_task(task, msg_sender, global_state), 322 Task::Diagnostics(tasks) => {
323 tasks.into_iter().for_each(|task| on_diagnostic_task(task, msg_sender, global_state))
324 }
325 Task::Unit => (),
349 } 326 }
350} 327}
351 328
352fn on_request( 329fn on_request(
353 global_state: &mut GlobalState, 330 global_state: &mut GlobalState,
354 pool: &ThreadPool, 331 msg_sender: &Sender<lsp_server::Message>,
355 task_sender: &Sender<Task>,
356 msg_sender: &Sender<Message>,
357 request_received: Instant, 332 request_received: Instant,
358 req: Request, 333 req: Request,
359) -> Result<()> { 334) -> Result<()> {
360 let mut pool_dispatcher = PoolDispatcher { 335 let mut pool_dispatcher =
361 req: Some(req), 336 PoolDispatcher { req: Some(req), global_state, msg_sender, request_received };
362 pool,
363 global_state,
364 task_sender,
365 msg_sender,
366 request_received,
367 };
368 pool_dispatcher 337 pool_dispatcher
369 .on_sync::<lsp_ext::CollectGarbage>(|s, ()| Ok(s.collect_garbage()))? 338 .on_sync::<lsp_ext::CollectGarbage>(|s, ()| Ok(s.collect_garbage()))?
370 .on_sync::<lsp_ext::JoinLines>(|s, p| handlers::handle_join_lines(s.snapshot(), p))? 339 .on_sync::<lsp_ext::JoinLines>(|s, p| handlers::handle_join_lines(s.snapshot(), p))?
@@ -415,7 +384,7 @@ fn on_request(
415} 384}
416 385
417fn on_notification( 386fn on_notification(
418 msg_sender: &Sender<Message>, 387 msg_sender: &Sender<lsp_server::Message>,
419 global_state: &mut GlobalState, 388 global_state: &mut GlobalState,
420 not: Notification, 389 not: Notification,
421) -> Result<()> { 390) -> Result<()> {
@@ -552,12 +521,11 @@ fn on_notification(
552fn on_check_task( 521fn on_check_task(
553 task: flycheck::Message, 522 task: flycheck::Message,
554 global_state: &mut GlobalState, 523 global_state: &mut GlobalState,
555 task_sender: &Sender<Task>, 524 msg_sender: &Sender<lsp_server::Message>,
556 msg_sender: &Sender<Message>,
557) -> Result<()> { 525) -> Result<()> {
558 match task { 526 match task {
559 flycheck::Message::ClearDiagnostics => { 527 flycheck::Message::ClearDiagnostics => {
560 task_sender.send(Task::Diagnostic(DiagnosticTask::ClearCheck))?; 528 on_diagnostic_task(DiagnosticTask::ClearCheck, msg_sender, global_state)
561 } 529 }
562 530
563 flycheck::Message::AddDiagnostic { workspace_root, diagnostic } => { 531 flycheck::Message::AddDiagnostic { workspace_root, diagnostic } => {
@@ -576,11 +544,15 @@ fn on_check_task(
576 } 544 }
577 }; 545 };
578 546
579 task_sender.send(Task::Diagnostic(DiagnosticTask::AddCheck( 547 on_diagnostic_task(
580 file_id, 548 DiagnosticTask::AddCheck(
581 diag.diagnostic, 549 file_id,
582 diag.fixes.into_iter().map(|it| it.into()).collect(), 550 diag.diagnostic,
583 )))?; 551 diag.fixes.into_iter().map(|it| it.into()).collect(),
552 ),
553 msg_sender,
554 global_state,
555 )
584 } 556 }
585 } 557 }
586 558
@@ -598,7 +570,11 @@ fn on_check_task(
598 Ok(()) 570 Ok(())
599} 571}
600 572
601fn on_diagnostic_task(task: DiagnosticTask, msg_sender: &Sender<Message>, state: &mut GlobalState) { 573fn on_diagnostic_task(
574 task: DiagnosticTask,
575 msg_sender: &Sender<lsp_server::Message>,
576 state: &mut GlobalState,
577) {
602 let subscriptions = state.diagnostics.handle_task(task); 578 let subscriptions = state.diagnostics.handle_task(task);
603 579
604 for file_id in subscriptions { 580 for file_id in subscriptions {
@@ -623,7 +599,7 @@ fn percentage(done: usize, total: usize) -> f64 {
623 599
624fn report_progress( 600fn report_progress(
625 global_state: &mut GlobalState, 601 global_state: &mut GlobalState,
626 sender: &Sender<Message>, 602 sender: &Sender<lsp_server::Message>,
627 title: &str, 603 title: &str,
628 state: Progress, 604 state: Progress,
629 message: Option<String>, 605 message: Option<String>,
@@ -670,10 +646,8 @@ fn report_progress(
670 646
671struct PoolDispatcher<'a> { 647struct PoolDispatcher<'a> {
672 req: Option<Request>, 648 req: Option<Request>,
673 pool: &'a ThreadPool,
674 global_state: &'a mut GlobalState, 649 global_state: &'a mut GlobalState,
675 msg_sender: &'a Sender<Message>, 650 msg_sender: &'a Sender<lsp_server::Message>,
676 task_sender: &'a Sender<Task>,
677 request_received: Instant, 651 request_received: Instant,
678} 652}
679 653
@@ -721,13 +695,11 @@ impl<'a> PoolDispatcher<'a> {
721 } 695 }
722 }; 696 };
723 697
724 self.pool.execute({ 698 self.global_state.task_pool.0.spawn({
725 let world = self.global_state.snapshot(); 699 let world = self.global_state.snapshot();
726 let sender = self.task_sender.clone();
727 move || { 700 move || {
728 let result = f(world, params); 701 let result = f(world, params);
729 let task = result_to_task::<R>(id, result); 702 result_to_task::<R>(id, result)
730 sender.send(task).unwrap();
731 } 703 }
732 }); 704 });
733 705
@@ -797,26 +769,27 @@ where
797} 769}
798 770
799fn update_file_notifications_on_threadpool( 771fn update_file_notifications_on_threadpool(
800 pool: &ThreadPool, 772 global_state: &mut GlobalState,
801 world: GlobalStateSnapshot,
802 task_sender: Sender<Task>,
803 subscriptions: Vec<FileId>, 773 subscriptions: Vec<FileId>,
804) { 774) {
805 log::trace!("updating notifications for {:?}", subscriptions); 775 log::trace!("updating notifications for {:?}", subscriptions);
806 if world.config.publish_diagnostics { 776 if global_state.config.publish_diagnostics {
807 pool.execute(move || { 777 let snapshot = global_state.snapshot();
808 for file_id in subscriptions { 778 global_state.task_pool.0.spawn(move || {
809 match handlers::publish_diagnostics(&world, file_id) { 779 let diagnostics = subscriptions
810 Err(e) => { 780 .into_iter()
811 if !is_canceled(&*e) { 781 .filter_map(|file_id| {
812 log::error!("failed to compute diagnostics: {:?}", e); 782 handlers::publish_diagnostics(&snapshot, file_id)
813 } 783 .map_err(|err| {
814 } 784 if !is_canceled(&*err) {
815 Ok(task) => { 785 log::error!("failed to compute diagnostics: {:?}", err);
816 task_sender.send(Task::Diagnostic(task)).unwrap(); 786 }
817 } 787 ()
818 } 788 })
819 } 789 .ok()
790 })
791 .collect::<Vec<_>>();
792 Task::Diagnostics(diagnostics)
820 }) 793 })
821 } 794 }
822} 795}
diff --git a/crates/rust-analyzer/src/thread_pool.rs b/crates/rust-analyzer/src/thread_pool.rs
new file mode 100644
index 000000000..4fa502925
--- /dev/null
+++ b/crates/rust-analyzer/src/thread_pool.rs
@@ -0,0 +1,35 @@
1//! A thin wrapper around `ThreadPool` to make sure that we join all things
2//! properly.
3use crossbeam_channel::Sender;
4
5pub(crate) struct TaskPool<T> {
6 sender: Sender<T>,
7 inner: threadpool::ThreadPool,
8}
9
10impl<T> TaskPool<T> {
11 pub(crate) fn new(sender: Sender<T>) -> TaskPool<T> {
12 TaskPool { sender, inner: threadpool::ThreadPool::default() }
13 }
14
15 pub(crate) fn spawn<F>(&mut self, task: F)
16 where
17 F: FnOnce() -> T + Send + 'static,
18 T: Send + 'static,
19 {
20 self.inner.execute({
21 let sender = self.sender.clone();
22 move || sender.send(task()).unwrap()
23 })
24 }
25
26 pub(crate) fn len(&self) -> usize {
27 self.inner.queued_count()
28 }
29}
30
31impl<T> Drop for TaskPool<T> {
32 fn drop(&mut self) {
33 self.inner.join()
34 }
35}
diff --git a/crates/vfs-notify/src/lib.rs b/crates/vfs-notify/src/lib.rs
index 282cf0358..68fdb8cb0 100644
--- a/crates/vfs-notify/src/lib.rs
+++ b/crates/vfs-notify/src/lib.rs
@@ -83,8 +83,8 @@ impl NotifyActor {
83 } 83 }
84 } 84 }
85 85
86 fn run(mut self, receiver: Receiver<Message>) { 86 fn run(mut self, inbox: Receiver<Message>) {
87 while let Some(event) = self.next_event(&receiver) { 87 while let Some(event) = self.next_event(&inbox) {
88 log::debug!("vfs-notify event: {:?}", event); 88 log::debug!("vfs-notify event: {:?}", event);
89 match event { 89 match event {
90 Event::Message(msg) => match msg { 90 Event::Message(msg) => match msg {