From c2c10b9014549e9c0783fb13dc202dfab6e6fd0a Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Sun, 30 Dec 2018 23:23:31 +0300 Subject: :arrow_up: crossbeam closes #189 --- crates/ra_lsp_server/Cargo.toml | 2 +- crates/ra_lsp_server/src/main_loop.rs | 48 ++++++++++++----------- crates/ra_lsp_server/src/project_model.rs | 4 +- crates/ra_lsp_server/tests/heavy_tests/support.rs | 13 ++++-- crates/ra_vfs/Cargo.toml | 2 +- crates/ra_vfs/src/io.rs | 4 +- crates/ra_vfs/src/lib.rs | 2 +- crates/thread_worker/Cargo.toml | 2 +- crates/thread_worker/src/lib.rs | 6 +-- 9 files changed, 48 insertions(+), 35 deletions(-) (limited to 'crates') diff --git a/crates/ra_lsp_server/Cargo.toml b/crates/ra_lsp_server/Cargo.toml index 646df2497..b1e8987fe 100644 --- a/crates/ra_lsp_server/Cargo.toml +++ b/crates/ra_lsp_server/Cargo.toml @@ -13,7 +13,7 @@ failure_derive = "0.1.2" serde_json = "1.0.24" serde = "1.0.83" drop_bomb = "0.1.0" -crossbeam-channel = "0.2.4" +crossbeam-channel = "0.3.5" flexi_logger = "0.10.0" log = "0.4.3" url_serde = "0.2.0" diff --git a/crates/ra_lsp_server/src/main_loop.rs b/crates/ra_lsp_server/src/main_loop.rs index 97c1be778..3ebae4ecd 100644 --- a/crates/ra_lsp_server/src/main_loop.rs +++ b/crates/ra_lsp_server/src/main_loop.rs @@ -7,7 +7,7 @@ use std::{ sync::Arc, }; -use crossbeam_channel::{unbounded, select, Receiver, Sender}; +use crossbeam_channel::{unbounded, select, Receiver, Sender, RecvError}; use gen_lsp_server::{ handle_shutdown, ErrorCode, RawMessage, RawNotification, RawRequest, RawResponse, }; @@ -62,7 +62,7 @@ pub fn main_loop( let (task_sender, task_receiver) = unbounded::(); let (ws_worker, ws_watcher) = workspace_loader(); - ws_worker.send(ws_root.clone()); + ws_worker.send(ws_root.clone()).unwrap(); // FIXME: support dynamic workspace loading. let workspaces = match ws_worker.recv().unwrap() { Ok(ws) => vec![ws], @@ -95,7 +95,9 @@ pub fn main_loop( ); log::info!("waiting for tasks to finish..."); - task_receiver.for_each(|task| on_task(task, msg_sender, &mut pending_requests)); + task_receiver + .into_iter() + .for_each(|task| on_task(task, msg_sender, &mut pending_requests)); log::info!("...tasks have finished"); log::info!("joining threadpool..."); drop(pool); @@ -170,16 +172,16 @@ fn main_loop_inner( loop { log::trace!("selecting"); let event = select! { - recv(msg_receiver, msg) => match msg { - Some(msg) => Event::Msg(msg), - None => bail!("client exited without shutdown"), + recv(msg_receiver) -> msg => match msg { + Ok(msg) => Event::Msg(msg), + Err(RecvError) => bail!("client exited without shutdown"), }, - recv(task_receiver, task) => Event::Task(task.unwrap()), - recv(state.vfs.read().task_receiver(), task) => match task { - None => bail!("vfs died"), - Some(task) => Event::Vfs(task), - } - recv(libdata_receiver, data) => Event::Lib(data.unwrap()) + recv(task_receiver) -> task => Event::Task(task.unwrap()), + recv(state.vfs.read().task_receiver()) -> task => match task { + Ok(task) => Event::Vfs(task), + Err(RecvError) => bail!("vfs died"), + }, + recv(libdata_receiver) -> data => Event::Lib(data.unwrap()) }; log::info!("loop_turn = {:?}", event); let start = std::time::Instant::now(); @@ -209,7 +211,7 @@ fn main_loop_inner( ErrorCode::MethodNotFound as i32, "unknown request".to_string(), ); - msg_sender.send(RawMessage::Response(resp)) + msg_sender.send(RawMessage::Response(resp)).unwrap() } } } @@ -229,7 +231,7 @@ fn main_loop_inner( log::info!("indexing {:?} ... ", root); let data = LibraryData::prepare(root, files); log::info!("indexed {:?} {:?}", start.elapsed(), root); - sender.send(data); + sender.send(data).unwrap(); }); } if state.roots_to_scan == 0 { @@ -253,10 +255,12 @@ fn on_task(task: Task, msg_sender: &Sender, pending_requests: &mut F match task { Task::Respond(response) => { if pending_requests.remove(&response.id) { - msg_sender.send(RawMessage::Response(response)) + msg_sender.send(RawMessage::Response(response)).unwrap(); } } - Task::Notify(n) => msg_sender.send(RawMessage::Notification(n)), + Task::Notify(n) => { + msg_sender.send(RawMessage::Notification(n)).unwrap(); + } } } @@ -328,7 +332,7 @@ fn on_notification( ErrorCode::RequestCancelled as i32, "canceled by client".to_string(), ); - msg_sender.send(RawMessage::Response(response)) + msg_sender.send(RawMessage::Response(response)).unwrap() } return Ok(()); } @@ -381,7 +385,7 @@ fn on_notification( diagnostics: Vec::new(), }; let not = RawNotification::new::(¶ms); - msg_sender.send(RawMessage::Notification(not)); + msg_sender.send(RawMessage::Notification(not)).unwrap(); return Ok(()); } Err(not) => not, @@ -441,7 +445,7 @@ impl<'a> PoolDispatcher<'a> { }, }; let task = Task::Respond(resp); - sender.send(task); + sender.send(task).unwrap(); }); self.res = Some(id); } @@ -476,7 +480,7 @@ fn update_file_notifications_on_threadpool( } Ok(params) => { let not = RawNotification::new::(¶ms); - sender.send(Task::Notify(not)); + sender.send(Task::Notify(not)).unwrap(); } } if publish_decorations { @@ -488,7 +492,7 @@ fn update_file_notifications_on_threadpool( } Ok(params) => { let not = RawNotification::new::(¶ms); - sender.send(Task::Notify(not)) + sender.send(Task::Notify(not)).unwrap(); } } } @@ -501,7 +505,7 @@ fn feedback(intrnal_mode: bool, msg: &str, sender: &Sender) { return; } let not = RawNotification::new::(&msg.to_string()); - sender.send(RawMessage::Notification(not)); + sender.send(RawMessage::Notification(not)).unwrap(); } fn is_canceled(e: &failure::Error) -> bool { diff --git a/crates/ra_lsp_server/src/project_model.rs b/crates/ra_lsp_server/src/project_model.rs index 5852a157d..ae2149463 100644 --- a/crates/ra_lsp_server/src/project_model.rs +++ b/crates/ra_lsp_server/src/project_model.rs @@ -204,8 +204,10 @@ pub fn workspace_loader() -> (Worker>, WorkerHan 1, |input_receiver, output_sender| { input_receiver + .into_iter() .map(|path| CargoWorkspace::from_cargo_metadata(path.as_path())) - .for_each(|it| output_sender.send(it)) + .try_for_each(|it| output_sender.send(it)) + .unwrap() }, ) } diff --git a/crates/ra_lsp_server/tests/heavy_tests/support.rs b/crates/ra_lsp_server/tests/heavy_tests/support.rs index c14d287ca..82ba12f87 100644 --- a/crates/ra_lsp_server/tests/heavy_tests/support.rs +++ b/crates/ra_lsp_server/tests/heavy_tests/support.rs @@ -118,7 +118,11 @@ impl Server { } fn send_request_(&self, r: RawRequest) -> Value { let id = r.id; - self.worker.as_ref().unwrap().send(RawMessage::Request(r)); + self.worker + .as_ref() + .unwrap() + .send(RawMessage::Request(r)) + .unwrap(); while let Some(msg) = self.recv() { match msg { RawMessage::Request(req) => panic!("unexpected request: {:?}", req), @@ -167,7 +171,8 @@ impl Server { self.worker .as_ref() .unwrap() - .send(RawMessage::Notification(not)); + .send(RawMessage::Notification(not)) + .unwrap(); } } @@ -185,7 +190,7 @@ impl Drop for Server { fn recv_timeout(receiver: &Receiver) -> Option { let timeout = Duration::from_secs(5); select! { - recv(receiver, msg) => msg, - recv(after(timeout)) => panic!("timed out"), + recv(receiver) -> msg => msg.ok(), + recv(after(timeout)) -> _ => panic!("timed out"), } } diff --git a/crates/ra_vfs/Cargo.toml b/crates/ra_vfs/Cargo.toml index ccea8a866..7c170cdfc 100644 --- a/crates/ra_vfs/Cargo.toml +++ b/crates/ra_vfs/Cargo.toml @@ -8,7 +8,7 @@ authors = ["Aleksey Kladov "] walkdir = "2.2.7" relative-path = "0.4.0" rustc-hash = "1.0" -crossbeam-channel = "0.2.4" +crossbeam-channel = "0.3.5" log = "0.4.6" thread_worker = { path = "../thread_worker" } diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs index 4cfdb83da..80328ad18 100644 --- a/crates/ra_vfs/src/io.rs +++ b/crates/ra_vfs/src/io.rs @@ -32,8 +32,10 @@ pub(crate) type Worker = thread_worker::Worker; pub(crate) fn start() -> (Worker, WorkerHandle) { thread_worker::spawn("vfs", 128, |input_receiver, output_sender| { input_receiver + .into_iter() .map(handle_task) - .for_each(|it| output_sender.send(it)) + .try_for_each(|it| output_sender.send(it)) + .unwrap() }) } diff --git a/crates/ra_vfs/src/lib.rs b/crates/ra_vfs/src/lib.rs index 90d5e21f4..757eac95b 100644 --- a/crates/ra_vfs/src/lib.rs +++ b/crates/ra_vfs/src/lib.rs @@ -148,7 +148,7 @@ impl Vfs { path: path.clone(), filter: Box::new(filter), }; - res.worker.inp.send(task); + res.worker.inp.send(task).unwrap(); } let roots = res.roots.iter().map(|(id, _)| id).collect(); (res, roots) diff --git a/crates/thread_worker/Cargo.toml b/crates/thread_worker/Cargo.toml index 62d66a1a3..c74b376e2 100644 --- a/crates/thread_worker/Cargo.toml +++ b/crates/thread_worker/Cargo.toml @@ -6,6 +6,6 @@ authors = ["Aleksey Kladov "] [dependencies] drop_bomb = "0.1.0" -crossbeam-channel = "0.2.4" +crossbeam-channel = "0.3.5" log = "0.4.3" diff --git a/crates/thread_worker/src/lib.rs b/crates/thread_worker/src/lib.rs index 12e8bf17e..5e46f62fe 100644 --- a/crates/thread_worker/src/lib.rs +++ b/crates/thread_worker/src/lib.rs @@ -2,7 +2,7 @@ use std::thread; -use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; +use crossbeam_channel::{bounded, unbounded, Receiver, Sender, RecvError, SendError}; use drop_bomb::DropBomb; pub struct Worker { @@ -34,10 +34,10 @@ impl Worker { self.out } - pub fn send(&self, item: I) { + pub fn send(&self, item: I) -> Result<(), SendError> { self.inp.send(item) } - pub fn recv(&self) -> Option { + pub fn recv(&self) -> Result { self.out.recv() } } -- cgit v1.2.3