aboutsummaryrefslogtreecommitdiff
path: root/crates/server/src/main_loop
diff options
context:
space:
mode:
authorAleksey Kladov <[email protected]>2018-09-01 15:40:45 +0100
committerAleksey Kladov <[email protected]>2018-09-01 15:40:45 +0100
commit8f1ce8275347e915b1cc824567e96369875cefd4 (patch)
tree1d7b56d7947a5c6a20a6547b5342e3363e6c0e0f /crates/server/src/main_loop
parent3588d6b2da6e63730cc560c9986ba7fda5de816e (diff)
move to gen-server impl
Diffstat (limited to 'crates/server/src/main_loop')
-rw-r--r--crates/server/src/main_loop/mod.rs281
1 files changed, 163 insertions, 118 deletions
diff --git a/crates/server/src/main_loop/mod.rs b/crates/server/src/main_loop/mod.rs
index db7d5ae34..34d077805 100644
--- a/crates/server/src/main_loop/mod.rs
+++ b/crates/server/src/main_loop/mod.rs
@@ -6,59 +6,97 @@ use std::{
6}; 6};
7 7
8use threadpool::ThreadPool; 8use threadpool::ThreadPool;
9use crossbeam_channel::{Sender, Receiver}; 9use serde::{Serialize, de::DeserializeOwned};
10use crossbeam_channel::{bounded, Sender, Receiver};
10use languageserver_types::{NumberOrString}; 11use languageserver_types::{NumberOrString};
11use libanalysis::{FileId, JobHandle, JobToken}; 12use libanalysis::{FileId, JobHandle, JobToken};
13use gen_lsp_server::{RawRequest, RawNotification, RawMessage, RawResponse, ErrorCode};
12 14
13use { 15use {
14 req, dispatch, 16 req,
15 Task, Result, 17 Result,
16 io::{Io, RawMsg, RawRequest, RawNotification}, 18 vfs::{self, FileEvent},
17 vfs::FileEvent,
18 server_world::{ServerWorldState, ServerWorld}, 19 server_world::{ServerWorldState, ServerWorld},
19 main_loop::subscriptions::{Subscriptions}, 20 main_loop::subscriptions::{Subscriptions},
20}; 21};
21 22
23enum Task {
24 Respond(RawResponse),
25 Notify(RawNotification),
26}
27
22pub(super) fn main_loop( 28pub(super) fn main_loop(
23 io: &mut Io, 29 receriver: &mut Receiver<RawMessage>,
24 pool: &mut ThreadPool, 30 sender: &mut Sender<RawMessage>,
25 task_sender: Sender<Task>,
26 task_receiver: Receiver<Task>,
27 fs_events_receiver: Receiver<Vec<FileEvent>>,
28) -> Result<()> { 31) -> Result<()> {
32 let pool = ThreadPool::new(4);
33 let (task_sender, task_receiver) = bounded::<Task>(16);
34 let (fs_events_receiver, watcher) = vfs::watch(vec![
35 ::std::env::current_dir()?,
36 ]);
37
29 info!("server initialized, serving requests"); 38 info!("server initialized, serving requests");
30 let mut state = ServerWorldState::new(); 39 let mut state = ServerWorldState::new();
31 40
32 let mut pending_requests: HashMap<u64, JobHandle> = HashMap::new(); 41 let mut pending_requests = HashMap::new();
33 let mut fs_events_receiver = Some(&fs_events_receiver);
34 let mut subs = Subscriptions::new(); 42 let mut subs = Subscriptions::new();
43 main_loop_inner(
44 &pool,
45 receriver,
46 sender,
47 task_receiver.clone(),
48 task_sender,
49 fs_events_receiver,
50 &mut state,
51 &mut pending_requests,
52 &mut subs,
53 )?;
54
55 info!("waiting for background jobs to finish...");
56 task_receiver.for_each(drop);
57 pool.join();
58 info!("...background jobs have finished");
59
60 info!("waiting for file watcher to finish...");
61 watcher.stop()?;
62 info!("...file watcher has finished");
63 Ok(())
64}
65
66fn main_loop_inner(
67 pool: &ThreadPool,
68 msg_receiver: &mut Receiver<RawMessage>,
69 msg_sender: &mut Sender<RawMessage>,
70 task_receiver: Receiver<Task>,
71 task_sender: Sender<Task>,
72 fs_receiver: Receiver<Vec<FileEvent>>,
73 state: &mut ServerWorldState,
74 pending_requests: &mut HashMap<u64, JobHandle>,
75 subs: &mut Subscriptions,
76) -> Result<u64> {
77 let mut fs_receiver = Some(fs_receiver);
35 loop { 78 loop {
36 enum Event { 79 enum Event {
37 Msg(RawMsg), 80 Msg(RawMessage),
38 Task(Task), 81 Task(Task),
39 Fs(Vec<FileEvent>), 82 Fs(Vec<FileEvent>),
40 ReceiverDead,
41 FsWatcherDead, 83 FsWatcherDead,
42 } 84 }
43 let event = select! { 85 let event = select! {
44 recv(io.receiver(), msg) => match msg { 86 recv(msg_receiver, msg) => match msg {
45 Some(msg) => Event::Msg(msg), 87 Some(msg) => Event::Msg(msg),
46 None => Event::ReceiverDead, 88 None => bail!("client exited without shutdown"),
47 }, 89 },
48 recv(task_receiver, task) => Event::Task(task.unwrap()), 90 recv(task_receiver, task) => Event::Task(task.unwrap()),
49 recv(fs_events_receiver, events) => match events { 91 recv(fs_receiver, events) => match events {
50 Some(events) => Event::Fs(events), 92 Some(events) => Event::Fs(events),
51 None => Event::FsWatcherDead, 93 None => Event::FsWatcherDead,
52 } 94 }
53 }; 95 };
54 let mut state_changed = false; 96 let mut state_changed = false;
55 match event { 97 match event {
56 Event::ReceiverDead => {
57 io.cleanup_receiver()?;
58 unreachable!();
59 }
60 Event::FsWatcherDead => { 98 Event::FsWatcherDead => {
61 fs_events_receiver = None; 99 fs_receiver = None;
62 } 100 }
63 Event::Task(task) => { 101 Event::Task(task) => {
64 match task { 102 match task {
@@ -66,12 +104,10 @@ pub(super) fn main_loop(
66 if let Some(handle) = pending_requests.remove(&response.id) { 104 if let Some(handle) = pending_requests.remove(&response.id) {
67 assert!(handle.has_completed()); 105 assert!(handle.has_completed());
68 } 106 }
69 io.send(RawMsg::Response(response)) 107 msg_sender.send(RawMessage::Response(response))
70 } 108 }
71 Task::Notify(n) => 109 Task::Notify(n) =>
72 io.send(RawMsg::Notification(n)), 110 msg_sender.send(RawMessage::Notification(n)),
73 Task::Die(error) =>
74 return Err(error),
75 } 111 }
76 continue; 112 continue;
77 } 113 }
@@ -82,16 +118,29 @@ pub(super) fn main_loop(
82 } 118 }
83 Event::Msg(msg) => { 119 Event::Msg(msg) => {
84 match msg { 120 match msg {
85 RawMsg::Request(req) => { 121 RawMessage::Request(req) => {
86 if !on_request(io, &mut state, &mut pending_requests, pool, &task_sender, req)? { 122 let req = match req.cast::<req::Shutdown>() {
87 return Ok(()); 123 Ok((id, _params)) => return Ok(id),
124 Err(req) => req,
125 };
126 match on_request(state, pending_requests, pool, &task_sender, req)? {
127 None => (),
128 Some(req) => {
129 error!("unknown request: {:?}", req);
130 let resp = RawResponse::err(
131 req.id,
132 ErrorCode::MethodNotFound as i32,
133 "unknown request".to_string(),
134 );
135 msg_sender.send(RawMessage::Response(resp))
136 }
88 } 137 }
89 } 138 }
90 RawMsg::Notification(not) => { 139 RawMessage::Notification(not) => {
91 on_notification(io, &mut state, &mut pending_requests, &mut subs, not)?; 140 on_notification(msg_sender, state, pending_requests, subs, not)?;
92 state_changed = true; 141 state_changed = true;
93 } 142 }
94 RawMsg::Response(resp) => { 143 RawMessage::Response(resp) => {
95 error!("unexpected response: {:?}", resp) 144 error!("unexpected response: {:?}", resp)
96 } 145 }
97 } 146 }
@@ -110,13 +159,12 @@ pub(super) fn main_loop(
110} 159}
111 160
112fn on_request( 161fn on_request(
113 io: &mut Io,
114 world: &mut ServerWorldState, 162 world: &mut ServerWorldState,
115 pending_requests: &mut HashMap<u64, JobHandle>, 163 pending_requests: &mut HashMap<u64, JobHandle>,
116 pool: &ThreadPool, 164 pool: &ThreadPool,
117 sender: &Sender<Task>, 165 sender: &Sender<Task>,
118 req: RawRequest, 166 req: RawRequest,
119) -> Result<bool> { 167) -> Result<Option<RawRequest>> {
120 let mut pool_dispatcher = PoolDispatcher { 168 let mut pool_dispatcher = PoolDispatcher {
121 req: Some(req), 169 req: Some(req),
122 res: None, 170 res: None,
@@ -141,81 +189,73 @@ fn on_request(
141 Ok((id, handle)) => { 189 Ok((id, handle)) => {
142 let inserted = pending_requests.insert(id, handle).is_none(); 190 let inserted = pending_requests.insert(id, handle).is_none();
143 assert!(inserted, "duplicate request: {}", id); 191 assert!(inserted, "duplicate request: {}", id);
192 Ok(None)
144 }, 193 },
145 Err(req) => { 194 Err(req) => Ok(Some(req)),
146 let req = dispatch::handle_request::<req::Shutdown, _>(req, |(), resp| {
147 let resp = resp.into_response(Ok(()))?;
148 io.send(RawMsg::Response(resp));
149 Ok(())
150 })?;
151 match req {
152 Ok(_id) => {
153 info!("lifecycle: initiating shutdown");
154 return Ok(false);
155 }
156 Err(req) => {
157 error!("unknown method: {:?}", req);
158 io.send(RawMsg::Response(dispatch::unknown_method(req.id)?));
159 }
160 }
161 }
162 } 195 }
163 Ok(true)
164} 196}
165 197
166fn on_notification( 198fn on_notification(
167 io: &mut Io, 199 msg_sender: &mut Sender<RawMessage>,
168 state: &mut ServerWorldState, 200 state: &mut ServerWorldState,
169 pending_requests: &mut HashMap<u64, JobHandle>, 201 pending_requests: &mut HashMap<u64, JobHandle>,
170 subs: &mut Subscriptions, 202 subs: &mut Subscriptions,
171 not: RawNotification, 203 not: RawNotification,
172) -> Result<()> { 204) -> Result<()> {
173 let mut not = Some(not); 205 let not = match not.cast::<req::Cancel>() {
174 dispatch::handle_notification::<req::Cancel, _>(&mut not, |params| { 206 Ok(params) => {
175 let id = match params.id { 207 let id = match params.id {
176 NumberOrString::Number(id) => id, 208 NumberOrString::Number(id) => id,
177 NumberOrString::String(id) => { 209 NumberOrString::String(id) => {
178 panic!("string id's not supported: {:?}", id); 210 panic!("string id's not supported: {:?}", id);
211 }
212 };
213 if let Some(handle) = pending_requests.remove(&id) {
214 handle.cancel();
179 } 215 }
180 }; 216 return Ok(())
181 if let Some(handle) = pending_requests.remove(&id) {
182 handle.cancel();
183 } 217 }
184 Ok(()) 218 Err(not) => not,
185 })?; 219 };
186 dispatch::handle_notification::<req::DidOpenTextDocument, _>(&mut not, |params| { 220 let not = match not.cast::<req::DidOpenTextDocument>() {
187 let uri = params.text_document.uri; 221 Ok(params) => {
188 let path = uri.to_file_path() 222 let uri = params.text_document.uri;
189 .map_err(|()| format_err!("invalid uri: {}", uri))?; 223 let path = uri.to_file_path()
190 let file_id = state.add_mem_file(path, params.text_document.text); 224 .map_err(|()| format_err!("invalid uri: {}", uri))?;
191 subs.add_sub(file_id); 225 let file_id = state.add_mem_file(path, params.text_document.text);
192 Ok(()) 226 subs.add_sub(file_id);
193 })?; 227 return Ok(())
194 dispatch::handle_notification::<req::DidChangeTextDocument, _>(&mut not, |mut params| { 228 }
195 let uri = params.text_document.uri; 229 Err(not) => not,
196 let path = uri.to_file_path() 230 };
197 .map_err(|()| format_err!("invalid uri: {}", uri))?; 231 let not = match not.cast::<req::DidChangeTextDocument>() {
198 let text = params.content_changes.pop() 232 Ok(mut params) => {
199 .ok_or_else(|| format_err!("empty changes"))? 233 let uri = params.text_document.uri;
200 .text; 234 let path = uri.to_file_path()
201 state.change_mem_file(path.as_path(), text)?; 235 .map_err(|()| format_err!("invalid uri: {}", uri))?;
202 Ok(()) 236 let text = params.content_changes.pop()
203 })?; 237 .ok_or_else(|| format_err!("empty changes"))?
204 dispatch::handle_notification::<req::DidCloseTextDocument, _>(&mut not, |params| { 238 .text;
205 let uri = params.text_document.uri; 239 state.change_mem_file(path.as_path(), text)?;
206 let path = uri.to_file_path() 240 return Ok(())
207 .map_err(|()| format_err!("invalid uri: {}", uri))?; 241 }
208 let file_id = state.remove_mem_file(path.as_path())?; 242 Err(not) => not,
209 subs.remove_sub(file_id); 243 };
210 let not = req::PublishDiagnosticsParams { uri, diagnostics: Vec::new() }; 244 let not = match not.cast::<req::DidCloseTextDocument>() {
211 let not = dispatch::send_notification::<req::PublishDiagnostics>(not); 245 Ok(params) => {
212 io.send(RawMsg::Notification(not)); 246 let uri = params.text_document.uri;
213 Ok(()) 247 let path = uri.to_file_path()
214 })?; 248 .map_err(|()| format_err!("invalid uri: {}", uri))?;
215 249 let file_id = state.remove_mem_file(path.as_path())?;
216 if let Some(not) = not { 250 subs.remove_sub(file_id);
217 error!("unhandled notification: {:?}", not); 251 let params = req::PublishDiagnosticsParams { uri, diagnostics: Vec::new() };
218 } 252 let not = RawNotification::new::<req::PublishDiagnostics>(params);
253 msg_sender.send(RawMessage::Notification(not));
254 return Ok(())
255 }
256 Err(not) => not,
257 };
258 error!("unhandled notification: {:?}", not);
219 Ok(()) 259 Ok(())
220} 260}
221 261
@@ -228,10 +268,14 @@ struct PoolDispatcher<'a> {
228} 268}
229 269
230impl<'a> PoolDispatcher<'a> { 270impl<'a> PoolDispatcher<'a> {
231 fn on<'b, R: req::ClientRequest>( 271 fn on<'b, R>(
232 &'b mut self, 272 &'b mut self,
233 f: fn(ServerWorld, R::Params, JobToken) -> Result<R::Result> 273 f: fn(ServerWorld, R::Params, JobToken) -> Result<R::Result>
234 ) -> Result<&'b mut Self> { 274 ) -> Result<&'b mut Self>
275 where R: req::Request,
276 R::Params: DeserializeOwned + Send + 'static,
277 R::Result: Serialize + 'static,
278 {
235 let req = match self.req.take() { 279 let req = match self.req.take() {
236 None => return Ok(self), 280 None => return Ok(self),
237 Some(req) => req, 281 Some(req) => req,
@@ -239,23 +283,24 @@ impl<'a> PoolDispatcher<'a> {
239 let world = self.world; 283 let world = self.world;
240 let sender = self.sender; 284 let sender = self.sender;
241 let pool = self.pool; 285 let pool = self.pool;
242 let (handle, token) = JobHandle::new(); 286 match req.cast::<R>() {
243 let req = dispatch::handle_request::<R, _>(req, |params, resp| { 287 Ok((id, params)) => {
244 let world = world.snapshot(); 288 let (handle, token) = JobHandle::new();
245 let sender = sender.clone(); 289 let world = world.snapshot();
246 pool.execute(move || { 290 let sender = sender.clone();
247 let res = f(world, params, token); 291 pool.execute(move || {
248 let task = match resp.into_response(res) { 292 let resp = match f(world, params, token) {
249 Ok(resp) => Task::Respond(resp), 293 Ok(resp) => RawResponse::ok(id, resp),
250 Err(e) => Task::Die(e), 294 Err(e) => RawResponse::err(id, ErrorCode::InternalError as i32, e.to_string()),
251 }; 295 };
252 sender.send(task); 296 let task = Task::Respond(resp);
253 }); 297 sender.send(task);
254 Ok(()) 298 });
255 })?; 299 self.res = Some((id, handle));
256 match req { 300 }
257 Ok(id) => self.res = Some((id, handle)), 301 Err(req) => {
258 Err(req) => self.req = Some(req), 302 self.req = Some(req)
303 }
259 } 304 }
260 Ok(self) 305 Ok(self)
261 } 306 }
@@ -282,7 +327,7 @@ fn update_file_notifications_on_threadpool(
282 error!("failed to compute diagnostics: {:?}", e) 327 error!("failed to compute diagnostics: {:?}", e)
283 } 328 }
284 Ok(params) => { 329 Ok(params) => {
285 let not = dispatch::send_notification::<req::PublishDiagnostics>(params); 330 let not = RawNotification::new::<req::PublishDiagnostics>(params);
286 sender.send(Task::Notify(not)); 331 sender.send(Task::Notify(not));
287 } 332 }
288 } 333 }
@@ -291,7 +336,7 @@ fn update_file_notifications_on_threadpool(
291 error!("failed to compute decorations: {:?}", e) 336 error!("failed to compute decorations: {:?}", e)
292 } 337 }
293 Ok(params) => { 338 Ok(params) => {
294 let not = dispatch::send_notification::<req::PublishDecorations>(params); 339 let not = RawNotification::new::<req::PublishDecorations>(params);
295 sender.send(Task::Notify(not)) 340 sender.send(Task::Notify(not))
296 } 341 }
297 } 342 }