From 193992fd14e88d91a3695f10204232d4c81192dc Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Tue, 18 Dec 2018 12:45:20 +0300 Subject: move thread worker to a separate crate --- crates/thread_worker/Cargo.toml | 11 ++++++ crates/thread_worker/src/lib.rs | 79 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+) create mode 100644 crates/thread_worker/Cargo.toml create mode 100644 crates/thread_worker/src/lib.rs (limited to 'crates/thread_worker') diff --git a/crates/thread_worker/Cargo.toml b/crates/thread_worker/Cargo.toml new file mode 100644 index 000000000..62d66a1a3 --- /dev/null +++ b/crates/thread_worker/Cargo.toml @@ -0,0 +1,11 @@ +[package] +edition = "2018" +name = "thread_worker" +version = "0.1.0" +authors = ["Aleksey Kladov "] + +[dependencies] +drop_bomb = "0.1.0" +crossbeam-channel = "0.2.4" +log = "0.4.3" + diff --git a/crates/thread_worker/src/lib.rs b/crates/thread_worker/src/lib.rs new file mode 100644 index 000000000..e558559ef --- /dev/null +++ b/crates/thread_worker/src/lib.rs @@ -0,0 +1,79 @@ +//! Small utility to correctly spawn crossbeam-channel based worker threads. + +use std::thread; + +use crossbeam_channel::{bounded, unbounded, Receiver, Sender}; +use drop_bomb::DropBomb; + +pub struct Worker { + pub inp: Sender, + pub out: Receiver, +} + +pub struct WorkerHandle { + name: &'static str, + thread: thread::JoinHandle<()>, + bomb: DropBomb, +} + +pub fn spawn(name: &'static str, buf: usize, f: F) -> (Worker, WorkerHandle) +where + F: FnOnce(Receiver, Sender) + Send + 'static, + I: Send + 'static, + O: Send + 'static, +{ + let (worker, inp_r, out_s) = worker_chan(buf); + let watcher = WorkerHandle::spawn(name, move || f(inp_r, out_s)); + (worker, watcher) +} + +impl Worker { + /// Stops the worker. Returns the message receiver to fetch results which + /// have become ready before the worker is stopped. + pub fn stop(self) -> Receiver { + self.out + } + + pub fn send(&self, item: I) { + self.inp.send(item) + } +} + +impl WorkerHandle { + fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> WorkerHandle { + let thread = thread::spawn(f); + WorkerHandle { + name, + thread, + bomb: DropBomb::new(format!("WorkerHandle {} was not stopped", name)), + } + } + + pub fn stop(mut self) -> thread::Result<()> { + log::info!("waiting for {} to finish ...", self.name); + let name = self.name; + self.bomb.defuse(); + let res = self.thread.join(); + match &res { + Ok(()) => log::info!("... {} terminated with ok", name), + Err(_) => log::error!("... {} terminated with err", name), + } + res + } +} + +/// Sets up worker channels in a deadlock-avoind way. +/// If one sets both input and output buffers to a fixed size, +/// a worker might get stuck. +fn worker_chan(buf: usize) -> (Worker, Receiver, Sender) { + let (input_sender, input_receiver) = bounded::(buf); + let (output_sender, output_receiver) = unbounded::(); + ( + Worker { + inp: input_sender, + out: output_receiver, + }, + input_receiver, + output_sender, + ) +} -- cgit v1.2.3