aboutsummaryrefslogtreecommitdiff
path: root/crates/ra_vfs/src
diff options
context:
space:
mode:
Diffstat (limited to 'crates/ra_vfs/src')
-rw-r--r--crates/ra_vfs/src/io.rs105
-rw-r--r--crates/ra_vfs/src/lib.rs100
-rw-r--r--crates/ra_vfs/src/watcher.rs120
3 files changed, 187 insertions, 138 deletions
diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs
index e5d5c6463..25acec9b1 100644
--- a/crates/ra_vfs/src/io.rs
+++ b/crates/ra_vfs/src/io.rs
@@ -1,13 +1,20 @@
1use std::{ 1use std::{
2 fmt, fs, 2 fmt, fs,
3 path::{Path, PathBuf}, 3 path::{Path, PathBuf},
4 sync::Arc,
5 thread,
4}; 6};
5 7
8use crossbeam_channel::{Receiver, Sender};
9use parking_lot::Mutex;
6use relative_path::RelativePathBuf; 10use relative_path::RelativePathBuf;
7use thread_worker::WorkerHandle; 11use thread_worker::WorkerHandle;
8use walkdir::{DirEntry, WalkDir}; 12use walkdir::{DirEntry, WalkDir};
9 13
10use crate::{has_rs_extension, watcher::WatcherChange, VfsRoot}; 14use crate::{
15 watcher::{Watcher, WatcherChange},
16 VfsRoot,
17};
11 18
12pub(crate) enum Task { 19pub(crate) enum Task {
13 AddRoot { 20 AddRoot {
@@ -17,6 +24,10 @@ pub(crate) enum Task {
17 }, 24 },
18 HandleChange(WatcherChange), 25 HandleChange(WatcherChange),
19 LoadChange(WatcherChange), 26 LoadChange(WatcherChange),
27 Watch {
28 dir: PathBuf,
29 filter: Box<Fn(&DirEntry) -> bool + Send>,
30 },
20} 31}
21 32
22#[derive(Debug)] 33#[derive(Debug)]
@@ -35,7 +46,8 @@ pub enum WatcherChangeData {
35pub enum TaskResult { 46pub enum TaskResult {
36 AddRoot(AddRootResult), 47 AddRoot(AddRootResult),
37 HandleChange(WatcherChange), 48 HandleChange(WatcherChange),
38 LoadChange(Option<WatcherChangeData>), 49 LoadChange(WatcherChangeData),
50 NoOp,
39} 51}
40 52
41impl fmt::Debug for TaskResult { 53impl fmt::Debug for TaskResult {
@@ -44,21 +56,74 @@ impl fmt::Debug for TaskResult {
44 } 56 }
45} 57}
46 58
47pub(crate) type Worker = thread_worker::Worker<Task, TaskResult>; 59pub(crate) struct Worker {
60 worker: thread_worker::Worker<Task, TaskResult>,
61 worker_handle: WorkerHandle,
62 watcher: Arc<Mutex<Option<Watcher>>>,
63}
64
65impl Worker {
66 pub(crate) fn start() -> Worker {
67 let watcher = Arc::new(Mutex::new(None));
68 let watcher_clone = watcher.clone();
69 let (worker, worker_handle) =
70 thread_worker::spawn("vfs", 128, move |input_receiver, output_sender| {
71 let res = input_receiver
72 .into_iter()
73 .map(|t| handle_task(t, &watcher_clone))
74 .try_for_each(|it| output_sender.send(it));
75 res.unwrap()
76 });
77 match Watcher::start(worker.inp.clone()) {
78 Ok(w) => {
79 watcher.lock().replace(w);
80 }
81 Err(e) => log::error!("could not start watcher: {}", e),
82 };
83 Worker {
84 worker,
85 worker_handle,
86 watcher,
87 }
88 }
89
90 pub(crate) fn sender(&self) -> &Sender<Task> {
91 &self.worker.inp
92 }
93
94 pub(crate) fn receiver(&self) -> &Receiver<TaskResult> {
95 &self.worker.out
96 }
97
98 pub(crate) fn shutdown(self) -> thread::Result<()> {
99 if let Some(watcher) = self.watcher.lock().take() {
100 let _ = watcher.shutdown();
101 }
102 self.worker_handle.shutdown()
103 }
104}
48 105
49pub(crate) fn start() -> (Worker, WorkerHandle) { 106fn watch(
50 thread_worker::spawn("vfs", 128, |input_receiver, output_sender| { 107 watcher: &Arc<Mutex<Option<Watcher>>>,
51 input_receiver 108 dir: &Path,
52 .into_iter() 109 filter_entry: impl Fn(&DirEntry) -> bool,
53 .map(handle_task) 110 emit_for_existing: bool,
54 .try_for_each(|it| output_sender.send(it)) 111) {
55 .unwrap() 112 let mut watcher = watcher.lock();
56 }) 113 let watcher = match *watcher {
114 Some(ref mut w) => w,
115 None => {
116 // watcher dropped or couldn't start
117 return;
118 }
119 };
120 watcher.watch_recursive(dir, filter_entry, emit_for_existing)
57} 121}
58 122
59fn handle_task(task: Task) -> TaskResult { 123fn handle_task(task: Task, watcher: &Arc<Mutex<Option<Watcher>>>) -> TaskResult {
60 match task { 124 match task {
61 Task::AddRoot { root, path, filter } => { 125 Task::AddRoot { root, path, filter } => {
126 watch(watcher, &path, &*filter, false);
62 log::debug!("loading {} ...", path.as_path().display()); 127 log::debug!("loading {} ...", path.as_path().display());
63 let files = load_root(path.as_path(), &*filter); 128 let files = load_root(path.as_path(), &*filter);
64 log::debug!("... loaded {}", path.as_path().display()); 129 log::debug!("... loaded {}", path.as_path().display());
@@ -70,8 +135,14 @@ fn handle_task(task: Task) -> TaskResult {
70 } 135 }
71 Task::LoadChange(change) => { 136 Task::LoadChange(change) => {
72 log::debug!("loading {:?} ...", change); 137 log::debug!("loading {:?} ...", change);
73 let data = load_change(change); 138 match load_change(change) {
74 TaskResult::LoadChange(data) 139 Some(data) => TaskResult::LoadChange(data),
140 None => TaskResult::NoOp,
141 }
142 }
143 Task::Watch { dir, filter } => {
144 watch(watcher, &dir, &*filter, true);
145 TaskResult::NoOp
75 } 146 }
76 } 147 }
77} 148}
@@ -90,9 +161,6 @@ fn load_root(root: &Path, filter: &dyn Fn(&DirEntry) -> bool) -> Vec<(RelativePa
90 continue; 161 continue;
91 } 162 }
92 let path = entry.path(); 163 let path = entry.path();
93 if !has_rs_extension(path) {
94 continue;
95 }
96 let text = match fs::read_to_string(path) { 164 let text = match fs::read_to_string(path) {
97 Ok(text) => text, 165 Ok(text) => text,
98 Err(e) => { 166 Err(e) => {
@@ -109,6 +177,9 @@ fn load_root(root: &Path, filter: &dyn Fn(&DirEntry) -> bool) -> Vec<(RelativePa
109fn load_change(change: WatcherChange) -> Option<WatcherChangeData> { 177fn load_change(change: WatcherChange) -> Option<WatcherChangeData> {
110 let data = match change { 178 let data = match change {
111 WatcherChange::Create(path) => { 179 WatcherChange::Create(path) => {
180 if path.is_dir() {
181 return None;
182 }
112 let text = match fs::read_to_string(&path) { 183 let text = match fs::read_to_string(&path) {
113 Ok(text) => text, 184 Ok(text) => text,
114 Err(e) => { 185 Err(e) => {
diff --git a/crates/ra_vfs/src/lib.rs b/crates/ra_vfs/src/lib.rs
index ad40db340..196180890 100644
--- a/crates/ra_vfs/src/lib.rs
+++ b/crates/ra_vfs/src/lib.rs
@@ -20,7 +20,7 @@ use std::{
20 cmp::Reverse, 20 cmp::Reverse,
21 ffi::OsStr, 21 ffi::OsStr,
22 fmt, fs, mem, 22 fmt, fs, mem,
23 path::{Path, PathBuf}, 23 path::{Component, Path, PathBuf},
24 sync::Arc, 24 sync::Arc,
25 thread, 25 thread,
26}; 26};
@@ -29,30 +29,37 @@ use crossbeam_channel::Receiver;
29use ra_arena::{impl_arena_id, Arena, RawId}; 29use ra_arena::{impl_arena_id, Arena, RawId};
30use relative_path::RelativePathBuf; 30use relative_path::RelativePathBuf;
31use rustc_hash::{FxHashMap, FxHashSet}; 31use rustc_hash::{FxHashMap, FxHashSet};
32use thread_worker::WorkerHandle;
33use walkdir::DirEntry; 32use walkdir::DirEntry;
34 33
35pub use crate::io::TaskResult as VfsTask; 34pub use crate::io::TaskResult as VfsTask;
36pub use crate::watcher::{Watcher, WatcherChange}; 35pub use crate::watcher::WatcherChange;
37 36
38/// `RootFilter` is a predicate that checks if a file can belong to a root. If 37/// `RootFilter` is a predicate that checks if a file can belong to a root. If
39/// several filters match a file (nested dirs), the most nested one wins. 38/// several filters match a file (nested dirs), the most nested one wins.
40struct RootFilter { 39pub(crate) struct RootFilter {
41 root: PathBuf, 40 root: PathBuf,
42 file_filter: fn(&Path) -> bool, 41 filter: fn(RootEntry) -> bool,
42}
43
44pub(crate) struct RootEntry<'a, 'b> {
45 root: &'a Path,
46 path: &'b Path,
43} 47}
44 48
45impl RootFilter { 49impl RootFilter {
46 fn new(root: PathBuf) -> RootFilter { 50 fn new(root: PathBuf) -> RootFilter {
47 RootFilter { 51 RootFilter {
48 root, 52 root,
49 file_filter: has_rs_extension, 53 filter: default_filter,
50 } 54 }
51 } 55 }
52 /// Check if this root can contain `path`. NB: even if this returns 56 /// Check if this root can contain `path`. NB: even if this returns
53 /// true, the `path` might actually be conained in some nested root. 57 /// true, the `path` might actually be conained in some nested root.
54 fn can_contain(&self, path: &Path) -> Option<RelativePathBuf> { 58 pub(crate) fn can_contain(&self, path: &Path) -> Option<RelativePathBuf> {
55 if !(self.file_filter)(path) { 59 if !(self.filter)(RootEntry {
60 root: &self.root,
61 path,
62 }) {
56 return None; 63 return None;
57 } 64 }
58 let path = path.strip_prefix(&self.root).ok()?; 65 let path = path.strip_prefix(&self.root).ok()?;
@@ -60,8 +67,17 @@ impl RootFilter {
60 } 67 }
61} 68}
62 69
63pub(crate) fn has_rs_extension(p: &Path) -> bool { 70pub(crate) fn default_filter(entry: RootEntry) -> bool {
64 p.extension() == Some(OsStr::new("rs")) 71 if entry.path.is_dir() {
72 // first component relative to root is "target"
73 entry
74 .path
75 .strip_prefix(entry.root)
76 .map(|p| p.components().next() != Some(Component::Normal(OsStr::new("target"))))
77 .unwrap_or(false)
78 } else {
79 entry.path.extension() == Some(OsStr::new("rs"))
80 }
65} 81}
66 82
67#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] 83#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
@@ -80,13 +96,11 @@ struct VfsFileData {
80} 96}
81 97
82pub struct Vfs { 98pub struct Vfs {
83 roots: Arena<VfsRoot, RootFilter>, 99 roots: Arena<VfsRoot, Arc<RootFilter>>,
84 files: Arena<VfsFile, VfsFileData>, 100 files: Arena<VfsFile, VfsFileData>,
85 root2files: FxHashMap<VfsRoot, FxHashSet<VfsFile>>, 101 root2files: FxHashMap<VfsRoot, FxHashSet<VfsFile>>,
86 pending_changes: Vec<VfsChange>, 102 pending_changes: Vec<VfsChange>,
87 worker: io::Worker, 103 worker: io::Worker,
88 worker_handle: WorkerHandle,
89 watcher: Option<Watcher>,
90} 104}
91 105
92impl fmt::Debug for Vfs { 106impl fmt::Debug for Vfs {
@@ -97,41 +111,35 @@ impl fmt::Debug for Vfs {
97 111
98impl Vfs { 112impl Vfs {
99 pub fn new(mut roots: Vec<PathBuf>) -> (Vfs, Vec<VfsRoot>) { 113 pub fn new(mut roots: Vec<PathBuf>) -> (Vfs, Vec<VfsRoot>) {
100 let (worker, worker_handle) = io::start(); 114 let worker = io::Worker::start();
101
102 let watcher = match Watcher::start(worker.inp.clone()) {
103 Ok(watcher) => Some(watcher),
104 Err(e) => {
105 log::error!("could not start watcher: {}", e);
106 None
107 }
108 };
109 115
110 let mut res = Vfs { 116 let mut res = Vfs {
111 roots: Arena::default(), 117 roots: Arena::default(),
112 files: Arena::default(), 118 files: Arena::default(),
113 root2files: FxHashMap::default(), 119 root2files: FxHashMap::default(),
114 worker, 120 worker,
115 worker_handle,
116 watcher,
117 pending_changes: Vec::new(), 121 pending_changes: Vec::new(),
118 }; 122 };
119 123
120 // A hack to make nesting work. 124 // A hack to make nesting work.
121 roots.sort_by_key(|it| Reverse(it.as_os_str().len())); 125 roots.sort_by_key(|it| Reverse(it.as_os_str().len()));
122 for (i, path) in roots.iter().enumerate() { 126 for (i, path) in roots.iter().enumerate() {
123 let root = res.roots.alloc(RootFilter::new(path.clone())); 127 let root_filter = Arc::new(RootFilter::new(path.clone()));
128
129 let root = res.roots.alloc(root_filter.clone());
124 res.root2files.insert(root, Default::default()); 130 res.root2files.insert(root, Default::default());
131
125 let nested = roots[..i] 132 let nested = roots[..i]
126 .iter() 133 .iter()
127 .filter(|it| it.starts_with(path)) 134 .filter(|it| it.starts_with(path))
128 .map(|it| it.clone()) 135 .map(|it| it.clone())
129 .collect::<Vec<_>>(); 136 .collect::<Vec<_>>();
137
130 let filter = move |entry: &DirEntry| { 138 let filter = move |entry: &DirEntry| {
131 if entry.file_type().is_file() { 139 if entry.file_type().is_dir() && nested.iter().any(|it| it == entry.path()) {
132 has_rs_extension(entry.path()) 140 false
133 } else { 141 } else {
134 nested.iter().all(|it| it != entry.path()) 142 root_filter.can_contain(entry.path()).is_some()
135 } 143 }
136 }; 144 };
137 let task = io::Task::AddRoot { 145 let task = io::Task::AddRoot {
@@ -139,10 +147,7 @@ impl Vfs {
139 path: path.clone(), 147 path: path.clone(),
140 filter: Box::new(filter), 148 filter: Box::new(filter),
141 }; 149 };
142 res.worker.inp.send(task).unwrap(); 150 res.worker.sender().send(task).unwrap();
143 if let Some(ref mut watcher) = res.watcher {
144 watcher.watch(path);
145 }
146 } 151 }
147 let roots = res.roots.iter().map(|(id, _)| id).collect(); 152 let roots = res.roots.iter().map(|(id, _)| id).collect();
148 (res, roots) 153 (res, roots)
@@ -194,7 +199,7 @@ impl Vfs {
194 } 199 }
195 200
196 pub fn task_receiver(&self) -> &Receiver<io::TaskResult> { 201 pub fn task_receiver(&self) -> &Receiver<io::TaskResult> {
197 &self.worker.out 202 self.worker.receiver()
198 } 203 }
199 204
200 pub fn handle_task(&mut self, task: io::TaskResult) { 205 pub fn handle_task(&mut self, task: io::TaskResult) {
@@ -225,19 +230,35 @@ impl Vfs {
225 self.pending_changes.push(change); 230 self.pending_changes.push(change);
226 } 231 }
227 io::TaskResult::HandleChange(change) => match &change { 232 io::TaskResult::HandleChange(change) => match &change {
233 watcher::WatcherChange::Create(path) if path.is_dir() => {
234 if let Some((root, _path, _file)) = self.find_root(&path) {
235 let root_filter = self.roots[root].clone();
236 let filter =
237 move |entry: &DirEntry| root_filter.can_contain(entry.path()).is_some();
238 self.worker
239 .sender()
240 .send(io::Task::Watch {
241 dir: path.to_path_buf(),
242 filter: Box::new(filter),
243 })
244 .unwrap()
245 }
246 }
228 watcher::WatcherChange::Create(path) 247 watcher::WatcherChange::Create(path)
229 | watcher::WatcherChange::Remove(path) 248 | watcher::WatcherChange::Remove(path)
230 | watcher::WatcherChange::Write(path) => { 249 | watcher::WatcherChange::Write(path) => {
231 if self.should_handle_change(&path) { 250 if self.should_handle_change(&path) {
232 self.worker.inp.send(io::Task::LoadChange(change)).unwrap() 251 self.worker
252 .sender()
253 .send(io::Task::LoadChange(change))
254 .unwrap()
233 } 255 }
234 } 256 }
235 watcher::WatcherChange::Rescan => { 257 watcher::WatcherChange::Rescan => {
236 // TODO we should reload all files 258 // TODO we should reload all files
237 } 259 }
238 }, 260 },
239 io::TaskResult::LoadChange(None) => {} 261 io::TaskResult::LoadChange(change) => match change {
240 io::TaskResult::LoadChange(Some(change)) => match change {
241 io::WatcherChangeData::Create { path, text } 262 io::WatcherChangeData::Create { path, text }
242 | io::WatcherChangeData::Write { path, text } => { 263 | io::WatcherChangeData::Write { path, text } => {
243 if let Some((root, path, file)) = self.find_root(&path) { 264 if let Some((root, path, file)) = self.find_root(&path) {
@@ -256,6 +277,7 @@ impl Vfs {
256 } 277 }
257 } 278 }
258 }, 279 },
280 io::TaskResult::NoOp => {}
259 } 281 }
260 } 282 }
261 283
@@ -359,11 +381,7 @@ impl Vfs {
359 381
360 /// Sutdown the VFS and terminate the background watching thread. 382 /// Sutdown the VFS and terminate the background watching thread.
361 pub fn shutdown(self) -> thread::Result<()> { 383 pub fn shutdown(self) -> thread::Result<()> {
362 if let Some(watcher) = self.watcher { 384 self.worker.shutdown()
363 let _ = watcher.shutdown();
364 }
365 let _ = self.worker.shutdown();
366 self.worker_handle.shutdown()
367 } 385 }
368 386
369 fn add_file( 387 fn add_file(
diff --git a/crates/ra_vfs/src/watcher.rs b/crates/ra_vfs/src/watcher.rs
index f0ef9bc0e..d8c35f2a3 100644
--- a/crates/ra_vfs/src/watcher.rs
+++ b/crates/ra_vfs/src/watcher.rs
@@ -1,20 +1,20 @@
1use crate::io; 1use crate::io;
2use crossbeam_channel::Sender; 2use crossbeam_channel::Sender;
3use drop_bomb::DropBomb; 3use drop_bomb::DropBomb;
4use ignore::{gitignore::Gitignore, Walk};
5use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher}; 4use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher};
6use parking_lot::Mutex;
7use std::{ 5use std::{
8 path::{Path, PathBuf}, 6 path::{Path, PathBuf},
9 sync::{mpsc, Arc}, 7 sync::mpsc,
10 thread, 8 thread,
11 time::Duration, 9 time::Duration,
12}; 10};
11use walkdir::{DirEntry, WalkDir};
13 12
14pub struct Watcher { 13pub(crate) struct Watcher {
15 watcher: Arc<Mutex<Option<RecommendedWatcher>>>, 14 watcher: RecommendedWatcher,
16 thread: thread::JoinHandle<()>, 15 thread: thread::JoinHandle<()>,
17 bomb: DropBomb, 16 bomb: DropBomb,
17 sender: Sender<io::Task>,
18} 18}
19 19
20#[derive(Debug)] 20#[derive(Debug)]
@@ -28,7 +28,6 @@ pub enum WatcherChange {
28fn handle_change_event( 28fn handle_change_event(
29 ev: DebouncedEvent, 29 ev: DebouncedEvent,
30 sender: &Sender<io::Task>, 30 sender: &Sender<io::Task>,
31 watcher: &Arc<Mutex<Option<RecommendedWatcher>>>,
32) -> Result<(), Box<std::error::Error>> { 31) -> Result<(), Box<std::error::Error>> {
33 match ev { 32 match ev {
34 DebouncedEvent::NoticeWrite(_) 33 DebouncedEvent::NoticeWrite(_)
@@ -40,12 +39,6 @@ fn handle_change_event(
40 sender.send(io::Task::HandleChange(WatcherChange::Rescan))?; 39 sender.send(io::Task::HandleChange(WatcherChange::Rescan))?;
41 } 40 }
42 DebouncedEvent::Create(path) => { 41 DebouncedEvent::Create(path) => {
43 // we have to check if `path` is ignored because Walk iterator doesn't check it
44 // also childs are only ignored if they match a pattern
45 // (see `matched` vs `matched_path_or_any_parents` in `Gitignore`)
46 if path.is_dir() && !should_ignore_dir(&path) {
47 watch_recursive(watcher, &path, Some(sender));
48 }
49 sender.send(io::Task::HandleChange(WatcherChange::Create(path)))?; 42 sender.send(io::Task::HandleChange(WatcherChange::Create(path)))?;
50 } 43 }
51 DebouncedEvent::Write(path) => { 44 DebouncedEvent::Write(path) => {
@@ -66,65 +59,6 @@ fn handle_change_event(
66 Ok(()) 59 Ok(())
67} 60}
68 61
69fn watch_one(watcher: &mut RecommendedWatcher, dir: &Path) {
70 match watcher.watch(dir, RecursiveMode::NonRecursive) {
71 Ok(()) => log::debug!("watching \"{}\"", dir.display()),
72 Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e),
73 }
74}
75
76fn watch_recursive(
77 watcher: &Arc<Mutex<Option<RecommendedWatcher>>>,
78 dir: &Path,
79 sender: Option<&Sender<io::Task>>,
80) {
81 let mut watcher = watcher.lock();
82 let mut watcher = match *watcher {
83 Some(ref mut watcher) => watcher,
84 None => {
85 // watcher has been dropped
86 return;
87 }
88 };
89 for res in Walk::new(dir) {
90 match res {
91 Ok(entry) => {
92 if entry.path().is_dir() {
93 watch_one(&mut watcher, entry.path());
94 }
95 if let Some(sender) = sender {
96 // emit as create because we haven't seen it yet
97 if let Err(e) = sender.send(io::Task::HandleChange(WatcherChange::Create(
98 entry.path().to_path_buf(),
99 ))) {
100 log::warn!("watcher error: {}", e)
101 }
102 }
103 }
104 Err(e) => log::warn!("watcher error: {}", e),
105 }
106 }
107}
108
109fn should_ignore_dir(dir: &Path) -> bool {
110 let mut parent = dir;
111 loop {
112 parent = match parent.parent() {
113 Some(p) => p,
114 None => break,
115 };
116 let gitignore = parent.join(".gitignore");
117 if gitignore.exists() {
118 let gitignore = Gitignore::new(gitignore).0;
119 if gitignore.matched_path_or_any_parents(dir, true).is_ignore() {
120 log::debug!("ignored {}", dir.display());
121 return true;
122 }
123 }
124 }
125 false
126}
127
128const WATCHER_DELAY: Duration = Duration::from_millis(250); 62const WATCHER_DELAY: Duration = Duration::from_millis(250);
129 63
130impl Watcher { 64impl Watcher {
@@ -132,32 +66,58 @@ impl Watcher {
132 output_sender: Sender<io::Task>, 66 output_sender: Sender<io::Task>,
133 ) -> Result<Watcher, Box<std::error::Error>> { 67 ) -> Result<Watcher, Box<std::error::Error>> {
134 let (input_sender, input_receiver) = mpsc::channel(); 68 let (input_sender, input_receiver) = mpsc::channel();
135 let watcher = Arc::new(Mutex::new(Some(notify::watcher( 69 let watcher = notify::watcher(input_sender, WATCHER_DELAY)?;
136 input_sender, 70 let sender = output_sender.clone();
137 WATCHER_DELAY,
138 )?)));
139 let w = watcher.clone();
140 let thread = thread::spawn(move || { 71 let thread = thread::spawn(move || {
141 input_receiver 72 input_receiver
142 .into_iter() 73 .into_iter()
143 // forward relevant events only 74 // forward relevant events only
144 .try_for_each(|change| handle_change_event(change, &output_sender, &w)) 75 .try_for_each(|change| handle_change_event(change, &output_sender))
145 .unwrap() 76 .unwrap()
146 }); 77 });
147 Ok(Watcher { 78 Ok(Watcher {
148 watcher, 79 watcher,
149 thread, 80 thread,
81 sender,
150 bomb: DropBomb::new(format!("Watcher was not shutdown")), 82 bomb: DropBomb::new(format!("Watcher was not shutdown")),
151 }) 83 })
152 } 84 }
153 85
154 pub fn watch(&mut self, root: impl AsRef<Path>) { 86 pub fn watch_recursive(
155 watch_recursive(&self.watcher, root.as_ref(), None); 87 &mut self,
88 dir: &Path,
89 filter_entry: impl Fn(&DirEntry) -> bool,
90 emit_for_existing: bool,
91 ) {
92 for res in WalkDir::new(dir).into_iter().filter_entry(filter_entry) {
93 match res {
94 Ok(entry) => {
95 if entry.path().is_dir() {
96 match self.watcher.watch(dir, RecursiveMode::NonRecursive) {
97 Ok(()) => log::debug!("watching \"{}\"", dir.display()),
98 Err(e) => log::warn!("could not watch \"{}\": {}", dir.display(), e),
99 }
100 }
101 if emit_for_existing {
102 // emit as create because we haven't seen it yet
103 if let Err(e) =
104 self.sender
105 .send(io::Task::HandleChange(WatcherChange::Create(
106 entry.path().to_path_buf(),
107 )))
108 {
109 log::warn!("watcher error: {}", e)
110 }
111 }
112 }
113 Err(e) => log::warn!("watcher error: {}", e),
114 }
115 }
156 } 116 }
157 117
158 pub fn shutdown(mut self) -> thread::Result<()> { 118 pub fn shutdown(mut self) -> thread::Result<()> {
159 self.bomb.defuse(); 119 self.bomb.defuse();
160 drop(self.watcher.lock().take()); 120 drop(self.watcher);
161 let res = self.thread.join(); 121 let res = self.thread.join();
162 match &res { 122 match &res {
163 Ok(()) => log::info!("... Watcher terminated with ok"), 123 Ok(()) => log::info!("... Watcher terminated with ok"),