diff options
author | bors[bot] <26634292+bors[bot]@users.noreply.github.com> | 2020-06-25 14:36:22 +0100 |
---|---|---|
committer | GitHub <[email protected]> | 2020-06-25 14:36:22 +0100 |
commit | 96d335d578984b42c2e00c715c924bceca391a6c (patch) | |
tree | f49e366fefec372f037b9e523e0fa144bc08fb6e | |
parent | 9be0094b5cc24d82541d98a7bd00187c18388fd3 (diff) | |
parent | dd20c2ec5bc0c5ac02149479c2b5f3746f3df505 (diff) |
Merge #5061
5061: Move TaskPool into GlobalState r=matklad a=matklad
bors r+
🤖
Co-authored-by: Aleksey Kladov <[email protected]>
-rw-r--r-- | crates/rust-analyzer/src/global_state.rs | 11 | ||||
-rw-r--r-- | crates/rust-analyzer/src/lib.rs | 1 | ||||
-rw-r--r-- | crates/rust-analyzer/src/main_loop.rs | 135 | ||||
-rw-r--r-- | crates/rust-analyzer/src/thread_pool.rs | 35 |
4 files changed, 98 insertions, 84 deletions
diff --git a/crates/rust-analyzer/src/global_state.rs b/crates/rust-analyzer/src/global_state.rs index 446207e9e..de6b95686 100644 --- a/crates/rust-analyzer/src/global_state.rs +++ b/crates/rust-analyzer/src/global_state.rs | |||
@@ -20,8 +20,9 @@ use crate::{ | |||
20 | diagnostics::{CheckFixes, DiagnosticCollection}, | 20 | diagnostics::{CheckFixes, DiagnosticCollection}, |
21 | from_proto, | 21 | from_proto, |
22 | line_endings::LineEndings, | 22 | line_endings::LineEndings, |
23 | main_loop::ReqQueue, | 23 | main_loop::{ReqQueue, Task}, |
24 | request_metrics::{LatestRequests, RequestMetrics}, | 24 | request_metrics::{LatestRequests, RequestMetrics}, |
25 | thread_pool::TaskPool, | ||
25 | to_proto::url_from_abs_path, | 26 | to_proto::url_from_abs_path, |
26 | Result, | 27 | Result, |
27 | }; | 28 | }; |
@@ -66,6 +67,7 @@ impl Default for Status { | |||
66 | /// incremental salsa database. | 67 | /// incremental salsa database. |
67 | pub(crate) struct GlobalState { | 68 | pub(crate) struct GlobalState { |
68 | pub(crate) config: Config, | 69 | pub(crate) config: Config, |
70 | pub(crate) task_pool: (TaskPool<Task>, Receiver<Task>), | ||
69 | pub(crate) analysis_host: AnalysisHost, | 71 | pub(crate) analysis_host: AnalysisHost, |
70 | pub(crate) loader: Box<dyn vfs::loader::Handle>, | 72 | pub(crate) loader: Box<dyn vfs::loader::Handle>, |
71 | pub(crate) task_receiver: Receiver<vfs::loader::Message>, | 73 | pub(crate) task_receiver: Receiver<vfs::loader::Message>, |
@@ -153,8 +155,15 @@ impl GlobalState { | |||
153 | 155 | ||
154 | let mut analysis_host = AnalysisHost::new(lru_capacity); | 156 | let mut analysis_host = AnalysisHost::new(lru_capacity); |
155 | analysis_host.apply_change(change); | 157 | analysis_host.apply_change(change); |
158 | |||
159 | let task_pool = { | ||
160 | let (sender, receiver) = unbounded(); | ||
161 | (TaskPool::new(sender), receiver) | ||
162 | }; | ||
163 | |||
156 | let mut res = GlobalState { | 164 | let mut res = GlobalState { |
157 | config, | 165 | config, |
166 | task_pool, | ||
158 | analysis_host, | 167 | analysis_host, |
159 | loader, | 168 | loader, |
160 | task_receiver, | 169 | task_receiver, |
diff --git a/crates/rust-analyzer/src/lib.rs b/crates/rust-analyzer/src/lib.rs index 794286672..ca788dd3c 100644 --- a/crates/rust-analyzer/src/lib.rs +++ b/crates/rust-analyzer/src/lib.rs | |||
@@ -30,6 +30,7 @@ mod diagnostics; | |||
30 | mod line_endings; | 30 | mod line_endings; |
31 | mod request_metrics; | 31 | mod request_metrics; |
32 | mod lsp_utils; | 32 | mod lsp_utils; |
33 | mod thread_pool; | ||
33 | pub mod lsp_ext; | 34 | pub mod lsp_ext; |
34 | pub mod config; | 35 | pub mod config; |
35 | 36 | ||
diff --git a/crates/rust-analyzer/src/main_loop.rs b/crates/rust-analyzer/src/main_loop.rs index a7a7d2eb7..1a9c5ee2c 100644 --- a/crates/rust-analyzer/src/main_loop.rs +++ b/crates/rust-analyzer/src/main_loop.rs | |||
@@ -2,11 +2,10 @@ | |||
2 | //! requests/replies and notifications back to the client. | 2 | //! requests/replies and notifications back to the client. |
3 | use std::{ | 3 | use std::{ |
4 | env, fmt, panic, | 4 | env, fmt, panic, |
5 | sync::Arc, | ||
6 | time::{Duration, Instant}, | 5 | time::{Duration, Instant}, |
7 | }; | 6 | }; |
8 | 7 | ||
9 | use crossbeam_channel::{never, select, unbounded, RecvError, Sender}; | 8 | use crossbeam_channel::{never, select, RecvError, Sender}; |
10 | use lsp_server::{Connection, ErrorCode, Notification, Request, RequestId, Response}; | 9 | use lsp_server::{Connection, ErrorCode, Notification, Request, RequestId, Response}; |
11 | use lsp_types::{request::Request as _, NumberOrString}; | 10 | use lsp_types::{request::Request as _, NumberOrString}; |
12 | use ra_db::VfsPath; | 11 | use ra_db::VfsPath; |
@@ -14,7 +13,6 @@ use ra_ide::{Canceled, FileId}; | |||
14 | use ra_prof::profile; | 13 | use ra_prof::profile; |
15 | use ra_project_model::{PackageRoot, ProjectWorkspace}; | 14 | use ra_project_model::{PackageRoot, ProjectWorkspace}; |
16 | use serde::{de::DeserializeOwned, Serialize}; | 15 | use serde::{de::DeserializeOwned, Serialize}; |
17 | use threadpool::ThreadPool; | ||
18 | 16 | ||
19 | use crate::{ | 17 | use crate::{ |
20 | config::{Config, FilesWatcher, LinkedProject}, | 18 | config::{Config, FilesWatcher, LinkedProject}, |
@@ -118,12 +116,8 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> { | |||
118 | GlobalState::new(workspaces, config.lru_capacity, config, req_queue) | 116 | GlobalState::new(workspaces, config.lru_capacity, config, req_queue) |
119 | }; | 117 | }; |
120 | 118 | ||
121 | let pool = ThreadPool::default(); | ||
122 | let (task_sender, task_receiver) = unbounded::<Task>(); | ||
123 | |||
124 | log::info!("server initialized, serving requests"); | 119 | log::info!("server initialized, serving requests"); |
125 | { | 120 | { |
126 | let task_sender = task_sender; | ||
127 | loop { | 121 | loop { |
128 | log::trace!("selecting"); | 122 | log::trace!("selecting"); |
129 | let event = select! { | 123 | let event = select! { |
@@ -131,7 +125,7 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> { | |||
131 | Ok(msg) => Event::Lsp(msg), | 125 | Ok(msg) => Event::Lsp(msg), |
132 | Err(RecvError) => return Err("client exited without shutdown".into()), | 126 | Err(RecvError) => return Err("client exited without shutdown".into()), |
133 | }, | 127 | }, |
134 | recv(task_receiver) -> task => Event::Task(task.unwrap()), | 128 | recv(&global_state.task_pool.1) -> task => Event::Task(task.unwrap()), |
135 | recv(global_state.task_receiver) -> task => match task { | 129 | recv(global_state.task_receiver) -> task => match task { |
136 | Ok(task) => Event::Vfs(task), | 130 | Ok(task) => Event::Vfs(task), |
137 | Err(RecvError) => return Err("vfs died".into()), | 131 | Err(RecvError) => return Err("vfs died".into()), |
@@ -147,29 +141,19 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> { | |||
147 | }; | 141 | }; |
148 | } | 142 | } |
149 | assert!(!global_state.vfs.read().0.has_changes()); | 143 | assert!(!global_state.vfs.read().0.has_changes()); |
150 | loop_turn(&pool, &task_sender, &connection, &mut global_state, event)?; | 144 | loop_turn(&connection, &mut global_state, event)?; |
151 | assert!(!global_state.vfs.read().0.has_changes()); | 145 | assert!(!global_state.vfs.read().0.has_changes()); |
152 | } | 146 | } |
153 | } | 147 | } |
154 | global_state.analysis_host.request_cancellation(); | 148 | global_state.analysis_host.request_cancellation(); |
155 | log::info!("waiting for tasks to finish..."); | ||
156 | task_receiver.into_iter().for_each(|task| on_task(task, &connection.sender, &mut global_state)); | ||
157 | log::info!("...tasks have finished"); | ||
158 | log::info!("joining threadpool..."); | ||
159 | pool.join(); | ||
160 | drop(pool); | ||
161 | log::info!("...threadpool has finished"); | ||
162 | |||
163 | let vfs = Arc::try_unwrap(global_state.vfs).expect("all snapshots should be dead"); | ||
164 | drop(vfs); | ||
165 | |||
166 | Ok(()) | 149 | Ok(()) |
167 | } | 150 | } |
168 | 151 | ||
169 | #[derive(Debug)] | 152 | #[derive(Debug)] |
170 | enum Task { | 153 | pub(crate) enum Task { |
171 | Respond(Response), | 154 | Respond(Response), |
172 | Diagnostic(DiagnosticTask), | 155 | Diagnostics(Vec<DiagnosticTask>), |
156 | Unit, | ||
173 | } | 157 | } |
174 | 158 | ||
175 | enum Event { | 159 | enum Event { |
@@ -215,19 +199,13 @@ pub(crate) type ReqHandler = fn(&mut GlobalState, Response); | |||
215 | pub(crate) type ReqQueue = lsp_server::ReqQueue<(&'static str, Instant), ReqHandler>; | 199 | pub(crate) type ReqQueue = lsp_server::ReqQueue<(&'static str, Instant), ReqHandler>; |
216 | const DO_NOTHING: ReqHandler = |_, _| (); | 200 | const DO_NOTHING: ReqHandler = |_, _| (); |
217 | 201 | ||
218 | fn loop_turn( | 202 | fn loop_turn(connection: &Connection, global_state: &mut GlobalState, event: Event) -> Result<()> { |
219 | pool: &ThreadPool, | ||
220 | task_sender: &Sender<Task>, | ||
221 | connection: &Connection, | ||
222 | global_state: &mut GlobalState, | ||
223 | event: Event, | ||
224 | ) -> Result<()> { | ||
225 | let loop_start = Instant::now(); | 203 | let loop_start = Instant::now(); |
226 | 204 | ||
227 | // NOTE: don't count blocking select! call as a loop-turn time | 205 | // NOTE: don't count blocking select! call as a loop-turn time |
228 | let _p = profile("main_loop_inner/loop-turn"); | 206 | let _p = profile("main_loop_inner/loop-turn"); |
229 | log::info!("loop turn = {:?}", event); | 207 | log::info!("loop turn = {:?}", event); |
230 | let queue_count = pool.queued_count(); | 208 | let queue_count = global_state.task_pool.0.len(); |
231 | if queue_count > 0 { | 209 | if queue_count > 0 { |
232 | log::info!("queued count = {}", queue_count); | 210 | log::info!("queued count = {}", queue_count); |
233 | } | 211 | } |
@@ -269,12 +247,10 @@ fn loop_turn( | |||
269 | ) | 247 | ) |
270 | } | 248 | } |
271 | }, | 249 | }, |
272 | Event::Flycheck(task) => { | 250 | Event::Flycheck(task) => on_check_task(task, global_state, &connection.sender)?, |
273 | on_check_task(task, global_state, task_sender, &connection.sender)? | ||
274 | } | ||
275 | Event::Lsp(msg) => match msg { | 251 | Event::Lsp(msg) => match msg { |
276 | lsp_server::Message::Request(req) => { | 252 | lsp_server::Message::Request(req) => { |
277 | on_request(global_state, pool, task_sender, &connection.sender, loop_start, req)? | 253 | on_request(global_state, &connection.sender, loop_start, req)? |
278 | } | 254 | } |
279 | lsp_server::Message::Notification(not) => { | 255 | lsp_server::Message::Notification(not) => { |
280 | on_notification(&connection.sender, global_state, not)?; | 256 | on_notification(&connection.sender, global_state, not)?; |
@@ -301,16 +277,14 @@ fn loop_turn( | |||
301 | .map(|path| global_state.vfs.read().0.file_id(&path).unwrap()) | 277 | .map(|path| global_state.vfs.read().0.file_id(&path).unwrap()) |
302 | .collect::<Vec<_>>(); | 278 | .collect::<Vec<_>>(); |
303 | 279 | ||
304 | update_file_notifications_on_threadpool( | 280 | update_file_notifications_on_threadpool(global_state, subscriptions.clone()); |
305 | pool, | 281 | global_state.task_pool.0.spawn({ |
306 | global_state.snapshot(), | ||
307 | task_sender.clone(), | ||
308 | subscriptions.clone(), | ||
309 | ); | ||
310 | pool.execute({ | ||
311 | let subs = subscriptions; | 282 | let subs = subscriptions; |
312 | let snap = global_state.snapshot(); | 283 | let snap = global_state.snapshot(); |
313 | move || snap.analysis.prime_caches(subs).unwrap_or_else(|_: Canceled| ()) | 284 | move || { |
285 | snap.analysis.prime_caches(subs).unwrap_or_else(|_: Canceled| ()); | ||
286 | Task::Unit | ||
287 | } | ||
314 | }); | 288 | }); |
315 | } | 289 | } |
316 | 290 | ||
@@ -345,26 +319,21 @@ fn on_task(task: Task, msg_sender: &Sender<lsp_server::Message>, global_state: & | |||
345 | msg_sender.send(response.into()).unwrap(); | 319 | msg_sender.send(response.into()).unwrap(); |
346 | } | 320 | } |
347 | } | 321 | } |
348 | Task::Diagnostic(task) => on_diagnostic_task(task, msg_sender, global_state), | 322 | Task::Diagnostics(tasks) => { |
323 | tasks.into_iter().for_each(|task| on_diagnostic_task(task, msg_sender, global_state)) | ||
324 | } | ||
325 | Task::Unit => (), | ||
349 | } | 326 | } |
350 | } | 327 | } |
351 | 328 | ||
352 | fn on_request( | 329 | fn on_request( |
353 | global_state: &mut GlobalState, | 330 | global_state: &mut GlobalState, |
354 | pool: &ThreadPool, | ||
355 | task_sender: &Sender<Task>, | ||
356 | msg_sender: &Sender<lsp_server::Message>, | 331 | msg_sender: &Sender<lsp_server::Message>, |
357 | request_received: Instant, | 332 | request_received: Instant, |
358 | req: Request, | 333 | req: Request, |
359 | ) -> Result<()> { | 334 | ) -> Result<()> { |
360 | let mut pool_dispatcher = PoolDispatcher { | 335 | let mut pool_dispatcher = |
361 | req: Some(req), | 336 | PoolDispatcher { req: Some(req), global_state, msg_sender, request_received }; |
362 | pool, | ||
363 | global_state, | ||
364 | task_sender, | ||
365 | msg_sender, | ||
366 | request_received, | ||
367 | }; | ||
368 | pool_dispatcher | 337 | pool_dispatcher |
369 | .on_sync::<lsp_ext::CollectGarbage>(|s, ()| Ok(s.collect_garbage()))? | 338 | .on_sync::<lsp_ext::CollectGarbage>(|s, ()| Ok(s.collect_garbage()))? |
370 | .on_sync::<lsp_ext::JoinLines>(|s, p| handlers::handle_join_lines(s.snapshot(), p))? | 339 | .on_sync::<lsp_ext::JoinLines>(|s, p| handlers::handle_join_lines(s.snapshot(), p))? |
@@ -552,12 +521,11 @@ fn on_notification( | |||
552 | fn on_check_task( | 521 | fn on_check_task( |
553 | task: flycheck::Message, | 522 | task: flycheck::Message, |
554 | global_state: &mut GlobalState, | 523 | global_state: &mut GlobalState, |
555 | task_sender: &Sender<Task>, | ||
556 | msg_sender: &Sender<lsp_server::Message>, | 524 | msg_sender: &Sender<lsp_server::Message>, |
557 | ) -> Result<()> { | 525 | ) -> Result<()> { |
558 | match task { | 526 | match task { |
559 | flycheck::Message::ClearDiagnostics => { | 527 | flycheck::Message::ClearDiagnostics => { |
560 | task_sender.send(Task::Diagnostic(DiagnosticTask::ClearCheck))?; | 528 | on_diagnostic_task(DiagnosticTask::ClearCheck, msg_sender, global_state) |
561 | } | 529 | } |
562 | 530 | ||
563 | flycheck::Message::AddDiagnostic { workspace_root, diagnostic } => { | 531 | flycheck::Message::AddDiagnostic { workspace_root, diagnostic } => { |
@@ -576,11 +544,15 @@ fn on_check_task( | |||
576 | } | 544 | } |
577 | }; | 545 | }; |
578 | 546 | ||
579 | task_sender.send(Task::Diagnostic(DiagnosticTask::AddCheck( | 547 | on_diagnostic_task( |
580 | file_id, | 548 | DiagnosticTask::AddCheck( |
581 | diag.diagnostic, | 549 | file_id, |
582 | diag.fixes.into_iter().map(|it| it.into()).collect(), | 550 | diag.diagnostic, |
583 | )))?; | 551 | diag.fixes.into_iter().map(|it| it.into()).collect(), |
552 | ), | ||
553 | msg_sender, | ||
554 | global_state, | ||
555 | ) | ||
584 | } | 556 | } |
585 | } | 557 | } |
586 | 558 | ||
@@ -674,10 +646,8 @@ fn report_progress( | |||
674 | 646 | ||
675 | struct PoolDispatcher<'a> { | 647 | struct PoolDispatcher<'a> { |
676 | req: Option<Request>, | 648 | req: Option<Request>, |
677 | pool: &'a ThreadPool, | ||
678 | global_state: &'a mut GlobalState, | 649 | global_state: &'a mut GlobalState, |
679 | msg_sender: &'a Sender<lsp_server::Message>, | 650 | msg_sender: &'a Sender<lsp_server::Message>, |
680 | task_sender: &'a Sender<Task>, | ||
681 | request_received: Instant, | 651 | request_received: Instant, |
682 | } | 652 | } |
683 | 653 | ||
@@ -725,13 +695,11 @@ impl<'a> PoolDispatcher<'a> { | |||
725 | } | 695 | } |
726 | }; | 696 | }; |
727 | 697 | ||
728 | self.pool.execute({ | 698 | self.global_state.task_pool.0.spawn({ |
729 | let world = self.global_state.snapshot(); | 699 | let world = self.global_state.snapshot(); |
730 | let sender = self.task_sender.clone(); | ||
731 | move || { | 700 | move || { |
732 | let result = f(world, params); | 701 | let result = f(world, params); |
733 | let task = result_to_task::<R>(id, result); | 702 | result_to_task::<R>(id, result) |
734 | sender.send(task).unwrap(); | ||
735 | } | 703 | } |
736 | }); | 704 | }); |
737 | 705 | ||
@@ -801,26 +769,27 @@ where | |||
801 | } | 769 | } |
802 | 770 | ||
803 | fn update_file_notifications_on_threadpool( | 771 | fn update_file_notifications_on_threadpool( |
804 | pool: &ThreadPool, | 772 | global_state: &mut GlobalState, |
805 | world: GlobalStateSnapshot, | ||
806 | task_sender: Sender<Task>, | ||
807 | subscriptions: Vec<FileId>, | 773 | subscriptions: Vec<FileId>, |
808 | ) { | 774 | ) { |
809 | log::trace!("updating notifications for {:?}", subscriptions); | 775 | log::trace!("updating notifications for {:?}", subscriptions); |
810 | if world.config.publish_diagnostics { | 776 | if global_state.config.publish_diagnostics { |
811 | pool.execute(move || { | 777 | let snapshot = global_state.snapshot(); |
812 | for file_id in subscriptions { | 778 | global_state.task_pool.0.spawn(move || { |
813 | match handlers::publish_diagnostics(&world, file_id) { | 779 | let diagnostics = subscriptions |
814 | Err(e) => { | 780 | .into_iter() |
815 | if !is_canceled(&*e) { | 781 | .filter_map(|file_id| { |
816 | log::error!("failed to compute diagnostics: {:?}", e); | 782 | handlers::publish_diagnostics(&snapshot, file_id) |
817 | } | 783 | .map_err(|err| { |
818 | } | 784 | if !is_canceled(&*err) { |
819 | Ok(task) => { | 785 | log::error!("failed to compute diagnostics: {:?}", err); |
820 | task_sender.send(Task::Diagnostic(task)).unwrap(); | 786 | } |
821 | } | 787 | () |
822 | } | 788 | }) |
823 | } | 789 | .ok() |
790 | }) | ||
791 | .collect::<Vec<_>>(); | ||
792 | Task::Diagnostics(diagnostics) | ||
824 | }) | 793 | }) |
825 | } | 794 | } |
826 | } | 795 | } |
diff --git a/crates/rust-analyzer/src/thread_pool.rs b/crates/rust-analyzer/src/thread_pool.rs new file mode 100644 index 000000000..4fa502925 --- /dev/null +++ b/crates/rust-analyzer/src/thread_pool.rs | |||
@@ -0,0 +1,35 @@ | |||
1 | //! A thin wrapper around `ThreadPool` to make sure that we join all things | ||
2 | //! properly. | ||
3 | use crossbeam_channel::Sender; | ||
4 | |||
5 | pub(crate) struct TaskPool<T> { | ||
6 | sender: Sender<T>, | ||
7 | inner: threadpool::ThreadPool, | ||
8 | } | ||
9 | |||
10 | impl<T> TaskPool<T> { | ||
11 | pub(crate) fn new(sender: Sender<T>) -> TaskPool<T> { | ||
12 | TaskPool { sender, inner: threadpool::ThreadPool::default() } | ||
13 | } | ||
14 | |||
15 | pub(crate) fn spawn<F>(&mut self, task: F) | ||
16 | where | ||
17 | F: FnOnce() -> T + Send + 'static, | ||
18 | T: Send + 'static, | ||
19 | { | ||
20 | self.inner.execute({ | ||
21 | let sender = self.sender.clone(); | ||
22 | move || sender.send(task()).unwrap() | ||
23 | }) | ||
24 | } | ||
25 | |||
26 | pub(crate) fn len(&self) -> usize { | ||
27 | self.inner.queued_count() | ||
28 | } | ||
29 | } | ||
30 | |||
31 | impl<T> Drop for TaskPool<T> { | ||
32 | fn drop(&mut self) { | ||
33 | self.inner.join() | ||
34 | } | ||
35 | } | ||