From 7daaddb2ac281dcad3ac99496b1cf3f06840887d Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Sat, 8 Sep 2018 13:15:01 +0300 Subject: Some abstraction around workers --- crates/server/src/main_loop/mod.rs | 28 ++++++++++++---------------- crates/server/src/project_model.rs | 24 ++++++++++++------------ crates/server/src/thread_watcher.rs | 31 +++++++++++++++++++++++++++++-- crates/server/src/vfs.rs | 34 ++++++++++++++++------------------ 4 files changed, 69 insertions(+), 48 deletions(-) (limited to 'crates/server/src') diff --git a/crates/server/src/main_loop/mod.rs b/crates/server/src/main_loop/mod.rs index b7f5efbb1..f1297ee48 100644 --- a/crates/server/src/main_loop/mod.rs +++ b/crates/server/src/main_loop/mod.rs @@ -23,6 +23,7 @@ use { server_world::{ServerWorldState, ServerWorld}, main_loop::subscriptions::{Subscriptions}, project_model::{CargoWorkspace, workspace_loader}, + thread_watcher::Worker, }; #[derive(Debug)] @@ -43,8 +44,8 @@ pub fn main_loop( .build() .unwrap(); let (task_sender, task_receiver) = unbounded::(); - let ((fs_sender, fs_receiver), fs_watcher) = vfs::roots_loader(); - let ((ws_sender, ws_receiver), ws_watcher) = workspace_loader(); + let (fs_worker, fs_watcher) = vfs::roots_loader(); + let (ws_worker, ws_watcher) = workspace_loader(); info!("server initialized, serving requests"); let mut state = ServerWorldState::new(); @@ -59,10 +60,8 @@ pub fn main_loop( msg_receriver, task_sender, task_receiver.clone(), - fs_sender, - fs_receiver, - ws_sender, - ws_receiver, + fs_worker, + ws_worker, &mut state, &mut pending_requests, &mut subs, @@ -93,17 +92,15 @@ fn main_loop_inner( msg_receiver: &mut Receiver, task_sender: Sender, task_receiver: Receiver, - fs_sender: Sender, - fs_receiver: Receiver<(PathBuf, Vec)>, - ws_sender: Sender, - ws_receiver: Receiver>, + fs_worker: Worker)>, + ws_worker: Worker>, state: &mut ServerWorldState, pending_requests: &mut HashMap, subs: &mut Subscriptions, ) -> Result<()> { let (libdata_sender, libdata_receiver) = unbounded(); - ws_sender.send(ws_root.clone()); - fs_sender.send(ws_root.clone()); + ws_worker.send(ws_root.clone()); + fs_worker.send(ws_root.clone()); loop { #[derive(Debug)] enum Event { @@ -120,11 +117,11 @@ fn main_loop_inner( None => bail!("client exited without shutdown"), }, recv(task_receiver, task) => Event::Task(task.unwrap()), - recv(fs_receiver, events) => match events { + recv(fs_worker.out, events) => match events { None => bail!("roots watcher died"), Some((pb, events)) => Event::Fs(pb, events), } - recv(ws_receiver, ws) => match ws { + recv(ws_worker.out, ws) => match ws { None => bail!("workspace watcher died"), Some(ws) => Event::Ws(ws), } @@ -158,8 +155,7 @@ fn main_loop_inner( for ws in workspaces.iter() { for pkg in ws.packages().filter(|pkg| !pkg.is_member(ws)) { debug!("sending root, {}", pkg.root(ws).to_path_buf().display()); - // deadlocky :-( - fs_sender.send(pkg.root(ws).to_path_buf()); + fs_worker.send(pkg.root(ws).to_path_buf()); } } state.set_workspaces(workspaces); diff --git a/crates/server/src/project_model.rs b/crates/server/src/project_model.rs index b9d6872c8..359cf787d 100644 --- a/crates/server/src/project_model.rs +++ b/crates/server/src/project_model.rs @@ -3,12 +3,11 @@ use std::{ path::{Path, PathBuf}, }; use cargo_metadata::{metadata_run, CargoOpt}; -use crossbeam_channel::{Sender, Receiver}; use libsyntax2::SmolStr; use { Result, - thread_watcher::{ThreadWatcher, worker_chan}, + thread_watcher::{Worker, ThreadWatcher}, }; #[derive(Debug, Clone)] @@ -162,14 +161,15 @@ impl TargetKind { } } -pub fn workspace_loader() -> ((Sender, Receiver>), ThreadWatcher) { - let (interface, input_receiver, output_sender) = worker_chan::>(1); - let thread = ThreadWatcher::spawn("workspace loader", move || { - input_receiver - .into_iter() - .map(|path| CargoWorkspace::from_cargo_metadata(path.as_path())) - .for_each(|it| output_sender.send(it)) - }); - - (interface, thread) +pub fn workspace_loader() -> (Worker>, ThreadWatcher) { + Worker::>::spawn( + "workspace loader", + 1, + |input_receiver, output_sender| { + input_receiver + .into_iter() + .map(|path| CargoWorkspace::from_cargo_metadata(path.as_path())) + .for_each(|it| output_sender.send(it)) + } + ) } diff --git a/crates/server/src/thread_watcher.rs b/crates/server/src/thread_watcher.rs index 74a0a58b7..86a3a91e0 100644 --- a/crates/server/src/thread_watcher.rs +++ b/crates/server/src/thread_watcher.rs @@ -3,6 +3,33 @@ use crossbeam_channel::{bounded, unbounded, Sender, Receiver}; use drop_bomb::DropBomb; use Result; +pub struct Worker { + pub inp: Sender, + pub out: Receiver, +} + +impl Worker { + pub fn spawn(name: &'static str, buf: usize, f: F) -> (Self, ThreadWatcher) + where + F: FnOnce(Receiver, Sender) + Send + 'static, + I: Send + 'static, + O: Send + 'static, + { + let ((inp, out), inp_r, out_s) = worker_chan(buf); + let worker = Worker { inp, out }; + let watcher = ThreadWatcher::spawn(name, move || f(inp_r, out_s)); + (worker, watcher) + } + + pub fn stop(self) -> Receiver { + self.out + } + + pub fn send(&self, item: I) { + self.inp.send(item) + } +} + pub struct ThreadWatcher { name: &'static str, thread: thread::JoinHandle<()>, @@ -10,7 +37,7 @@ pub struct ThreadWatcher { } impl ThreadWatcher { - pub fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher { + fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher { let thread = thread::spawn(f); ThreadWatcher { name, @@ -36,7 +63,7 @@ impl ThreadWatcher { /// Sets up worker channels in a deadlock-avoind way. /// If one sets both input and output buffers to a fixed size, /// a worker might get stuck. -pub fn worker_chan(buf: usize) -> ((Sender, Receiver), Receiver, Sender) { +fn worker_chan(buf: usize) -> ((Sender, Receiver), Receiver, Sender) { let (input_sender, input_receiver) = bounded::(buf); let (output_sender, output_receiver) = unbounded::(); ((input_sender, output_receiver), input_receiver, output_sender) diff --git a/crates/server/src/vfs.rs b/crates/server/src/vfs.rs index c228f0b0a..a1c1783f2 100644 --- a/crates/server/src/vfs.rs +++ b/crates/server/src/vfs.rs @@ -3,11 +3,10 @@ use std::{ fs, }; -use crossbeam_channel::{Sender, Receiver}; use walkdir::WalkDir; use { - thread_watcher::{ThreadWatcher, worker_chan}, + thread_watcher::{Worker, ThreadWatcher}, }; @@ -22,22 +21,21 @@ pub enum FileEventKind { Add(String), } -pub fn roots_loader() -> ((Sender, Receiver<(PathBuf, Vec)>), ThreadWatcher) { - let (interface, input_receiver, output_sender) = - worker_chan::)>(128); - let thread = ThreadWatcher::spawn("roots loader", move || { - input_receiver - .into_iter() - .map(|path| { - debug!("loading {} ...", path.as_path().display()); - let events = load_root(path.as_path()); - debug!("... loaded {}", path.as_path().display()); - (path, events) - }) - .for_each(|it| output_sender.send(it)) - }); - - (interface, thread) +pub fn roots_loader() -> (Worker)>, ThreadWatcher) { + Worker::)>::spawn( + "roots loader", + 128, |input_receiver, output_sender| { + input_receiver + .into_iter() + .map(|path| { + debug!("loading {} ...", path.as_path().display()); + let events = load_root(path.as_path()); + debug!("... loaded {}", path.as_path().display()); + (path, events) + }) + .for_each(|it| output_sender.send(it)) + } + ) } fn load_root(path: &Path) -> Vec { -- cgit v1.2.3