aboutsummaryrefslogtreecommitdiff
path: root/crates
diff options
context:
space:
mode:
authorAleksey Kladov <[email protected]>2018-12-30 20:23:31 +0000
committerAleksey Kladov <[email protected]>2018-12-30 20:23:31 +0000
commitc2c10b9014549e9c0783fb13dc202dfab6e6fd0a (patch)
treed4b2a47910df5effb14271ab72a1602b5f77ee81 /crates
parenteffc1eae8be338ea949058cc89c39950c25858c5 (diff)
:arrow_up: crossbeam
closes #189
Diffstat (limited to 'crates')
-rw-r--r--crates/ra_lsp_server/Cargo.toml2
-rw-r--r--crates/ra_lsp_server/src/main_loop.rs48
-rw-r--r--crates/ra_lsp_server/src/project_model.rs4
-rw-r--r--crates/ra_lsp_server/tests/heavy_tests/support.rs13
-rw-r--r--crates/ra_vfs/Cargo.toml2
-rw-r--r--crates/ra_vfs/src/io.rs4
-rw-r--r--crates/ra_vfs/src/lib.rs2
-rw-r--r--crates/thread_worker/Cargo.toml2
-rw-r--r--crates/thread_worker/src/lib.rs6
9 files changed, 48 insertions, 35 deletions
diff --git a/crates/ra_lsp_server/Cargo.toml b/crates/ra_lsp_server/Cargo.toml
index 646df2497..b1e8987fe 100644
--- a/crates/ra_lsp_server/Cargo.toml
+++ b/crates/ra_lsp_server/Cargo.toml
@@ -13,7 +13,7 @@ failure_derive = "0.1.2"
13serde_json = "1.0.24" 13serde_json = "1.0.24"
14serde = "1.0.83" 14serde = "1.0.83"
15drop_bomb = "0.1.0" 15drop_bomb = "0.1.0"
16crossbeam-channel = "0.2.4" 16crossbeam-channel = "0.3.5"
17flexi_logger = "0.10.0" 17flexi_logger = "0.10.0"
18log = "0.4.3" 18log = "0.4.3"
19url_serde = "0.2.0" 19url_serde = "0.2.0"
diff --git a/crates/ra_lsp_server/src/main_loop.rs b/crates/ra_lsp_server/src/main_loop.rs
index 97c1be778..3ebae4ecd 100644
--- a/crates/ra_lsp_server/src/main_loop.rs
+++ b/crates/ra_lsp_server/src/main_loop.rs
@@ -7,7 +7,7 @@ use std::{
7 sync::Arc, 7 sync::Arc,
8}; 8};
9 9
10use crossbeam_channel::{unbounded, select, Receiver, Sender}; 10use crossbeam_channel::{unbounded, select, Receiver, Sender, RecvError};
11use gen_lsp_server::{ 11use gen_lsp_server::{
12 handle_shutdown, ErrorCode, RawMessage, RawNotification, RawRequest, RawResponse, 12 handle_shutdown, ErrorCode, RawMessage, RawNotification, RawRequest, RawResponse,
13}; 13};
@@ -62,7 +62,7 @@ pub fn main_loop(
62 let (task_sender, task_receiver) = unbounded::<Task>(); 62 let (task_sender, task_receiver) = unbounded::<Task>();
63 let (ws_worker, ws_watcher) = workspace_loader(); 63 let (ws_worker, ws_watcher) = workspace_loader();
64 64
65 ws_worker.send(ws_root.clone()); 65 ws_worker.send(ws_root.clone()).unwrap();
66 // FIXME: support dynamic workspace loading. 66 // FIXME: support dynamic workspace loading.
67 let workspaces = match ws_worker.recv().unwrap() { 67 let workspaces = match ws_worker.recv().unwrap() {
68 Ok(ws) => vec![ws], 68 Ok(ws) => vec![ws],
@@ -95,7 +95,9 @@ pub fn main_loop(
95 ); 95 );
96 96
97 log::info!("waiting for tasks to finish..."); 97 log::info!("waiting for tasks to finish...");
98 task_receiver.for_each(|task| on_task(task, msg_sender, &mut pending_requests)); 98 task_receiver
99 .into_iter()
100 .for_each(|task| on_task(task, msg_sender, &mut pending_requests));
99 log::info!("...tasks have finished"); 101 log::info!("...tasks have finished");
100 log::info!("joining threadpool..."); 102 log::info!("joining threadpool...");
101 drop(pool); 103 drop(pool);
@@ -170,16 +172,16 @@ fn main_loop_inner(
170 loop { 172 loop {
171 log::trace!("selecting"); 173 log::trace!("selecting");
172 let event = select! { 174 let event = select! {
173 recv(msg_receiver, msg) => match msg { 175 recv(msg_receiver) -> msg => match msg {
174 Some(msg) => Event::Msg(msg), 176 Ok(msg) => Event::Msg(msg),
175 None => bail!("client exited without shutdown"), 177 Err(RecvError) => bail!("client exited without shutdown"),
176 }, 178 },
177 recv(task_receiver, task) => Event::Task(task.unwrap()), 179 recv(task_receiver) -> task => Event::Task(task.unwrap()),
178 recv(state.vfs.read().task_receiver(), task) => match task { 180 recv(state.vfs.read().task_receiver()) -> task => match task {
179 None => bail!("vfs died"), 181 Ok(task) => Event::Vfs(task),
180 Some(task) => Event::Vfs(task), 182 Err(RecvError) => bail!("vfs died"),
181 } 183 },
182 recv(libdata_receiver, data) => Event::Lib(data.unwrap()) 184 recv(libdata_receiver) -> data => Event::Lib(data.unwrap())
183 }; 185 };
184 log::info!("loop_turn = {:?}", event); 186 log::info!("loop_turn = {:?}", event);
185 let start = std::time::Instant::now(); 187 let start = std::time::Instant::now();
@@ -209,7 +211,7 @@ fn main_loop_inner(
209 ErrorCode::MethodNotFound as i32, 211 ErrorCode::MethodNotFound as i32,
210 "unknown request".to_string(), 212 "unknown request".to_string(),
211 ); 213 );
212 msg_sender.send(RawMessage::Response(resp)) 214 msg_sender.send(RawMessage::Response(resp)).unwrap()
213 } 215 }
214 } 216 }
215 } 217 }
@@ -229,7 +231,7 @@ fn main_loop_inner(
229 log::info!("indexing {:?} ... ", root); 231 log::info!("indexing {:?} ... ", root);
230 let data = LibraryData::prepare(root, files); 232 let data = LibraryData::prepare(root, files);
231 log::info!("indexed {:?} {:?}", start.elapsed(), root); 233 log::info!("indexed {:?} {:?}", start.elapsed(), root);
232 sender.send(data); 234 sender.send(data).unwrap();
233 }); 235 });
234 } 236 }
235 if state.roots_to_scan == 0 { 237 if state.roots_to_scan == 0 {
@@ -253,10 +255,12 @@ fn on_task(task: Task, msg_sender: &Sender<RawMessage>, pending_requests: &mut F
253 match task { 255 match task {
254 Task::Respond(response) => { 256 Task::Respond(response) => {
255 if pending_requests.remove(&response.id) { 257 if pending_requests.remove(&response.id) {
256 msg_sender.send(RawMessage::Response(response)) 258 msg_sender.send(RawMessage::Response(response)).unwrap();
257 } 259 }
258 } 260 }
259 Task::Notify(n) => msg_sender.send(RawMessage::Notification(n)), 261 Task::Notify(n) => {
262 msg_sender.send(RawMessage::Notification(n)).unwrap();
263 }
260 } 264 }
261} 265}
262 266
@@ -328,7 +332,7 @@ fn on_notification(
328 ErrorCode::RequestCancelled as i32, 332 ErrorCode::RequestCancelled as i32,
329 "canceled by client".to_string(), 333 "canceled by client".to_string(),
330 ); 334 );
331 msg_sender.send(RawMessage::Response(response)) 335 msg_sender.send(RawMessage::Response(response)).unwrap()
332 } 336 }
333 return Ok(()); 337 return Ok(());
334 } 338 }
@@ -381,7 +385,7 @@ fn on_notification(
381 diagnostics: Vec::new(), 385 diagnostics: Vec::new(),
382 }; 386 };
383 let not = RawNotification::new::<req::PublishDiagnostics>(&params); 387 let not = RawNotification::new::<req::PublishDiagnostics>(&params);
384 msg_sender.send(RawMessage::Notification(not)); 388 msg_sender.send(RawMessage::Notification(not)).unwrap();
385 return Ok(()); 389 return Ok(());
386 } 390 }
387 Err(not) => not, 391 Err(not) => not,
@@ -441,7 +445,7 @@ impl<'a> PoolDispatcher<'a> {
441 }, 445 },
442 }; 446 };
443 let task = Task::Respond(resp); 447 let task = Task::Respond(resp);
444 sender.send(task); 448 sender.send(task).unwrap();
445 }); 449 });
446 self.res = Some(id); 450 self.res = Some(id);
447 } 451 }
@@ -476,7 +480,7 @@ fn update_file_notifications_on_threadpool(
476 } 480 }
477 Ok(params) => { 481 Ok(params) => {
478 let not = RawNotification::new::<req::PublishDiagnostics>(&params); 482 let not = RawNotification::new::<req::PublishDiagnostics>(&params);
479 sender.send(Task::Notify(not)); 483 sender.send(Task::Notify(not)).unwrap();
480 } 484 }
481 } 485 }
482 if publish_decorations { 486 if publish_decorations {
@@ -488,7 +492,7 @@ fn update_file_notifications_on_threadpool(
488 } 492 }
489 Ok(params) => { 493 Ok(params) => {
490 let not = RawNotification::new::<req::PublishDecorations>(&params); 494 let not = RawNotification::new::<req::PublishDecorations>(&params);
491 sender.send(Task::Notify(not)) 495 sender.send(Task::Notify(not)).unwrap();
492 } 496 }
493 } 497 }
494 } 498 }
@@ -501,7 +505,7 @@ fn feedback(intrnal_mode: bool, msg: &str, sender: &Sender<RawMessage>) {
501 return; 505 return;
502 } 506 }
503 let not = RawNotification::new::<req::InternalFeedback>(&msg.to_string()); 507 let not = RawNotification::new::<req::InternalFeedback>(&msg.to_string());
504 sender.send(RawMessage::Notification(not)); 508 sender.send(RawMessage::Notification(not)).unwrap();
505} 509}
506 510
507fn is_canceled(e: &failure::Error) -> bool { 511fn is_canceled(e: &failure::Error) -> bool {
diff --git a/crates/ra_lsp_server/src/project_model.rs b/crates/ra_lsp_server/src/project_model.rs
index 5852a157d..ae2149463 100644
--- a/crates/ra_lsp_server/src/project_model.rs
+++ b/crates/ra_lsp_server/src/project_model.rs
@@ -204,8 +204,10 @@ pub fn workspace_loader() -> (Worker<PathBuf, Result<CargoWorkspace>>, WorkerHan
204 1, 204 1,
205 |input_receiver, output_sender| { 205 |input_receiver, output_sender| {
206 input_receiver 206 input_receiver
207 .into_iter()
207 .map(|path| CargoWorkspace::from_cargo_metadata(path.as_path())) 208 .map(|path| CargoWorkspace::from_cargo_metadata(path.as_path()))
208 .for_each(|it| output_sender.send(it)) 209 .try_for_each(|it| output_sender.send(it))
210 .unwrap()
209 }, 211 },
210 ) 212 )
211} 213}
diff --git a/crates/ra_lsp_server/tests/heavy_tests/support.rs b/crates/ra_lsp_server/tests/heavy_tests/support.rs
index c14d287ca..82ba12f87 100644
--- a/crates/ra_lsp_server/tests/heavy_tests/support.rs
+++ b/crates/ra_lsp_server/tests/heavy_tests/support.rs
@@ -118,7 +118,11 @@ impl Server {
118 } 118 }
119 fn send_request_(&self, r: RawRequest) -> Value { 119 fn send_request_(&self, r: RawRequest) -> Value {
120 let id = r.id; 120 let id = r.id;
121 self.worker.as_ref().unwrap().send(RawMessage::Request(r)); 121 self.worker
122 .as_ref()
123 .unwrap()
124 .send(RawMessage::Request(r))
125 .unwrap();
122 while let Some(msg) = self.recv() { 126 while let Some(msg) = self.recv() {
123 match msg { 127 match msg {
124 RawMessage::Request(req) => panic!("unexpected request: {:?}", req), 128 RawMessage::Request(req) => panic!("unexpected request: {:?}", req),
@@ -167,7 +171,8 @@ impl Server {
167 self.worker 171 self.worker
168 .as_ref() 172 .as_ref()
169 .unwrap() 173 .unwrap()
170 .send(RawMessage::Notification(not)); 174 .send(RawMessage::Notification(not))
175 .unwrap();
171 } 176 }
172} 177}
173 178
@@ -185,7 +190,7 @@ impl Drop for Server {
185fn recv_timeout(receiver: &Receiver<RawMessage>) -> Option<RawMessage> { 190fn recv_timeout(receiver: &Receiver<RawMessage>) -> Option<RawMessage> {
186 let timeout = Duration::from_secs(5); 191 let timeout = Duration::from_secs(5);
187 select! { 192 select! {
188 recv(receiver, msg) => msg, 193 recv(receiver) -> msg => msg.ok(),
189 recv(after(timeout)) => panic!("timed out"), 194 recv(after(timeout)) -> _ => panic!("timed out"),
190 } 195 }
191} 196}
diff --git a/crates/ra_vfs/Cargo.toml b/crates/ra_vfs/Cargo.toml
index ccea8a866..7c170cdfc 100644
--- a/crates/ra_vfs/Cargo.toml
+++ b/crates/ra_vfs/Cargo.toml
@@ -8,7 +8,7 @@ authors = ["Aleksey Kladov <[email protected]>"]
8walkdir = "2.2.7" 8walkdir = "2.2.7"
9relative-path = "0.4.0" 9relative-path = "0.4.0"
10rustc-hash = "1.0" 10rustc-hash = "1.0"
11crossbeam-channel = "0.2.4" 11crossbeam-channel = "0.3.5"
12log = "0.4.6" 12log = "0.4.6"
13 13
14thread_worker = { path = "../thread_worker" } 14thread_worker = { path = "../thread_worker" }
diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs
index 4cfdb83da..80328ad18 100644
--- a/crates/ra_vfs/src/io.rs
+++ b/crates/ra_vfs/src/io.rs
@@ -32,8 +32,10 @@ pub(crate) type Worker = thread_worker::Worker<Task, TaskResult>;
32pub(crate) fn start() -> (Worker, WorkerHandle) { 32pub(crate) fn start() -> (Worker, WorkerHandle) {
33 thread_worker::spawn("vfs", 128, |input_receiver, output_sender| { 33 thread_worker::spawn("vfs", 128, |input_receiver, output_sender| {
34 input_receiver 34 input_receiver
35 .into_iter()
35 .map(handle_task) 36 .map(handle_task)
36 .for_each(|it| output_sender.send(it)) 37 .try_for_each(|it| output_sender.send(it))
38 .unwrap()
37 }) 39 })
38} 40}
39 41
diff --git a/crates/ra_vfs/src/lib.rs b/crates/ra_vfs/src/lib.rs
index 90d5e21f4..757eac95b 100644
--- a/crates/ra_vfs/src/lib.rs
+++ b/crates/ra_vfs/src/lib.rs
@@ -148,7 +148,7 @@ impl Vfs {
148 path: path.clone(), 148 path: path.clone(),
149 filter: Box::new(filter), 149 filter: Box::new(filter),
150 }; 150 };
151 res.worker.inp.send(task); 151 res.worker.inp.send(task).unwrap();
152 } 152 }
153 let roots = res.roots.iter().map(|(id, _)| id).collect(); 153 let roots = res.roots.iter().map(|(id, _)| id).collect();
154 (res, roots) 154 (res, roots)
diff --git a/crates/thread_worker/Cargo.toml b/crates/thread_worker/Cargo.toml
index 62d66a1a3..c74b376e2 100644
--- a/crates/thread_worker/Cargo.toml
+++ b/crates/thread_worker/Cargo.toml
@@ -6,6 +6,6 @@ authors = ["Aleksey Kladov <[email protected]>"]
6 6
7[dependencies] 7[dependencies]
8drop_bomb = "0.1.0" 8drop_bomb = "0.1.0"
9crossbeam-channel = "0.2.4" 9crossbeam-channel = "0.3.5"
10log = "0.4.3" 10log = "0.4.3"
11 11
diff --git a/crates/thread_worker/src/lib.rs b/crates/thread_worker/src/lib.rs
index 12e8bf17e..5e46f62fe 100644
--- a/crates/thread_worker/src/lib.rs
+++ b/crates/thread_worker/src/lib.rs
@@ -2,7 +2,7 @@
2 2
3use std::thread; 3use std::thread;
4 4
5use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; 5use crossbeam_channel::{bounded, unbounded, Receiver, Sender, RecvError, SendError};
6use drop_bomb::DropBomb; 6use drop_bomb::DropBomb;
7 7
8pub struct Worker<I, O> { 8pub struct Worker<I, O> {
@@ -34,10 +34,10 @@ impl<I, O> Worker<I, O> {
34 self.out 34 self.out
35 } 35 }
36 36
37 pub fn send(&self, item: I) { 37 pub fn send(&self, item: I) -> Result<(), SendError<I>> {
38 self.inp.send(item) 38 self.inp.send(item)
39 } 39 }
40 pub fn recv(&self) -> Option<O> { 40 pub fn recv(&self) -> Result<O, RecvError> {
41 self.out.recv() 41 self.out.recv()
42 } 42 }
43} 43}