aboutsummaryrefslogtreecommitdiff
path: root/crates/server/src/main_loop/mod.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/server/src/main_loop/mod.rs')
-rw-r--r--crates/server/src/main_loop/mod.rs419
1 files changed, 0 insertions, 419 deletions
diff --git a/crates/server/src/main_loop/mod.rs b/crates/server/src/main_loop/mod.rs
deleted file mode 100644
index f3b2744bf..000000000
--- a/crates/server/src/main_loop/mod.rs
+++ /dev/null
@@ -1,419 +0,0 @@
1mod handlers;
2mod subscriptions;
3
4use std::{
5 path::PathBuf,
6 collections::{HashMap},
7};
8
9use serde::{Serialize, de::DeserializeOwned};
10use crossbeam_channel::{unbounded, Sender, Receiver};
11use rayon::{self, ThreadPool};
12use languageserver_types::{NumberOrString};
13use libanalysis::{FileId, JobHandle, JobToken, LibraryData};
14use gen_lsp_server::{
15 RawRequest, RawNotification, RawMessage, RawResponse, ErrorCode,
16 handle_shutdown,
17};
18
19use {
20 req,
21 Result,
22 vfs::{self, FileEvent},
23 server_world::{ServerWorldState, ServerWorld},
24 main_loop::subscriptions::{Subscriptions},
25 project_model::{CargoWorkspace, workspace_loader},
26 thread_watcher::Worker,
27};
28
29#[derive(Debug)]
30enum Task {
31 Respond(RawResponse),
32 Notify(RawNotification),
33}
34
35pub fn main_loop(
36 internal_mode: bool,
37 root: PathBuf,
38 msg_receriver: &mut Receiver<RawMessage>,
39 msg_sender: &mut Sender<RawMessage>,
40) -> Result<()> {
41 let pool = rayon::ThreadPoolBuilder::new()
42 .num_threads(4)
43 .panic_handler(|_| error!("thread panicked :("))
44 .build()
45 .unwrap();
46 let (task_sender, task_receiver) = unbounded::<Task>();
47 let (fs_worker, fs_watcher) = vfs::roots_loader();
48 let (ws_worker, ws_watcher) = workspace_loader();
49
50 info!("server initialized, serving requests");
51 let mut state = ServerWorldState::new();
52
53 let mut pending_requests = HashMap::new();
54 let mut subs = Subscriptions::new();
55 let main_res = main_loop_inner(
56 internal_mode,
57 root,
58 &pool,
59 msg_sender,
60 msg_receriver,
61 task_sender,
62 task_receiver.clone(),
63 fs_worker,
64 ws_worker,
65 &mut state,
66 &mut pending_requests,
67 &mut subs,
68 );
69
70 info!("waiting for tasks to finish...");
71 task_receiver.for_each(|task| on_task(task, msg_sender, &mut pending_requests));
72 info!("...tasks have finished");
73 info!("joining threadpool...");
74 drop(pool);
75 info!("...threadpool has finished");
76
77 let fs_res = fs_watcher.stop();
78 let ws_res = ws_watcher.stop();
79
80 main_res?;
81 fs_res?;
82 ws_res?;
83
84 Ok(())
85}
86
87fn main_loop_inner(
88 internal_mode: bool,
89 ws_root: PathBuf,
90 pool: &ThreadPool,
91 msg_sender: &mut Sender<RawMessage>,
92 msg_receiver: &mut Receiver<RawMessage>,
93 task_sender: Sender<Task>,
94 task_receiver: Receiver<Task>,
95 fs_worker: Worker<PathBuf, (PathBuf, Vec<FileEvent>)>,
96 ws_worker: Worker<PathBuf, Result<CargoWorkspace>>,
97 state: &mut ServerWorldState,
98 pending_requests: &mut HashMap<u64, JobHandle>,
99 subs: &mut Subscriptions,
100) -> Result<()> {
101 let (libdata_sender, libdata_receiver) = unbounded();
102 ws_worker.send(ws_root.clone());
103 fs_worker.send(ws_root.clone());
104 loop {
105 #[derive(Debug)]
106 enum Event {
107 Msg(RawMessage),
108 Task(Task),
109 Fs(PathBuf, Vec<FileEvent>),
110 Ws(Result<CargoWorkspace>),
111 Lib(LibraryData),
112 }
113 trace!("selecting");
114 let event = select! {
115 recv(msg_receiver, msg) => match msg {
116 Some(msg) => Event::Msg(msg),
117 None => bail!("client exited without shutdown"),
118 },
119 recv(task_receiver, task) => Event::Task(task.unwrap()),
120 recv(fs_worker.out, events) => match events {
121 None => bail!("roots watcher died"),
122 Some((pb, events)) => Event::Fs(pb, events),
123 }
124 recv(ws_worker.out, ws) => match ws {
125 None => bail!("workspace watcher died"),
126 Some(ws) => Event::Ws(ws),
127 }
128 recv(libdata_receiver, data) => Event::Lib(data.unwrap())
129 };
130 let mut state_changed = false;
131 match event {
132 Event::Task(task) => on_task(task, msg_sender, pending_requests),
133 Event::Fs(root, events) => {
134 info!("fs change, {}, {} events", root.display(), events.len());
135 if root == ws_root {
136 state.apply_fs_changes(events);
137 } else {
138 let (files, resolver) = state.events_to_files(events);
139 let sender = libdata_sender.clone();
140 pool.spawn(move || {
141 let start = ::std::time::Instant::now();
142 info!("indexing {} ... ", root.display());
143 let data = LibraryData::prepare(files, resolver);
144 info!("indexed {:?} {}", start.elapsed(), root.display());
145 sender.send(data);
146 });
147 }
148 state_changed = true;
149 }
150 Event::Ws(ws) => {
151 match ws {
152 Ok(ws) => {
153 let workspaces = vec![ws];
154 feedback(internal_mode, "workspace loaded", msg_sender);
155 for ws in workspaces.iter() {
156 for pkg in ws.packages().filter(|pkg| !pkg.is_member(ws)) {
157 debug!("sending root, {}", pkg.root(ws).to_path_buf().display());
158 fs_worker.send(pkg.root(ws).to_path_buf());
159 }
160 }
161 state.set_workspaces(workspaces);
162 state_changed = true;
163 }
164 Err(e) => warn!("loading workspace failed: {}", e),
165 }
166 }
167 Event::Lib(lib) => {
168 feedback(internal_mode, "library loaded", msg_sender);
169 state.add_lib(lib);
170 }
171 Event::Msg(msg) => {
172 match msg {
173 RawMessage::Request(req) => {
174 let req = match handle_shutdown(req, msg_sender) {
175 Some(req) => req,
176 None => return Ok(()),
177 };
178 match on_request(state, pending_requests, pool, &task_sender, req)? {
179 None => (),
180 Some(req) => {
181 error!("unknown request: {:?}", req);
182 let resp = RawResponse::err(
183 req.id,
184 ErrorCode::MethodNotFound as i32,
185 "unknown request".to_string(),
186 );
187 msg_sender.send(RawMessage::Response(resp))
188 }
189 }
190 }
191 RawMessage::Notification(not) => {
192 on_notification(msg_sender, state, pending_requests, subs, not)?;
193 state_changed = true;
194 }
195 RawMessage::Response(resp) => {
196 error!("unexpected response: {:?}", resp)
197 }
198 }
199 }
200 };
201
202 if state_changed {
203 update_file_notifications_on_threadpool(
204 pool,
205 state.snapshot(),
206 task_sender.clone(),
207 subs.subscriptions(),
208 )
209 }
210 }
211}
212
213fn on_task(
214 task: Task,
215 msg_sender: &mut Sender<RawMessage>,
216 pending_requests: &mut HashMap<u64, JobHandle>,
217) {
218 match task {
219 Task::Respond(response) => {
220 if let Some(handle) = pending_requests.remove(&response.id) {
221 assert!(handle.has_completed());
222 }
223 msg_sender.send(RawMessage::Response(response))
224 }
225 Task::Notify(n) =>
226 msg_sender.send(RawMessage::Notification(n)),
227 }
228}
229
230fn on_request(
231 world: &mut ServerWorldState,
232 pending_requests: &mut HashMap<u64, JobHandle>,
233 pool: &ThreadPool,
234 sender: &Sender<Task>,
235 req: RawRequest,
236) -> Result<Option<RawRequest>> {
237 let mut pool_dispatcher = PoolDispatcher {
238 req: Some(req),
239 res: None,
240 pool, world, sender
241 };
242 let req = pool_dispatcher
243 .on::<req::SyntaxTree>(handlers::handle_syntax_tree)?
244 .on::<req::ExtendSelection>(handlers::handle_extend_selection)?
245 .on::<req::FindMatchingBrace>(handlers::handle_find_matching_brace)?
246 .on::<req::JoinLines>(handlers::handle_join_lines)?
247 .on::<req::OnTypeFormatting>(handlers::handle_on_type_formatting)?
248 .on::<req::DocumentSymbolRequest>(handlers::handle_document_symbol)?
249 .on::<req::WorkspaceSymbol>(handlers::handle_workspace_symbol)?
250 .on::<req::GotoDefinition>(handlers::handle_goto_definition)?
251 .on::<req::ParentModule>(handlers::handle_parent_module)?
252 .on::<req::Runnables>(handlers::handle_runnables)?
253 .on::<req::DecorationsRequest>(handlers::handle_decorations)?
254 .on::<req::Completion>(handlers::handle_completion)?
255 .on::<req::CodeActionRequest>(handlers::handle_code_action)?
256 .finish();
257 match req {
258 Ok((id, handle)) => {
259 let inserted = pending_requests.insert(id, handle).is_none();
260 assert!(inserted, "duplicate request: {}", id);
261 Ok(None)
262 },
263 Err(req) => Ok(Some(req)),
264 }
265}
266
267fn on_notification(
268 msg_sender: &mut Sender<RawMessage>,
269 state: &mut ServerWorldState,
270 pending_requests: &mut HashMap<u64, JobHandle>,
271 subs: &mut Subscriptions,
272 not: RawNotification,
273) -> Result<()> {
274 let not = match not.cast::<req::Cancel>() {
275 Ok(params) => {
276 let id = match params.id {
277 NumberOrString::Number(id) => id,
278 NumberOrString::String(id) => {
279 panic!("string id's not supported: {:?}", id);
280 }
281 };
282 if let Some(handle) = pending_requests.remove(&id) {
283 handle.cancel();
284 }
285 return Ok(())
286 }
287 Err(not) => not,
288 };
289 let not = match not.cast::<req::DidOpenTextDocument>() {
290 Ok(params) => {
291 let uri = params.text_document.uri;
292 let path = uri.to_file_path()
293 .map_err(|()| format_err!("invalid uri: {}", uri))?;
294 let file_id = state.add_mem_file(path, params.text_document.text);
295 subs.add_sub(file_id);
296 return Ok(())
297 }
298 Err(not) => not,
299 };
300 let not = match not.cast::<req::DidChangeTextDocument>() {
301 Ok(mut params) => {
302 let uri = params.text_document.uri;
303 let path = uri.to_file_path()
304 .map_err(|()| format_err!("invalid uri: {}", uri))?;
305 let text = params.content_changes.pop()
306 .ok_or_else(|| format_err!("empty changes"))?
307 .text;
308 state.change_mem_file(path.as_path(), text)?;
309 return Ok(())
310 }
311 Err(not) => not,
312 };
313 let not = match not.cast::<req::DidCloseTextDocument>() {
314 Ok(params) => {
315 let uri = params.text_document.uri;
316 let path = uri.to_file_path()
317 .map_err(|()| format_err!("invalid uri: {}", uri))?;
318 let file_id = state.remove_mem_file(path.as_path())?;
319 subs.remove_sub(file_id);
320 let params = req::PublishDiagnosticsParams { uri, diagnostics: Vec::new() };
321 let not = RawNotification::new::<req::PublishDiagnostics>(&params);
322 msg_sender.send(RawMessage::Notification(not));
323 return Ok(())
324 }
325 Err(not) => not,
326 };
327 error!("unhandled notification: {:?}", not);
328 Ok(())
329}
330
331struct PoolDispatcher<'a> {
332 req: Option<RawRequest>,
333 res: Option<(u64, JobHandle)>,
334 pool: &'a ThreadPool,
335 world: &'a ServerWorldState,
336 sender: &'a Sender<Task>,
337}
338
339impl<'a> PoolDispatcher<'a> {
340 fn on<'b, R>(
341 &'b mut self,
342 f: fn(ServerWorld, R::Params, JobToken) -> Result<R::Result>
343 ) -> Result<&'b mut Self>
344 where R: req::Request,
345 R::Params: DeserializeOwned + Send + 'static,
346 R::Result: Serialize + 'static,
347 {
348 let req = match self.req.take() {
349 None => return Ok(self),
350 Some(req) => req,
351 };
352 match req.cast::<R>() {
353 Ok((id, params)) => {
354 let (handle, token) = JobHandle::new();
355 let world = self.world.snapshot();
356 let sender = self.sender.clone();
357 self.pool.spawn(move || {
358 let resp = match f(world, params, token) {
359 Ok(resp) => RawResponse::ok::<R>(id, &resp),
360 Err(e) => RawResponse::err(id, ErrorCode::InternalError as i32, e.to_string()),
361 };
362 let task = Task::Respond(resp);
363 sender.send(task);
364 });
365 self.res = Some((id, handle));
366 }
367 Err(req) => {
368 self.req = Some(req)
369 }
370 }
371 Ok(self)
372 }
373
374 fn finish(&mut self) -> ::std::result::Result<(u64, JobHandle), RawRequest> {
375 match (self.res.take(), self.req.take()) {
376 (Some(res), None) => Ok(res),
377 (None, Some(req)) => Err(req),
378 _ => unreachable!(),
379 }
380 }
381}
382
383fn update_file_notifications_on_threadpool(
384 pool: &ThreadPool,
385 world: ServerWorld,
386 sender: Sender<Task>,
387 subscriptions: Vec<FileId>,
388) {
389 pool.spawn(move || {
390 for file_id in subscriptions {
391 match handlers::publish_diagnostics(world.clone(), file_id) {
392 Err(e) => {
393 error!("failed to compute diagnostics: {:?}", e)
394 }
395 Ok(params) => {
396 let not = RawNotification::new::<req::PublishDiagnostics>(&params);
397 sender.send(Task::Notify(not));
398 }
399 }
400 match handlers::publish_decorations(world.clone(), file_id) {
401 Err(e) => {
402 error!("failed to compute decorations: {:?}", e)
403 }
404 Ok(params) => {
405 let not = RawNotification::new::<req::PublishDecorations>(&params);
406 sender.send(Task::Notify(not))
407 }
408 }
409 }
410 });
411}
412
413fn feedback(intrnal_mode: bool, msg: &str, sender: &Sender<RawMessage>) {
414 if !intrnal_mode {
415 return;
416 }
417 let not = RawNotification::new::<req::InternalFeedback>(&msg.to_string());
418 sender.send(RawMessage::Notification(not));
419}