diff options
-rw-r--r-- | crates/server/src/main_loop/mod.rs | 28 | ||||
-rw-r--r-- | crates/server/src/project_model.rs | 24 | ||||
-rw-r--r-- | crates/server/src/thread_watcher.rs | 31 | ||||
-rw-r--r-- | crates/server/src/vfs.rs | 34 | ||||
-rw-r--r-- | crates/server/tests/heavy_tests/support.rs | 49 |
5 files changed, 92 insertions, 74 deletions
diff --git a/crates/server/src/main_loop/mod.rs b/crates/server/src/main_loop/mod.rs index b7f5efbb1..f1297ee48 100644 --- a/crates/server/src/main_loop/mod.rs +++ b/crates/server/src/main_loop/mod.rs | |||
@@ -23,6 +23,7 @@ use { | |||
23 | server_world::{ServerWorldState, ServerWorld}, | 23 | server_world::{ServerWorldState, ServerWorld}, |
24 | main_loop::subscriptions::{Subscriptions}, | 24 | main_loop::subscriptions::{Subscriptions}, |
25 | project_model::{CargoWorkspace, workspace_loader}, | 25 | project_model::{CargoWorkspace, workspace_loader}, |
26 | thread_watcher::Worker, | ||
26 | }; | 27 | }; |
27 | 28 | ||
28 | #[derive(Debug)] | 29 | #[derive(Debug)] |
@@ -43,8 +44,8 @@ pub fn main_loop( | |||
43 | .build() | 44 | .build() |
44 | .unwrap(); | 45 | .unwrap(); |
45 | let (task_sender, task_receiver) = unbounded::<Task>(); | 46 | let (task_sender, task_receiver) = unbounded::<Task>(); |
46 | let ((fs_sender, fs_receiver), fs_watcher) = vfs::roots_loader(); | 47 | let (fs_worker, fs_watcher) = vfs::roots_loader(); |
47 | let ((ws_sender, ws_receiver), ws_watcher) = workspace_loader(); | 48 | let (ws_worker, ws_watcher) = workspace_loader(); |
48 | 49 | ||
49 | info!("server initialized, serving requests"); | 50 | info!("server initialized, serving requests"); |
50 | let mut state = ServerWorldState::new(); | 51 | let mut state = ServerWorldState::new(); |
@@ -59,10 +60,8 @@ pub fn main_loop( | |||
59 | msg_receriver, | 60 | msg_receriver, |
60 | task_sender, | 61 | task_sender, |
61 | task_receiver.clone(), | 62 | task_receiver.clone(), |
62 | fs_sender, | 63 | fs_worker, |
63 | fs_receiver, | 64 | ws_worker, |
64 | ws_sender, | ||
65 | ws_receiver, | ||
66 | &mut state, | 65 | &mut state, |
67 | &mut pending_requests, | 66 | &mut pending_requests, |
68 | &mut subs, | 67 | &mut subs, |
@@ -93,17 +92,15 @@ fn main_loop_inner( | |||
93 | msg_receiver: &mut Receiver<RawMessage>, | 92 | msg_receiver: &mut Receiver<RawMessage>, |
94 | task_sender: Sender<Task>, | 93 | task_sender: Sender<Task>, |
95 | task_receiver: Receiver<Task>, | 94 | task_receiver: Receiver<Task>, |
96 | fs_sender: Sender<PathBuf>, | 95 | fs_worker: Worker<PathBuf, (PathBuf, Vec<FileEvent>)>, |
97 | fs_receiver: Receiver<(PathBuf, Vec<FileEvent>)>, | 96 | ws_worker: Worker<PathBuf, Result<CargoWorkspace>>, |
98 | ws_sender: Sender<PathBuf>, | ||
99 | ws_receiver: Receiver<Result<CargoWorkspace>>, | ||
100 | state: &mut ServerWorldState, | 97 | state: &mut ServerWorldState, |
101 | pending_requests: &mut HashMap<u64, JobHandle>, | 98 | pending_requests: &mut HashMap<u64, JobHandle>, |
102 | subs: &mut Subscriptions, | 99 | subs: &mut Subscriptions, |
103 | ) -> Result<()> { | 100 | ) -> Result<()> { |
104 | let (libdata_sender, libdata_receiver) = unbounded(); | 101 | let (libdata_sender, libdata_receiver) = unbounded(); |
105 | ws_sender.send(ws_root.clone()); | 102 | ws_worker.send(ws_root.clone()); |
106 | fs_sender.send(ws_root.clone()); | 103 | fs_worker.send(ws_root.clone()); |
107 | loop { | 104 | loop { |
108 | #[derive(Debug)] | 105 | #[derive(Debug)] |
109 | enum Event { | 106 | enum Event { |
@@ -120,11 +117,11 @@ fn main_loop_inner( | |||
120 | None => bail!("client exited without shutdown"), | 117 | None => bail!("client exited without shutdown"), |
121 | }, | 118 | }, |
122 | recv(task_receiver, task) => Event::Task(task.unwrap()), | 119 | recv(task_receiver, task) => Event::Task(task.unwrap()), |
123 | recv(fs_receiver, events) => match events { | 120 | recv(fs_worker.out, events) => match events { |
124 | None => bail!("roots watcher died"), | 121 | None => bail!("roots watcher died"), |
125 | Some((pb, events)) => Event::Fs(pb, events), | 122 | Some((pb, events)) => Event::Fs(pb, events), |
126 | } | 123 | } |
127 | recv(ws_receiver, ws) => match ws { | 124 | recv(ws_worker.out, ws) => match ws { |
128 | None => bail!("workspace watcher died"), | 125 | None => bail!("workspace watcher died"), |
129 | Some(ws) => Event::Ws(ws), | 126 | Some(ws) => Event::Ws(ws), |
130 | } | 127 | } |
@@ -158,8 +155,7 @@ fn main_loop_inner( | |||
158 | for ws in workspaces.iter() { | 155 | for ws in workspaces.iter() { |
159 | for pkg in ws.packages().filter(|pkg| !pkg.is_member(ws)) { | 156 | for pkg in ws.packages().filter(|pkg| !pkg.is_member(ws)) { |
160 | debug!("sending root, {}", pkg.root(ws).to_path_buf().display()); | 157 | debug!("sending root, {}", pkg.root(ws).to_path_buf().display()); |
161 | // deadlocky :-( | 158 | fs_worker.send(pkg.root(ws).to_path_buf()); |
162 | fs_sender.send(pkg.root(ws).to_path_buf()); | ||
163 | } | 159 | } |
164 | } | 160 | } |
165 | state.set_workspaces(workspaces); | 161 | state.set_workspaces(workspaces); |
diff --git a/crates/server/src/project_model.rs b/crates/server/src/project_model.rs index b9d6872c8..359cf787d 100644 --- a/crates/server/src/project_model.rs +++ b/crates/server/src/project_model.rs | |||
@@ -3,12 +3,11 @@ use std::{ | |||
3 | path::{Path, PathBuf}, | 3 | path::{Path, PathBuf}, |
4 | }; | 4 | }; |
5 | use cargo_metadata::{metadata_run, CargoOpt}; | 5 | use cargo_metadata::{metadata_run, CargoOpt}; |
6 | use crossbeam_channel::{Sender, Receiver}; | ||
7 | use libsyntax2::SmolStr; | 6 | use libsyntax2::SmolStr; |
8 | 7 | ||
9 | use { | 8 | use { |
10 | Result, | 9 | Result, |
11 | thread_watcher::{ThreadWatcher, worker_chan}, | 10 | thread_watcher::{Worker, ThreadWatcher}, |
12 | }; | 11 | }; |
13 | 12 | ||
14 | #[derive(Debug, Clone)] | 13 | #[derive(Debug, Clone)] |
@@ -162,14 +161,15 @@ impl TargetKind { | |||
162 | } | 161 | } |
163 | } | 162 | } |
164 | 163 | ||
165 | pub fn workspace_loader() -> ((Sender<PathBuf>, Receiver<Result<CargoWorkspace>>), ThreadWatcher) { | 164 | pub fn workspace_loader() -> (Worker<PathBuf, Result<CargoWorkspace>>, ThreadWatcher) { |
166 | let (interface, input_receiver, output_sender) = worker_chan::<PathBuf, Result<CargoWorkspace>>(1); | 165 | Worker::<PathBuf, Result<CargoWorkspace>>::spawn( |
167 | let thread = ThreadWatcher::spawn("workspace loader", move || { | 166 | "workspace loader", |
168 | input_receiver | 167 | 1, |
169 | .into_iter() | 168 | |input_receiver, output_sender| { |
170 | .map(|path| CargoWorkspace::from_cargo_metadata(path.as_path())) | 169 | input_receiver |
171 | .for_each(|it| output_sender.send(it)) | 170 | .into_iter() |
172 | }); | 171 | .map(|path| CargoWorkspace::from_cargo_metadata(path.as_path())) |
173 | 172 | .for_each(|it| output_sender.send(it)) | |
174 | (interface, thread) | 173 | } |
174 | ) | ||
175 | } | 175 | } |
diff --git a/crates/server/src/thread_watcher.rs b/crates/server/src/thread_watcher.rs index 74a0a58b7..86a3a91e0 100644 --- a/crates/server/src/thread_watcher.rs +++ b/crates/server/src/thread_watcher.rs | |||
@@ -3,6 +3,33 @@ use crossbeam_channel::{bounded, unbounded, Sender, Receiver}; | |||
3 | use drop_bomb::DropBomb; | 3 | use drop_bomb::DropBomb; |
4 | use Result; | 4 | use Result; |
5 | 5 | ||
6 | pub struct Worker<I, O> { | ||
7 | pub inp: Sender<I>, | ||
8 | pub out: Receiver<O>, | ||
9 | } | ||
10 | |||
11 | impl<I, O> Worker<I, O> { | ||
12 | pub fn spawn<F>(name: &'static str, buf: usize, f: F) -> (Self, ThreadWatcher) | ||
13 | where | ||
14 | F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static, | ||
15 | I: Send + 'static, | ||
16 | O: Send + 'static, | ||
17 | { | ||
18 | let ((inp, out), inp_r, out_s) = worker_chan(buf); | ||
19 | let worker = Worker { inp, out }; | ||
20 | let watcher = ThreadWatcher::spawn(name, move || f(inp_r, out_s)); | ||
21 | (worker, watcher) | ||
22 | } | ||
23 | |||
24 | pub fn stop(self) -> Receiver<O> { | ||
25 | self.out | ||
26 | } | ||
27 | |||
28 | pub fn send(&self, item: I) { | ||
29 | self.inp.send(item) | ||
30 | } | ||
31 | } | ||
32 | |||
6 | pub struct ThreadWatcher { | 33 | pub struct ThreadWatcher { |
7 | name: &'static str, | 34 | name: &'static str, |
8 | thread: thread::JoinHandle<()>, | 35 | thread: thread::JoinHandle<()>, |
@@ -10,7 +37,7 @@ pub struct ThreadWatcher { | |||
10 | } | 37 | } |
11 | 38 | ||
12 | impl ThreadWatcher { | 39 | impl ThreadWatcher { |
13 | pub fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher { | 40 | fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher { |
14 | let thread = thread::spawn(f); | 41 | let thread = thread::spawn(f); |
15 | ThreadWatcher { | 42 | ThreadWatcher { |
16 | name, | 43 | name, |
@@ -36,7 +63,7 @@ impl ThreadWatcher { | |||
36 | /// Sets up worker channels in a deadlock-avoind way. | 63 | /// Sets up worker channels in a deadlock-avoind way. |
37 | /// If one sets both input and output buffers to a fixed size, | 64 | /// If one sets both input and output buffers to a fixed size, |
38 | /// a worker might get stuck. | 65 | /// a worker might get stuck. |
39 | pub fn worker_chan<I, O>(buf: usize) -> ((Sender<I>, Receiver<O>), Receiver<I>, Sender<O>) { | 66 | fn worker_chan<I, O>(buf: usize) -> ((Sender<I>, Receiver<O>), Receiver<I>, Sender<O>) { |
40 | let (input_sender, input_receiver) = bounded::<I>(buf); | 67 | let (input_sender, input_receiver) = bounded::<I>(buf); |
41 | let (output_sender, output_receiver) = unbounded::<O>(); | 68 | let (output_sender, output_receiver) = unbounded::<O>(); |
42 | ((input_sender, output_receiver), input_receiver, output_sender) | 69 | ((input_sender, output_receiver), input_receiver, output_sender) |
diff --git a/crates/server/src/vfs.rs b/crates/server/src/vfs.rs index c228f0b0a..a1c1783f2 100644 --- a/crates/server/src/vfs.rs +++ b/crates/server/src/vfs.rs | |||
@@ -3,11 +3,10 @@ use std::{ | |||
3 | fs, | 3 | fs, |
4 | }; | 4 | }; |
5 | 5 | ||
6 | use crossbeam_channel::{Sender, Receiver}; | ||
7 | use walkdir::WalkDir; | 6 | use walkdir::WalkDir; |
8 | 7 | ||
9 | use { | 8 | use { |
10 | thread_watcher::{ThreadWatcher, worker_chan}, | 9 | thread_watcher::{Worker, ThreadWatcher}, |
11 | }; | 10 | }; |
12 | 11 | ||
13 | 12 | ||
@@ -22,22 +21,21 @@ pub enum FileEventKind { | |||
22 | Add(String), | 21 | Add(String), |
23 | } | 22 | } |
24 | 23 | ||
25 | pub fn roots_loader() -> ((Sender<PathBuf>, Receiver<(PathBuf, Vec<FileEvent>)>), ThreadWatcher) { | 24 | pub fn roots_loader() -> (Worker<PathBuf, (PathBuf, Vec<FileEvent>)>, ThreadWatcher) { |
26 | let (interface, input_receiver, output_sender) = | 25 | Worker::<PathBuf, (PathBuf, Vec<FileEvent>)>::spawn( |
27 | worker_chan::<PathBuf, (PathBuf, Vec<FileEvent>)>(128); | 26 | "roots loader", |
28 | let thread = ThreadWatcher::spawn("roots loader", move || { | 27 | 128, |input_receiver, output_sender| { |
29 | input_receiver | 28 | input_receiver |
30 | .into_iter() | 29 | .into_iter() |
31 | .map(|path| { | 30 | .map(|path| { |
32 | debug!("loading {} ...", path.as_path().display()); | 31 | debug!("loading {} ...", path.as_path().display()); |
33 | let events = load_root(path.as_path()); | 32 | let events = load_root(path.as_path()); |
34 | debug!("... loaded {}", path.as_path().display()); | 33 | debug!("... loaded {}", path.as_path().display()); |
35 | (path, events) | 34 | (path, events) |
36 | }) | 35 | }) |
37 | .for_each(|it| output_sender.send(it)) | 36 | .for_each(|it| output_sender.send(it)) |
38 | }); | 37 | } |
39 | 38 | ) | |
40 | (interface, thread) | ||
41 | } | 39 | } |
42 | 40 | ||
43 | fn load_root(path: &Path) -> Vec<FileEvent> { | 41 | fn load_root(path: &Path) -> Vec<FileEvent> { |
diff --git a/crates/server/tests/heavy_tests/support.rs b/crates/server/tests/heavy_tests/support.rs index 2710ab59b..355914033 100644 --- a/crates/server/tests/heavy_tests/support.rs +++ b/crates/server/tests/heavy_tests/support.rs | |||
@@ -1,6 +1,5 @@ | |||
1 | use std::{ | 1 | use std::{ |
2 | fs, | 2 | fs, |
3 | thread, | ||
4 | cell::{Cell, RefCell}, | 3 | cell::{Cell, RefCell}, |
5 | path::PathBuf, | 4 | path::PathBuf, |
6 | time::Duration, | 5 | time::Duration, |
@@ -8,7 +7,7 @@ use std::{ | |||
8 | }; | 7 | }; |
9 | 8 | ||
10 | use tempdir::TempDir; | 9 | use tempdir::TempDir; |
11 | use crossbeam_channel::{after, Sender, Receiver}; | 10 | use crossbeam_channel::{after, Receiver}; |
12 | use flexi_logger::Logger; | 11 | use flexi_logger::Logger; |
13 | use languageserver_types::{ | 12 | use languageserver_types::{ |
14 | Url, | 13 | Url, |
@@ -22,7 +21,7 @@ use serde::Serialize; | |||
22 | use serde_json::{Value, from_str, to_string_pretty}; | 21 | use serde_json::{Value, from_str, to_string_pretty}; |
23 | use gen_lsp_server::{RawMessage, RawRequest, RawNotification}; | 22 | use gen_lsp_server::{RawMessage, RawRequest, RawNotification}; |
24 | 23 | ||
25 | use m::{Result, main_loop, req, thread_watcher::worker_chan}; | 24 | use m::{main_loop, req, thread_watcher::{ThreadWatcher, Worker}}; |
26 | 25 | ||
27 | pub fn project(fixture: &str) -> Server { | 26 | pub fn project(fixture: &str) -> Server { |
28 | static INIT: Once = Once::new(); | 27 | static INIT: Once = Once::new(); |
@@ -61,28 +60,27 @@ pub struct Server { | |||
61 | req_id: Cell<u64>, | 60 | req_id: Cell<u64>, |
62 | messages: RefCell<Vec<RawMessage>>, | 61 | messages: RefCell<Vec<RawMessage>>, |
63 | dir: TempDir, | 62 | dir: TempDir, |
64 | sender: Option<Sender<RawMessage>>, | 63 | worker: Option<Worker<RawMessage, RawMessage>>, |
65 | receiver: Receiver<RawMessage>, | 64 | watcher: Option<ThreadWatcher>, |
66 | server: Option<thread::JoinHandle<Result<()>>>, | ||
67 | } | 65 | } |
68 | 66 | ||
69 | impl Server { | 67 | impl Server { |
70 | fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server { | 68 | fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server { |
71 | let path = dir.path().to_path_buf(); | 69 | let path = dir.path().to_path_buf(); |
72 | let ((msg_sender, msg_receiver), server) = { | 70 | let (worker, watcher) = Worker::<RawMessage, RawMessage>::spawn( |
73 | let (api, mut msg_receiver, mut msg_sender) = worker_chan::<RawMessage, RawMessage>(128); | 71 | "test server", |
74 | let server = thread::spawn(move || { | 72 | 128, |
73 | move |mut msg_receiver, mut msg_sender| { | ||
75 | main_loop(true, path, &mut msg_receiver, &mut msg_sender) | 74 | main_loop(true, path, &mut msg_receiver, &mut msg_sender) |
76 | }); | 75 | .unwrap() |
77 | (api, server) | 76 | } |
78 | }; | 77 | ); |
79 | let res = Server { | 78 | let res = Server { |
80 | req_id: Cell::new(1), | 79 | req_id: Cell::new(1), |
81 | dir, | 80 | dir, |
82 | messages: Default::default(), | 81 | messages: Default::default(), |
83 | sender: Some(msg_sender), | 82 | worker: Some(worker), |
84 | receiver: msg_receiver, | 83 | watcher: Some(watcher), |
85 | server: Some(server), | ||
86 | }; | 84 | }; |
87 | 85 | ||
88 | for (path, text) in files { | 86 | for (path, text) in files { |
@@ -140,7 +138,7 @@ impl Server { | |||
140 | fn send_request_(&self, r: RawRequest) -> Value | 138 | fn send_request_(&self, r: RawRequest) -> Value |
141 | { | 139 | { |
142 | let id = r.id; | 140 | let id = r.id; |
143 | self.sender.as_ref() | 141 | self.worker.as_ref() |
144 | .unwrap() | 142 | .unwrap() |
145 | .send(RawMessage::Request(r)); | 143 | .send(RawMessage::Request(r)); |
146 | while let Some(msg) = self.recv() { | 144 | while let Some(msg) = self.recv() { |
@@ -183,14 +181,14 @@ impl Server { | |||
183 | } | 181 | } |
184 | } | 182 | } |
185 | fn recv(&self) -> Option<RawMessage> { | 183 | fn recv(&self) -> Option<RawMessage> { |
186 | recv_timeout(&self.receiver) | 184 | recv_timeout(&self.worker.as_ref().unwrap().out) |
187 | .map(|msg| { | 185 | .map(|msg| { |
188 | self.messages.borrow_mut().push(msg.clone()); | 186 | self.messages.borrow_mut().push(msg.clone()); |
189 | msg | 187 | msg |
190 | }) | 188 | }) |
191 | } | 189 | } |
192 | fn send_notification(&self, not: RawNotification) { | 190 | fn send_notification(&self, not: RawNotification) { |
193 | self.sender.as_ref() | 191 | self.worker.as_ref() |
194 | .unwrap() | 192 | .unwrap() |
195 | .send(RawMessage::Notification(not)); | 193 | .send(RawMessage::Notification(not)); |
196 | } | 194 | } |
@@ -198,16 +196,15 @@ impl Server { | |||
198 | 196 | ||
199 | impl Drop for Server { | 197 | impl Drop for Server { |
200 | fn drop(&mut self) { | 198 | fn drop(&mut self) { |
201 | { | 199 | self.send_request::<Shutdown>(666, ()); |
202 | self.send_request::<Shutdown>(666, ()); | 200 | let receiver = self.worker.take().unwrap().stop(); |
203 | drop(self.sender.take().unwrap()); | 201 | while let Some(msg) = recv_timeout(&receiver) { |
204 | while let Some(msg) = recv_timeout(&self.receiver) { | 202 | drop(msg); |
205 | drop(msg); | ||
206 | } | ||
207 | } | 203 | } |
208 | self.server.take() | 204 | self.watcher.take() |
209 | .unwrap() | 205 | .unwrap() |
210 | .join().unwrap().unwrap(); | 206 | .stop() |
207 | .unwrap(); | ||
211 | } | 208 | } |
212 | } | 209 | } |
213 | 210 | ||