1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
|
use crate::io;
use crossbeam_channel::Sender;
use drop_bomb::DropBomb;
use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher};
use std::{
path::{Path, PathBuf},
sync::mpsc,
thread,
time::Duration,
};
use walkdir::{DirEntry, WalkDir};
pub(crate) struct Watcher {
watcher: RecommendedWatcher,
thread: thread::JoinHandle<()>,
bomb: DropBomb,
sender: Sender<io::Task>,
}
#[derive(Debug)]
pub enum WatcherChange {
Create(PathBuf),
Write(PathBuf),
Remove(PathBuf),
Rescan,
}
fn handle_change_event(
ev: DebouncedEvent,
sender: &Sender<io::Task>,
) -> Result<(), Box<std::error::Error>> {
match ev {
DebouncedEvent::NoticeWrite(_)
| DebouncedEvent::NoticeRemove(_)
| DebouncedEvent::Chmod(_) => {
// ignore
}
DebouncedEvent::Rescan => {
sender.send(io::Task::HandleChange(WatcherChange::Rescan))?;
}
DebouncedEvent::Create(path) => {
sender.send(io::Task::HandleChange(WatcherChange::Create(path)))?;
}
DebouncedEvent::Write(path) => {
sender.send(io::Task::HandleChange(WatcherChange::Write(path)))?;
}
DebouncedEvent::Remove(path) => {
sender.send(io::Task::HandleChange(WatcherChange::Remove(path)))?;
}
DebouncedEvent::Rename(src, dst) => {
sender.send(io::Task::HandleChange(WatcherChange::Remove(src)))?;
sender.send(io::Task::HandleChange(WatcherChange::Create(dst)))?;
}
DebouncedEvent::Error(err, path) => {
// TODO should we reload the file contents?
log::warn!("watcher error \"{}\", {:?}", err, path);
}
}
Ok(())
}
const WATCHER_DELAY: Duration = Duration::from_millis(250);
impl Watcher {
pub(crate) fn start(
output_sender: Sender<io::Task>,
) -> Result<Watcher, Box<std::error::Error>> {
let (input_sender, input_receiver) = mpsc::channel();
let watcher = notify::watcher(input_sender, WATCHER_DELAY)?;
let sender = output_sender.clone();
let thread = thread::spawn(move || {
input_receiver
.into_iter()
// forward relevant events only
.try_for_each(|change| handle_change_event(change, &output_sender))
.unwrap()
});
Ok(Watcher {
watcher,
thread,
sender,
bomb: DropBomb::new(format!("Watcher was not shutdown")),
})
}
pub fn watch_recursive(
&mut self,
dir: &Path,
filter_entry: impl Fn(&DirEntry) -> bool,
emit_for_contents: bool,
) {
for res in WalkDir::new(dir).into_iter().filter_entry(filter_entry) {
match res {
Ok(entry) => {
if entry.path().is_dir() {
match self.watcher.watch(dir, RecursiveMode::NonRecursive) {
Ok(()) => log::debug!("watching \"{}\"", dir.display()),
Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e),
}
}
if emit_for_contents && entry.depth() > 0 {
// emit as create because we haven't seen it yet
if let Err(e) =
self.sender
.send(io::Task::HandleChange(WatcherChange::Create(
entry.path().to_path_buf(),
)))
{
log::warn!("watcher error: {}", e)
}
}
}
Err(e) => log::warn!("watcher error: {}", e),
}
}
}
pub fn shutdown(mut self) -> thread::Result<()> {
self.bomb.defuse();
drop(self.watcher);
let res = self.thread.join();
match &res {
Ok(()) => log::info!("... Watcher terminated with ok"),
Err(_) => log::error!("... Watcher terminated with err"),
}
res
}
}
|