aboutsummaryrefslogtreecommitdiff
path: root/crates
diff options
context:
space:
mode:
Diffstat (limited to 'crates')
-rw-r--r--crates/rust-analyzer/src/global_state.rs31
-rw-r--r--crates/rust-analyzer/src/main_loop.rs112
2 files changed, 64 insertions, 79 deletions
diff --git a/crates/rust-analyzer/src/global_state.rs b/crates/rust-analyzer/src/global_state.rs
index 9a75cb2ab..ad5f94e87 100644
--- a/crates/rust-analyzer/src/global_state.rs
+++ b/crates/rust-analyzer/src/global_state.rs
@@ -20,11 +20,12 @@ use crate::{
20 diagnostics::{CheckFixes, DiagnosticCollection}, 20 diagnostics::{CheckFixes, DiagnosticCollection},
21 from_proto, 21 from_proto,
22 line_endings::LineEndings, 22 line_endings::LineEndings,
23 main_loop::ReqQueue,
23 request_metrics::{LatestRequests, RequestMetrics}, 24 request_metrics::{LatestRequests, RequestMetrics},
24 to_proto::url_from_abs_path, 25 to_proto::url_from_abs_path,
25 Result, 26 Result,
26}; 27};
27use rustc_hash::FxHashMap; 28use rustc_hash::{FxHashMap, FxHashSet};
28 29
29fn create_flycheck(workspaces: &[ProjectWorkspace], config: &FlycheckConfig) -> Option<Flycheck> { 30fn create_flycheck(workspaces: &[ProjectWorkspace], config: &FlycheckConfig) -> Option<Flycheck> {
30 // FIXME: Figure out the multi-workspace situation 31 // FIXME: Figure out the multi-workspace situation
@@ -40,12 +41,23 @@ fn create_flycheck(workspaces: &[ProjectWorkspace], config: &FlycheckConfig) ->
40 }) 41 })
41} 42}
42 43
44#[derive(Eq, PartialEq)]
45pub(crate) enum Status {
46 Loading,
47 Ready,
48}
49
50impl Default for Status {
51 fn default() -> Self {
52 Status::Loading
53 }
54}
55
43/// `GlobalState` is the primary mutable state of the language server 56/// `GlobalState` is the primary mutable state of the language server
44/// 57///
45/// The most interesting components are `vfs`, which stores a consistent 58/// The most interesting components are `vfs`, which stores a consistent
46/// snapshot of the file systems, and `analysis_host`, which stores our 59/// snapshot of the file systems, and `analysis_host`, which stores our
47/// incremental salsa database. 60/// incremental salsa database.
48#[derive(Debug)]
49pub(crate) struct GlobalState { 61pub(crate) struct GlobalState {
50 pub(crate) config: Config, 62 pub(crate) config: Config,
51 pub(crate) workspaces: Arc<Vec<ProjectWorkspace>>, 63 pub(crate) workspaces: Arc<Vec<ProjectWorkspace>>,
@@ -54,10 +66,13 @@ pub(crate) struct GlobalState {
54 pub(crate) task_receiver: Receiver<vfs::loader::Message>, 66 pub(crate) task_receiver: Receiver<vfs::loader::Message>,
55 pub(crate) flycheck: Option<Flycheck>, 67 pub(crate) flycheck: Option<Flycheck>,
56 pub(crate) diagnostics: DiagnosticCollection, 68 pub(crate) diagnostics: DiagnosticCollection,
57 pub(crate) proc_macro_client: ProcMacroClient, 69 pub(crate) mem_docs: FxHashSet<VfsPath>,
58 pub(crate) vfs: Arc<RwLock<(vfs::Vfs, FxHashMap<FileId, LineEndings>)>>, 70 pub(crate) vfs: Arc<RwLock<(vfs::Vfs, FxHashMap<FileId, LineEndings>)>>,
71 pub(crate) status: Status,
72 pub(crate) req_queue: ReqQueue,
59 pub(crate) latest_requests: Arc<RwLock<LatestRequests>>, 73 pub(crate) latest_requests: Arc<RwLock<LatestRequests>>,
60 source_root_config: SourceRootConfig, 74 source_root_config: SourceRootConfig,
75 _proc_macro_client: ProcMacroClient,
61} 76}
62 77
63/// An immutable snapshot of the world's state at a point in time. 78/// An immutable snapshot of the world's state at a point in time.
@@ -75,6 +90,7 @@ impl GlobalState {
75 workspaces: Vec<ProjectWorkspace>, 90 workspaces: Vec<ProjectWorkspace>,
76 lru_capacity: Option<usize>, 91 lru_capacity: Option<usize>,
77 config: Config, 92 config: Config,
93 req_queue: ReqQueue,
78 ) -> GlobalState { 94 ) -> GlobalState {
79 let mut change = AnalysisChange::new(); 95 let mut change = AnalysisChange::new();
80 96
@@ -136,13 +152,16 @@ impl GlobalState {
136 workspaces: Arc::new(workspaces), 152 workspaces: Arc::new(workspaces),
137 analysis_host, 153 analysis_host,
138 loader, 154 loader,
139 vfs: Arc::new(RwLock::new((vfs, FxHashMap::default()))),
140 task_receiver, 155 task_receiver,
141 latest_requests: Default::default(),
142 flycheck, 156 flycheck,
143 diagnostics: Default::default(), 157 diagnostics: Default::default(),
144 proc_macro_client, 158 mem_docs: FxHashSet::default(),
159 vfs: Arc::new(RwLock::new((vfs, FxHashMap::default()))),
160 status: Status::default(),
161 req_queue,
162 latest_requests: Default::default(),
145 source_root_config: project_folders.source_root_config, 163 source_root_config: project_folders.source_root_config,
164 _proc_macro_client: proc_macro_client,
146 }; 165 };
147 res.process_changes(); 166 res.process_changes();
148 res 167 res
diff --git a/crates/rust-analyzer/src/main_loop.rs b/crates/rust-analyzer/src/main_loop.rs
index c8819c3b0..02e188b02 100644
--- a/crates/rust-analyzer/src/main_loop.rs
+++ b/crates/rust-analyzer/src/main_loop.rs
@@ -11,16 +11,13 @@ use std::{
11}; 11};
12 12
13use crossbeam_channel::{never, select, unbounded, RecvError, Sender}; 13use crossbeam_channel::{never, select, unbounded, RecvError, Sender};
14use lsp_server::{ 14use lsp_server::{Connection, ErrorCode, Message, Notification, Request, RequestId, Response};
15 Connection, ErrorCode, Message, Notification, ReqQueue, Request, RequestId, Response,
16};
17use lsp_types::{request::Request as _, NumberOrString, TextDocumentContentChangeEvent}; 15use lsp_types::{request::Request as _, NumberOrString, TextDocumentContentChangeEvent};
18use ra_db::VfsPath; 16use ra_db::VfsPath;
19use ra_flycheck::CheckTask; 17use ra_flycheck::CheckTask;
20use ra_ide::{Canceled, FileId, LineIndex}; 18use ra_ide::{Canceled, FileId, LineIndex};
21use ra_prof::profile; 19use ra_prof::profile;
22use ra_project_model::{PackageRoot, ProjectWorkspace}; 20use ra_project_model::{PackageRoot, ProjectWorkspace};
23use rustc_hash::FxHashSet;
24use serde::{de::DeserializeOwned, Serialize}; 21use serde::{de::DeserializeOwned, Serialize};
25use threadpool::ThreadPool; 22use threadpool::ThreadPool;
26 23
@@ -28,7 +25,7 @@ use crate::{
28 config::{Config, FilesWatcher, LinkedProject}, 25 config::{Config, FilesWatcher, LinkedProject},
29 diagnostics::DiagnosticTask, 26 diagnostics::DiagnosticTask,
30 from_proto, 27 from_proto,
31 global_state::{file_id_to_url, GlobalState, GlobalStateSnapshot}, 28 global_state::{file_id_to_url, GlobalState, GlobalStateSnapshot, Status},
32 handlers, lsp_ext, 29 handlers, lsp_ext,
33 request_metrics::RequestMetrics, 30 request_metrics::RequestMetrics,
34 Result, 31 Result,
@@ -78,7 +75,6 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
78 SetThreadPriority(thread, thread_priority_above_normal); 75 SetThreadPriority(thread, thread_priority_above_normal);
79 } 76 }
80 77
81 let mut loop_state = LoopState::default();
82 let mut global_state = { 78 let mut global_state = {
83 let workspaces = { 79 let workspaces = {
84 if config.linked_projects.is_empty() && config.notifications.cargo_toml_not_found { 80 if config.linked_projects.is_empty() && config.notifications.cargo_toml_not_found {
@@ -116,6 +112,8 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
116 .collect::<Vec<_>>() 112 .collect::<Vec<_>>()
117 }; 113 };
118 114
115 let mut req_queue = ReqQueue::default();
116
119 if let FilesWatcher::Client = config.files.watcher { 117 if let FilesWatcher::Client = config.files.watcher {
120 let registration_options = lsp_types::DidChangeWatchedFilesRegistrationOptions { 118 let registration_options = lsp_types::DidChangeWatchedFilesRegistrationOptions {
121 watchers: workspaces 119 watchers: workspaces
@@ -132,7 +130,7 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
132 register_options: Some(serde_json::to_value(registration_options).unwrap()), 130 register_options: Some(serde_json::to_value(registration_options).unwrap()),
133 }; 131 };
134 let params = lsp_types::RegistrationParams { registrations: vec![registration] }; 132 let params = lsp_types::RegistrationParams { registrations: vec![registration] };
135 let request = loop_state.req_queue.outgoing.register( 133 let request = req_queue.outgoing.register(
136 lsp_types::request::RegisterCapability::METHOD.to_string(), 134 lsp_types::request::RegisterCapability::METHOD.to_string(),
137 params, 135 params,
138 DO_NOTHING, 136 DO_NOTHING,
@@ -140,7 +138,7 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
140 connection.sender.send(request.into()).unwrap(); 138 connection.sender.send(request.into()).unwrap();
141 } 139 }
142 140
143 GlobalState::new(workspaces, config.lru_capacity, config) 141 GlobalState::new(workspaces, config.lru_capacity, config, req_queue)
144 }; 142 };
145 143
146 let pool = ThreadPool::default(); 144 let pool = ThreadPool::default();
@@ -172,15 +170,13 @@ pub fn main_loop(config: Config, connection: Connection) -> Result<()> {
172 }; 170 };
173 } 171 }
174 assert!(!global_state.vfs.read().0.has_changes()); 172 assert!(!global_state.vfs.read().0.has_changes());
175 loop_turn(&pool, &task_sender, &connection, &mut global_state, &mut loop_state, event)?; 173 loop_turn(&pool, &task_sender, &connection, &mut global_state, event)?;
176 assert!(!global_state.vfs.read().0.has_changes()); 174 assert!(!global_state.vfs.read().0.has_changes());
177 } 175 }
178 } 176 }
179 global_state.analysis_host.request_cancellation(); 177 global_state.analysis_host.request_cancellation();
180 log::info!("waiting for tasks to finish..."); 178 log::info!("waiting for tasks to finish...");
181 task_receiver.into_iter().for_each(|task| { 179 task_receiver.into_iter().for_each(|task| on_task(task, &connection.sender, &mut global_state));
182 on_task(task, &connection.sender, &mut loop_state.req_queue.incoming, &mut global_state)
183 });
184 log::info!("...tasks have finished"); 180 log::info!("...tasks have finished");
185 log::info!("joining threadpool..."); 181 log::info!("joining threadpool...");
186 pool.join(); 182 pool.join();
@@ -244,35 +240,15 @@ impl fmt::Debug for Event {
244 } 240 }
245} 241}
246 242
247type ReqHandler = fn(&mut GlobalState, Response); 243pub(crate) type ReqHandler = fn(&mut GlobalState, Response);
244pub(crate) type ReqQueue = lsp_server::ReqQueue<(&'static str, Instant), ReqHandler>;
248const DO_NOTHING: ReqHandler = |_, _| (); 245const DO_NOTHING: ReqHandler = |_, _| ();
249type Incoming = lsp_server::Incoming<(&'static str, Instant)>;
250
251#[derive(Default)]
252struct LoopState {
253 req_queue: ReqQueue<(&'static str, Instant), ReqHandler>,
254 mem_docs: FxHashSet<VfsPath>,
255 status: Status,
256}
257
258#[derive(Eq, PartialEq)]
259enum Status {
260 Loading,
261 Ready,
262}
263
264impl Default for Status {
265 fn default() -> Self {
266 Status::Loading
267 }
268}
269 246
270fn loop_turn( 247fn loop_turn(
271 pool: &ThreadPool, 248 pool: &ThreadPool,
272 task_sender: &Sender<Task>, 249 task_sender: &Sender<Task>,
273 connection: &Connection, 250 connection: &Connection,
274 global_state: &mut GlobalState, 251 global_state: &mut GlobalState,
275 loop_state: &mut LoopState,
276 event: Event, 252 event: Event,
277) -> Result<()> { 253) -> Result<()> {
278 let loop_start = Instant::now(); 254 let loop_start = Instant::now();
@@ -288,7 +264,7 @@ fn loop_turn(
288 let mut became_ready = false; 264 let mut became_ready = false;
289 match event { 265 match event {
290 Event::Task(task) => { 266 Event::Task(task) => {
291 on_task(task, &connection.sender, &mut loop_state.req_queue.incoming, global_state); 267 on_task(task, &connection.sender, global_state);
292 global_state.maybe_collect_garbage(); 268 global_state.maybe_collect_garbage();
293 } 269 }
294 Event::Vfs(task) => match task { 270 Event::Vfs(task) => match task {
@@ -296,35 +272,29 @@ fn loop_turn(
296 let vfs = &mut global_state.vfs.write().0; 272 let vfs = &mut global_state.vfs.write().0;
297 for (path, contents) in files { 273 for (path, contents) in files {
298 let path = VfsPath::from(path); 274 let path = VfsPath::from(path);
299 if !loop_state.mem_docs.contains(&path) { 275 if !global_state.mem_docs.contains(&path) {
300 vfs.set_file_contents(path, contents) 276 vfs.set_file_contents(path, contents)
301 } 277 }
302 } 278 }
303 } 279 }
304 vfs::loader::Message::Progress { n_total, n_done } => { 280 vfs::loader::Message::Progress { n_total, n_done } => {
305 if n_done == n_total { 281 if n_done == n_total {
306 loop_state.status = Status::Ready; 282 global_state.status = Status::Ready;
307 became_ready = true; 283 became_ready = true;
308 } 284 }
309 report_progress(loop_state, &connection.sender, n_done, n_total, "roots scanned") 285 report_progress(global_state, &connection.sender, n_done, n_total, "roots scanned")
310 } 286 }
311 }, 287 },
312 Event::CheckWatcher(task) => on_check_task(task, global_state, task_sender)?, 288 Event::CheckWatcher(task) => on_check_task(task, global_state, task_sender)?,
313 Event::Msg(msg) => match msg { 289 Event::Msg(msg) => match msg {
314 Message::Request(req) => on_request( 290 Message::Request(req) => {
315 global_state, 291 on_request(global_state, pool, task_sender, &connection.sender, loop_start, req)?
316 &mut loop_state.req_queue.incoming, 292 }
317 pool,
318 task_sender,
319 &connection.sender,
320 loop_start,
321 req,
322 )?,
323 Message::Notification(not) => { 293 Message::Notification(not) => {
324 on_notification(&connection.sender, global_state, loop_state, not)?; 294 on_notification(&connection.sender, global_state, not)?;
325 } 295 }
326 Message::Response(resp) => { 296 Message::Response(resp) => {
327 let handler = loop_state.req_queue.outgoing.complete(resp.id.clone()); 297 let handler = global_state.req_queue.outgoing.complete(resp.id.clone());
328 handler(global_state, resp) 298 handler(global_state, resp)
329 } 299 }
330 }, 300 },
@@ -338,8 +308,8 @@ fn loop_turn(
338 } 308 }
339 } 309 }
340 310
341 if loop_state.status == Status::Ready && (state_changed || became_ready) { 311 if global_state.status == Status::Ready && (state_changed || became_ready) {
342 let subscriptions = loop_state 312 let subscriptions = global_state
343 .mem_docs 313 .mem_docs
344 .iter() 314 .iter()
345 .map(|path| global_state.vfs.read().0.file_id(&path).unwrap()) 315 .map(|path| global_state.vfs.read().0.file_id(&path).unwrap())
@@ -373,18 +343,15 @@ fn loop_turn(
373 Ok(()) 343 Ok(())
374} 344}
375 345
376fn on_task( 346fn on_task(task: Task, msg_sender: &Sender<Message>, global_state: &mut GlobalState) {
377 task: Task,
378 msg_sender: &Sender<Message>,
379 incoming_requests: &mut Incoming,
380 state: &mut GlobalState,
381) {
382 match task { 347 match task {
383 Task::Respond(response) => { 348 Task::Respond(response) => {
384 if let Some((method, start)) = incoming_requests.complete(response.id.clone()) { 349 if let Some((method, start)) =
350 global_state.req_queue.incoming.complete(response.id.clone())
351 {
385 let duration = start.elapsed(); 352 let duration = start.elapsed();
386 log::info!("handled req#{} in {:?}", response.id, duration); 353 log::info!("handled req#{} in {:?}", response.id, duration);
387 state.complete_request(RequestMetrics { 354 global_state.complete_request(RequestMetrics {
388 id: response.id.clone(), 355 id: response.id.clone(),
389 method: method.to_string(), 356 method: method.to_string(),
390 duration, 357 duration,
@@ -395,13 +362,12 @@ fn on_task(
395 Task::Notify(n) => { 362 Task::Notify(n) => {
396 msg_sender.send(n.into()).unwrap(); 363 msg_sender.send(n.into()).unwrap();
397 } 364 }
398 Task::Diagnostic(task) => on_diagnostic_task(task, msg_sender, state), 365 Task::Diagnostic(task) => on_diagnostic_task(task, msg_sender, global_state),
399 } 366 }
400} 367}
401 368
402fn on_request( 369fn on_request(
403 global_state: &mut GlobalState, 370 global_state: &mut GlobalState,
404 incoming_requests: &mut Incoming,
405 pool: &ThreadPool, 371 pool: &ThreadPool,
406 task_sender: &Sender<Task>, 372 task_sender: &Sender<Task>,
407 msg_sender: &Sender<Message>, 373 msg_sender: &Sender<Message>,
@@ -414,7 +380,6 @@ fn on_request(
414 global_state, 380 global_state,
415 task_sender, 381 task_sender,
416 msg_sender, 382 msg_sender,
417 incoming_requests,
418 request_received, 383 request_received,
419 }; 384 };
420 pool_dispatcher 385 pool_dispatcher
@@ -469,7 +434,6 @@ fn on_request(
469fn on_notification( 434fn on_notification(
470 msg_sender: &Sender<Message>, 435 msg_sender: &Sender<Message>,
471 global_state: &mut GlobalState, 436 global_state: &mut GlobalState,
472 loop_state: &mut LoopState,
473 not: Notification, 437 not: Notification,
474) -> Result<()> { 438) -> Result<()> {
475 let not = match notification_cast::<lsp_types::notification::Cancel>(not) { 439 let not = match notification_cast::<lsp_types::notification::Cancel>(not) {
@@ -478,7 +442,7 @@ fn on_notification(
478 NumberOrString::Number(id) => id.into(), 442 NumberOrString::Number(id) => id.into(),
479 NumberOrString::String(id) => id.into(), 443 NumberOrString::String(id) => id.into(),
480 }; 444 };
481 if let Some(response) = loop_state.req_queue.incoming.cancel(id) { 445 if let Some(response) = global_state.req_queue.incoming.cancel(id) {
482 msg_sender.send(response.into()).unwrap() 446 msg_sender.send(response.into()).unwrap()
483 } 447 }
484 return Ok(()); 448 return Ok(());
@@ -488,7 +452,7 @@ fn on_notification(
488 let not = match notification_cast::<lsp_types::notification::DidOpenTextDocument>(not) { 452 let not = match notification_cast::<lsp_types::notification::DidOpenTextDocument>(not) {
489 Ok(params) => { 453 Ok(params) => {
490 if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) { 454 if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) {
491 if !loop_state.mem_docs.insert(path.clone()) { 455 if !global_state.mem_docs.insert(path.clone()) {
492 log::error!("duplicate DidOpenTextDocument: {}", path) 456 log::error!("duplicate DidOpenTextDocument: {}", path)
493 } 457 }
494 global_state 458 global_state
@@ -504,7 +468,7 @@ fn on_notification(
504 let not = match notification_cast::<lsp_types::notification::DidChangeTextDocument>(not) { 468 let not = match notification_cast::<lsp_types::notification::DidChangeTextDocument>(not) {
505 Ok(params) => { 469 Ok(params) => {
506 if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) { 470 if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) {
507 assert!(loop_state.mem_docs.contains(&path)); 471 assert!(global_state.mem_docs.contains(&path));
508 let vfs = &mut global_state.vfs.write().0; 472 let vfs = &mut global_state.vfs.write().0;
509 let file_id = vfs.file_id(&path).unwrap(); 473 let file_id = vfs.file_id(&path).unwrap();
510 let mut text = String::from_utf8(vfs.file_contents(file_id).to_vec()).unwrap(); 474 let mut text = String::from_utf8(vfs.file_contents(file_id).to_vec()).unwrap();
@@ -518,7 +482,7 @@ fn on_notification(
518 let not = match notification_cast::<lsp_types::notification::DidCloseTextDocument>(not) { 482 let not = match notification_cast::<lsp_types::notification::DidCloseTextDocument>(not) {
519 Ok(params) => { 483 Ok(params) => {
520 if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) { 484 if let Ok(path) = from_proto::vfs_path(&params.text_document.uri) {
521 if !loop_state.mem_docs.remove(&path) { 485 if !global_state.mem_docs.remove(&path) {
522 log::error!("orphan DidCloseTextDocument: {}", path) 486 log::error!("orphan DidCloseTextDocument: {}", path)
523 } 487 }
524 if let Some(path) = path.as_path() { 488 if let Some(path) = path.as_path() {
@@ -549,7 +513,7 @@ fn on_notification(
549 Ok(_) => { 513 Ok(_) => {
550 // As stated in https://github.com/microsoft/language-server-protocol/issues/676, 514 // As stated in https://github.com/microsoft/language-server-protocol/issues/676,
551 // this notification's parameters should be ignored and the actual config queried separately. 515 // this notification's parameters should be ignored and the actual config queried separately.
552 let request = loop_state.req_queue.outgoing.register( 516 let request = global_state.req_queue.outgoing.register(
553 lsp_types::request::WorkspaceConfiguration::METHOD.to_string(), 517 lsp_types::request::WorkspaceConfiguration::METHOD.to_string(),
554 lsp_types::ConfigurationParams { 518 lsp_types::ConfigurationParams {
555 items: vec![lsp_types::ConfigurationItem { 519 items: vec![lsp_types::ConfigurationItem {
@@ -732,7 +696,7 @@ fn on_diagnostic_task(task: DiagnosticTask, msg_sender: &Sender<Message>, state:
732} 696}
733 697
734fn report_progress( 698fn report_progress(
735 loop_state: &mut LoopState, 699 global_state: &mut GlobalState,
736 sender: &Sender<Message>, 700 sender: &Sender<Message>,
737 done: usize, 701 done: usize,
738 total: usize, 702 total: usize,
@@ -742,7 +706,7 @@ fn report_progress(
742 let message = Some(format!("{}/{} {}", done, total, message)); 706 let message = Some(format!("{}/{} {}", done, total, message));
743 let percentage = Some(100.0 * done as f64 / total.max(1) as f64); 707 let percentage = Some(100.0 * done as f64 / total.max(1) as f64);
744 let work_done_progress = if done == 0 { 708 let work_done_progress = if done == 0 {
745 let work_done_progress_create = loop_state.req_queue.outgoing.register( 709 let work_done_progress_create = global_state.req_queue.outgoing.register(
746 lsp_types::request::WorkDoneProgressCreate::METHOD.to_string(), 710 lsp_types::request::WorkDoneProgressCreate::METHOD.to_string(),
747 lsp_types::WorkDoneProgressCreateParams { token: token.clone() }, 711 lsp_types::WorkDoneProgressCreateParams { token: token.clone() },
748 DO_NOTHING, 712 DO_NOTHING,
@@ -777,7 +741,6 @@ struct PoolDispatcher<'a> {
777 req: Option<Request>, 741 req: Option<Request>,
778 pool: &'a ThreadPool, 742 pool: &'a ThreadPool,
779 global_state: &'a mut GlobalState, 743 global_state: &'a mut GlobalState,
780 incoming_requests: &'a mut Incoming,
781 msg_sender: &'a Sender<Message>, 744 msg_sender: &'a Sender<Message>,
782 task_sender: &'a Sender<Task>, 745 task_sender: &'a Sender<Task>,
783 request_received: Instant, 746 request_received: Instant,
@@ -806,7 +769,7 @@ impl<'a> PoolDispatcher<'a> {
806 result_to_task::<R>(id, result) 769 result_to_task::<R>(id, result)
807 }) 770 })
808 .map_err(|_| format!("sync task {:?} panicked", R::METHOD))?; 771 .map_err(|_| format!("sync task {:?} panicked", R::METHOD))?;
809 on_task(task, self.msg_sender, self.incoming_requests, self.global_state); 772 on_task(task, self.msg_sender, self.global_state);
810 Ok(self) 773 Ok(self)
811 } 774 }
812 775
@@ -853,7 +816,10 @@ impl<'a> PoolDispatcher<'a> {
853 return None; 816 return None;
854 } 817 }
855 }; 818 };
856 self.incoming_requests.register(id.clone(), (R::METHOD, self.request_received)); 819 self.global_state
820 .req_queue
821 .incoming
822 .register(id.clone(), (R::METHOD, self.request_received));
857 Some((id, params)) 823 Some((id, params))
858 } 824 }
859 825