diff options
author | Aleksey Kladov <[email protected]> | 2018-09-01 15:40:45 +0100 |
---|---|---|
committer | Aleksey Kladov <[email protected]> | 2018-09-01 15:40:45 +0100 |
commit | 8f1ce8275347e915b1cc824567e96369875cefd4 (patch) | |
tree | 1d7b56d7947a5c6a20a6547b5342e3363e6c0e0f /crates/server/src/main_loop | |
parent | 3588d6b2da6e63730cc560c9986ba7fda5de816e (diff) |
move to gen-server impl
Diffstat (limited to 'crates/server/src/main_loop')
-rw-r--r-- | crates/server/src/main_loop/mod.rs | 281 |
1 files changed, 163 insertions, 118 deletions
diff --git a/crates/server/src/main_loop/mod.rs b/crates/server/src/main_loop/mod.rs index db7d5ae34..34d077805 100644 --- a/crates/server/src/main_loop/mod.rs +++ b/crates/server/src/main_loop/mod.rs | |||
@@ -6,59 +6,97 @@ use std::{ | |||
6 | }; | 6 | }; |
7 | 7 | ||
8 | use threadpool::ThreadPool; | 8 | use threadpool::ThreadPool; |
9 | use crossbeam_channel::{Sender, Receiver}; | 9 | use serde::{Serialize, de::DeserializeOwned}; |
10 | use crossbeam_channel::{bounded, Sender, Receiver}; | ||
10 | use languageserver_types::{NumberOrString}; | 11 | use languageserver_types::{NumberOrString}; |
11 | use libanalysis::{FileId, JobHandle, JobToken}; | 12 | use libanalysis::{FileId, JobHandle, JobToken}; |
13 | use gen_lsp_server::{RawRequest, RawNotification, RawMessage, RawResponse, ErrorCode}; | ||
12 | 14 | ||
13 | use { | 15 | use { |
14 | req, dispatch, | 16 | req, |
15 | Task, Result, | 17 | Result, |
16 | io::{Io, RawMsg, RawRequest, RawNotification}, | 18 | vfs::{self, FileEvent}, |
17 | vfs::FileEvent, | ||
18 | server_world::{ServerWorldState, ServerWorld}, | 19 | server_world::{ServerWorldState, ServerWorld}, |
19 | main_loop::subscriptions::{Subscriptions}, | 20 | main_loop::subscriptions::{Subscriptions}, |
20 | }; | 21 | }; |
21 | 22 | ||
23 | enum Task { | ||
24 | Respond(RawResponse), | ||
25 | Notify(RawNotification), | ||
26 | } | ||
27 | |||
22 | pub(super) fn main_loop( | 28 | pub(super) fn main_loop( |
23 | io: &mut Io, | 29 | receriver: &mut Receiver<RawMessage>, |
24 | pool: &mut ThreadPool, | 30 | sender: &mut Sender<RawMessage>, |
25 | task_sender: Sender<Task>, | ||
26 | task_receiver: Receiver<Task>, | ||
27 | fs_events_receiver: Receiver<Vec<FileEvent>>, | ||
28 | ) -> Result<()> { | 31 | ) -> Result<()> { |
32 | let pool = ThreadPool::new(4); | ||
33 | let (task_sender, task_receiver) = bounded::<Task>(16); | ||
34 | let (fs_events_receiver, watcher) = vfs::watch(vec![ | ||
35 | ::std::env::current_dir()?, | ||
36 | ]); | ||
37 | |||
29 | info!("server initialized, serving requests"); | 38 | info!("server initialized, serving requests"); |
30 | let mut state = ServerWorldState::new(); | 39 | let mut state = ServerWorldState::new(); |
31 | 40 | ||
32 | let mut pending_requests: HashMap<u64, JobHandle> = HashMap::new(); | 41 | let mut pending_requests = HashMap::new(); |
33 | let mut fs_events_receiver = Some(&fs_events_receiver); | ||
34 | let mut subs = Subscriptions::new(); | 42 | let mut subs = Subscriptions::new(); |
43 | main_loop_inner( | ||
44 | &pool, | ||
45 | receriver, | ||
46 | sender, | ||
47 | task_receiver.clone(), | ||
48 | task_sender, | ||
49 | fs_events_receiver, | ||
50 | &mut state, | ||
51 | &mut pending_requests, | ||
52 | &mut subs, | ||
53 | )?; | ||
54 | |||
55 | info!("waiting for background jobs to finish..."); | ||
56 | task_receiver.for_each(drop); | ||
57 | pool.join(); | ||
58 | info!("...background jobs have finished"); | ||
59 | |||
60 | info!("waiting for file watcher to finish..."); | ||
61 | watcher.stop()?; | ||
62 | info!("...file watcher has finished"); | ||
63 | Ok(()) | ||
64 | } | ||
65 | |||
66 | fn main_loop_inner( | ||
67 | pool: &ThreadPool, | ||
68 | msg_receiver: &mut Receiver<RawMessage>, | ||
69 | msg_sender: &mut Sender<RawMessage>, | ||
70 | task_receiver: Receiver<Task>, | ||
71 | task_sender: Sender<Task>, | ||
72 | fs_receiver: Receiver<Vec<FileEvent>>, | ||
73 | state: &mut ServerWorldState, | ||
74 | pending_requests: &mut HashMap<u64, JobHandle>, | ||
75 | subs: &mut Subscriptions, | ||
76 | ) -> Result<u64> { | ||
77 | let mut fs_receiver = Some(fs_receiver); | ||
35 | loop { | 78 | loop { |
36 | enum Event { | 79 | enum Event { |
37 | Msg(RawMsg), | 80 | Msg(RawMessage), |
38 | Task(Task), | 81 | Task(Task), |
39 | Fs(Vec<FileEvent>), | 82 | Fs(Vec<FileEvent>), |
40 | ReceiverDead, | ||
41 | FsWatcherDead, | 83 | FsWatcherDead, |
42 | } | 84 | } |
43 | let event = select! { | 85 | let event = select! { |
44 | recv(io.receiver(), msg) => match msg { | 86 | recv(msg_receiver, msg) => match msg { |
45 | Some(msg) => Event::Msg(msg), | 87 | Some(msg) => Event::Msg(msg), |
46 | None => Event::ReceiverDead, | 88 | None => bail!("client exited without shutdown"), |
47 | }, | 89 | }, |
48 | recv(task_receiver, task) => Event::Task(task.unwrap()), | 90 | recv(task_receiver, task) => Event::Task(task.unwrap()), |
49 | recv(fs_events_receiver, events) => match events { | 91 | recv(fs_receiver, events) => match events { |
50 | Some(events) => Event::Fs(events), | 92 | Some(events) => Event::Fs(events), |
51 | None => Event::FsWatcherDead, | 93 | None => Event::FsWatcherDead, |
52 | } | 94 | } |
53 | }; | 95 | }; |
54 | let mut state_changed = false; | 96 | let mut state_changed = false; |
55 | match event { | 97 | match event { |
56 | Event::ReceiverDead => { | ||
57 | io.cleanup_receiver()?; | ||
58 | unreachable!(); | ||
59 | } | ||
60 | Event::FsWatcherDead => { | 98 | Event::FsWatcherDead => { |
61 | fs_events_receiver = None; | 99 | fs_receiver = None; |
62 | } | 100 | } |
63 | Event::Task(task) => { | 101 | Event::Task(task) => { |
64 | match task { | 102 | match task { |
@@ -66,12 +104,10 @@ pub(super) fn main_loop( | |||
66 | if let Some(handle) = pending_requests.remove(&response.id) { | 104 | if let Some(handle) = pending_requests.remove(&response.id) { |
67 | assert!(handle.has_completed()); | 105 | assert!(handle.has_completed()); |
68 | } | 106 | } |
69 | io.send(RawMsg::Response(response)) | 107 | msg_sender.send(RawMessage::Response(response)) |
70 | } | 108 | } |
71 | Task::Notify(n) => | 109 | Task::Notify(n) => |
72 | io.send(RawMsg::Notification(n)), | 110 | msg_sender.send(RawMessage::Notification(n)), |
73 | Task::Die(error) => | ||
74 | return Err(error), | ||
75 | } | 111 | } |
76 | continue; | 112 | continue; |
77 | } | 113 | } |
@@ -82,16 +118,29 @@ pub(super) fn main_loop( | |||
82 | } | 118 | } |
83 | Event::Msg(msg) => { | 119 | Event::Msg(msg) => { |
84 | match msg { | 120 | match msg { |
85 | RawMsg::Request(req) => { | 121 | RawMessage::Request(req) => { |
86 | if !on_request(io, &mut state, &mut pending_requests, pool, &task_sender, req)? { | 122 | let req = match req.cast::<req::Shutdown>() { |
87 | return Ok(()); | 123 | Ok((id, _params)) => return Ok(id), |
124 | Err(req) => req, | ||
125 | }; | ||
126 | match on_request(state, pending_requests, pool, &task_sender, req)? { | ||
127 | None => (), | ||
128 | Some(req) => { | ||
129 | error!("unknown request: {:?}", req); | ||
130 | let resp = RawResponse::err( | ||
131 | req.id, | ||
132 | ErrorCode::MethodNotFound as i32, | ||
133 | "unknown request".to_string(), | ||
134 | ); | ||
135 | msg_sender.send(RawMessage::Response(resp)) | ||
136 | } | ||
88 | } | 137 | } |
89 | } | 138 | } |
90 | RawMsg::Notification(not) => { | 139 | RawMessage::Notification(not) => { |
91 | on_notification(io, &mut state, &mut pending_requests, &mut subs, not)?; | 140 | on_notification(msg_sender, state, pending_requests, subs, not)?; |
92 | state_changed = true; | 141 | state_changed = true; |
93 | } | 142 | } |
94 | RawMsg::Response(resp) => { | 143 | RawMessage::Response(resp) => { |
95 | error!("unexpected response: {:?}", resp) | 144 | error!("unexpected response: {:?}", resp) |
96 | } | 145 | } |
97 | } | 146 | } |
@@ -110,13 +159,12 @@ pub(super) fn main_loop( | |||
110 | } | 159 | } |
111 | 160 | ||
112 | fn on_request( | 161 | fn on_request( |
113 | io: &mut Io, | ||
114 | world: &mut ServerWorldState, | 162 | world: &mut ServerWorldState, |
115 | pending_requests: &mut HashMap<u64, JobHandle>, | 163 | pending_requests: &mut HashMap<u64, JobHandle>, |
116 | pool: &ThreadPool, | 164 | pool: &ThreadPool, |
117 | sender: &Sender<Task>, | 165 | sender: &Sender<Task>, |
118 | req: RawRequest, | 166 | req: RawRequest, |
119 | ) -> Result<bool> { | 167 | ) -> Result<Option<RawRequest>> { |
120 | let mut pool_dispatcher = PoolDispatcher { | 168 | let mut pool_dispatcher = PoolDispatcher { |
121 | req: Some(req), | 169 | req: Some(req), |
122 | res: None, | 170 | res: None, |
@@ -141,81 +189,73 @@ fn on_request( | |||
141 | Ok((id, handle)) => { | 189 | Ok((id, handle)) => { |
142 | let inserted = pending_requests.insert(id, handle).is_none(); | 190 | let inserted = pending_requests.insert(id, handle).is_none(); |
143 | assert!(inserted, "duplicate request: {}", id); | 191 | assert!(inserted, "duplicate request: {}", id); |
192 | Ok(None) | ||
144 | }, | 193 | }, |
145 | Err(req) => { | 194 | Err(req) => Ok(Some(req)), |
146 | let req = dispatch::handle_request::<req::Shutdown, _>(req, |(), resp| { | ||
147 | let resp = resp.into_response(Ok(()))?; | ||
148 | io.send(RawMsg::Response(resp)); | ||
149 | Ok(()) | ||
150 | })?; | ||
151 | match req { | ||
152 | Ok(_id) => { | ||
153 | info!("lifecycle: initiating shutdown"); | ||
154 | return Ok(false); | ||
155 | } | ||
156 | Err(req) => { | ||
157 | error!("unknown method: {:?}", req); | ||
158 | io.send(RawMsg::Response(dispatch::unknown_method(req.id)?)); | ||
159 | } | ||
160 | } | ||
161 | } | ||
162 | } | 195 | } |
163 | Ok(true) | ||
164 | } | 196 | } |
165 | 197 | ||
166 | fn on_notification( | 198 | fn on_notification( |
167 | io: &mut Io, | 199 | msg_sender: &mut Sender<RawMessage>, |
168 | state: &mut ServerWorldState, | 200 | state: &mut ServerWorldState, |
169 | pending_requests: &mut HashMap<u64, JobHandle>, | 201 | pending_requests: &mut HashMap<u64, JobHandle>, |
170 | subs: &mut Subscriptions, | 202 | subs: &mut Subscriptions, |
171 | not: RawNotification, | 203 | not: RawNotification, |
172 | ) -> Result<()> { | 204 | ) -> Result<()> { |
173 | let mut not = Some(not); | 205 | let not = match not.cast::<req::Cancel>() { |
174 | dispatch::handle_notification::<req::Cancel, _>(&mut not, |params| { | 206 | Ok(params) => { |
175 | let id = match params.id { | 207 | let id = match params.id { |
176 | NumberOrString::Number(id) => id, | 208 | NumberOrString::Number(id) => id, |
177 | NumberOrString::String(id) => { | 209 | NumberOrString::String(id) => { |
178 | panic!("string id's not supported: {:?}", id); | 210 | panic!("string id's not supported: {:?}", id); |
211 | } | ||
212 | }; | ||
213 | if let Some(handle) = pending_requests.remove(&id) { | ||
214 | handle.cancel(); | ||
179 | } | 215 | } |
180 | }; | 216 | return Ok(()) |
181 | if let Some(handle) = pending_requests.remove(&id) { | ||
182 | handle.cancel(); | ||
183 | } | 217 | } |
184 | Ok(()) | 218 | Err(not) => not, |
185 | })?; | 219 | }; |
186 | dispatch::handle_notification::<req::DidOpenTextDocument, _>(&mut not, |params| { | 220 | let not = match not.cast::<req::DidOpenTextDocument>() { |
187 | let uri = params.text_document.uri; | 221 | Ok(params) => { |
188 | let path = uri.to_file_path() | 222 | let uri = params.text_document.uri; |
189 | .map_err(|()| format_err!("invalid uri: {}", uri))?; | 223 | let path = uri.to_file_path() |
190 | let file_id = state.add_mem_file(path, params.text_document.text); | 224 | .map_err(|()| format_err!("invalid uri: {}", uri))?; |
191 | subs.add_sub(file_id); | 225 | let file_id = state.add_mem_file(path, params.text_document.text); |
192 | Ok(()) | 226 | subs.add_sub(file_id); |
193 | })?; | 227 | return Ok(()) |
194 | dispatch::handle_notification::<req::DidChangeTextDocument, _>(&mut not, |mut params| { | 228 | } |
195 | let uri = params.text_document.uri; | 229 | Err(not) => not, |
196 | let path = uri.to_file_path() | 230 | }; |
197 | .map_err(|()| format_err!("invalid uri: {}", uri))?; | 231 | let not = match not.cast::<req::DidChangeTextDocument>() { |
198 | let text = params.content_changes.pop() | 232 | Ok(mut params) => { |
199 | .ok_or_else(|| format_err!("empty changes"))? | 233 | let uri = params.text_document.uri; |
200 | .text; | 234 | let path = uri.to_file_path() |
201 | state.change_mem_file(path.as_path(), text)?; | 235 | .map_err(|()| format_err!("invalid uri: {}", uri))?; |
202 | Ok(()) | 236 | let text = params.content_changes.pop() |
203 | })?; | 237 | .ok_or_else(|| format_err!("empty changes"))? |
204 | dispatch::handle_notification::<req::DidCloseTextDocument, _>(&mut not, |params| { | 238 | .text; |
205 | let uri = params.text_document.uri; | 239 | state.change_mem_file(path.as_path(), text)?; |
206 | let path = uri.to_file_path() | 240 | return Ok(()) |
207 | .map_err(|()| format_err!("invalid uri: {}", uri))?; | 241 | } |
208 | let file_id = state.remove_mem_file(path.as_path())?; | 242 | Err(not) => not, |
209 | subs.remove_sub(file_id); | 243 | }; |
210 | let not = req::PublishDiagnosticsParams { uri, diagnostics: Vec::new() }; | 244 | let not = match not.cast::<req::DidCloseTextDocument>() { |
211 | let not = dispatch::send_notification::<req::PublishDiagnostics>(not); | 245 | Ok(params) => { |
212 | io.send(RawMsg::Notification(not)); | 246 | let uri = params.text_document.uri; |
213 | Ok(()) | 247 | let path = uri.to_file_path() |
214 | })?; | 248 | .map_err(|()| format_err!("invalid uri: {}", uri))?; |
215 | 249 | let file_id = state.remove_mem_file(path.as_path())?; | |
216 | if let Some(not) = not { | 250 | subs.remove_sub(file_id); |
217 | error!("unhandled notification: {:?}", not); | 251 | let params = req::PublishDiagnosticsParams { uri, diagnostics: Vec::new() }; |
218 | } | 252 | let not = RawNotification::new::<req::PublishDiagnostics>(params); |
253 | msg_sender.send(RawMessage::Notification(not)); | ||
254 | return Ok(()) | ||
255 | } | ||
256 | Err(not) => not, | ||
257 | }; | ||
258 | error!("unhandled notification: {:?}", not); | ||
219 | Ok(()) | 259 | Ok(()) |
220 | } | 260 | } |
221 | 261 | ||
@@ -228,10 +268,14 @@ struct PoolDispatcher<'a> { | |||
228 | } | 268 | } |
229 | 269 | ||
230 | impl<'a> PoolDispatcher<'a> { | 270 | impl<'a> PoolDispatcher<'a> { |
231 | fn on<'b, R: req::ClientRequest>( | 271 | fn on<'b, R>( |
232 | &'b mut self, | 272 | &'b mut self, |
233 | f: fn(ServerWorld, R::Params, JobToken) -> Result<R::Result> | 273 | f: fn(ServerWorld, R::Params, JobToken) -> Result<R::Result> |
234 | ) -> Result<&'b mut Self> { | 274 | ) -> Result<&'b mut Self> |
275 | where R: req::Request, | ||
276 | R::Params: DeserializeOwned + Send + 'static, | ||
277 | R::Result: Serialize + 'static, | ||
278 | { | ||
235 | let req = match self.req.take() { | 279 | let req = match self.req.take() { |
236 | None => return Ok(self), | 280 | None => return Ok(self), |
237 | Some(req) => req, | 281 | Some(req) => req, |
@@ -239,23 +283,24 @@ impl<'a> PoolDispatcher<'a> { | |||
239 | let world = self.world; | 283 | let world = self.world; |
240 | let sender = self.sender; | 284 | let sender = self.sender; |
241 | let pool = self.pool; | 285 | let pool = self.pool; |
242 | let (handle, token) = JobHandle::new(); | 286 | match req.cast::<R>() { |
243 | let req = dispatch::handle_request::<R, _>(req, |params, resp| { | 287 | Ok((id, params)) => { |
244 | let world = world.snapshot(); | 288 | let (handle, token) = JobHandle::new(); |
245 | let sender = sender.clone(); | 289 | let world = world.snapshot(); |
246 | pool.execute(move || { | 290 | let sender = sender.clone(); |
247 | let res = f(world, params, token); | 291 | pool.execute(move || { |
248 | let task = match resp.into_response(res) { | 292 | let resp = match f(world, params, token) { |
249 | Ok(resp) => Task::Respond(resp), | 293 | Ok(resp) => RawResponse::ok(id, resp), |
250 | Err(e) => Task::Die(e), | 294 | Err(e) => RawResponse::err(id, ErrorCode::InternalError as i32, e.to_string()), |
251 | }; | 295 | }; |
252 | sender.send(task); | 296 | let task = Task::Respond(resp); |
253 | }); | 297 | sender.send(task); |
254 | Ok(()) | 298 | }); |
255 | })?; | 299 | self.res = Some((id, handle)); |
256 | match req { | 300 | } |
257 | Ok(id) => self.res = Some((id, handle)), | 301 | Err(req) => { |
258 | Err(req) => self.req = Some(req), | 302 | self.req = Some(req) |
303 | } | ||
259 | } | 304 | } |
260 | Ok(self) | 305 | Ok(self) |
261 | } | 306 | } |
@@ -282,7 +327,7 @@ fn update_file_notifications_on_threadpool( | |||
282 | error!("failed to compute diagnostics: {:?}", e) | 327 | error!("failed to compute diagnostics: {:?}", e) |
283 | } | 328 | } |
284 | Ok(params) => { | 329 | Ok(params) => { |
285 | let not = dispatch::send_notification::<req::PublishDiagnostics>(params); | 330 | let not = RawNotification::new::<req::PublishDiagnostics>(params); |
286 | sender.send(Task::Notify(not)); | 331 | sender.send(Task::Notify(not)); |
287 | } | 332 | } |
288 | } | 333 | } |
@@ -291,7 +336,7 @@ fn update_file_notifications_on_threadpool( | |||
291 | error!("failed to compute decorations: {:?}", e) | 336 | error!("failed to compute decorations: {:?}", e) |
292 | } | 337 | } |
293 | Ok(params) => { | 338 | Ok(params) => { |
294 | let not = dispatch::send_notification::<req::PublishDecorations>(params); | 339 | let not = RawNotification::new::<req::PublishDecorations>(params); |
295 | sender.send(Task::Notify(not)) | 340 | sender.send(Task::Notify(not)) |
296 | } | 341 | } |
297 | } | 342 | } |