aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleksey Kladov <[email protected]>2020-06-25 14:35:42 +0100
committerAleksey Kladov <[email protected]>2020-06-25 14:35:42 +0100
commitdd20c2ec5bc0c5ac02149479c2b5f3746f3df505 (patch)
treef49e366fefec372f037b9e523e0fa144bc08fb6e
parent9be0094b5cc24d82541d98a7bd00187c18388fd3 (diff)
Move TaskPool into GlobalState
-rw-r--r--crates/rust-analyzer/src/global_state.rs11
-rw-r--r--crates/rust-analyzer/src/lib.rs1
-rw-r--r--crates/rust-analyzer/src/main_loop.rs135
-rw-r--r--crates/rust-analyzer/src/thread_pool.rs35
4 files changed, 98 insertions, 84 deletions
diff --git a/crates/rust-analyzer/src/global_state.rs b/crates/rust-analyzer/src/global_state.rs
index 446207e9e..de6b95686 100644
--- a/crates/rust-analyzer/src/global_state.rs
+++ b/crates/rust-analyzer/src/global_state.rs
@@ -20,8 +20,9 @@ use crate::{
20 diagnostics::{CheckFixes, DiagnosticCollection}, 20 diagnostics::{CheckFixes, DiagnosticCollection},
21 from_proto, 21 from_proto,
22 line_endings::LineEndings, 22 line_endings::LineEndings,
23 main_loop::ReqQueue, 23 main_loop::{ReqQueue, Task},
24 request_metrics::{LatestRequests, RequestMetrics}, 24 request_metrics::{LatestRequests, RequestMetrics},
25 thread_pool::TaskPool,
25 to_proto::url_from_abs_path, 26 to_proto::url_from_abs_path,
26 Result, 27 Result,
27}; 28};
@@ -66,6 +67,7 @@ impl Default for Status {
66/// incremental salsa database. 67/// incremental salsa database.
67pub(crate) struct GlobalState { 68pub(crate) struct GlobalState {
68 pub(crate) config: Config, 69 pub(crate) config: Config,
70 pub(crate) task_pool: (TaskPool<Task>, Receiver<Task>),
69 pub(crate) analysis_host: AnalysisHost, 71 pub(crate) analysis_host: AnalysisHost,
70 pub(crate) loader: Box<dyn vfs::loader::Handle>, 72 pub(crate) loader: Box<dyn vfs::loader::Handle>,
71 pub(crate) task_receiver: Receiver<vfs::loader::Message>, 73 pub(crate) task_receiver: Receiver<vfs::loader::Message>,
@@ -153,8 +155,15 @@ impl GlobalState {
153 155
154 let mut analysis_host = AnalysisHost::new(lru_capacity); 156 let mut analysis_host = AnalysisHost::new(lru_capacity);
155 analysis_host.apply_change(change); 157 analysis_host.apply_change(change);
158
159 let task_pool = {
160 let (sender, receiver) = unbounded();
161 (TaskPool::new(sender), receiver)
162 };
163
156 let mut res = GlobalState { 164 let mut res = GlobalState {
157 config, 165 config,
166 task_pool,
158 analysis_host, 167 analysis_host,
159 loader, 168 loader,
160 task_receiver, 169 task_receiver,
diff --git a/crates/rust-analyzer/src/lib.rs b/crates/rust-analyzer/src/lib.rs
index 794286672..ca788dd3c 100644
--- a/crates/rust-analyzer/src/lib.rs
+++ b/crates/rust-analyzer/src/lib.rs
@@ -30,6 +30,7 @@ mod diagnostics;
30mod line_endings; 30mod line_endings;
31mod request_metrics; 31mod request_metrics;
32mod lsp_utils; 32mod lsp_utils;
33mod thread_pool;
33pub mod lsp_ext; 34pub mod lsp_ext;
34pub mod config; 35pub mod config;
35 36
diff --git a/crates/rust-analyzer/src/main_loop.rs b/crates/rust-analyzer/src/main_loop.rs
index a7a7d2eb7..1a9c5ee2c 100644
--- a/crates/rust-analyzer/src/main_loop.rs
+++ b/crates/rust-analyzer/src/main_loop.rs
@@ -2,11 +2,10 @@
2//! requests/replies and notifications back to the client. 2//! requests/replies and notifications back to the client.
3use std::{ 3use std::{
4 env, fmt, panic, 4 env, fmt, panic,
5 sync::Arc,
6 time::{Duration, Instant}, 5 time::{Duration, Instant},
7}; 6};
8 7
9use crossbeam_channel::{never, select, unbounded, RecvError, Sender}; 8use crossbeam_channel::{never, select, RecvError, Sender};
10use lsp_server::{Connection, ErrorCode, Notification, Request, RequestId, Response}; 9use lsp_server::{Connection, ErrorCode, Notification, Request, RequestId, Response};
11use lsp_types::{request::Request as _, NumberOrString}; 10use lsp_types::{request::Request as _, NumberOrString};
12use ra_db::VfsPath; 11use ra_db::VfsPath;
@@ -14,7 +13,6 @@ use ra_ide::{Canceled, FileId};
14use ra_prof::profile; 13use ra_prof::profile;
15use ra_project_model::{PackageRoot, ProjectWorkspace}; 14use ra_project_model::{PackageRoot, ProjectWorkspace};
16use serde::{de::DeserializeOwned, Serialize}; 15use serde::{de::DeserializeOwned, Serialize};
17use threadpool::ThreadPool;
18 16
19use crate::{ 17use crate::{
20 config::{Config, FilesWatcher, LinkedProject}, 18 config::{Config, FilesWatcher, LinkedProject},
@@ -118,12 +116,8 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
118 GlobalState::new(workspaces, config.lru_capacity, config, req_queue) 116 GlobalState::new(workspaces, config.lru_capacity, config, req_queue)
119 }; 117 };
120 118
121 let pool = ThreadPool::default();
122 let (task_sender, task_receiver) = unbounded::<Task>();
123
124 log::info!("server initialized, serving requests"); 119 log::info!("server initialized, serving requests");
125 { 120 {
126 let task_sender = task_sender;
127 loop { 121 loop {
128 log::trace!("selecting"); 122 log::trace!("selecting");
129 let event = select! { 123 let event = select! {
@@ -131,7 +125,7 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
131 Ok(msg) => Event::Lsp(msg), 125 Ok(msg) => Event::Lsp(msg),
132 Err(RecvError) => return Err("client exited without shutdown".into()), 126 Err(RecvError) => return Err("client exited without shutdown".into()),
133 }, 127 },
134 recv(task_receiver) -> task => Event::Task(task.unwrap()), 128 recv(&global_state.task_pool.1) -> task => Event::Task(task.unwrap()),
135 recv(global_state.task_receiver) -> task => match task { 129 recv(global_state.task_receiver) -> task => match task {
136 Ok(task) => Event::Vfs(task), 130 Ok(task) => Event::Vfs(task),
137 Err(RecvError) => return Err("vfs died".into()), 131 Err(RecvError) => return Err("vfs died".into()),
@@ -147,29 +141,19 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
147 }; 141 };
148 } 142 }
149 assert!(!global_state.vfs.read().0.has_changes()); 143 assert!(!global_state.vfs.read().0.has_changes());
150 loop_turn(&pool, &task_sender, &connection, &mut global_state, event)?; 144 loop_turn(&connection, &mut global_state, event)?;
151 assert!(!global_state.vfs.read().0.has_changes()); 145 assert!(!global_state.vfs.read().0.has_changes());
152 } 146 }
153 } 147 }
154 global_state.analysis_host.request_cancellation(); 148 global_state.analysis_host.request_cancellation();
155 log::info!("waiting for tasks to finish...");
156 task_receiver.into_iter().for_each(|task| on_task(task, &connection.sender, &mut global_state));
157 log::info!("...tasks have finished");
158 log::info!("joining threadpool...");
159 pool.join();
160 drop(pool);
161 log::info!("...threadpool has finished");
162
163 let vfs = Arc::try_unwrap(global_state.vfs).expect("all snapshots should be dead");
164 drop(vfs);
165
166 Ok(()) 149 Ok(())
167} 150}
168 151
169#[derive(Debug)] 152#[derive(Debug)]
170enum Task { 153pub(crate) enum Task {
171 Respond(Response), 154 Respond(Response),
172 Diagnostic(DiagnosticTask), 155 Diagnostics(Vec<DiagnosticTask>),
156 Unit,
173} 157}
174 158
175enum Event { 159enum Event {
@@ -215,19 +199,13 @@ pub(crate) type ReqHandler = fn(&mut GlobalState, Response);
215pub(crate) type ReqQueue = lsp_server::ReqQueue<(&'static str, Instant), ReqHandler>; 199pub(crate) type ReqQueue = lsp_server::ReqQueue<(&'static str, Instant), ReqHandler>;
216const DO_NOTHING: ReqHandler = |_, _| (); 200const DO_NOTHING: ReqHandler = |_, _| ();
217 201
218fn loop_turn( 202fn loop_turn(connection: &Connection, global_state: &mut GlobalState, event: Event) -> Result<()> {
219 pool: &ThreadPool,
220 task_sender: &Sender<Task>,
221 connection: &Connection,
222 global_state: &mut GlobalState,
223 event: Event,
224) -> Result<()> {
225 let loop_start = Instant::now(); 203 let loop_start = Instant::now();
226 204
227 // NOTE: don't count blocking select! call as a loop-turn time 205 // NOTE: don't count blocking select! call as a loop-turn time
228 let _p = profile("main_loop_inner/loop-turn"); 206 let _p = profile("main_loop_inner/loop-turn");
229 log::info!("loop turn = {:?}", event); 207 log::info!("loop turn = {:?}", event);
230 let queue_count = pool.queued_count(); 208 let queue_count = global_state.task_pool.0.len();
231 if queue_count > 0 { 209 if queue_count > 0 {
232 log::info!("queued count = {}", queue_count); 210 log::info!("queued count = {}", queue_count);
233 } 211 }
@@ -269,12 +247,10 @@ fn loop_turn(
269 ) 247 )
270 } 248 }
271 }, 249 },
272 Event::Flycheck(task) => { 250 Event::Flycheck(task) => on_check_task(task, global_state, &connection.sender)?,
273 on_check_task(task, global_state, task_sender, &connection.sender)?
274 }
275 Event::Lsp(msg) => match msg { 251 Event::Lsp(msg) => match msg {
276 lsp_server::Message::Request(req) => { 252 lsp_server::Message::Request(req) => {
277 on_request(global_state, pool, task_sender, &connection.sender, loop_start, req)? 253 on_request(global_state, &connection.sender, loop_start, req)?
278 } 254 }
279 lsp_server::Message::Notification(not) => { 255 lsp_server::Message::Notification(not) => {
280 on_notification(&connection.sender, global_state, not)?; 256 on_notification(&connection.sender, global_state, not)?;
@@ -301,16 +277,14 @@ fn loop_turn(
301 .map(|path| global_state.vfs.read().0.file_id(&path).unwrap()) 277 .map(|path| global_state.vfs.read().0.file_id(&path).unwrap())
302 .collect::<Vec<_>>(); 278 .collect::<Vec<_>>();
303 279
304 update_file_notifications_on_threadpool( 280 update_file_notifications_on_threadpool(global_state, subscriptions.clone());
305 pool, 281 global_state.task_pool.0.spawn({
306 global_state.snapshot(),
307 task_sender.clone(),
308 subscriptions.clone(),
309 );
310 pool.execute({
311 let subs = subscriptions; 282 let subs = subscriptions;
312 let snap = global_state.snapshot(); 283 let snap = global_state.snapshot();
313 move || snap.analysis.prime_caches(subs).unwrap_or_else(|_: Canceled| ()) 284 move || {
285 snap.analysis.prime_caches(subs).unwrap_or_else(|_: Canceled| ());
286 Task::Unit
287 }
314 }); 288 });
315 } 289 }
316 290
@@ -345,26 +319,21 @@ fn on_task(task: Task, msg_sender: &Sender<lsp_server::Message>, global_state: &
345 msg_sender.send(response.into()).unwrap(); 319 msg_sender.send(response.into()).unwrap();
346 } 320 }
347 } 321 }
348 Task::Diagnostic(task) => on_diagnostic_task(task, msg_sender, global_state), 322 Task::Diagnostics(tasks) => {
323 tasks.into_iter().for_each(|task| on_diagnostic_task(task, msg_sender, global_state))
324 }
325 Task::Unit => (),
349 } 326 }
350} 327}
351 328
352fn on_request( 329fn on_request(
353 global_state: &mut GlobalState, 330 global_state: &mut GlobalState,
354 pool: &ThreadPool,
355 task_sender: &Sender<Task>,
356 msg_sender: &Sender<lsp_server::Message>, 331 msg_sender: &Sender<lsp_server::Message>,
357 request_received: Instant, 332 request_received: Instant,
358 req: Request, 333 req: Request,
359) -> Result<()> { 334) -> Result<()> {
360 let mut pool_dispatcher = PoolDispatcher { 335 let mut pool_dispatcher =
361 req: Some(req), 336 PoolDispatcher { req: Some(req), global_state, msg_sender, request_received };
362 pool,
363 global_state,
364 task_sender,
365 msg_sender,
366 request_received,
367 };
368 pool_dispatcher 337 pool_dispatcher
369 .on_sync::<lsp_ext::CollectGarbage>(|s, ()| Ok(s.collect_garbage()))? 338 .on_sync::<lsp_ext::CollectGarbage>(|s, ()| Ok(s.collect_garbage()))?
370 .on_sync::<lsp_ext::JoinLines>(|s, p| handlers::handle_join_lines(s.snapshot(), p))? 339 .on_sync::<lsp_ext::JoinLines>(|s, p| handlers::handle_join_lines(s.snapshot(), p))?
@@ -552,12 +521,11 @@ fn on_notification(
552fn on_check_task( 521fn on_check_task(
553 task: flycheck::Message, 522 task: flycheck::Message,
554 global_state: &mut GlobalState, 523 global_state: &mut GlobalState,
555 task_sender: &Sender<Task>,
556 msg_sender: &Sender<lsp_server::Message>, 524 msg_sender: &Sender<lsp_server::Message>,
557) -> Result<()> { 525) -> Result<()> {
558 match task { 526 match task {
559 flycheck::Message::ClearDiagnostics => { 527 flycheck::Message::ClearDiagnostics => {
560 task_sender.send(Task::Diagnostic(DiagnosticTask::ClearCheck))?; 528 on_diagnostic_task(DiagnosticTask::ClearCheck, msg_sender, global_state)
561 } 529 }
562 530
563 flycheck::Message::AddDiagnostic { workspace_root, diagnostic } => { 531 flycheck::Message::AddDiagnostic { workspace_root, diagnostic } => {
@@ -576,11 +544,15 @@ fn on_check_task(
576 } 544 }
577 }; 545 };
578 546
579 task_sender.send(Task::Diagnostic(DiagnosticTask::AddCheck( 547 on_diagnostic_task(
580 file_id, 548 DiagnosticTask::AddCheck(
581 diag.diagnostic, 549 file_id,
582 diag.fixes.into_iter().map(|it| it.into()).collect(), 550 diag.diagnostic,
583 )))?; 551 diag.fixes.into_iter().map(|it| it.into()).collect(),
552 ),
553 msg_sender,
554 global_state,
555 )
584 } 556 }
585 } 557 }
586 558
@@ -674,10 +646,8 @@ fn report_progress(
674 646
675struct PoolDispatcher<'a> { 647struct PoolDispatcher<'a> {
676 req: Option<Request>, 648 req: Option<Request>,
677 pool: &'a ThreadPool,
678 global_state: &'a mut GlobalState, 649 global_state: &'a mut GlobalState,
679 msg_sender: &'a Sender<lsp_server::Message>, 650 msg_sender: &'a Sender<lsp_server::Message>,
680 task_sender: &'a Sender<Task>,
681 request_received: Instant, 651 request_received: Instant,
682} 652}
683 653
@@ -725,13 +695,11 @@ impl<'a> PoolDispatcher<'a> {
725 } 695 }
726 }; 696 };
727 697
728 self.pool.execute({ 698 self.global_state.task_pool.0.spawn({
729 let world = self.global_state.snapshot(); 699 let world = self.global_state.snapshot();
730 let sender = self.task_sender.clone();
731 move || { 700 move || {
732 let result = f(world, params); 701 let result = f(world, params);
733 let task = result_to_task::<R>(id, result); 702 result_to_task::<R>(id, result)
734 sender.send(task).unwrap();
735 } 703 }
736 }); 704 });
737 705
@@ -801,26 +769,27 @@ where
801} 769}
802 770
803fn update_file_notifications_on_threadpool( 771fn update_file_notifications_on_threadpool(
804 pool: &ThreadPool, 772 global_state: &mut GlobalState,
805 world: GlobalStateSnapshot,
806 task_sender: Sender<Task>,
807 subscriptions: Vec<FileId>, 773 subscriptions: Vec<FileId>,
808) { 774) {
809 log::trace!("updating notifications for {:?}", subscriptions); 775 log::trace!("updating notifications for {:?}", subscriptions);
810 if world.config.publish_diagnostics { 776 if global_state.config.publish_diagnostics {
811 pool.execute(move || { 777 let snapshot = global_state.snapshot();
812 for file_id in subscriptions { 778 global_state.task_pool.0.spawn(move || {
813 match handlers::publish_diagnostics(&world, file_id) { 779 let diagnostics = subscriptions
814 Err(e) => { 780 .into_iter()
815 if !is_canceled(&*e) { 781 .filter_map(|file_id| {
816 log::error!("failed to compute diagnostics: {:?}", e); 782 handlers::publish_diagnostics(&snapshot, file_id)
817 } 783 .map_err(|err| {
818 } 784 if !is_canceled(&*err) {
819 Ok(task) => { 785 log::error!("failed to compute diagnostics: {:?}", err);
820 task_sender.send(Task::Diagnostic(task)).unwrap(); 786 }
821 } 787 ()
822 } 788 })
823 } 789 .ok()
790 })
791 .collect::<Vec<_>>();
792 Task::Diagnostics(diagnostics)
824 }) 793 })
825 } 794 }
826} 795}
diff --git a/crates/rust-analyzer/src/thread_pool.rs b/crates/rust-analyzer/src/thread_pool.rs
new file mode 100644
index 000000000..4fa502925
--- /dev/null
+++ b/crates/rust-analyzer/src/thread_pool.rs
@@ -0,0 +1,35 @@
1//! A thin wrapper around `ThreadPool` to make sure that we join all things
2//! properly.
3use crossbeam_channel::Sender;
4
5pub(crate) struct TaskPool<T> {
6 sender: Sender<T>,
7 inner: threadpool::ThreadPool,
8}
9
10impl<T> TaskPool<T> {
11 pub(crate) fn new(sender: Sender<T>) -> TaskPool<T> {
12 TaskPool { sender, inner: threadpool::ThreadPool::default() }
13 }
14
15 pub(crate) fn spawn<F>(&mut self, task: F)
16 where
17 F: FnOnce() -> T + Send + 'static,
18 T: Send + 'static,
19 {
20 self.inner.execute({
21 let sender = self.sender.clone();
22 move || sender.send(task()).unwrap()
23 })
24 }
25
26 pub(crate) fn len(&self) -> usize {
27 self.inner.queued_count()
28 }
29}
30
31impl<T> Drop for TaskPool<T> {
32 fn drop(&mut self) {
33 self.inner.join()
34 }
35}