diff options
author | Aleksey Kladov <[email protected]> | 2019-08-31 12:47:37 +0100 |
---|---|---|
committer | Aleksey Kladov <[email protected]> | 2019-08-31 12:47:37 +0100 |
commit | 4e3f5cc7293d97aef4630ea30b8e9ad6931589a8 (patch) | |
tree | 00e5db62b89604bb005f45368d0caa2642a2d350 /crates/ra_lsp_server/src/main_loop.rs | |
parent | f90a88630195a1b6b542b5883295d66dd6c59cef (diff) |
cleanup main loop
Diffstat (limited to 'crates/ra_lsp_server/src/main_loop.rs')
-rw-r--r-- | crates/ra_lsp_server/src/main_loop.rs | 277 |
1 files changed, 151 insertions, 126 deletions
diff --git a/crates/ra_lsp_server/src/main_loop.rs b/crates/ra_lsp_server/src/main_loop.rs index 42ebb5cdf..80f0216e8 100644 --- a/crates/ra_lsp_server/src/main_loop.rs +++ b/crates/ra_lsp_server/src/main_loop.rs | |||
@@ -4,12 +4,13 @@ pub(crate) mod pending_requests; | |||
4 | 4 | ||
5 | use std::{error::Error, fmt, path::PathBuf, sync::Arc, time::Instant}; | 5 | use std::{error::Error, fmt, path::PathBuf, sync::Arc, time::Instant}; |
6 | 6 | ||
7 | use crossbeam_channel::{select, unbounded, Receiver, RecvError, Sender}; | 7 | use crossbeam_channel::{select, unbounded, RecvError, Sender}; |
8 | use lsp_server::{Connection, ErrorCode, Message, Notification, Request, RequestId, Response}; | 8 | use lsp_server::{Connection, ErrorCode, Message, Notification, Request, RequestId, Response}; |
9 | use lsp_types::{ClientCapabilities, NumberOrString}; | 9 | use lsp_types::{ClientCapabilities, NumberOrString}; |
10 | use ra_ide_api::{Canceled, FeatureFlags, FileId, LibraryData}; | 10 | use ra_ide_api::{Canceled, FeatureFlags, FileId, LibraryData, SourceRootId}; |
11 | use ra_prof::profile; | 11 | use ra_prof::profile; |
12 | use ra_vfs::VfsTask; | 12 | use ra_vfs::VfsTask; |
13 | use relative_path::RelativePathBuf; | ||
13 | use serde::{de::DeserializeOwned, Serialize}; | 14 | use serde::{de::DeserializeOwned, Serialize}; |
14 | use threadpool::ThreadPool; | 15 | use threadpool::ThreadPool; |
15 | 16 | ||
@@ -18,7 +19,6 @@ use crate::{ | |||
18 | pending_requests::{PendingRequest, PendingRequests}, | 19 | pending_requests::{PendingRequest, PendingRequests}, |
19 | subscriptions::Subscriptions, | 20 | subscriptions::Subscriptions, |
20 | }, | 21 | }, |
21 | project_model::workspace_loader, | ||
22 | req, | 22 | req, |
23 | world::{Options, WorldSnapshot, WorldState}, | 23 | world::{Options, WorldSnapshot, WorldState}, |
24 | Result, ServerConfig, | 24 | Result, ServerConfig, |
@@ -54,14 +54,17 @@ pub fn main_loop( | |||
54 | connection: &Connection, | 54 | connection: &Connection, |
55 | ) -> Result<()> { | 55 | ) -> Result<()> { |
56 | log::info!("server_config: {:#?}", config); | 56 | log::info!("server_config: {:#?}", config); |
57 | |||
57 | // FIXME: support dynamic workspace loading. | 58 | // FIXME: support dynamic workspace loading. |
58 | let workspaces = { | 59 | let workspaces = { |
59 | let ws_worker = workspace_loader(config.with_sysroot); | ||
60 | let mut loaded_workspaces = Vec::new(); | 60 | let mut loaded_workspaces = Vec::new(); |
61 | for ws_root in &ws_roots { | 61 | for ws_root in &ws_roots { |
62 | ws_worker.sender().send(ws_root.clone()).unwrap(); | 62 | let workspace = ra_project_model::ProjectWorkspace::discover_with_sysroot( |
63 | match ws_worker.receiver().recv().unwrap() { | 63 | ws_root.as_path(), |
64 | Ok(ws) => loaded_workspaces.push(ws), | 64 | config.with_sysroot, |
65 | ); | ||
66 | match workspace { | ||
67 | Ok(workspace) => loaded_workspaces.push(workspace), | ||
65 | Err(e) => { | 68 | Err(e) => { |
66 | log::error!("loading workspace failed: {}", e); | 69 | log::error!("loading workspace failed: {}", e); |
67 | 70 | ||
@@ -75,11 +78,13 @@ pub fn main_loop( | |||
75 | } | 78 | } |
76 | loaded_workspaces | 79 | loaded_workspaces |
77 | }; | 80 | }; |
81 | |||
78 | let globs = config | 82 | let globs = config |
79 | .exclude_globs | 83 | .exclude_globs |
80 | .iter() | 84 | .iter() |
81 | .map(|glob| ra_vfs_glob::Glob::new(glob)) | 85 | .map(|glob| ra_vfs_glob::Glob::new(glob)) |
82 | .collect::<std::result::Result<Vec<_>, _>>()?; | 86 | .collect::<std::result::Result<Vec<_>, _>>()?; |
87 | |||
83 | let feature_flags = { | 88 | let feature_flags = { |
84 | let mut ff = FeatureFlags::default(); | 89 | let mut ff = FeatureFlags::default(); |
85 | for (flag, value) in config.feature_flags { | 90 | for (flag, value) in config.feature_flags { |
@@ -95,7 +100,8 @@ pub fn main_loop( | |||
95 | ff | 100 | ff |
96 | }; | 101 | }; |
97 | log::info!("feature_flags: {:#?}", feature_flags); | 102 | log::info!("feature_flags: {:#?}", feature_flags); |
98 | let mut state = WorldState::new( | 103 | |
104 | let mut world_state = WorldState::new( | ||
99 | ws_roots, | 105 | ws_roots, |
100 | workspaces, | 106 | workspaces, |
101 | config.lru_capacity, | 107 | config.lru_capacity, |
@@ -113,31 +119,58 @@ pub fn main_loop( | |||
113 | 119 | ||
114 | let pool = ThreadPool::new(THREADPOOL_SIZE); | 120 | let pool = ThreadPool::new(THREADPOOL_SIZE); |
115 | let (task_sender, task_receiver) = unbounded::<Task>(); | 121 | let (task_sender, task_receiver) = unbounded::<Task>(); |
116 | let mut pending_requests = PendingRequests::default(); | 122 | let (libdata_sender, libdata_receiver) = unbounded::<LibraryData>(); |
123 | let mut loop_state = LoopState::default(); | ||
117 | 124 | ||
118 | log::info!("server initialized, serving requests"); | 125 | log::info!("server initialized, serving requests"); |
119 | let main_res = main_loop_inner( | 126 | { |
120 | &pool, | 127 | let task_sender = task_sender; |
121 | connection, | 128 | let libdata_sender = libdata_sender; |
122 | task_sender, | 129 | loop { |
123 | task_receiver.clone(), | 130 | log::trace!("selecting"); |
124 | &mut state, | 131 | let event = select! { |
125 | &mut pending_requests, | 132 | recv(&connection.receiver) -> msg => match msg { |
126 | ); | 133 | Ok(msg) => Event::Msg(msg), |
134 | Err(RecvError) => Err("client exited without shutdown")?, | ||
135 | }, | ||
136 | recv(task_receiver) -> task => Event::Task(task.unwrap()), | ||
137 | recv(world_state.task_receiver) -> task => match task { | ||
138 | Ok(task) => Event::Vfs(task), | ||
139 | Err(RecvError) => Err("vfs died")?, | ||
140 | }, | ||
141 | recv(libdata_receiver) -> data => Event::Lib(data.unwrap()) | ||
142 | }; | ||
143 | if let Event::Msg(Message::Request(req)) = &event { | ||
144 | if connection.handle_shutdown(&req)? { | ||
145 | break; | ||
146 | }; | ||
147 | } | ||
148 | loop_turn( | ||
149 | &pool, | ||
150 | &task_sender, | ||
151 | &libdata_sender, | ||
152 | connection, | ||
153 | &mut world_state, | ||
154 | &mut loop_state, | ||
155 | event, | ||
156 | )?; | ||
157 | } | ||
158 | } | ||
127 | 159 | ||
128 | log::info!("waiting for tasks to finish..."); | 160 | log::info!("waiting for tasks to finish..."); |
129 | task_receiver | 161 | task_receiver.into_iter().for_each(|task| { |
130 | .into_iter() | 162 | on_task(task, &connection.sender, &mut loop_state.pending_requests, &mut world_state) |
131 | .for_each(|task| on_task(task, &connection.sender, &mut pending_requests, &mut state)); | 163 | }); |
164 | libdata_receiver.into_iter().for_each(|lib| drop(lib)); | ||
132 | log::info!("...tasks have finished"); | 165 | log::info!("...tasks have finished"); |
133 | log::info!("joining threadpool..."); | 166 | log::info!("joining threadpool..."); |
134 | drop(pool); | 167 | drop(pool); |
135 | log::info!("...threadpool has finished"); | 168 | log::info!("...threadpool has finished"); |
136 | 169 | ||
137 | let vfs = Arc::try_unwrap(state.vfs).expect("all snapshots should be dead"); | 170 | let vfs = Arc::try_unwrap(world_state.vfs).expect("all snapshots should be dead"); |
138 | drop(vfs); | 171 | drop(vfs); |
139 | 172 | ||
140 | main_res | 173 | Ok(()) |
141 | } | 174 | } |
142 | 175 | ||
143 | #[derive(Debug)] | 176 | #[derive(Debug)] |
@@ -192,121 +225,113 @@ impl fmt::Debug for Event { | |||
192 | } | 225 | } |
193 | } | 226 | } |
194 | 227 | ||
195 | fn main_loop_inner( | 228 | #[derive(Debug, Default)] |
229 | struct LoopState { | ||
230 | pending_requests: PendingRequests, | ||
231 | subscriptions: Subscriptions, | ||
232 | // We try not to index more than MAX_IN_FLIGHT_LIBS libraries at the same | ||
233 | // time to always have a thread ready to react to input. | ||
234 | in_flight_libraries: usize, | ||
235 | pending_libraries: Vec<(SourceRootId, Vec<(FileId, RelativePathBuf, Arc<String>)>)>, | ||
236 | workspace_loaded: bool, | ||
237 | } | ||
238 | |||
239 | fn loop_turn( | ||
196 | pool: &ThreadPool, | 240 | pool: &ThreadPool, |
241 | task_sender: &Sender<Task>, | ||
242 | libdata_sender: &Sender<LibraryData>, | ||
197 | connection: &Connection, | 243 | connection: &Connection, |
198 | task_sender: Sender<Task>, | 244 | world_state: &mut WorldState, |
199 | task_receiver: Receiver<Task>, | 245 | loop_state: &mut LoopState, |
200 | state: &mut WorldState, | 246 | event: Event, |
201 | pending_requests: &mut PendingRequests, | ||
202 | ) -> Result<()> { | 247 | ) -> Result<()> { |
203 | let mut subs = Subscriptions::default(); | 248 | let loop_start = Instant::now(); |
204 | // We try not to index more than MAX_IN_FLIGHT_LIBS libraries at the same | 249 | |
205 | // time to always have a thread ready to react to input. | 250 | // NOTE: don't count blocking select! call as a loop-turn time |
206 | let mut in_flight_libraries = 0; | 251 | let _p = profile("main_loop_inner/loop-turn"); |
207 | let mut pending_libraries = Vec::new(); | 252 | log::info!("loop turn = {:?}", event); |
208 | let mut send_workspace_notification = true; | 253 | let queue_count = pool.queued_count(); |
209 | 254 | if queue_count > 0 { | |
210 | let (libdata_sender, libdata_receiver) = unbounded(); | 255 | log::info!("queued count = {}", queue_count); |
211 | loop { | 256 | } |
212 | log::trace!("selecting"); | ||
213 | let event = select! { | ||
214 | recv(&connection.receiver) -> msg => match msg { | ||
215 | Ok(msg) => Event::Msg(msg), | ||
216 | Err(RecvError) => Err("client exited without shutdown")?, | ||
217 | }, | ||
218 | recv(task_receiver) -> task => Event::Task(task.unwrap()), | ||
219 | recv(state.task_receiver) -> task => match task { | ||
220 | Ok(task) => Event::Vfs(task), | ||
221 | Err(RecvError) => Err("vfs died")?, | ||
222 | }, | ||
223 | recv(libdata_receiver) -> data => Event::Lib(data.unwrap()) | ||
224 | }; | ||
225 | let loop_start = Instant::now(); | ||
226 | |||
227 | // NOTE: don't count blocking select! call as a loop-turn time | ||
228 | let _p = profile("main_loop_inner/loop-turn"); | ||
229 | log::info!("loop turn = {:?}", event); | ||
230 | let queue_count = pool.queued_count(); | ||
231 | if queue_count > 0 { | ||
232 | log::info!("queued count = {}", queue_count); | ||
233 | } | ||
234 | 257 | ||
235 | let mut state_changed = false; | 258 | let mut state_changed = false; |
236 | match event { | 259 | match event { |
237 | Event::Task(task) => { | 260 | Event::Task(task) => { |
238 | on_task(task, &connection.sender, pending_requests, state); | 261 | on_task(task, &connection.sender, &mut loop_state.pending_requests, world_state); |
239 | state.maybe_collect_garbage(); | 262 | world_state.maybe_collect_garbage(); |
240 | } | 263 | } |
241 | Event::Vfs(task) => { | 264 | Event::Vfs(task) => { |
242 | state.vfs.write().handle_task(task); | 265 | world_state.vfs.write().handle_task(task); |
266 | state_changed = true; | ||
267 | } | ||
268 | Event::Lib(lib) => { | ||
269 | world_state.add_lib(lib); | ||
270 | world_state.maybe_collect_garbage(); | ||
271 | loop_state.in_flight_libraries -= 1; | ||
272 | } | ||
273 | Event::Msg(msg) => match msg { | ||
274 | Message::Request(req) => on_request( | ||
275 | world_state, | ||
276 | &mut loop_state.pending_requests, | ||
277 | pool, | ||
278 | task_sender, | ||
279 | &connection.sender, | ||
280 | loop_start, | ||
281 | req, | ||
282 | )?, | ||
283 | Message::Notification(not) => { | ||
284 | on_notification( | ||
285 | &connection.sender, | ||
286 | world_state, | ||
287 | &mut loop_state.pending_requests, | ||
288 | &mut loop_state.subscriptions, | ||
289 | not, | ||
290 | )?; | ||
243 | state_changed = true; | 291 | state_changed = true; |
244 | } | 292 | } |
245 | Event::Lib(lib) => { | 293 | Message::Response(resp) => log::error!("unexpected response: {:?}", resp), |
246 | state.add_lib(lib); | 294 | }, |
247 | state.maybe_collect_garbage(); | 295 | }; |
248 | in_flight_libraries -= 1; | ||
249 | } | ||
250 | Event::Msg(msg) => match msg { | ||
251 | Message::Request(req) => { | ||
252 | if connection.handle_shutdown(&req)? { | ||
253 | return Ok(()); | ||
254 | }; | ||
255 | on_request( | ||
256 | state, | ||
257 | pending_requests, | ||
258 | pool, | ||
259 | &task_sender, | ||
260 | &connection.sender, | ||
261 | loop_start, | ||
262 | req, | ||
263 | )? | ||
264 | } | ||
265 | Message::Notification(not) => { | ||
266 | on_notification(&connection.sender, state, pending_requests, &mut subs, not)?; | ||
267 | state_changed = true; | ||
268 | } | ||
269 | Message::Response(resp) => log::error!("unexpected response: {:?}", resp), | ||
270 | }, | ||
271 | }; | ||
272 | 296 | ||
273 | pending_libraries.extend(state.process_changes()); | 297 | loop_state.pending_libraries.extend(world_state.process_changes()); |
274 | while in_flight_libraries < MAX_IN_FLIGHT_LIBS && !pending_libraries.is_empty() { | 298 | while loop_state.in_flight_libraries < MAX_IN_FLIGHT_LIBS |
275 | let (root, files) = pending_libraries.pop().unwrap(); | 299 | && !loop_state.pending_libraries.is_empty() |
276 | in_flight_libraries += 1; | 300 | { |
277 | let sender = libdata_sender.clone(); | 301 | let (root, files) = loop_state.pending_libraries.pop().unwrap(); |
278 | pool.execute(move || { | 302 | loop_state.in_flight_libraries += 1; |
279 | log::info!("indexing {:?} ... ", root); | 303 | let sender = libdata_sender.clone(); |
280 | let _p = profile(&format!("indexed {:?}", root)); | 304 | pool.execute(move || { |
281 | let data = LibraryData::prepare(root, files); | 305 | log::info!("indexing {:?} ... ", root); |
282 | sender.send(data).unwrap(); | 306 | let _p = profile(&format!("indexed {:?}", root)); |
283 | }); | 307 | let data = LibraryData::prepare(root, files); |
284 | } | 308 | sender.send(data).unwrap(); |
309 | }); | ||
310 | } | ||
285 | 311 | ||
286 | if send_workspace_notification | 312 | if !loop_state.workspace_loaded |
287 | && state.roots_to_scan == 0 | 313 | && world_state.roots_to_scan == 0 |
288 | && pending_libraries.is_empty() | 314 | && loop_state.pending_libraries.is_empty() |
289 | && in_flight_libraries == 0 | 315 | && loop_state.in_flight_libraries == 0 |
290 | { | 316 | { |
291 | let n_packages: usize = state.workspaces.iter().map(|it| it.n_packages()).sum(); | 317 | loop_state.workspace_loaded = true; |
292 | if state.feature_flags().get("notifications.workspace-loaded") { | 318 | let n_packages: usize = world_state.workspaces.iter().map(|it| it.n_packages()).sum(); |
293 | let msg = format!("workspace loaded, {} rust packages", n_packages); | 319 | if world_state.feature_flags().get("notifications.workspace-loaded") { |
294 | show_message(req::MessageType::Info, msg, &connection.sender); | 320 | let msg = format!("workspace loaded, {} rust packages", n_packages); |
295 | } | 321 | show_message(req::MessageType::Info, msg, &connection.sender); |
296 | // Only send the notification first time | ||
297 | send_workspace_notification = false; | ||
298 | } | 322 | } |
323 | } | ||
299 | 324 | ||
300 | if state_changed { | 325 | if state_changed { |
301 | update_file_notifications_on_threadpool( | 326 | update_file_notifications_on_threadpool( |
302 | pool, | 327 | pool, |
303 | state.snapshot(), | 328 | world_state.snapshot(), |
304 | state.options.publish_decorations, | 329 | world_state.options.publish_decorations, |
305 | task_sender.clone(), | 330 | task_sender.clone(), |
306 | subs.subscriptions(), | 331 | loop_state.subscriptions.subscriptions(), |
307 | ) | 332 | ) |
308 | } | ||
309 | } | 333 | } |
334 | Ok(()) | ||
310 | } | 335 | } |
311 | 336 | ||
312 | fn on_task( | 337 | fn on_task( |