aboutsummaryrefslogtreecommitdiff
path: root/crates/server/src/main_loop
diff options
context:
space:
mode:
Diffstat (limited to 'crates/server/src/main_loop')
-rw-r--r--crates/server/src/main_loop/mod.rs60
1 files changed, 30 insertions, 30 deletions
diff --git a/crates/server/src/main_loop/mod.rs b/crates/server/src/main_loop/mod.rs
index 34d077805..75d65dcbf 100644
--- a/crates/server/src/main_loop/mod.rs
+++ b/crates/server/src/main_loop/mod.rs
@@ -2,6 +2,7 @@ mod handlers;
2mod subscriptions; 2mod subscriptions;
3 3
4use std::{ 4use std::{
5 path::PathBuf,
5 collections::{HashMap}, 6 collections::{HashMap},
6}; 7};
7 8
@@ -26,14 +27,13 @@ enum Task {
26} 27}
27 28
28pub(super) fn main_loop( 29pub(super) fn main_loop(
29 receriver: &mut Receiver<RawMessage>, 30 root: PathBuf,
30 sender: &mut Sender<RawMessage>, 31 msg_receriver: &mut Receiver<RawMessage>,
32 msg_sender: &mut Sender<RawMessage>,
31) -> Result<()> { 33) -> Result<()> {
32 let pool = ThreadPool::new(4); 34 let pool = ThreadPool::new(4);
33 let (task_sender, task_receiver) = bounded::<Task>(16); 35 let (task_sender, task_receiver) = bounded::<Task>(16);
34 let (fs_events_receiver, watcher) = vfs::watch(vec![ 36 let (fs_events_receiver, watcher) = vfs::watch(vec![root]);
35 ::std::env::current_dir()?,
36 ]);
37 37
38 info!("server initialized, serving requests"); 38 info!("server initialized, serving requests");
39 let mut state = ServerWorldState::new(); 39 let mut state = ServerWorldState::new();
@@ -42,8 +42,8 @@ pub(super) fn main_loop(
42 let mut subs = Subscriptions::new(); 42 let mut subs = Subscriptions::new();
43 main_loop_inner( 43 main_loop_inner(
44 &pool, 44 &pool,
45 receriver, 45 msg_receriver,
46 sender, 46 msg_sender,
47 task_receiver.clone(), 47 task_receiver.clone(),
48 task_sender, 48 task_sender,
49 fs_events_receiver, 49 fs_events_receiver,
@@ -53,7 +53,7 @@ pub(super) fn main_loop(
53 )?; 53 )?;
54 54
55 info!("waiting for background jobs to finish..."); 55 info!("waiting for background jobs to finish...");
56 task_receiver.for_each(drop); 56 task_receiver.for_each(|task| on_task(task, msg_sender, &mut pending_requests));
57 pool.join(); 57 pool.join();
58 info!("...background jobs have finished"); 58 info!("...background jobs have finished");
59 59
@@ -95,22 +95,8 @@ fn main_loop_inner(
95 }; 95 };
96 let mut state_changed = false; 96 let mut state_changed = false;
97 match event { 97 match event {
98 Event::FsWatcherDead => { 98 Event::FsWatcherDead => fs_receiver = None,
99 fs_receiver = None; 99 Event::Task(task) => on_task(task, msg_sender, pending_requests),
100 }
101 Event::Task(task) => {
102 match task {
103 Task::Respond(response) => {
104 if let Some(handle) = pending_requests.remove(&response.id) {
105 assert!(handle.has_completed());
106 }
107 msg_sender.send(RawMessage::Response(response))
108 }
109 Task::Notify(n) =>
110 msg_sender.send(RawMessage::Notification(n)),
111 }
112 continue;
113 }
114 Event::Fs(events) => { 100 Event::Fs(events) => {
115 trace!("fs change, {} events", events.len()); 101 trace!("fs change, {} events", events.len());
116 state.apply_fs_changes(events); 102 state.apply_fs_changes(events);
@@ -158,6 +144,23 @@ fn main_loop_inner(
158 } 144 }
159} 145}
160 146
147fn on_task(
148 task: Task,
149 msg_sender: &mut Sender<RawMessage>,
150 pending_requests: &mut HashMap<u64, JobHandle>,
151) {
152 match task {
153 Task::Respond(response) => {
154 if let Some(handle) = pending_requests.remove(&response.id) {
155 assert!(handle.has_completed());
156 }
157 msg_sender.send(RawMessage::Response(response))
158 }
159 Task::Notify(n) =>
160 msg_sender.send(RawMessage::Notification(n)),
161 }
162}
163
161fn on_request( 164fn on_request(
162 world: &mut ServerWorldState, 165 world: &mut ServerWorldState,
163 pending_requests: &mut HashMap<u64, JobHandle>, 166 pending_requests: &mut HashMap<u64, JobHandle>,
@@ -280,15 +283,12 @@ impl<'a> PoolDispatcher<'a> {
280 None => return Ok(self), 283 None => return Ok(self),
281 Some(req) => req, 284 Some(req) => req,
282 }; 285 };
283 let world = self.world;
284 let sender = self.sender;
285 let pool = self.pool;
286 match req.cast::<R>() { 286 match req.cast::<R>() {
287 Ok((id, params)) => { 287 Ok((id, params)) => {
288 let (handle, token) = JobHandle::new(); 288 let (handle, token) = JobHandle::new();
289 let world = world.snapshot(); 289 let world = self.world.snapshot();
290 let sender = sender.clone(); 290 let sender = self.sender.clone();
291 pool.execute(move || { 291 self.pool.execute(move || {
292 let resp = match f(world, params, token) { 292 let resp = match f(world, params, token) {
293 Ok(resp) => RawResponse::ok(id, resp), 293 Ok(resp) => RawResponse::ok(id, resp),
294 Err(e) => RawResponse::err(id, ErrorCode::InternalError as i32, e.to_string()), 294 Err(e) => RawResponse::err(id, ErrorCode::InternalError as i32, e.to_string()),