aboutsummaryrefslogtreecommitdiff
path: root/crates
diff options
context:
space:
mode:
Diffstat (limited to 'crates')
-rw-r--r--crates/ra_batch/src/lib.rs1
-rw-r--r--crates/ra_lsp_server/src/main_loop.rs26
-rw-r--r--crates/ra_lsp_server/src/project_model.rs6
-rw-r--r--crates/ra_lsp_server/tests/heavy_tests/support.rs17
-rw-r--r--crates/ra_vfs/src/io.rs91
-rw-r--r--crates/ra_vfs/src/lib.rs8
-rw-r--r--crates/ra_vfs/tests/vfs.rs1
-rw-r--r--crates/thread_worker/Cargo.toml1
-rw-r--r--crates/thread_worker/src/lib.rs120
9 files changed, 119 insertions, 152 deletions
diff --git a/crates/ra_batch/src/lib.rs b/crates/ra_batch/src/lib.rs
index d08fad908..69d66113e 100644
--- a/crates/ra_batch/src/lib.rs
+++ b/crates/ra_batch/src/lib.rs
@@ -121,7 +121,6 @@ impl BatchDatabase {
121 .collect(); 121 .collect();
122 122
123 let db = BatchDatabase::load(crate_graph, &mut vfs); 123 let db = BatchDatabase::load(crate_graph, &mut vfs);
124 let _ = vfs.shutdown();
125 Ok((db, local_roots)) 124 Ok((db, local_roots))
126 } 125 }
127} 126}
diff --git a/crates/ra_lsp_server/src/main_loop.rs b/crates/ra_lsp_server/src/main_loop.rs
index a51299851..06443bb76 100644
--- a/crates/ra_lsp_server/src/main_loop.rs
+++ b/crates/ra_lsp_server/src/main_loop.rs
@@ -54,19 +54,20 @@ pub fn main_loop(
54) -> Result<()> { 54) -> Result<()> {
55 let pool = ThreadPool::new(THREADPOOL_SIZE); 55 let pool = ThreadPool::new(THREADPOOL_SIZE);
56 let (task_sender, task_receiver) = unbounded::<Task>(); 56 let (task_sender, task_receiver) = unbounded::<Task>();
57 let (ws_worker, ws_watcher) = workspace_loader();
58 57
59 ws_worker.send(ws_root.clone()).unwrap();
60 // FIXME: support dynamic workspace loading. 58 // FIXME: support dynamic workspace loading.
61 let workspaces = match ws_worker.recv().unwrap() { 59 let workspaces = {
62 Ok(ws) => vec![ws], 60 let ws_worker = workspace_loader();
63 Err(e) => { 61 ws_worker.sender().send(ws_root.clone()).unwrap();
64 log::error!("loading workspace failed: {}", e); 62 match ws_worker.receiver().recv().unwrap() {
65 Vec::new() 63 Ok(ws) => vec![ws],
64 Err(e) => {
65 log::error!("loading workspace failed: {}", e);
66 Vec::new()
67 }
66 } 68 }
67 }; 69 };
68 ws_worker.shutdown(); 70
69 ws_watcher.shutdown().map_err(|_| format_err!("ws watcher died"))?;
70 let mut state = ServerWorldState::new(ws_root.clone(), workspaces); 71 let mut state = ServerWorldState::new(ws_root.clone(), workspaces);
71 72
72 log::info!("server initialized, serving requests"); 73 log::info!("server initialized, serving requests");
@@ -94,12 +95,9 @@ pub fn main_loop(
94 log::info!("...threadpool has finished"); 95 log::info!("...threadpool has finished");
95 96
96 let vfs = Arc::try_unwrap(state.vfs).expect("all snapshots should be dead"); 97 let vfs = Arc::try_unwrap(state.vfs).expect("all snapshots should be dead");
97 let vfs_res = vfs.into_inner().shutdown(); 98 drop(vfs);
98 99
99 main_res?; 100 main_res
100 vfs_res.map_err(|_| format_err!("fs watcher died"))?;
101
102 Ok(())
103} 101}
104 102
105enum Event { 103enum Event {
diff --git a/crates/ra_lsp_server/src/project_model.rs b/crates/ra_lsp_server/src/project_model.rs
index 6800eb138..7d6440fad 100644
--- a/crates/ra_lsp_server/src/project_model.rs
+++ b/crates/ra_lsp_server/src/project_model.rs
@@ -1,6 +1,6 @@
1use std::path::PathBuf; 1use std::path::PathBuf;
2 2
3use thread_worker::{WorkerHandle, Worker}; 3use thread_worker::Worker;
4 4
5use crate::Result; 5use crate::Result;
6 6
@@ -8,8 +8,8 @@ pub use ra_project_model::{
8 ProjectWorkspace, CargoWorkspace, Package, Target, TargetKind, Sysroot, 8 ProjectWorkspace, CargoWorkspace, Package, Target, TargetKind, Sysroot,
9}; 9};
10 10
11pub fn workspace_loader() -> (Worker<PathBuf, Result<ProjectWorkspace>>, WorkerHandle) { 11pub fn workspace_loader() -> Worker<PathBuf, Result<ProjectWorkspace>> {
12 thread_worker::spawn::<PathBuf, Result<ProjectWorkspace>, _>( 12 Worker::<PathBuf, Result<ProjectWorkspace>>::spawn(
13 "workspace loader", 13 "workspace loader",
14 1, 14 1,
15 |input_receiver, output_sender| { 15 |input_receiver, output_sender| {
diff --git a/crates/ra_lsp_server/tests/heavy_tests/support.rs b/crates/ra_lsp_server/tests/heavy_tests/support.rs
index eee85f8c8..11f94b4ab 100644
--- a/crates/ra_lsp_server/tests/heavy_tests/support.rs
+++ b/crates/ra_lsp_server/tests/heavy_tests/support.rs
@@ -17,7 +17,7 @@ use lsp_types::{
17use serde::Serialize; 17use serde::Serialize;
18use serde_json::{to_string_pretty, Value}; 18use serde_json::{to_string_pretty, Value};
19use tempfile::TempDir; 19use tempfile::TempDir;
20use thread_worker::{WorkerHandle, Worker}; 20use thread_worker::Worker;
21use test_utils::{parse_fixture, find_mismatch}; 21use test_utils::{parse_fixture, find_mismatch};
22 22
23use ra_lsp_server::{ 23use ra_lsp_server::{
@@ -45,13 +45,12 @@ pub struct Server {
45 messages: RefCell<Vec<RawMessage>>, 45 messages: RefCell<Vec<RawMessage>>,
46 dir: TempDir, 46 dir: TempDir,
47 worker: Option<Worker<RawMessage, RawMessage>>, 47 worker: Option<Worker<RawMessage, RawMessage>>,
48 watcher: Option<WorkerHandle>,
49} 48}
50 49
51impl Server { 50impl Server {
52 fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server { 51 fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server {
53 let path = dir.path().to_path_buf(); 52 let path = dir.path().to_path_buf();
54 let (worker, watcher) = thread_worker::spawn::<RawMessage, RawMessage, _>( 53 let worker = Worker::<RawMessage, RawMessage>::spawn(
55 "test server", 54 "test server",
56 128, 55 128,
57 move |mut msg_receiver, mut msg_sender| { 56 move |mut msg_receiver, mut msg_sender| {
@@ -63,7 +62,6 @@ impl Server {
63 dir, 62 dir,
64 messages: Default::default(), 63 messages: Default::default(),
65 worker: Some(worker), 64 worker: Some(worker),
66 watcher: Some(watcher),
67 }; 65 };
68 66
69 for (path, text) in files { 67 for (path, text) in files {
@@ -117,7 +115,7 @@ impl Server {
117 } 115 }
118 fn send_request_(&self, r: RawRequest) -> Value { 116 fn send_request_(&self, r: RawRequest) -> Value {
119 let id = r.id; 117 let id = r.id;
120 self.worker.as_ref().unwrap().send(RawMessage::Request(r)).unwrap(); 118 self.worker.as_ref().unwrap().sender().send(RawMessage::Request(r)).unwrap();
121 while let Some(msg) = self.recv() { 119 while let Some(msg) = self.recv() {
122 match msg { 120 match msg {
123 RawMessage::Request(req) => panic!("unexpected request: {:?}", req), 121 RawMessage::Request(req) => panic!("unexpected request: {:?}", req),
@@ -157,24 +155,19 @@ impl Server {
157 } 155 }
158 } 156 }
159 fn recv(&self) -> Option<RawMessage> { 157 fn recv(&self) -> Option<RawMessage> {
160 recv_timeout(&self.worker.as_ref().unwrap().out).map(|msg| { 158 recv_timeout(&self.worker.as_ref().unwrap().receiver()).map(|msg| {
161 self.messages.borrow_mut().push(msg.clone()); 159 self.messages.borrow_mut().push(msg.clone());
162 msg 160 msg
163 }) 161 })
164 } 162 }
165 fn send_notification(&self, not: RawNotification) { 163 fn send_notification(&self, not: RawNotification) {
166 self.worker.as_ref().unwrap().send(RawMessage::Notification(not)).unwrap(); 164 self.worker.as_ref().unwrap().sender().send(RawMessage::Notification(not)).unwrap();
167 } 165 }
168} 166}
169 167
170impl Drop for Server { 168impl Drop for Server {
171 fn drop(&mut self) { 169 fn drop(&mut self) {
172 self.send_request::<Shutdown>(()); 170 self.send_request::<Shutdown>(());
173 let receiver = self.worker.take().unwrap().shutdown();
174 while let Some(msg) = recv_timeout(&receiver) {
175 drop(msg);
176 }
177 self.watcher.take().unwrap().shutdown().unwrap();
178 } 171 }
179} 172}
180 173
diff --git a/crates/ra_vfs/src/io.rs b/crates/ra_vfs/src/io.rs
index 3952b200b..f64b4c532 100644
--- a/crates/ra_vfs/src/io.rs
+++ b/crates/ra_vfs/src/io.rs
@@ -1,13 +1,11 @@
1use std::{ 1use std::{
2 fs, 2 fs,
3 thread,
4 path::{Path, PathBuf}, 3 path::{Path, PathBuf},
5 sync::{mpsc, Arc}, 4 sync::{mpsc, Arc},
6 time::Duration, 5 time::Duration,
7}; 6};
8use crossbeam_channel::{Receiver, Sender, unbounded, RecvError, select}; 7use crossbeam_channel::{Sender, unbounded, RecvError, select};
9use relative_path::RelativePathBuf; 8use relative_path::RelativePathBuf;
10use thread_worker::WorkerHandle;
11use walkdir::WalkDir; 9use walkdir::WalkDir;
12use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher}; 10use notify::{DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher as _Watcher};
13 11
@@ -48,37 +46,42 @@ enum ChangeKind {
48 46
49const WATCHER_DELAY: Duration = Duration::from_millis(250); 47const WATCHER_DELAY: Duration = Duration::from_millis(250);
50 48
51pub(crate) struct Worker { 49pub(crate) type Worker = thread_worker::Worker<Task, TaskResult>;
52 worker: thread_worker::Worker<Task, TaskResult>, 50pub(crate) fn start(roots: Arc<Roots>) -> Worker {
53 worker_handle: WorkerHandle, 51 // This is a pretty elaborate setup of threads & channels! It is
54} 52 // explained by the following concerns:
55 53 // * we need to burn a thread translating from notify's mpsc to
56impl Worker { 54 // crossbeam_channel.
57 pub(crate) fn start(roots: Arc<Roots>) -> Worker { 55 // * we want to read all files from a single thread, to guarantee that
58 // This is a pretty elaborate setup of threads & channels! It is 56 // we always get fresher versions and never go back in time.
59 // explained by the following concerns: 57 // * we want to tear down everything neatly during shutdown.
60 // * we need to burn a thread translating from notify's mpsc to 58 Worker::spawn(
61 // crossbeam_channel. 59 "vfs",
62 // * we want to read all files from a single thread, to guarantee that 60 128,
63 // we always get fresher versions and never go back in time. 61 // This are the channels we use to communicate with outside world.
64 // * we want to tear down everything neatly during shutdown. 62 // If `input_receiver` is closed we need to tear ourselves down.
65 let (worker, worker_handle) = thread_worker::spawn( 63 // `output_sender` should not be closed unless the parent died.
66 "vfs", 64 move |input_receiver, output_sender| {
67 128, 65 // Make sure that the destruction order is
68 // This are the channels we use to communicate with outside world. 66 //
69 // If `input_receiver` is closed we need to tear ourselves down. 67 // * notify_sender
70 // `output_sender` should not be closed unless the parent died. 68 // * _thread
71 move |input_receiver, output_sender| { 69 // * watcher_sender
70 //
71 // this is required to avoid deadlocks.
72
73 // These are the corresponding crossbeam channels
74 let (watcher_sender, watcher_receiver) = unbounded();
75 let _thread;
76 {
72 // These are `std` channels notify will send events to 77 // These are `std` channels notify will send events to
73 let (notify_sender, notify_receiver) = mpsc::channel(); 78 let (notify_sender, notify_receiver) = mpsc::channel();
74 // These are the corresponding crossbeam channels
75 let (watcher_sender, watcher_receiver) = unbounded();
76 79
77 let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY) 80 let mut watcher = notify::watcher(notify_sender, WATCHER_DELAY)
78 .map_err(|e| log::error!("failed to spawn notify {}", e)) 81 .map_err(|e| log::error!("failed to spawn notify {}", e))
79 .ok(); 82 .ok();
80 // Start a silly thread to transform between two channels 83 // Start a silly thread to transform between two channels
81 let thread = thread::spawn(move || { 84 _thread = thread_worker::ScopedThread::spawn("notify-convertor", move || {
82 notify_receiver 85 notify_receiver
83 .into_iter() 86 .into_iter()
84 .for_each(|event| convert_notify_event(event, &watcher_sender)) 87 .for_each(|event| convert_notify_event(event, &watcher_sender))
@@ -110,35 +113,11 @@ impl Worker {
110 }, 113 },
111 } 114 }
112 } 115 }
113 // Stopped the watcher 116 }
114 drop(watcher.take()); 117 // Drain pending events: we are not interested in them anyways!
115 // Drain pending events: we are not interested in them anyways! 118 watcher_receiver.into_iter().for_each(|_| ());
116 watcher_receiver.into_iter().for_each(|_| ()); 119 },
117 120 )
118 let res = thread.join();
119 match &res {
120 Ok(()) => log::info!("... Watcher terminated with ok"),
121 Err(_) => log::error!("... Watcher terminated with err"),
122 }
123 res.unwrap();
124 },
125 );
126
127 Worker { worker, worker_handle }
128 }
129
130 pub(crate) fn sender(&self) -> &Sender<Task> {
131 &self.worker.inp
132 }
133
134 pub(crate) fn receiver(&self) -> &Receiver<TaskResult> {
135 &self.worker.out
136 }
137
138 pub(crate) fn shutdown(self) -> thread::Result<()> {
139 let _ = self.worker.shutdown();
140 self.worker_handle.shutdown()
141 }
142} 121}
143 122
144fn watch_root( 123fn watch_root(
diff --git a/crates/ra_vfs/src/lib.rs b/crates/ra_vfs/src/lib.rs
index f07657db6..cfdc1275f 100644
--- a/crates/ra_vfs/src/lib.rs
+++ b/crates/ra_vfs/src/lib.rs
@@ -22,7 +22,6 @@ use std::{
22 fmt, fs, mem, 22 fmt, fs, mem,
23 path::{Path, PathBuf}, 23 path::{Path, PathBuf},
24 sync::Arc, 24 sync::Arc,
25 thread,
26}; 25};
27 26
28use crossbeam_channel::Receiver; 27use crossbeam_channel::Receiver;
@@ -160,7 +159,7 @@ impl fmt::Debug for Vfs {
160impl Vfs { 159impl Vfs {
161 pub fn new(roots: Vec<PathBuf>) -> (Vfs, Vec<VfsRoot>) { 160 pub fn new(roots: Vec<PathBuf>) -> (Vfs, Vec<VfsRoot>) {
162 let roots = Arc::new(Roots::new(roots)); 161 let roots = Arc::new(Roots::new(roots));
163 let worker = io::Worker::start(Arc::clone(&roots)); 162 let worker = io::start(Arc::clone(&roots));
164 let mut root2files = ArenaMap::default(); 163 let mut root2files = ArenaMap::default();
165 164
166 for (root, config) in roots.iter() { 165 for (root, config) in roots.iter() {
@@ -337,11 +336,6 @@ impl Vfs {
337 mem::replace(&mut self.pending_changes, Vec::new()) 336 mem::replace(&mut self.pending_changes, Vec::new())
338 } 337 }
339 338
340 /// Shutdown the VFS and terminate the background watching thread.
341 pub fn shutdown(self) -> thread::Result<()> {
342 self.worker.shutdown()
343 }
344
345 fn add_file( 339 fn add_file(
346 &mut self, 340 &mut self,
347 root: VfsRoot, 341 root: VfsRoot,
diff --git a/crates/ra_vfs/tests/vfs.rs b/crates/ra_vfs/tests/vfs.rs
index 649ef96c9..c76e6ea26 100644
--- a/crates/ra_vfs/tests/vfs.rs
+++ b/crates/ra_vfs/tests/vfs.rs
@@ -158,6 +158,5 @@ fn test_vfs_works() -> std::io::Result<()> {
158 Err(RecvTimeoutError::Timeout) 158 Err(RecvTimeoutError::Timeout)
159 ); 159 );
160 160
161 vfs.shutdown().unwrap();
162 Ok(()) 161 Ok(())
163} 162}
diff --git a/crates/thread_worker/Cargo.toml b/crates/thread_worker/Cargo.toml
index 363b4c3b8..a9857d59d 100644
--- a/crates/thread_worker/Cargo.toml
+++ b/crates/thread_worker/Cargo.toml
@@ -5,7 +5,6 @@ version = "0.1.0"
5authors = ["rust-analyzer developers"] 5authors = ["rust-analyzer developers"]
6 6
7[dependencies] 7[dependencies]
8drop_bomb = "0.1.0"
9crossbeam-channel = "0.3.5" 8crossbeam-channel = "0.3.5"
10log = "0.4.3" 9log = "0.4.3"
11 10
diff --git a/crates/thread_worker/src/lib.rs b/crates/thread_worker/src/lib.rs
index a522a0843..d67e44e38 100644
--- a/crates/thread_worker/src/lib.rs
+++ b/crates/thread_worker/src/lib.rs
@@ -2,74 +2,80 @@
2 2
3use std::thread; 3use std::thread;
4 4
5use crossbeam_channel::{bounded, unbounded, Receiver, Sender, RecvError, SendError}; 5use crossbeam_channel::{bounded, unbounded, Receiver, Sender};
6use drop_bomb::DropBomb;
7 6
8pub struct Worker<I, O> { 7/// Like `std::thread::JoinHandle<()>`, but joins thread in drop automatically.
9 pub inp: Sender<I>, 8pub struct ScopedThread {
10 pub out: Receiver<O>, 9 // Option for drop
10 inner: Option<thread::JoinHandle<()>>,
11} 11}
12 12
13pub struct WorkerHandle { 13impl Drop for ScopedThread {
14 name: &'static str, 14 fn drop(&mut self) {
15 thread: thread::JoinHandle<()>, 15 let inner = self.inner.take().unwrap();
16 bomb: DropBomb, 16 let name = inner.thread().name().unwrap().to_string();
17} 17 log::info!("waiting for {} to finish...", name);
18 let res = inner.join();
19 log::info!(".. {} terminated with {}", name, if res.is_ok() { "ok" } else { "err" });
18 20
19pub fn spawn<I, O, F>(name: &'static str, buf: usize, f: F) -> (Worker<I, O>, WorkerHandle) 21 // escalate panic, but avoid aborting the process
20where 22 match res {
21 F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static, 23 Err(e) => {
22 I: Send + 'static, 24 if !thread::panicking() {
23 O: Send + 'static, 25 panic!(e)
24{ 26 }
25 let (worker, inp_r, out_s) = worker_chan(buf); 27 }
26 let watcher = WorkerHandle::spawn(name, move || f(inp_r, out_s)); 28 _ => (),
27 (worker, watcher) 29 }
28}
29
30impl<I, O> Worker<I, O> {
31 /// Stops the worker. Returns the message receiver to fetch results which
32 /// have become ready before the worker is stopped.
33 pub fn shutdown(self) -> Receiver<O> {
34 self.out
35 } 30 }
31}
36 32
37 pub fn send(&self, item: I) -> Result<(), SendError<I>> { 33impl ScopedThread {
38 self.inp.send(item) 34 pub fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ScopedThread {
39 } 35 let inner = thread::Builder::new().name(name.into()).spawn(f).unwrap();
40 pub fn recv(&self) -> Result<O, RecvError> { 36 ScopedThread { inner: Some(inner) }
41 self.out.recv()
42 } 37 }
43} 38}
44 39
45impl WorkerHandle { 40/// A wrapper around event-processing thread with automatic shutdown semantics.
46 fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> WorkerHandle { 41pub struct Worker<I, O> {
47 let thread = thread::spawn(f); 42 // XXX: field order is significant here.
48 WorkerHandle { 43 //
49 name, 44 // In Rust, fields are dropped in the declaration order, and we rely on this
50 thread, 45 // here. We must close input first, so that the `thread` (who holds the
51 bomb: DropBomb::new(format!("WorkerHandle {} was not shutdown", name)), 46 // opposite side of the channel) noticed shutdown. Then, we must join the
52 } 47 // thread, but we must keep out alive so that the thread does not panic.
53 } 48 //
49 // Note that a potential problem here is that we might drop some messages
50 // from receiver on the floor. This is ok for rust-analyzer: we have only a
51 // single client, so, if we are shutting down, nobody is interested in the
52 // unfinished work anyway!
53 sender: Sender<I>,
54 _thread: ScopedThread,
55 receiver: Receiver<O>,
56}
54 57
55 pub fn shutdown(mut self) -> thread::Result<()> { 58impl<I, O> Worker<I, O> {
56 log::info!("waiting for {} to finish ...", self.name); 59 pub fn spawn<F>(name: &'static str, buf: usize, f: F) -> Worker<I, O>
57 let name = self.name; 60 where
58 self.bomb.defuse(); 61 F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static,
59 let res = self.thread.join(); 62 I: Send + 'static,
60 match &res { 63 O: Send + 'static,
61 Ok(()) => log::info!("... {} terminated with ok", name), 64 {
62 Err(_) => log::error!("... {} terminated with err", name), 65 // Set up worker channels in a deadlock-avoiding way. If one sets both input
63 } 66 // and output buffers to a fixed size, a worker might get stuck.
64 res 67 let (sender, input_receiver) = bounded::<I>(buf);
68 let (output_sender, receiver) = unbounded::<O>();
69 let _thread = ScopedThread::spawn(name, move || f(input_receiver, output_sender));
70 Worker { sender, _thread, receiver }
65 } 71 }
66} 72}
67 73
68/// Sets up worker channels in a deadlock-avoiding way. 74impl<I, O> Worker<I, O> {
69/// If one sets both input and output buffers to a fixed size, 75 pub fn sender(&self) -> &Sender<I> {
70/// a worker might get stuck. 76 &self.sender
71fn worker_chan<I, O>(buf: usize) -> (Worker<I, O>, Receiver<I>, Sender<O>) { 77 }
72 let (input_sender, input_receiver) = bounded::<I>(buf); 78 pub fn receiver(&self) -> &Receiver<O> {
73 let (output_sender, output_receiver) = unbounded::<O>(); 79 &self.receiver
74 (Worker { inp: input_sender, out: output_receiver }, input_receiver, output_sender) 80 }
75} 81}