aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleksey Kladov <[email protected]>2019-02-14 18:14:47 +0000
committerAleksey Kladov <[email protected]>2019-02-14 18:14:47 +0000
commite0b8942c56378b7966af39058f27b11a0d02890f (patch)
tree2e9abadb6e64fed7738b5b3fbc2eb13787efdaca
parentbf352cd2511775a331d77dee261b64bd8359dacb (diff)
simplify
-rw-r--r--crates/ra_vfs/src/io.rs155
-rw-r--r--crates/ra_vfs/src/lib.rs2
2 files changed, 71 insertions, 86 deletions
diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs
index 1b70cd8df..f64b4c532 100644
--- a/crates/ra_vfs/src/io.rs
+++ b/crates/ra_vfs/src/io.rs
@@ -4,7 +4,7 @@ use std::{
4 sync::{mpsc, Arc}, 4 sync::{mpsc, Arc},
5 time::Duration, 5 time::Duration,
6}; 6};
7use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select}; 7use crossbeam_channel::{Sender, unbounded, RecvError, select};
8use relative_path::RelativePathBuf; 8use relative_path::RelativePathBuf;
9use walkdir::WalkDir; 9use walkdir::WalkDir;
10use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; 10use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher};
@@ -46,93 +46,78 @@ enum ChangeKind {
46 46
47const WATCHER_DELAY: Duration = Duration::from_millis(250); 47const WATCHER_DELAY: Duration = Duration::from_millis(250);
48 48
49pub(crate) struct Worker { 49pub(crate) type Worker = thread_worker::Worker<Task, TaskResult>;
50 thread_worker: thread_worker::Worker<Task, TaskResult>, 50pub(crate) fn start(roots: Arc<Roots>) -> Worker {
51} 51 // This is a pretty elaborate setup of threads & channels! It is
52 52 // explained by the following concerns:
53impl Worker { 53 // * we need to burn a thread translating from notify's mpsc to
54 pub(crate) fn start(roots: Arc<Roots>) -> Worker { 54 // crossbeam_channel.
55 // This is a pretty elaborate setup of threads & channels! It is 55 // * we want to read all files from a single thread, to guarantee that
56 // explained by the following concerns: 56 // we always get fresher versions and never go back in time.
57 // * we need to burn a thread translating from notify's mpsc to 57 // * we want to tear down everything neatly during shutdown.
58 // crossbeam_channel. 58 Worker::spawn(
59 // * we want to read all files from a single thread, to guarantee that 59 "vfs",
60 // we always get fresher versions and never go back in time. 60 128,
61 // * we want to tear down everything neatly during shutdown. 61 // This are the channels we use to communicate with outside world.
62 let thread_worker = thread_worker::Worker::spawn( 62 // If `input_receiver` is closed we need to tear ourselves down.
63 "vfs", 63 // `output_sender` should not be closed unless the parent died.
64 128, 64 move |input_receiver, output_sender| {
65 // This are the channels we use to communicate with outside world. 65 // Make sure that the destruction order is
66 // If `input_receiver` is closed we need to tear ourselves down. 66 //
67 // `output_sender` should not be closed unless the parent died. 67 // * notify_sender
68 move |input_receiver, output_sender| { 68 // * _thread
69 // Make sure that the destruction order is 69 // * watcher_sender
70 // 70 //
71 // * notify_sender 71 // this is required to avoid deadlocks.
72 // * _thread 72
73 // * watcher_sender 73 // These are the corresponding crossbeam channels
74 // 74 let (watcher_sender, watcher_receiver) = unbounded();
75 // this is required to avoid deadlocks. 75 let _thread;
76 76 {
77 // These are the corresponding crossbeam channels 77 // These are `std` channels notify will send events to
78 let (watcher_sender, watcher_receiver) = unbounded(); 78 let (notify_sender, notify_receiver) = mpsc::channel();
79 let _thread; 79
80 { 80 let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY)
81 // These are `std` channels notify will send events to 81 .map_err(|e| log::error!("failed to spawn notify {}", e))
82 let (notify_sender, notify_receiver) = mpsc::channel(); 82 .ok();
83 83 // Start a silly thread to transform between two channels
84 let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) 84 _thread = thread_worker::ScopedThread::spawn("notify-convertor", move || {
85 .map_err(|e| log::error!("failed to spawn notify {}", e)) 85 notify_receiver
86 .ok(); 86 .into_iter()
87 // Start a silly thread to transform between two channels 87 .for_each(|event| convert_notify_event(event, &watcher_sender))
88 _thread = thread_worker::ScopedThread::spawn("notify-convertor", move || { 88 });
89 notify_receiver 89
90 .into_iter() 90 // Process requests from the called or notifications from
91 .for_each(|event| convert_notify_event(event, &watcher_sender)) 91 // watcher until the caller says stop.
92 }); 92 loop {
93 93 select! {
94 // Process requests from the called or notifications from 94 // Received request from the caller. If this channel is
95 // watcher until the caller says stop. 95 // closed, we should shutdown everything.
96 loop { 96 recv(input_receiver) -> t => match t {
97 select! { 97 Err(RecvError) => {
98 // Received request from the caller. If this channel is 98 drop(input_receiver);
99 // closed, we should shutdown everything. 99 break
100 recv(input_receiver) -> t => match t {
101 Err(RecvError) => {
102 drop(input_receiver);
103 break
104 },
105 Ok(Task::AddRoot { root, config }) => {
106 watch_root(watcher.as_mut(), &output_sender, root, Arc::clone(&config));
107 }
108 },
109 // Watcher send us changes. If **this** channel is
110 // closed, the watcher has died, which indicates a bug
111 // -- escalate!
112 recv(watcher_receiver) -> event => match event {
113 Err(RecvError) => panic!("watcher is dead"),
114 Ok((path, change)) => {
115 handle_change(watcher.as_mut(), &output_sender, &*roots, path, change);
116 }
117 }, 100 },
118 } 101 Ok(Task::AddRoot { root, config }) => {
102 watch_root(watcher.as_mut(), &output_sender, root, Arc::clone(&config));
103 }
104 },
105 // Watcher send us changes. If **this** channel is
106 // closed, the watcher has died, which indicates a bug
107 // -- escalate!
108 recv(watcher_receiver) -> event => match event {
109 Err(RecvError) => panic!("watcher is dead"),
110 Ok((path, change)) => {
111 handle_change(watcher.as_mut(), &output_sender, &*roots, path, change);
112 }
113 },
119 } 114 }
120 } 115 }
121 // Drain pending events: we are not interested in them anyways! 116 }
122 watcher_receiver.into_iter().for_each(|_| ()); 117 // Drain pending events: we are not interested in them anyways!
123 }, 118 watcher_receiver.into_iter().for_each(|_| ());
124 ); 119 },
125 120 )
126 Worker { thread_worker }
127 }
128
129 pub(crate) fn sender(&self) -> &Sender<Task> {
130 &self.thread_worker.sender()
131 }
132
133 pub(crate) fn receiver(&self) -> &Receiver<TaskResult> {
134 &self.thread_worker.receiver()
135 }
136} 121}
137 122
138fn watch_root( 123fn watch_root(
diff --git a/crates/ra_vfs/src/lib.rs b/crates/ra_vfs/src/lib.rs
index 1fb255365..cfdc1275f 100644
--- a/crates/ra_vfs/src/lib.rs
+++ b/crates/ra_vfs/src/lib.rs
@@ -159,7 +159,7 @@ impl fmt::Debug for Vfs {
159impl Vfs { 159impl Vfs {
160 pub fn new(roots: Vec<PathBuf>) -> (Vfs, Vec<VfsRoot>) { 160 pub fn new(roots: Vec<PathBuf>) -> (Vfs, Vec<VfsRoot>) {
161 let roots = Arc::new(Roots::new(roots)); 161 let roots = Arc::new(Roots::new(roots));
162 let worker = io::Worker::start(Arc::clone(&roots)); 162 let worker = io::start(Arc::clone(&roots));
163 let mut root2files = ArenaMap::default(); 163 let mut root2files = ArenaMap::default();
164 164
165 for (root, config) in roots.iter() { 165 for (root, config) in roots.iter() {