aboutsummaryrefslogtreecommitdiff
path: root/crates/thread_worker/src/lib.rs
blob: 12e8bf17ed32603ab843d66a0e7f50edf3f09cf1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
//! Small utility to correctly spawn crossbeam-channel based worker threads.

use std::thread;

use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
use drop_bomb::DropBomb;

pub struct Worker<I, O> {
    pub inp: Sender<I>,
    pub out: Receiver<O>,
}

pub struct WorkerHandle {
    name: &'static str,
    thread: thread::JoinHandle<()>,
    bomb: DropBomb,
}

pub fn spawn<I, O, F>(name: &'static str, buf: usize, f: F) -> (Worker<I, O>, WorkerHandle)
where
    F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static,
    I: Send + 'static,
    O: Send + 'static,
{
    let (worker, inp_r, out_s) = worker_chan(buf);
    let watcher = WorkerHandle::spawn(name, move || f(inp_r, out_s));
    (worker, watcher)
}

impl<I, O> Worker<I, O> {
    /// Stops the worker. Returns the message receiver to fetch results which
    /// have become ready before the worker is stopped.
    pub fn shutdown(self) -> Receiver<O> {
        self.out
    }

    pub fn send(&self, item: I) {
        self.inp.send(item)
    }
    pub fn recv(&self) -> Option<O> {
        self.out.recv()
    }
}

impl WorkerHandle {
    fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> WorkerHandle {
        let thread = thread::spawn(f);
        WorkerHandle {
            name,
            thread,
            bomb: DropBomb::new(format!("WorkerHandle {} was not shutdown", name)),
        }
    }

    pub fn shutdown(mut self) -> thread::Result<()> {
        log::info!("waiting for {} to finish ...", self.name);
        let name = self.name;
        self.bomb.defuse();
        let res = self.thread.join();
        match &res {
            Ok(()) => log::info!("... {} terminated with ok", name),
            Err(_) => log::error!("... {} terminated with err", name),
        }
        res
    }
}

/// Sets up worker channels in a deadlock-avoind way.
/// If one sets both input and output buffers to a fixed size,
/// a worker might get stuck.
fn worker_chan<I, O>(buf: usize) -> (Worker<I, O>, Receiver<I>, Sender<O>) {
    let (input_sender, input_receiver) = bounded::<I>(buf);
    let (output_sender, output_receiver) = unbounded::<O>();
    (
        Worker {
            inp: input_sender,
            out: output_receiver,
        },
        input_receiver,
        output_sender,
    )
}