From 7daaddb2ac281dcad3ac99496b1cf3f06840887d Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Sat, 8 Sep 2018 13:15:01 +0300 Subject: Some abstraction around workers --- crates/server/src/main_loop/mod.rs | 28 ++++++++--------- crates/server/src/project_model.rs | 24 +++++++-------- crates/server/src/thread_watcher.rs | 31 +++++++++++++++++-- crates/server/src/vfs.rs | 34 ++++++++++----------- crates/server/tests/heavy_tests/support.rs | 49 ++++++++++++++---------------- 5 files changed, 92 insertions(+), 74 deletions(-) (limited to 'crates') diff --git a/crates/server/src/main_loop/mod.rs b/crates/server/src/main_loop/mod.rs index b7f5efbb1..f1297ee48 100644 --- a/crates/server/src/main_loop/mod.rs +++ b/crates/server/src/main_loop/mod.rs @@ -23,6 +23,7 @@ use { server_world::{ServerWorldState, ServerWorld}, main_loop::subscriptions::{Subscriptions}, project_model::{CargoWorkspace, workspace_loader}, + thread_watcher::Worker, }; #[derive(Debug)] @@ -43,8 +44,8 @@ pub fn main_loop( .build() .unwrap(); let (task_sender, task_receiver) = unbounded::(); - let ((fs_sender, fs_receiver), fs_watcher) = vfs::roots_loader(); - let ((ws_sender, ws_receiver), ws_watcher) = workspace_loader(); + let (fs_worker, fs_watcher) = vfs::roots_loader(); + let (ws_worker, ws_watcher) = workspace_loader(); info!("server initialized, serving requests"); let mut state = ServerWorldState::new(); @@ -59,10 +60,8 @@ pub fn main_loop( msg_receriver, task_sender, task_receiver.clone(), - fs_sender, - fs_receiver, - ws_sender, - ws_receiver, + fs_worker, + ws_worker, &mut state, &mut pending_requests, &mut subs, @@ -93,17 +92,15 @@ fn main_loop_inner( msg_receiver: &mut Receiver, task_sender: Sender, task_receiver: Receiver, - fs_sender: Sender, - fs_receiver: Receiver<(PathBuf, Vec)>, - ws_sender: Sender, - ws_receiver: Receiver>, + fs_worker: Worker)>, + ws_worker: Worker>, state: &mut ServerWorldState, pending_requests: &mut HashMap, subs: &mut Subscriptions, ) -> Result<()> { let (libdata_sender, libdata_receiver) = unbounded(); - ws_sender.send(ws_root.clone()); - fs_sender.send(ws_root.clone()); + ws_worker.send(ws_root.clone()); + fs_worker.send(ws_root.clone()); loop { #[derive(Debug)] enum Event { @@ -120,11 +117,11 @@ fn main_loop_inner( None => bail!("client exited without shutdown"), }, recv(task_receiver, task) => Event::Task(task.unwrap()), - recv(fs_receiver, events) => match events { + recv(fs_worker.out, events) => match events { None => bail!("roots watcher died"), Some((pb, events)) => Event::Fs(pb, events), } - recv(ws_receiver, ws) => match ws { + recv(ws_worker.out, ws) => match ws { None => bail!("workspace watcher died"), Some(ws) => Event::Ws(ws), } @@ -158,8 +155,7 @@ fn main_loop_inner( for ws in workspaces.iter() { for pkg in ws.packages().filter(|pkg| !pkg.is_member(ws)) { debug!("sending root, {}", pkg.root(ws).to_path_buf().display()); - // deadlocky :-( - fs_sender.send(pkg.root(ws).to_path_buf()); + fs_worker.send(pkg.root(ws).to_path_buf()); } } state.set_workspaces(workspaces); diff --git a/crates/server/src/project_model.rs b/crates/server/src/project_model.rs index b9d6872c8..359cf787d 100644 --- a/crates/server/src/project_model.rs +++ b/crates/server/src/project_model.rs @@ -3,12 +3,11 @@ use std::{ path::{Path, PathBuf}, }; use cargo_metadata::{metadata_run, CargoOpt}; -use crossbeam_channel::{Sender, Receiver}; use libsyntax2::SmolStr; use { Result, - thread_watcher::{ThreadWatcher, worker_chan}, + thread_watcher::{Worker, ThreadWatcher}, }; #[derive(Debug, Clone)] @@ -162,14 +161,15 @@ impl TargetKind { } } -pub fn workspace_loader() -> ((Sender, Receiver>), ThreadWatcher) { - let (interface, input_receiver, output_sender) = worker_chan::>(1); - let thread = ThreadWatcher::spawn("workspace loader", move || { - input_receiver - .into_iter() - .map(|path| CargoWorkspace::from_cargo_metadata(path.as_path())) - .for_each(|it| output_sender.send(it)) - }); - - (interface, thread) +pub fn workspace_loader() -> (Worker>, ThreadWatcher) { + Worker::>::spawn( + "workspace loader", + 1, + |input_receiver, output_sender| { + input_receiver + .into_iter() + .map(|path| CargoWorkspace::from_cargo_metadata(path.as_path())) + .for_each(|it| output_sender.send(it)) + } + ) } diff --git a/crates/server/src/thread_watcher.rs b/crates/server/src/thread_watcher.rs index 74a0a58b7..86a3a91e0 100644 --- a/crates/server/src/thread_watcher.rs +++ b/crates/server/src/thread_watcher.rs @@ -3,6 +3,33 @@ use crossbeam_channel::{bounded, unbounded, Sender, Receiver}; use drop_bomb::DropBomb; use Result; +pub struct Worker { + pub inp: Sender, + pub out: Receiver, +} + +impl Worker { + pub fn spawn(name: &'static str, buf: usize, f: F) -> (Self, ThreadWatcher) + where + F: FnOnce(Receiver, Sender) + Send + 'static, + I: Send + 'static, + O: Send + 'static, + { + let ((inp, out), inp_r, out_s) = worker_chan(buf); + let worker = Worker { inp, out }; + let watcher = ThreadWatcher::spawn(name, move || f(inp_r, out_s)); + (worker, watcher) + } + + pub fn stop(self) -> Receiver { + self.out + } + + pub fn send(&self, item: I) { + self.inp.send(item) + } +} + pub struct ThreadWatcher { name: &'static str, thread: thread::JoinHandle<()>, @@ -10,7 +37,7 @@ pub struct ThreadWatcher { } impl ThreadWatcher { - pub fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher { + fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher { let thread = thread::spawn(f); ThreadWatcher { name, @@ -36,7 +63,7 @@ impl ThreadWatcher { /// Sets up worker channels in a deadlock-avoind way. /// If one sets both input and output buffers to a fixed size, /// a worker might get stuck. -pub fn worker_chan(buf: usize) -> ((Sender, Receiver), Receiver, Sender) { +fn worker_chan(buf: usize) -> ((Sender, Receiver), Receiver, Sender) { let (input_sender, input_receiver) = bounded::(buf); let (output_sender, output_receiver) = unbounded::(); ((input_sender, output_receiver), input_receiver, output_sender) diff --git a/crates/server/src/vfs.rs b/crates/server/src/vfs.rs index c228f0b0a..a1c1783f2 100644 --- a/crates/server/src/vfs.rs +++ b/crates/server/src/vfs.rs @@ -3,11 +3,10 @@ use std::{ fs, }; -use crossbeam_channel::{Sender, Receiver}; use walkdir::WalkDir; use { - thread_watcher::{ThreadWatcher, worker_chan}, + thread_watcher::{Worker, ThreadWatcher}, }; @@ -22,22 +21,21 @@ pub enum FileEventKind { Add(String), } -pub fn roots_loader() -> ((Sender, Receiver<(PathBuf, Vec)>), ThreadWatcher) { - let (interface, input_receiver, output_sender) = - worker_chan::)>(128); - let thread = ThreadWatcher::spawn("roots loader", move || { - input_receiver - .into_iter() - .map(|path| { - debug!("loading {} ...", path.as_path().display()); - let events = load_root(path.as_path()); - debug!("... loaded {}", path.as_path().display()); - (path, events) - }) - .for_each(|it| output_sender.send(it)) - }); - - (interface, thread) +pub fn roots_loader() -> (Worker)>, ThreadWatcher) { + Worker::)>::spawn( + "roots loader", + 128, |input_receiver, output_sender| { + input_receiver + .into_iter() + .map(|path| { + debug!("loading {} ...", path.as_path().display()); + let events = load_root(path.as_path()); + debug!("... loaded {}", path.as_path().display()); + (path, events) + }) + .for_each(|it| output_sender.send(it)) + } + ) } fn load_root(path: &Path) -> Vec { diff --git a/crates/server/tests/heavy_tests/support.rs b/crates/server/tests/heavy_tests/support.rs index 2710ab59b..355914033 100644 --- a/crates/server/tests/heavy_tests/support.rs +++ b/crates/server/tests/heavy_tests/support.rs @@ -1,6 +1,5 @@ use std::{ fs, - thread, cell::{Cell, RefCell}, path::PathBuf, time::Duration, @@ -8,7 +7,7 @@ use std::{ }; use tempdir::TempDir; -use crossbeam_channel::{after, Sender, Receiver}; +use crossbeam_channel::{after, Receiver}; use flexi_logger::Logger; use languageserver_types::{ Url, @@ -22,7 +21,7 @@ use serde::Serialize; use serde_json::{Value, from_str, to_string_pretty}; use gen_lsp_server::{RawMessage, RawRequest, RawNotification}; -use m::{Result, main_loop, req, thread_watcher::worker_chan}; +use m::{main_loop, req, thread_watcher::{ThreadWatcher, Worker}}; pub fn project(fixture: &str) -> Server { static INIT: Once = Once::new(); @@ -61,28 +60,27 @@ pub struct Server { req_id: Cell, messages: RefCell>, dir: TempDir, - sender: Option>, - receiver: Receiver, - server: Option>>, + worker: Option>, + watcher: Option, } impl Server { fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server { let path = dir.path().to_path_buf(); - let ((msg_sender, msg_receiver), server) = { - let (api, mut msg_receiver, mut msg_sender) = worker_chan::(128); - let server = thread::spawn(move || { + let (worker, watcher) = Worker::::spawn( + "test server", + 128, + move |mut msg_receiver, mut msg_sender| { main_loop(true, path, &mut msg_receiver, &mut msg_sender) - }); - (api, server) - }; + .unwrap() + } + ); let res = Server { req_id: Cell::new(1), dir, messages: Default::default(), - sender: Some(msg_sender), - receiver: msg_receiver, - server: Some(server), + worker: Some(worker), + watcher: Some(watcher), }; for (path, text) in files { @@ -140,7 +138,7 @@ impl Server { fn send_request_(&self, r: RawRequest) -> Value { let id = r.id; - self.sender.as_ref() + self.worker.as_ref() .unwrap() .send(RawMessage::Request(r)); while let Some(msg) = self.recv() { @@ -183,14 +181,14 @@ impl Server { } } fn recv(&self) -> Option { - recv_timeout(&self.receiver) + recv_timeout(&self.worker.as_ref().unwrap().out) .map(|msg| { self.messages.borrow_mut().push(msg.clone()); msg }) } fn send_notification(&self, not: RawNotification) { - self.sender.as_ref() + self.worker.as_ref() .unwrap() .send(RawMessage::Notification(not)); } @@ -198,16 +196,15 @@ impl Server { impl Drop for Server { fn drop(&mut self) { - { - self.send_request::(666, ()); - drop(self.sender.take().unwrap()); - while let Some(msg) = recv_timeout(&self.receiver) { - drop(msg); - } + self.send_request::(666, ()); + let receiver = self.worker.take().unwrap().stop(); + while let Some(msg) = recv_timeout(&receiver) { + drop(msg); } - self.server.take() + self.watcher.take() .unwrap() - .join().unwrap().unwrap(); + .stop() + .unwrap(); } } -- cgit v1.2.3