From c2c10b9014549e9c0783fb13dc202dfab6e6fd0a Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Sun, 30 Dec 2018 23:23:31 +0300 Subject: :arrow_up: crossbeam closes #189 --- crates/ra_lsp_server/src/main_loop.rs | 48 +++++++++++++++++-------------- crates/ra_lsp_server/src/project_model.rs | 4 ++- 2 files changed, 29 insertions(+), 23 deletions(-) (limited to 'crates/ra_lsp_server/src') diff --git a/crates/ra_lsp_server/src/main_loop.rs b/crates/ra_lsp_server/src/main_loop.rs index 97c1be778..3ebae4ecd 100644 --- a/crates/ra_lsp_server/src/main_loop.rs +++ b/crates/ra_lsp_server/src/main_loop.rs @@ -7,7 +7,7 @@ use std::{ sync::Arc, }; -use crossbeam_channel::{unbounded, select, Receiver, Sender}; +use crossbeam_channel::{unbounded, select, Receiver, Sender, RecvError}; use gen_lsp_server::{ handle_shutdown, ErrorCode, RawMessage, RawNotification, RawRequest, RawResponse, }; @@ -62,7 +62,7 @@ pub fn main_loop( let (task_sender, task_receiver) = unbounded::(); let (ws_worker, ws_watcher) = workspace_loader(); - ws_worker.send(ws_root.clone()); + ws_worker.send(ws_root.clone()).unwrap(); // FIXME: support dynamic workspace loading. let workspaces = match ws_worker.recv().unwrap() { Ok(ws) => vec![ws], @@ -95,7 +95,9 @@ pub fn main_loop( ); log::info!("waiting for tasks to finish..."); - task_receiver.for_each(|task| on_task(task, msg_sender, &mut pending_requests)); + task_receiver + .into_iter() + .for_each(|task| on_task(task, msg_sender, &mut pending_requests)); log::info!("...tasks have finished"); log::info!("joining threadpool..."); drop(pool); @@ -170,16 +172,16 @@ fn main_loop_inner( loop { log::trace!("selecting"); let event = select! { - recv(msg_receiver, msg) => match msg { - Some(msg) => Event::Msg(msg), - None => bail!("client exited without shutdown"), + recv(msg_receiver) -> msg => match msg { + Ok(msg) => Event::Msg(msg), + Err(RecvError) => bail!("client exited without shutdown"), }, - recv(task_receiver, task) => Event::Task(task.unwrap()), - recv(state.vfs.read().task_receiver(), task) => match task { - None => bail!("vfs died"), - Some(task) => Event::Vfs(task), - } - recv(libdata_receiver, data) => Event::Lib(data.unwrap()) + recv(task_receiver) -> task => Event::Task(task.unwrap()), + recv(state.vfs.read().task_receiver()) -> task => match task { + Ok(task) => Event::Vfs(task), + Err(RecvError) => bail!("vfs died"), + }, + recv(libdata_receiver) -> data => Event::Lib(data.unwrap()) }; log::info!("loop_turn = {:?}", event); let start = std::time::Instant::now(); @@ -209,7 +211,7 @@ fn main_loop_inner( ErrorCode::MethodNotFound as i32, "unknown request".to_string(), ); - msg_sender.send(RawMessage::Response(resp)) + msg_sender.send(RawMessage::Response(resp)).unwrap() } } } @@ -229,7 +231,7 @@ fn main_loop_inner( log::info!("indexing {:?} ... ", root); let data = LibraryData::prepare(root, files); log::info!("indexed {:?} {:?}", start.elapsed(), root); - sender.send(data); + sender.send(data).unwrap(); }); } if state.roots_to_scan == 0 { @@ -253,10 +255,12 @@ fn on_task(task: Task, msg_sender: &Sender, pending_requests: &mut F match task { Task::Respond(response) => { if pending_requests.remove(&response.id) { - msg_sender.send(RawMessage::Response(response)) + msg_sender.send(RawMessage::Response(response)).unwrap(); } } - Task::Notify(n) => msg_sender.send(RawMessage::Notification(n)), + Task::Notify(n) => { + msg_sender.send(RawMessage::Notification(n)).unwrap(); + } } } @@ -328,7 +332,7 @@ fn on_notification( ErrorCode::RequestCancelled as i32, "canceled by client".to_string(), ); - msg_sender.send(RawMessage::Response(response)) + msg_sender.send(RawMessage::Response(response)).unwrap() } return Ok(()); } @@ -381,7 +385,7 @@ fn on_notification( diagnostics: Vec::new(), }; let not = RawNotification::new::(¶ms); - msg_sender.send(RawMessage::Notification(not)); + msg_sender.send(RawMessage::Notification(not)).unwrap(); return Ok(()); } Err(not) => not, @@ -441,7 +445,7 @@ impl<'a> PoolDispatcher<'a> { }, }; let task = Task::Respond(resp); - sender.send(task); + sender.send(task).unwrap(); }); self.res = Some(id); } @@ -476,7 +480,7 @@ fn update_file_notifications_on_threadpool( } Ok(params) => { let not = RawNotification::new::(¶ms); - sender.send(Task::Notify(not)); + sender.send(Task::Notify(not)).unwrap(); } } if publish_decorations { @@ -488,7 +492,7 @@ fn update_file_notifications_on_threadpool( } Ok(params) => { let not = RawNotification::new::(¶ms); - sender.send(Task::Notify(not)) + sender.send(Task::Notify(not)).unwrap(); } } } @@ -501,7 +505,7 @@ fn feedback(intrnal_mode: bool, msg: &str, sender: &Sender) { return; } let not = RawNotification::new::(&msg.to_string()); - sender.send(RawMessage::Notification(not)); + sender.send(RawMessage::Notification(not)).unwrap(); } fn is_canceled(e: &failure::Error) -> bool { diff --git a/crates/ra_lsp_server/src/project_model.rs b/crates/ra_lsp_server/src/project_model.rs index 5852a157d..ae2149463 100644 --- a/crates/ra_lsp_server/src/project_model.rs +++ b/crates/ra_lsp_server/src/project_model.rs @@ -204,8 +204,10 @@ pub fn workspace_loader() -> (Worker>, WorkerHan 1, |input_receiver, output_sender| { input_receiver + .into_iter() .map(|path| CargoWorkspace::from_cargo_metadata(path.as_path())) - .for_each(|it| output_sender.send(it)) + .try_for_each(|it| output_sender.send(it)) + .unwrap() }, ) } -- cgit v1.2.3