diff options
Diffstat (limited to 'crates/server/src/main_loop/mod.rs')
-rw-r--r-- | crates/server/src/main_loop/mod.rs | 419 |
1 files changed, 0 insertions, 419 deletions
diff --git a/crates/server/src/main_loop/mod.rs b/crates/server/src/main_loop/mod.rs deleted file mode 100644 index f3b2744bf..000000000 --- a/crates/server/src/main_loop/mod.rs +++ /dev/null | |||
@@ -1,419 +0,0 @@ | |||
1 | mod handlers; | ||
2 | mod subscriptions; | ||
3 | |||
4 | use std::{ | ||
5 | path::PathBuf, | ||
6 | collections::{HashMap}, | ||
7 | }; | ||
8 | |||
9 | use serde::{Serialize, de::DeserializeOwned}; | ||
10 | use crossbeam_channel::{unbounded, Sender, Receiver}; | ||
11 | use rayon::{self, ThreadPool}; | ||
12 | use languageserver_types::{NumberOrString}; | ||
13 | use libanalysis::{FileId, JobHandle, JobToken, LibraryData}; | ||
14 | use gen_lsp_server::{ | ||
15 | RawRequest, RawNotification, RawMessage, RawResponse, ErrorCode, | ||
16 | handle_shutdown, | ||
17 | }; | ||
18 | |||
19 | use { | ||
20 | req, | ||
21 | Result, | ||
22 | vfs::{self, FileEvent}, | ||
23 | server_world::{ServerWorldState, ServerWorld}, | ||
24 | main_loop::subscriptions::{Subscriptions}, | ||
25 | project_model::{CargoWorkspace, workspace_loader}, | ||
26 | thread_watcher::Worker, | ||
27 | }; | ||
28 | |||
29 | #[derive(Debug)] | ||
30 | enum Task { | ||
31 | Respond(RawResponse), | ||
32 | Notify(RawNotification), | ||
33 | } | ||
34 | |||
35 | pub fn main_loop( | ||
36 | internal_mode: bool, | ||
37 | root: PathBuf, | ||
38 | msg_receriver: &mut Receiver<RawMessage>, | ||
39 | msg_sender: &mut Sender<RawMessage>, | ||
40 | ) -> Result<()> { | ||
41 | let pool = rayon::ThreadPoolBuilder::new() | ||
42 | .num_threads(4) | ||
43 | .panic_handler(|_| error!("thread panicked :(")) | ||
44 | .build() | ||
45 | .unwrap(); | ||
46 | let (task_sender, task_receiver) = unbounded::<Task>(); | ||
47 | let (fs_worker, fs_watcher) = vfs::roots_loader(); | ||
48 | let (ws_worker, ws_watcher) = workspace_loader(); | ||
49 | |||
50 | info!("server initialized, serving requests"); | ||
51 | let mut state = ServerWorldState::new(); | ||
52 | |||
53 | let mut pending_requests = HashMap::new(); | ||
54 | let mut subs = Subscriptions::new(); | ||
55 | let main_res = main_loop_inner( | ||
56 | internal_mode, | ||
57 | root, | ||
58 | &pool, | ||
59 | msg_sender, | ||
60 | msg_receriver, | ||
61 | task_sender, | ||
62 | task_receiver.clone(), | ||
63 | fs_worker, | ||
64 | ws_worker, | ||
65 | &mut state, | ||
66 | &mut pending_requests, | ||
67 | &mut subs, | ||
68 | ); | ||
69 | |||
70 | info!("waiting for tasks to finish..."); | ||
71 | task_receiver.for_each(|task| on_task(task, msg_sender, &mut pending_requests)); | ||
72 | info!("...tasks have finished"); | ||
73 | info!("joining threadpool..."); | ||
74 | drop(pool); | ||
75 | info!("...threadpool has finished"); | ||
76 | |||
77 | let fs_res = fs_watcher.stop(); | ||
78 | let ws_res = ws_watcher.stop(); | ||
79 | |||
80 | main_res?; | ||
81 | fs_res?; | ||
82 | ws_res?; | ||
83 | |||
84 | Ok(()) | ||
85 | } | ||
86 | |||
87 | fn main_loop_inner( | ||
88 | internal_mode: bool, | ||
89 | ws_root: PathBuf, | ||
90 | pool: &ThreadPool, | ||
91 | msg_sender: &mut Sender<RawMessage>, | ||
92 | msg_receiver: &mut Receiver<RawMessage>, | ||
93 | task_sender: Sender<Task>, | ||
94 | task_receiver: Receiver<Task>, | ||
95 | fs_worker: Worker<PathBuf, (PathBuf, Vec<FileEvent>)>, | ||
96 | ws_worker: Worker<PathBuf, Result<CargoWorkspace>>, | ||
97 | state: &mut ServerWorldState, | ||
98 | pending_requests: &mut HashMap<u64, JobHandle>, | ||
99 | subs: &mut Subscriptions, | ||
100 | ) -> Result<()> { | ||
101 | let (libdata_sender, libdata_receiver) = unbounded(); | ||
102 | ws_worker.send(ws_root.clone()); | ||
103 | fs_worker.send(ws_root.clone()); | ||
104 | loop { | ||
105 | #[derive(Debug)] | ||
106 | enum Event { | ||
107 | Msg(RawMessage), | ||
108 | Task(Task), | ||
109 | Fs(PathBuf, Vec<FileEvent>), | ||
110 | Ws(Result<CargoWorkspace>), | ||
111 | Lib(LibraryData), | ||
112 | } | ||
113 | trace!("selecting"); | ||
114 | let event = select! { | ||
115 | recv(msg_receiver, msg) => match msg { | ||
116 | Some(msg) => Event::Msg(msg), | ||
117 | None => bail!("client exited without shutdown"), | ||
118 | }, | ||
119 | recv(task_receiver, task) => Event::Task(task.unwrap()), | ||
120 | recv(fs_worker.out, events) => match events { | ||
121 | None => bail!("roots watcher died"), | ||
122 | Some((pb, events)) => Event::Fs(pb, events), | ||
123 | } | ||
124 | recv(ws_worker.out, ws) => match ws { | ||
125 | None => bail!("workspace watcher died"), | ||
126 | Some(ws) => Event::Ws(ws), | ||
127 | } | ||
128 | recv(libdata_receiver, data) => Event::Lib(data.unwrap()) | ||
129 | }; | ||
130 | let mut state_changed = false; | ||
131 | match event { | ||
132 | Event::Task(task) => on_task(task, msg_sender, pending_requests), | ||
133 | Event::Fs(root, events) => { | ||
134 | info!("fs change, {}, {} events", root.display(), events.len()); | ||
135 | if root == ws_root { | ||
136 | state.apply_fs_changes(events); | ||
137 | } else { | ||
138 | let (files, resolver) = state.events_to_files(events); | ||
139 | let sender = libdata_sender.clone(); | ||
140 | pool.spawn(move || { | ||
141 | let start = ::std::time::Instant::now(); | ||
142 | info!("indexing {} ... ", root.display()); | ||
143 | let data = LibraryData::prepare(files, resolver); | ||
144 | info!("indexed {:?} {}", start.elapsed(), root.display()); | ||
145 | sender.send(data); | ||
146 | }); | ||
147 | } | ||
148 | state_changed = true; | ||
149 | } | ||
150 | Event::Ws(ws) => { | ||
151 | match ws { | ||
152 | Ok(ws) => { | ||
153 | let workspaces = vec![ws]; | ||
154 | feedback(internal_mode, "workspace loaded", msg_sender); | ||
155 | for ws in workspaces.iter() { | ||
156 | for pkg in ws.packages().filter(|pkg| !pkg.is_member(ws)) { | ||
157 | debug!("sending root, {}", pkg.root(ws).to_path_buf().display()); | ||
158 | fs_worker.send(pkg.root(ws).to_path_buf()); | ||
159 | } | ||
160 | } | ||
161 | state.set_workspaces(workspaces); | ||
162 | state_changed = true; | ||
163 | } | ||
164 | Err(e) => warn!("loading workspace failed: {}", e), | ||
165 | } | ||
166 | } | ||
167 | Event::Lib(lib) => { | ||
168 | feedback(internal_mode, "library loaded", msg_sender); | ||
169 | state.add_lib(lib); | ||
170 | } | ||
171 | Event::Msg(msg) => { | ||
172 | match msg { | ||
173 | RawMessage::Request(req) => { | ||
174 | let req = match handle_shutdown(req, msg_sender) { | ||
175 | Some(req) => req, | ||
176 | None => return Ok(()), | ||
177 | }; | ||
178 | match on_request(state, pending_requests, pool, &task_sender, req)? { | ||
179 | None => (), | ||
180 | Some(req) => { | ||
181 | error!("unknown request: {:?}", req); | ||
182 | let resp = RawResponse::err( | ||
183 | req.id, | ||
184 | ErrorCode::MethodNotFound as i32, | ||
185 | "unknown request".to_string(), | ||
186 | ); | ||
187 | msg_sender.send(RawMessage::Response(resp)) | ||
188 | } | ||
189 | } | ||
190 | } | ||
191 | RawMessage::Notification(not) => { | ||
192 | on_notification(msg_sender, state, pending_requests, subs, not)?; | ||
193 | state_changed = true; | ||
194 | } | ||
195 | RawMessage::Response(resp) => { | ||
196 | error!("unexpected response: {:?}", resp) | ||
197 | } | ||
198 | } | ||
199 | } | ||
200 | }; | ||
201 | |||
202 | if state_changed { | ||
203 | update_file_notifications_on_threadpool( | ||
204 | pool, | ||
205 | state.snapshot(), | ||
206 | task_sender.clone(), | ||
207 | subs.subscriptions(), | ||
208 | ) | ||
209 | } | ||
210 | } | ||
211 | } | ||
212 | |||
213 | fn on_task( | ||
214 | task: Task, | ||
215 | msg_sender: &mut Sender<RawMessage>, | ||
216 | pending_requests: &mut HashMap<u64, JobHandle>, | ||
217 | ) { | ||
218 | match task { | ||
219 | Task::Respond(response) => { | ||
220 | if let Some(handle) = pending_requests.remove(&response.id) { | ||
221 | assert!(handle.has_completed()); | ||
222 | } | ||
223 | msg_sender.send(RawMessage::Response(response)) | ||
224 | } | ||
225 | Task::Notify(n) => | ||
226 | msg_sender.send(RawMessage::Notification(n)), | ||
227 | } | ||
228 | } | ||
229 | |||
230 | fn on_request( | ||
231 | world: &mut ServerWorldState, | ||
232 | pending_requests: &mut HashMap<u64, JobHandle>, | ||
233 | pool: &ThreadPool, | ||
234 | sender: &Sender<Task>, | ||
235 | req: RawRequest, | ||
236 | ) -> Result<Option<RawRequest>> { | ||
237 | let mut pool_dispatcher = PoolDispatcher { | ||
238 | req: Some(req), | ||
239 | res: None, | ||
240 | pool, world, sender | ||
241 | }; | ||
242 | let req = pool_dispatcher | ||
243 | .on::<req::SyntaxTree>(handlers::handle_syntax_tree)? | ||
244 | .on::<req::ExtendSelection>(handlers::handle_extend_selection)? | ||
245 | .on::<req::FindMatchingBrace>(handlers::handle_find_matching_brace)? | ||
246 | .on::<req::JoinLines>(handlers::handle_join_lines)? | ||
247 | .on::<req::OnTypeFormatting>(handlers::handle_on_type_formatting)? | ||
248 | .on::<req::DocumentSymbolRequest>(handlers::handle_document_symbol)? | ||
249 | .on::<req::WorkspaceSymbol>(handlers::handle_workspace_symbol)? | ||
250 | .on::<req::GotoDefinition>(handlers::handle_goto_definition)? | ||
251 | .on::<req::ParentModule>(handlers::handle_parent_module)? | ||
252 | .on::<req::Runnables>(handlers::handle_runnables)? | ||
253 | .on::<req::DecorationsRequest>(handlers::handle_decorations)? | ||
254 | .on::<req::Completion>(handlers::handle_completion)? | ||
255 | .on::<req::CodeActionRequest>(handlers::handle_code_action)? | ||
256 | .finish(); | ||
257 | match req { | ||
258 | Ok((id, handle)) => { | ||
259 | let inserted = pending_requests.insert(id, handle).is_none(); | ||
260 | assert!(inserted, "duplicate request: {}", id); | ||
261 | Ok(None) | ||
262 | }, | ||
263 | Err(req) => Ok(Some(req)), | ||
264 | } | ||
265 | } | ||
266 | |||
267 | fn on_notification( | ||
268 | msg_sender: &mut Sender<RawMessage>, | ||
269 | state: &mut ServerWorldState, | ||
270 | pending_requests: &mut HashMap<u64, JobHandle>, | ||
271 | subs: &mut Subscriptions, | ||
272 | not: RawNotification, | ||
273 | ) -> Result<()> { | ||
274 | let not = match not.cast::<req::Cancel>() { | ||
275 | Ok(params) => { | ||
276 | let id = match params.id { | ||
277 | NumberOrString::Number(id) => id, | ||
278 | NumberOrString::String(id) => { | ||
279 | panic!("string id's not supported: {:?}", id); | ||
280 | } | ||
281 | }; | ||
282 | if let Some(handle) = pending_requests.remove(&id) { | ||
283 | handle.cancel(); | ||
284 | } | ||
285 | return Ok(()) | ||
286 | } | ||
287 | Err(not) => not, | ||
288 | }; | ||
289 | let not = match not.cast::<req::DidOpenTextDocument>() { | ||
290 | Ok(params) => { | ||
291 | let uri = params.text_document.uri; | ||
292 | let path = uri.to_file_path() | ||
293 | .map_err(|()| format_err!("invalid uri: {}", uri))?; | ||
294 | let file_id = state.add_mem_file(path, params.text_document.text); | ||
295 | subs.add_sub(file_id); | ||
296 | return Ok(()) | ||
297 | } | ||
298 | Err(not) => not, | ||
299 | }; | ||
300 | let not = match not.cast::<req::DidChangeTextDocument>() { | ||
301 | Ok(mut params) => { | ||
302 | let uri = params.text_document.uri; | ||
303 | let path = uri.to_file_path() | ||
304 | .map_err(|()| format_err!("invalid uri: {}", uri))?; | ||
305 | let text = params.content_changes.pop() | ||
306 | .ok_or_else(|| format_err!("empty changes"))? | ||
307 | .text; | ||
308 | state.change_mem_file(path.as_path(), text)?; | ||
309 | return Ok(()) | ||
310 | } | ||
311 | Err(not) => not, | ||
312 | }; | ||
313 | let not = match not.cast::<req::DidCloseTextDocument>() { | ||
314 | Ok(params) => { | ||
315 | let uri = params.text_document.uri; | ||
316 | let path = uri.to_file_path() | ||
317 | .map_err(|()| format_err!("invalid uri: {}", uri))?; | ||
318 | let file_id = state.remove_mem_file(path.as_path())?; | ||
319 | subs.remove_sub(file_id); | ||
320 | let params = req::PublishDiagnosticsParams { uri, diagnostics: Vec::new() }; | ||
321 | let not = RawNotification::new::<req::PublishDiagnostics>(¶ms); | ||
322 | msg_sender.send(RawMessage::Notification(not)); | ||
323 | return Ok(()) | ||
324 | } | ||
325 | Err(not) => not, | ||
326 | }; | ||
327 | error!("unhandled notification: {:?}", not); | ||
328 | Ok(()) | ||
329 | } | ||
330 | |||
331 | struct PoolDispatcher<'a> { | ||
332 | req: Option<RawRequest>, | ||
333 | res: Option<(u64, JobHandle)>, | ||
334 | pool: &'a ThreadPool, | ||
335 | world: &'a ServerWorldState, | ||
336 | sender: &'a Sender<Task>, | ||
337 | } | ||
338 | |||
339 | impl<'a> PoolDispatcher<'a> { | ||
340 | fn on<'b, R>( | ||
341 | &'b mut self, | ||
342 | f: fn(ServerWorld, R::Params, JobToken) -> Result<R::Result> | ||
343 | ) -> Result<&'b mut Self> | ||
344 | where R: req::Request, | ||
345 | R::Params: DeserializeOwned + Send + 'static, | ||
346 | R::Result: Serialize + 'static, | ||
347 | { | ||
348 | let req = match self.req.take() { | ||
349 | None => return Ok(self), | ||
350 | Some(req) => req, | ||
351 | }; | ||
352 | match req.cast::<R>() { | ||
353 | Ok((id, params)) => { | ||
354 | let (handle, token) = JobHandle::new(); | ||
355 | let world = self.world.snapshot(); | ||
356 | let sender = self.sender.clone(); | ||
357 | self.pool.spawn(move || { | ||
358 | let resp = match f(world, params, token) { | ||
359 | Ok(resp) => RawResponse::ok::<R>(id, &resp), | ||
360 | Err(e) => RawResponse::err(id, ErrorCode::InternalError as i32, e.to_string()), | ||
361 | }; | ||
362 | let task = Task::Respond(resp); | ||
363 | sender.send(task); | ||
364 | }); | ||
365 | self.res = Some((id, handle)); | ||
366 | } | ||
367 | Err(req) => { | ||
368 | self.req = Some(req) | ||
369 | } | ||
370 | } | ||
371 | Ok(self) | ||
372 | } | ||
373 | |||
374 | fn finish(&mut self) -> ::std::result::Result<(u64, JobHandle), RawRequest> { | ||
375 | match (self.res.take(), self.req.take()) { | ||
376 | (Some(res), None) => Ok(res), | ||
377 | (None, Some(req)) => Err(req), | ||
378 | _ => unreachable!(), | ||
379 | } | ||
380 | } | ||
381 | } | ||
382 | |||
383 | fn update_file_notifications_on_threadpool( | ||
384 | pool: &ThreadPool, | ||
385 | world: ServerWorld, | ||
386 | sender: Sender<Task>, | ||
387 | subscriptions: Vec<FileId>, | ||
388 | ) { | ||
389 | pool.spawn(move || { | ||
390 | for file_id in subscriptions { | ||
391 | match handlers::publish_diagnostics(world.clone(), file_id) { | ||
392 | Err(e) => { | ||
393 | error!("failed to compute diagnostics: {:?}", e) | ||
394 | } | ||
395 | Ok(params) => { | ||
396 | let not = RawNotification::new::<req::PublishDiagnostics>(¶ms); | ||
397 | sender.send(Task::Notify(not)); | ||
398 | } | ||
399 | } | ||
400 | match handlers::publish_decorations(world.clone(), file_id) { | ||
401 | Err(e) => { | ||
402 | error!("failed to compute decorations: {:?}", e) | ||
403 | } | ||
404 | Ok(params) => { | ||
405 | let not = RawNotification::new::<req::PublishDecorations>(¶ms); | ||
406 | sender.send(Task::Notify(not)) | ||
407 | } | ||
408 | } | ||
409 | } | ||
410 | }); | ||
411 | } | ||
412 | |||
413 | fn feedback(intrnal_mode: bool, msg: &str, sender: &Sender<RawMessage>) { | ||
414 | if !intrnal_mode { | ||
415 | return; | ||
416 | } | ||
417 | let not = RawNotification::new::<req::InternalFeedback>(&msg.to_string()); | ||
418 | sender.send(RawMessage::Notification(not)); | ||
419 | } | ||