aboutsummaryrefslogtreecommitdiff
path: root/crates/server/src
diff options
context:
space:
mode:
authorAleksey Kladov <[email protected]>2018-09-08 11:15:01 +0100
committerAleksey Kladov <[email protected]>2018-09-08 11:15:01 +0100
commit7daaddb2ac281dcad3ac99496b1cf3f06840887d (patch)
tree3a6c9ac35c0f198c782591b50eb50225769955bb /crates/server/src
parent326ffcefe09906560a03d3184a2ce76841448702 (diff)
Some abstraction around workers
Diffstat (limited to 'crates/server/src')
-rw-r--r--crates/server/src/main_loop/mod.rs28
-rw-r--r--crates/server/src/project_model.rs24
-rw-r--r--crates/server/src/thread_watcher.rs31
-rw-r--r--crates/server/src/vfs.rs34
4 files changed, 69 insertions, 48 deletions
diff --git a/crates/server/src/main_loop/mod.rs b/crates/server/src/main_loop/mod.rs
index b7f5efbb1..f1297ee48 100644
--- a/crates/server/src/main_loop/mod.rs
+++ b/crates/server/src/main_loop/mod.rs
@@ -23,6 +23,7 @@ use {
23 server_world::{ServerWorldState, ServerWorld}, 23 server_world::{ServerWorldState, ServerWorld},
24 main_loop::subscriptions::{Subscriptions}, 24 main_loop::subscriptions::{Subscriptions},
25 project_model::{CargoWorkspace, workspace_loader}, 25 project_model::{CargoWorkspace, workspace_loader},
26 thread_watcher::Worker,
26}; 27};
27 28
28#[derive(Debug)] 29#[derive(Debug)]
@@ -43,8 +44,8 @@ pub fn main_loop(
43 .build() 44 .build()
44 .unwrap(); 45 .unwrap();
45 let (task_sender, task_receiver) = unbounded::<Task>(); 46 let (task_sender, task_receiver) = unbounded::<Task>();
46 let ((fs_sender, fs_receiver), fs_watcher) = vfs::roots_loader(); 47 let (fs_worker, fs_watcher) = vfs::roots_loader();
47 let ((ws_sender, ws_receiver), ws_watcher) = workspace_loader(); 48 let (ws_worker, ws_watcher) = workspace_loader();
48 49
49 info!("server initialized, serving requests"); 50 info!("server initialized, serving requests");
50 let mut state = ServerWorldState::new(); 51 let mut state = ServerWorldState::new();
@@ -59,10 +60,8 @@ pub fn main_loop(
59 msg_receriver, 60 msg_receriver,
60 task_sender, 61 task_sender,
61 task_receiver.clone(), 62 task_receiver.clone(),
62 fs_sender, 63 fs_worker,
63 fs_receiver, 64 ws_worker,
64 ws_sender,
65 ws_receiver,
66 &mut state, 65 &mut state,
67 &mut pending_requests, 66 &mut pending_requests,
68 &mut subs, 67 &mut subs,
@@ -93,17 +92,15 @@ fn main_loop_inner(
93 msg_receiver: &mut Receiver<RawMessage>, 92 msg_receiver: &mut Receiver<RawMessage>,
94 task_sender: Sender<Task>, 93 task_sender: Sender<Task>,
95 task_receiver: Receiver<Task>, 94 task_receiver: Receiver<Task>,
96 fs_sender: Sender<PathBuf>, 95 fs_worker: Worker<PathBuf, (PathBuf, Vec<FileEvent>)>,
97 fs_receiver: Receiver<(PathBuf, Vec<FileEvent>)>, 96 ws_worker: Worker<PathBuf, Result<CargoWorkspace>>,
98 ws_sender: Sender<PathBuf>,
99 ws_receiver: Receiver<Result<CargoWorkspace>>,
100 state: &mut ServerWorldState, 97 state: &mut ServerWorldState,
101 pending_requests: &mut HashMap<u64, JobHandle>, 98 pending_requests: &mut HashMap<u64, JobHandle>,
102 subs: &mut Subscriptions, 99 subs: &mut Subscriptions,
103) -> Result<()> { 100) -> Result<()> {
104 let (libdata_sender, libdata_receiver) = unbounded(); 101 let (libdata_sender, libdata_receiver) = unbounded();
105 ws_sender.send(ws_root.clone()); 102 ws_worker.send(ws_root.clone());
106 fs_sender.send(ws_root.clone()); 103 fs_worker.send(ws_root.clone());
107 loop { 104 loop {
108 #[derive(Debug)] 105 #[derive(Debug)]
109 enum Event { 106 enum Event {
@@ -120,11 +117,11 @@ fn main_loop_inner(
120 None => bail!("client exited without shutdown"), 117 None => bail!("client exited without shutdown"),
121 }, 118 },
122 recv(task_receiver, task) => Event::Task(task.unwrap()), 119 recv(task_receiver, task) => Event::Task(task.unwrap()),
123 recv(fs_receiver, events) => match events { 120 recv(fs_worker.out, events) => match events {
124 None => bail!("roots watcher died"), 121 None => bail!("roots watcher died"),
125 Some((pb, events)) => Event::Fs(pb, events), 122 Some((pb, events)) => Event::Fs(pb, events),
126 } 123 }
127 recv(ws_receiver, ws) => match ws { 124 recv(ws_worker.out, ws) => match ws {
128 None => bail!("workspace watcher died"), 125 None => bail!("workspace watcher died"),
129 Some(ws) => Event::Ws(ws), 126 Some(ws) => Event::Ws(ws),
130 } 127 }
@@ -158,8 +155,7 @@ fn main_loop_inner(
158 for ws in workspaces.iter() { 155 for ws in workspaces.iter() {
159 for pkg in ws.packages().filter(|pkg| !pkg.is_member(ws)) { 156 for pkg in ws.packages().filter(|pkg| !pkg.is_member(ws)) {
160 debug!("sending root, {}", pkg.root(ws).to_path_buf().display()); 157 debug!("sending root, {}", pkg.root(ws).to_path_buf().display());
161 // deadlocky :-( 158 fs_worker.send(pkg.root(ws).to_path_buf());
162 fs_sender.send(pkg.root(ws).to_path_buf());
163 } 159 }
164 } 160 }
165 state.set_workspaces(workspaces); 161 state.set_workspaces(workspaces);
diff --git a/crates/server/src/project_model.rs b/crates/server/src/project_model.rs
index b9d6872c8..359cf787d 100644
--- a/crates/server/src/project_model.rs
+++ b/crates/server/src/project_model.rs
@@ -3,12 +3,11 @@ use std::{
3 path::{Path, PathBuf}, 3 path::{Path, PathBuf},
4}; 4};
5use cargo_metadata::{metadata_run, CargoOpt}; 5use cargo_metadata::{metadata_run, CargoOpt};
6use crossbeam_channel::{Sender, Receiver};
7use libsyntax2::SmolStr; 6use libsyntax2::SmolStr;
8 7
9use { 8use {
10 Result, 9 Result,
11 thread_watcher::{ThreadWatcher, worker_chan}, 10 thread_watcher::{Worker, ThreadWatcher},
12}; 11};
13 12
14#[derive(Debug, Clone)] 13#[derive(Debug, Clone)]
@@ -162,14 +161,15 @@ impl TargetKind {
162 } 161 }
163} 162}
164 163
165pub fn workspace_loader() -> ((Sender<PathBuf>, Receiver<Result<CargoWorkspace>>), ThreadWatcher) { 164pub fn workspace_loader() -> (Worker<PathBuf, Result<CargoWorkspace>>, ThreadWatcher) {
166 let (interface, input_receiver, output_sender) = worker_chan::<PathBuf, Result<CargoWorkspace>>(1); 165 Worker::<PathBuf, Result<CargoWorkspace>>::spawn(
167 let thread = ThreadWatcher::spawn("workspace loader", move || { 166 "workspace loader",
168 input_receiver 167 1,
169 .into_iter() 168 |input_receiver, output_sender| {
170 .map(|path| CargoWorkspace::from_cargo_metadata(path.as_path())) 169 input_receiver
171 .for_each(|it| output_sender.send(it)) 170 .into_iter()
172 }); 171 .map(|path| CargoWorkspace::from_cargo_metadata(path.as_path()))
173 172 .for_each(|it| output_sender.send(it))
174 (interface, thread) 173 }
174 )
175} 175}
diff --git a/crates/server/src/thread_watcher.rs b/crates/server/src/thread_watcher.rs
index 74a0a58b7..86a3a91e0 100644
--- a/crates/server/src/thread_watcher.rs
+++ b/crates/server/src/thread_watcher.rs
@@ -3,6 +3,33 @@ use crossbeam_channel::{bounded, unbounded, Sender, Receiver};
3use drop_bomb::DropBomb; 3use drop_bomb::DropBomb;
4use Result; 4use Result;
5 5
6pub struct Worker<I, O> {
7 pub inp: Sender<I>,
8 pub out: Receiver<O>,
9}
10
11impl<I, O> Worker<I, O> {
12 pub fn spawn<F>(name: &'static str, buf: usize, f: F) -> (Self, ThreadWatcher)
13 where
14 F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static,
15 I: Send + 'static,
16 O: Send + 'static,
17 {
18 let ((inp, out), inp_r, out_s) = worker_chan(buf);
19 let worker = Worker { inp, out };
20 let watcher = ThreadWatcher::spawn(name, move || f(inp_r, out_s));
21 (worker, watcher)
22 }
23
24 pub fn stop(self) -> Receiver<O> {
25 self.out
26 }
27
28 pub fn send(&self, item: I) {
29 self.inp.send(item)
30 }
31}
32
6pub struct ThreadWatcher { 33pub struct ThreadWatcher {
7 name: &'static str, 34 name: &'static str,
8 thread: thread::JoinHandle<()>, 35 thread: thread::JoinHandle<()>,
@@ -10,7 +37,7 @@ pub struct ThreadWatcher {
10} 37}
11 38
12impl ThreadWatcher { 39impl ThreadWatcher {
13 pub fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher { 40 fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher {
14 let thread = thread::spawn(f); 41 let thread = thread::spawn(f);
15 ThreadWatcher { 42 ThreadWatcher {
16 name, 43 name,
@@ -36,7 +63,7 @@ impl ThreadWatcher {
36/// Sets up worker channels in a deadlock-avoind way. 63/// Sets up worker channels in a deadlock-avoind way.
37/// If one sets both input and output buffers to a fixed size, 64/// If one sets both input and output buffers to a fixed size,
38/// a worker might get stuck. 65/// a worker might get stuck.
39pub fn worker_chan<I, O>(buf: usize) -> ((Sender<I>, Receiver<O>), Receiver<I>, Sender<O>) { 66fn worker_chan<I, O>(buf: usize) -> ((Sender<I>, Receiver<O>), Receiver<I>, Sender<O>) {
40 let (input_sender, input_receiver) = bounded::<I>(buf); 67 let (input_sender, input_receiver) = bounded::<I>(buf);
41 let (output_sender, output_receiver) = unbounded::<O>(); 68 let (output_sender, output_receiver) = unbounded::<O>();
42 ((input_sender, output_receiver), input_receiver, output_sender) 69 ((input_sender, output_receiver), input_receiver, output_sender)
diff --git a/crates/server/src/vfs.rs b/crates/server/src/vfs.rs
index c228f0b0a..a1c1783f2 100644
--- a/crates/server/src/vfs.rs
+++ b/crates/server/src/vfs.rs
@@ -3,11 +3,10 @@ use std::{
3 fs, 3 fs,
4}; 4};
5 5
6use crossbeam_channel::{Sender, Receiver};
7use walkdir::WalkDir; 6use walkdir::WalkDir;
8 7
9use { 8use {
10 thread_watcher::{ThreadWatcher, worker_chan}, 9 thread_watcher::{Worker, ThreadWatcher},
11}; 10};
12 11
13 12
@@ -22,22 +21,21 @@ pub enum FileEventKind {
22 Add(String), 21 Add(String),
23} 22}
24 23
25pub fn roots_loader() -> ((Sender<PathBuf>, Receiver<(PathBuf, Vec<FileEvent>)>), ThreadWatcher) { 24pub fn roots_loader() -> (Worker<PathBuf, (PathBuf, Vec<FileEvent>)>, ThreadWatcher) {
26 let (interface, input_receiver, output_sender) = 25 Worker::<PathBuf, (PathBuf, Vec<FileEvent>)>::spawn(
27 worker_chan::<PathBuf, (PathBuf, Vec<FileEvent>)>(128); 26 "roots loader",
28 let thread = ThreadWatcher::spawn("roots loader", move || { 27 128, |input_receiver, output_sender| {
29 input_receiver 28 input_receiver
30 .into_iter() 29 .into_iter()
31 .map(|path| { 30 .map(|path| {
32 debug!("loading {} ...", path.as_path().display()); 31 debug!("loading {} ...", path.as_path().display());
33 let events = load_root(path.as_path()); 32 let events = load_root(path.as_path());
34 debug!("... loaded {}", path.as_path().display()); 33 debug!("... loaded {}", path.as_path().display());
35 (path, events) 34 (path, events)
36 }) 35 })
37 .for_each(|it| output_sender.send(it)) 36 .for_each(|it| output_sender.send(it))
38 }); 37 }
39 38 )
40 (interface, thread)
41} 39}
42 40
43fn load_root(path: &Path) -> Vec<FileEvent> { 41fn load_root(path: &Path) -> Vec<FileEvent> {