From 7daaddb2ac281dcad3ac99496b1cf3f06840887d Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Sat, 8 Sep 2018 13:15:01 +0300 Subject: Some abstraction around workers --- crates/server/src/thread_watcher.rs | 31 +++++++++++++++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) (limited to 'crates/server/src/thread_watcher.rs') diff --git a/crates/server/src/thread_watcher.rs b/crates/server/src/thread_watcher.rs index 74a0a58b7..86a3a91e0 100644 --- a/crates/server/src/thread_watcher.rs +++ b/crates/server/src/thread_watcher.rs @@ -3,6 +3,33 @@ use crossbeam_channel::{bounded, unbounded, Sender, Receiver}; use drop_bomb::DropBomb; use Result; +pub struct Worker { + pub inp: Sender, + pub out: Receiver, +} + +impl Worker { + pub fn spawn(name: &'static str, buf: usize, f: F) -> (Self, ThreadWatcher) + where + F: FnOnce(Receiver, Sender) + Send + 'static, + I: Send + 'static, + O: Send + 'static, + { + let ((inp, out), inp_r, out_s) = worker_chan(buf); + let worker = Worker { inp, out }; + let watcher = ThreadWatcher::spawn(name, move || f(inp_r, out_s)); + (worker, watcher) + } + + pub fn stop(self) -> Receiver { + self.out + } + + pub fn send(&self, item: I) { + self.inp.send(item) + } +} + pub struct ThreadWatcher { name: &'static str, thread: thread::JoinHandle<()>, @@ -10,7 +37,7 @@ pub struct ThreadWatcher { } impl ThreadWatcher { - pub fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher { + fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher { let thread = thread::spawn(f); ThreadWatcher { name, @@ -36,7 +63,7 @@ impl ThreadWatcher { /// Sets up worker channels in a deadlock-avoind way. /// If one sets both input and output buffers to a fixed size, /// a worker might get stuck. -pub fn worker_chan(buf: usize) -> ((Sender, Receiver), Receiver, Sender) { +fn worker_chan(buf: usize) -> ((Sender, Receiver), Receiver, Sender) { let (input_sender, input_receiver) = bounded::(buf); let (output_sender, output_receiver) = unbounded::(); ((input_sender, output_receiver), input_receiver, output_sender) -- cgit v1.2.3