aboutsummaryrefslogtreecommitdiff
path: root/crates
diff options
context:
space:
mode:
authorAleksey Kladov <[email protected]>2018-09-08 11:15:01 +0100
committerAleksey Kladov <[email protected]>2018-09-08 11:15:01 +0100
commit7daaddb2ac281dcad3ac99496b1cf3f06840887d (patch)
tree3a6c9ac35c0f198c782591b50eb50225769955bb /crates
parent326ffcefe09906560a03d3184a2ce76841448702 (diff)
Some abstraction around workers
Diffstat (limited to 'crates')
-rw-r--r--crates/server/src/main_loop/mod.rs28
-rw-r--r--crates/server/src/project_model.rs24
-rw-r--r--crates/server/src/thread_watcher.rs31
-rw-r--r--crates/server/src/vfs.rs34
-rw-r--r--crates/server/tests/heavy_tests/support.rs49
5 files changed, 92 insertions, 74 deletions
diff --git a/crates/server/src/main_loop/mod.rs b/crates/server/src/main_loop/mod.rs
index b7f5efbb1..f1297ee48 100644
--- a/crates/server/src/main_loop/mod.rs
+++ b/crates/server/src/main_loop/mod.rs
@@ -23,6 +23,7 @@ use {
23 server_world::{ServerWorldState, ServerWorld}, 23 server_world::{ServerWorldState, ServerWorld},
24 main_loop::subscriptions::{Subscriptions}, 24 main_loop::subscriptions::{Subscriptions},
25 project_model::{CargoWorkspace, workspace_loader}, 25 project_model::{CargoWorkspace, workspace_loader},
26 thread_watcher::Worker,
26}; 27};
27 28
28#[derive(Debug)] 29#[derive(Debug)]
@@ -43,8 +44,8 @@ pub fn main_loop(
43 .build() 44 .build()
44 .unwrap(); 45 .unwrap();
45 let (task_sender, task_receiver) = unbounded::<Task>(); 46 let (task_sender, task_receiver) = unbounded::<Task>();
46 let ((fs_sender, fs_receiver), fs_watcher) = vfs::roots_loader(); 47 let (fs_worker, fs_watcher) = vfs::roots_loader();
47 let ((ws_sender, ws_receiver), ws_watcher) = workspace_loader(); 48 let (ws_worker, ws_watcher) = workspace_loader();
48 49
49 info!("server initialized, serving requests"); 50 info!("server initialized, serving requests");
50 let mut state = ServerWorldState::new(); 51 let mut state = ServerWorldState::new();
@@ -59,10 +60,8 @@ pub fn main_loop(
59 msg_receriver, 60 msg_receriver,
60 task_sender, 61 task_sender,
61 task_receiver.clone(), 62 task_receiver.clone(),
62 fs_sender, 63 fs_worker,
63 fs_receiver, 64 ws_worker,
64 ws_sender,
65 ws_receiver,
66 &mut state, 65 &mut state,
67 &mut pending_requests, 66 &mut pending_requests,
68 &mut subs, 67 &mut subs,
@@ -93,17 +92,15 @@ fn main_loop_inner(
93 msg_receiver: &mut Receiver<RawMessage>, 92 msg_receiver: &mut Receiver<RawMessage>,
94 task_sender: Sender<Task>, 93 task_sender: Sender<Task>,
95 task_receiver: Receiver<Task>, 94 task_receiver: Receiver<Task>,
96 fs_sender: Sender<PathBuf>, 95 fs_worker: Worker<PathBuf, (PathBuf, Vec<FileEvent>)>,
97 fs_receiver: Receiver<(PathBuf, Vec<FileEvent>)>, 96 ws_worker: Worker<PathBuf, Result<CargoWorkspace>>,
98 ws_sender: Sender<PathBuf>,
99 ws_receiver: Receiver<Result<CargoWorkspace>>,
100 state: &mut ServerWorldState, 97 state: &mut ServerWorldState,
101 pending_requests: &mut HashMap<u64, JobHandle>, 98 pending_requests: &mut HashMap<u64, JobHandle>,
102 subs: &mut Subscriptions, 99 subs: &mut Subscriptions,
103) -> Result<()> { 100) -> Result<()> {
104 let (libdata_sender, libdata_receiver) = unbounded(); 101 let (libdata_sender, libdata_receiver) = unbounded();
105 ws_sender.send(ws_root.clone()); 102 ws_worker.send(ws_root.clone());
106 fs_sender.send(ws_root.clone()); 103 fs_worker.send(ws_root.clone());
107 loop { 104 loop {
108 #[derive(Debug)] 105 #[derive(Debug)]
109 enum Event { 106 enum Event {
@@ -120,11 +117,11 @@ fn main_loop_inner(
120 None => bail!("client exited without shutdown"), 117 None => bail!("client exited without shutdown"),
121 }, 118 },
122 recv(task_receiver, task) => Event::Task(task.unwrap()), 119 recv(task_receiver, task) => Event::Task(task.unwrap()),
123 recv(fs_receiver, events) => match events { 120 recv(fs_worker.out, events) => match events {
124 None => bail!("roots watcher died"), 121 None => bail!("roots watcher died"),
125 Some((pb, events)) => Event::Fs(pb, events), 122 Some((pb, events)) => Event::Fs(pb, events),
126 } 123 }
127 recv(ws_receiver, ws) => match ws { 124 recv(ws_worker.out, ws) => match ws {
128 None => bail!("workspace watcher died"), 125 None => bail!("workspace watcher died"),
129 Some(ws) => Event::Ws(ws), 126 Some(ws) => Event::Ws(ws),
130 } 127 }
@@ -158,8 +155,7 @@ fn main_loop_inner(
158 for ws in workspaces.iter() { 155 for ws in workspaces.iter() {
159 for pkg in ws.packages().filter(|pkg| !pkg.is_member(ws)) { 156 for pkg in ws.packages().filter(|pkg| !pkg.is_member(ws)) {
160 debug!("sending root, {}", pkg.root(ws).to_path_buf().display()); 157 debug!("sending root, {}", pkg.root(ws).to_path_buf().display());
161 // deadlocky :-( 158 fs_worker.send(pkg.root(ws).to_path_buf());
162 fs_sender.send(pkg.root(ws).to_path_buf());
163 } 159 }
164 } 160 }
165 state.set_workspaces(workspaces); 161 state.set_workspaces(workspaces);
diff --git a/crates/server/src/project_model.rs b/crates/server/src/project_model.rs
index b9d6872c8..359cf787d 100644
--- a/crates/server/src/project_model.rs
+++ b/crates/server/src/project_model.rs
@@ -3,12 +3,11 @@ use std::{
3 path::{Path, PathBuf}, 3 path::{Path, PathBuf},
4}; 4};
5use cargo_metadata::{metadata_run, CargoOpt}; 5use cargo_metadata::{metadata_run, CargoOpt};
6use crossbeam_channel::{Sender, Receiver};
7use libsyntax2::SmolStr; 6use libsyntax2::SmolStr;
8 7
9use { 8use {
10 Result, 9 Result,
11 thread_watcher::{ThreadWatcher, worker_chan}, 10 thread_watcher::{Worker, ThreadWatcher},
12}; 11};
13 12
14#[derive(Debug, Clone)] 13#[derive(Debug, Clone)]
@@ -162,14 +161,15 @@ impl TargetKind {
162 } 161 }
163} 162}
164 163
165pub fn workspace_loader() -> ((Sender<PathBuf>, Receiver<Result<CargoWorkspace>>), ThreadWatcher) { 164pub fn workspace_loader() -> (Worker<PathBuf, Result<CargoWorkspace>>, ThreadWatcher) {
166 let (interface, input_receiver, output_sender) = worker_chan::<PathBuf, Result<CargoWorkspace>>(1); 165 Worker::<PathBuf, Result<CargoWorkspace>>::spawn(
167 let thread = ThreadWatcher::spawn("workspace loader", move || { 166 "workspace loader",
168 input_receiver 167 1,
169 .into_iter() 168 |input_receiver, output_sender| {
170 .map(|path| CargoWorkspace::from_cargo_metadata(path.as_path())) 169 input_receiver
171 .for_each(|it| output_sender.send(it)) 170 .into_iter()
172 }); 171 .map(|path| CargoWorkspace::from_cargo_metadata(path.as_path()))
173 172 .for_each(|it| output_sender.send(it))
174 (interface, thread) 173 }
174 )
175} 175}
diff --git a/crates/server/src/thread_watcher.rs b/crates/server/src/thread_watcher.rs
index 74a0a58b7..86a3a91e0 100644
--- a/crates/server/src/thread_watcher.rs
+++ b/crates/server/src/thread_watcher.rs
@@ -3,6 +3,33 @@ use crossbeam_channel::{bounded, unbounded, Sender, Receiver};
3use drop_bomb::DropBomb; 3use drop_bomb::DropBomb;
4use Result; 4use Result;
5 5
6pub struct Worker<I, O> {
7 pub inp: Sender<I>,
8 pub out: Receiver<O>,
9}
10
11impl<I, O> Worker<I, O> {
12 pub fn spawn<F>(name: &'static str, buf: usize, f: F) -> (Self, ThreadWatcher)
13 where
14 F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static,
15 I: Send + 'static,
16 O: Send + 'static,
17 {
18 let ((inp, out), inp_r, out_s) = worker_chan(buf);
19 let worker = Worker { inp, out };
20 let watcher = ThreadWatcher::spawn(name, move || f(inp_r, out_s));
21 (worker, watcher)
22 }
23
24 pub fn stop(self) -> Receiver<O> {
25 self.out
26 }
27
28 pub fn send(&self, item: I) {
29 self.inp.send(item)
30 }
31}
32
6pub struct ThreadWatcher { 33pub struct ThreadWatcher {
7 name: &'static str, 34 name: &'static str,
8 thread: thread::JoinHandle<()>, 35 thread: thread::JoinHandle<()>,
@@ -10,7 +37,7 @@ pub struct ThreadWatcher {
10} 37}
11 38
12impl ThreadWatcher { 39impl ThreadWatcher {
13 pub fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher { 40 fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher {
14 let thread = thread::spawn(f); 41 let thread = thread::spawn(f);
15 ThreadWatcher { 42 ThreadWatcher {
16 name, 43 name,
@@ -36,7 +63,7 @@ impl ThreadWatcher {
36/// Sets up worker channels in a deadlock-avoind way. 63/// Sets up worker channels in a deadlock-avoind way.
37/// If one sets both input and output buffers to a fixed size, 64/// If one sets both input and output buffers to a fixed size,
38/// a worker might get stuck. 65/// a worker might get stuck.
39pub fn worker_chan<I, O>(buf: usize) -> ((Sender<I>, Receiver<O>), Receiver<I>, Sender<O>) { 66fn worker_chan<I, O>(buf: usize) -> ((Sender<I>, Receiver<O>), Receiver<I>, Sender<O>) {
40 let (input_sender, input_receiver) = bounded::<I>(buf); 67 let (input_sender, input_receiver) = bounded::<I>(buf);
41 let (output_sender, output_receiver) = unbounded::<O>(); 68 let (output_sender, output_receiver) = unbounded::<O>();
42 ((input_sender, output_receiver), input_receiver, output_sender) 69 ((input_sender, output_receiver), input_receiver, output_sender)
diff --git a/crates/server/src/vfs.rs b/crates/server/src/vfs.rs
index c228f0b0a..a1c1783f2 100644
--- a/crates/server/src/vfs.rs
+++ b/crates/server/src/vfs.rs
@@ -3,11 +3,10 @@ use std::{
3 fs, 3 fs,
4}; 4};
5 5
6use crossbeam_channel::{Sender, Receiver};
7use walkdir::WalkDir; 6use walkdir::WalkDir;
8 7
9use { 8use {
10 thread_watcher::{ThreadWatcher, worker_chan}, 9 thread_watcher::{Worker, ThreadWatcher},
11}; 10};
12 11
13 12
@@ -22,22 +21,21 @@ pub enum FileEventKind {
22 Add(String), 21 Add(String),
23} 22}
24 23
25pub fn roots_loader() -> ((Sender<PathBuf>, Receiver<(PathBuf, Vec<FileEvent>)>), ThreadWatcher) { 24pub fn roots_loader() -> (Worker<PathBuf, (PathBuf, Vec<FileEvent>)>, ThreadWatcher) {
26 let (interface, input_receiver, output_sender) = 25 Worker::<PathBuf, (PathBuf, Vec<FileEvent>)>::spawn(
27 worker_chan::<PathBuf, (PathBuf, Vec<FileEvent>)>(128); 26 "roots loader",
28 let thread = ThreadWatcher::spawn("roots loader", move || { 27 128, |input_receiver, output_sender| {
29 input_receiver 28 input_receiver
30 .into_iter() 29 .into_iter()
31 .map(|path| { 30 .map(|path| {
32 debug!("loading {} ...", path.as_path().display()); 31 debug!("loading {} ...", path.as_path().display());
33 let events = load_root(path.as_path()); 32 let events = load_root(path.as_path());
34 debug!("... loaded {}", path.as_path().display()); 33 debug!("... loaded {}", path.as_path().display());
35 (path, events) 34 (path, events)
36 }) 35 })
37 .for_each(|it| output_sender.send(it)) 36 .for_each(|it| output_sender.send(it))
38 }); 37 }
39 38 )
40 (interface, thread)
41} 39}
42 40
43fn load_root(path: &Path) -> Vec<FileEvent> { 41fn load_root(path: &Path) -> Vec<FileEvent> {
diff --git a/crates/server/tests/heavy_tests/support.rs b/crates/server/tests/heavy_tests/support.rs
index 2710ab59b..355914033 100644
--- a/crates/server/tests/heavy_tests/support.rs
+++ b/crates/server/tests/heavy_tests/support.rs
@@ -1,6 +1,5 @@
1use std::{ 1use std::{
2 fs, 2 fs,
3 thread,
4 cell::{Cell, RefCell}, 3 cell::{Cell, RefCell},
5 path::PathBuf, 4 path::PathBuf,
6 time::Duration, 5 time::Duration,
@@ -8,7 +7,7 @@ use std::{
8}; 7};
9 8
10use tempdir::TempDir; 9use tempdir::TempDir;
11use crossbeam_channel::{after, Sender, Receiver}; 10use crossbeam_channel::{after, Receiver};
12use flexi_logger::Logger; 11use flexi_logger::Logger;
13use languageserver_types::{ 12use languageserver_types::{
14 Url, 13 Url,
@@ -22,7 +21,7 @@ use serde::Serialize;
22use serde_json::{Value, from_str, to_string_pretty}; 21use serde_json::{Value, from_str, to_string_pretty};
23use gen_lsp_server::{RawMessage, RawRequest, RawNotification}; 22use gen_lsp_server::{RawMessage, RawRequest, RawNotification};
24 23
25use m::{Result, main_loop, req, thread_watcher::worker_chan}; 24use m::{main_loop, req, thread_watcher::{ThreadWatcher, Worker}};
26 25
27pub fn project(fixture: &str) -> Server { 26pub fn project(fixture: &str) -> Server {
28 static INIT: Once = Once::new(); 27 static INIT: Once = Once::new();
@@ -61,28 +60,27 @@ pub struct Server {
61 req_id: Cell<u64>, 60 req_id: Cell<u64>,
62 messages: RefCell<Vec<RawMessage>>, 61 messages: RefCell<Vec<RawMessage>>,
63 dir: TempDir, 62 dir: TempDir,
64 sender: Option<Sender<RawMessage>>, 63 worker: Option<Worker<RawMessage, RawMessage>>,
65 receiver: Receiver<RawMessage>, 64 watcher: Option<ThreadWatcher>,
66 server: Option<thread::JoinHandle<Result<()>>>,
67} 65}
68 66
69impl Server { 67impl Server {
70 fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server { 68 fn new(dir: TempDir, files: Vec<(PathBuf, String)>) -> Server {
71 let path = dir.path().to_path_buf(); 69 let path = dir.path().to_path_buf();
72 let ((msg_sender, msg_receiver), server) = { 70 let (worker, watcher) = Worker::<RawMessage, RawMessage>::spawn(
73 let (api, mut msg_receiver, mut msg_sender) = worker_chan::<RawMessage, RawMessage>(128); 71 "test server",
74 let server = thread::spawn(move || { 72 128,
73 move |mut msg_receiver, mut msg_sender| {
75 main_loop(true, path, &mut msg_receiver, &mut msg_sender) 74 main_loop(true, path, &mut msg_receiver, &mut msg_sender)
76 }); 75 .unwrap()
77 (api, server) 76 }
78 }; 77 );
79 let res = Server { 78 let res = Server {
80 req_id: Cell::new(1), 79 req_id: Cell::new(1),
81 dir, 80 dir,
82 messages: Default::default(), 81 messages: Default::default(),
83 sender: Some(msg_sender), 82 worker: Some(worker),
84 receiver: msg_receiver, 83 watcher: Some(watcher),
85 server: Some(server),
86 }; 84 };
87 85
88 for (path, text) in files { 86 for (path, text) in files {
@@ -140,7 +138,7 @@ impl Server {
140 fn send_request_(&self, r: RawRequest) -> Value 138 fn send_request_(&self, r: RawRequest) -> Value
141 { 139 {
142 let id = r.id; 140 let id = r.id;
143 self.sender.as_ref() 141 self.worker.as_ref()
144 .unwrap() 142 .unwrap()
145 .send(RawMessage::Request(r)); 143 .send(RawMessage::Request(r));
146 while let Some(msg) = self.recv() { 144 while let Some(msg) = self.recv() {
@@ -183,14 +181,14 @@ impl Server {
183 } 181 }
184 } 182 }
185 fn recv(&self) -> Option<RawMessage> { 183 fn recv(&self) -> Option<RawMessage> {
186 recv_timeout(&self.receiver) 184 recv_timeout(&self.worker.as_ref().unwrap().out)
187 .map(|msg| { 185 .map(|msg| {
188 self.messages.borrow_mut().push(msg.clone()); 186 self.messages.borrow_mut().push(msg.clone());
189 msg 187 msg
190 }) 188 })
191 } 189 }
192 fn send_notification(&self, not: RawNotification) { 190 fn send_notification(&self, not: RawNotification) {
193 self.sender.as_ref() 191 self.worker.as_ref()
194 .unwrap() 192 .unwrap()
195 .send(RawMessage::Notification(not)); 193 .send(RawMessage::Notification(not));
196 } 194 }
@@ -198,16 +196,15 @@ impl Server {
198 196
199impl Drop for Server { 197impl Drop for Server {
200 fn drop(&mut self) { 198 fn drop(&mut self) {
201 { 199 self.send_request::<Shutdown>(666, ());
202 self.send_request::<Shutdown>(666, ()); 200 let receiver = self.worker.take().unwrap().stop();
203 drop(self.sender.take().unwrap()); 201 while let Some(msg) = recv_timeout(&receiver) {
204 while let Some(msg) = recv_timeout(&self.receiver) { 202 drop(msg);
205 drop(msg);
206 }
207 } 203 }
208 self.server.take() 204 self.watcher.take()
209 .unwrap() 205 .unwrap()
210 .join().unwrap().unwrap(); 206 .stop()
207 .unwrap();
211 } 208 }
212} 209}
213 210