diff options
author | Aleksey Kladov <[email protected]> | 2018-09-08 11:15:01 +0100 |
---|---|---|
committer | Aleksey Kladov <[email protected]> | 2018-09-08 11:15:01 +0100 |
commit | 7daaddb2ac281dcad3ac99496b1cf3f06840887d (patch) | |
tree | 3a6c9ac35c0f198c782591b50eb50225769955bb /crates/server/src | |
parent | 326ffcefe09906560a03d3184a2ce76841448702 (diff) |
Some abstraction around workers
Diffstat (limited to 'crates/server/src')
-rw-r--r-- | crates/server/src/main_loop/mod.rs | 28 | ||||
-rw-r--r-- | crates/server/src/project_model.rs | 24 | ||||
-rw-r--r-- | crates/server/src/thread_watcher.rs | 31 | ||||
-rw-r--r-- | crates/server/src/vfs.rs | 34 |
4 files changed, 69 insertions, 48 deletions
diff --git a/crates/server/src/main_loop/mod.rs b/crates/server/src/main_loop/mod.rs index b7f5efbb1..f1297ee48 100644 --- a/crates/server/src/main_loop/mod.rs +++ b/crates/server/src/main_loop/mod.rs | |||
@@ -23,6 +23,7 @@ use { | |||
23 | server_world::{ServerWorldState, ServerWorld}, | 23 | server_world::{ServerWorldState, ServerWorld}, |
24 | main_loop::subscriptions::{Subscriptions}, | 24 | main_loop::subscriptions::{Subscriptions}, |
25 | project_model::{CargoWorkspace, workspace_loader}, | 25 | project_model::{CargoWorkspace, workspace_loader}, |
26 | thread_watcher::Worker, | ||
26 | }; | 27 | }; |
27 | 28 | ||
28 | #[derive(Debug)] | 29 | #[derive(Debug)] |
@@ -43,8 +44,8 @@ pub fn main_loop( | |||
43 | .build() | 44 | .build() |
44 | .unwrap(); | 45 | .unwrap(); |
45 | let (task_sender, task_receiver) = unbounded::<Task>(); | 46 | let (task_sender, task_receiver) = unbounded::<Task>(); |
46 | let ((fs_sender, fs_receiver), fs_watcher) = vfs::roots_loader(); | 47 | let (fs_worker, fs_watcher) = vfs::roots_loader(); |
47 | let ((ws_sender, ws_receiver), ws_watcher) = workspace_loader(); | 48 | let (ws_worker, ws_watcher) = workspace_loader(); |
48 | 49 | ||
49 | info!("server initialized, serving requests"); | 50 | info!("server initialized, serving requests"); |
50 | let mut state = ServerWorldState::new(); | 51 | let mut state = ServerWorldState::new(); |
@@ -59,10 +60,8 @@ pub fn main_loop( | |||
59 | msg_receriver, | 60 | msg_receriver, |
60 | task_sender, | 61 | task_sender, |
61 | task_receiver.clone(), | 62 | task_receiver.clone(), |
62 | fs_sender, | 63 | fs_worker, |
63 | fs_receiver, | 64 | ws_worker, |
64 | ws_sender, | ||
65 | ws_receiver, | ||
66 | &mut state, | 65 | &mut state, |
67 | &mut pending_requests, | 66 | &mut pending_requests, |
68 | &mut subs, | 67 | &mut subs, |
@@ -93,17 +92,15 @@ fn main_loop_inner( | |||
93 | msg_receiver: &mut Receiver<RawMessage>, | 92 | msg_receiver: &mut Receiver<RawMessage>, |
94 | task_sender: Sender<Task>, | 93 | task_sender: Sender<Task>, |
95 | task_receiver: Receiver<Task>, | 94 | task_receiver: Receiver<Task>, |
96 | fs_sender: Sender<PathBuf>, | 95 | fs_worker: Worker<PathBuf, (PathBuf, Vec<FileEvent>)>, |
97 | fs_receiver: Receiver<(PathBuf, Vec<FileEvent>)>, | 96 | ws_worker: Worker<PathBuf, Result<CargoWorkspace>>, |
98 | ws_sender: Sender<PathBuf>, | ||
99 | ws_receiver: Receiver<Result<CargoWorkspace>>, | ||
100 | state: &mut ServerWorldState, | 97 | state: &mut ServerWorldState, |
101 | pending_requests: &mut HashMap<u64, JobHandle>, | 98 | pending_requests: &mut HashMap<u64, JobHandle>, |
102 | subs: &mut Subscriptions, | 99 | subs: &mut Subscriptions, |
103 | ) -> Result<()> { | 100 | ) -> Result<()> { |
104 | let (libdata_sender, libdata_receiver) = unbounded(); | 101 | let (libdata_sender, libdata_receiver) = unbounded(); |
105 | ws_sender.send(ws_root.clone()); | 102 | ws_worker.send(ws_root.clone()); |
106 | fs_sender.send(ws_root.clone()); | 103 | fs_worker.send(ws_root.clone()); |
107 | loop { | 104 | loop { |
108 | #[derive(Debug)] | 105 | #[derive(Debug)] |
109 | enum Event { | 106 | enum Event { |
@@ -120,11 +117,11 @@ fn main_loop_inner( | |||
120 | None => bail!("client exited without shutdown"), | 117 | None => bail!("client exited without shutdown"), |
121 | }, | 118 | }, |
122 | recv(task_receiver, task) => Event::Task(task.unwrap()), | 119 | recv(task_receiver, task) => Event::Task(task.unwrap()), |
123 | recv(fs_receiver, events) => match events { | 120 | recv(fs_worker.out, events) => match events { |
124 | None => bail!("roots watcher died"), | 121 | None => bail!("roots watcher died"), |
125 | Some((pb, events)) => Event::Fs(pb, events), | 122 | Some((pb, events)) => Event::Fs(pb, events), |
126 | } | 123 | } |
127 | recv(ws_receiver, ws) => match ws { | 124 | recv(ws_worker.out, ws) => match ws { |
128 | None => bail!("workspace watcher died"), | 125 | None => bail!("workspace watcher died"), |
129 | Some(ws) => Event::Ws(ws), | 126 | Some(ws) => Event::Ws(ws), |
130 | } | 127 | } |
@@ -158,8 +155,7 @@ fn main_loop_inner( | |||
158 | for ws in workspaces.iter() { | 155 | for ws in workspaces.iter() { |
159 | for pkg in ws.packages().filter(|pkg| !pkg.is_member(ws)) { | 156 | for pkg in ws.packages().filter(|pkg| !pkg.is_member(ws)) { |
160 | debug!("sending root, {}", pkg.root(ws).to_path_buf().display()); | 157 | debug!("sending root, {}", pkg.root(ws).to_path_buf().display()); |
161 | // deadlocky :-( | 158 | fs_worker.send(pkg.root(ws).to_path_buf()); |
162 | fs_sender.send(pkg.root(ws).to_path_buf()); | ||
163 | } | 159 | } |
164 | } | 160 | } |
165 | state.set_workspaces(workspaces); | 161 | state.set_workspaces(workspaces); |
diff --git a/crates/server/src/project_model.rs b/crates/server/src/project_model.rs index b9d6872c8..359cf787d 100644 --- a/crates/server/src/project_model.rs +++ b/crates/server/src/project_model.rs | |||
@@ -3,12 +3,11 @@ use std::{ | |||
3 | path::{Path, PathBuf}, | 3 | path::{Path, PathBuf}, |
4 | }; | 4 | }; |
5 | use cargo_metadata::{metadata_run, CargoOpt}; | 5 | use cargo_metadata::{metadata_run, CargoOpt}; |
6 | use crossbeam_channel::{Sender, Receiver}; | ||
7 | use libsyntax2::SmolStr; | 6 | use libsyntax2::SmolStr; |
8 | 7 | ||
9 | use { | 8 | use { |
10 | Result, | 9 | Result, |
11 | thread_watcher::{ThreadWatcher, worker_chan}, | 10 | thread_watcher::{Worker, ThreadWatcher}, |
12 | }; | 11 | }; |
13 | 12 | ||
14 | #[derive(Debug, Clone)] | 13 | #[derive(Debug, Clone)] |
@@ -162,14 +161,15 @@ impl TargetKind { | |||
162 | } | 161 | } |
163 | } | 162 | } |
164 | 163 | ||
165 | pub fn workspace_loader() -> ((Sender<PathBuf>, Receiver<Result<CargoWorkspace>>), ThreadWatcher) { | 164 | pub fn workspace_loader() -> (Worker<PathBuf, Result<CargoWorkspace>>, ThreadWatcher) { |
166 | let (interface, input_receiver, output_sender) = worker_chan::<PathBuf, Result<CargoWorkspace>>(1); | 165 | Worker::<PathBuf, Result<CargoWorkspace>>::spawn( |
167 | let thread = ThreadWatcher::spawn("workspace loader", move || { | 166 | "workspace loader", |
168 | input_receiver | 167 | 1, |
169 | .into_iter() | 168 | |input_receiver, output_sender| { |
170 | .map(|path| CargoWorkspace::from_cargo_metadata(path.as_path())) | 169 | input_receiver |
171 | .for_each(|it| output_sender.send(it)) | 170 | .into_iter() |
172 | }); | 171 | .map(|path| CargoWorkspace::from_cargo_metadata(path.as_path())) |
173 | 172 | .for_each(|it| output_sender.send(it)) | |
174 | (interface, thread) | 173 | } |
174 | ) | ||
175 | } | 175 | } |
diff --git a/crates/server/src/thread_watcher.rs b/crates/server/src/thread_watcher.rs index 74a0a58b7..86a3a91e0 100644 --- a/crates/server/src/thread_watcher.rs +++ b/crates/server/src/thread_watcher.rs | |||
@@ -3,6 +3,33 @@ use crossbeam_channel::{bounded, unbounded, Sender, Receiver}; | |||
3 | use drop_bomb::DropBomb; | 3 | use drop_bomb::DropBomb; |
4 | use Result; | 4 | use Result; |
5 | 5 | ||
6 | pub struct Worker<I, O> { | ||
7 | pub inp: Sender<I>, | ||
8 | pub out: Receiver<O>, | ||
9 | } | ||
10 | |||
11 | impl<I, O> Worker<I, O> { | ||
12 | pub fn spawn<F>(name: &'static str, buf: usize, f: F) -> (Self, ThreadWatcher) | ||
13 | where | ||
14 | F: FnOnce(Receiver<I>, Sender<O>) + Send + 'static, | ||
15 | I: Send + 'static, | ||
16 | O: Send + 'static, | ||
17 | { | ||
18 | let ((inp, out), inp_r, out_s) = worker_chan(buf); | ||
19 | let worker = Worker { inp, out }; | ||
20 | let watcher = ThreadWatcher::spawn(name, move || f(inp_r, out_s)); | ||
21 | (worker, watcher) | ||
22 | } | ||
23 | |||
24 | pub fn stop(self) -> Receiver<O> { | ||
25 | self.out | ||
26 | } | ||
27 | |||
28 | pub fn send(&self, item: I) { | ||
29 | self.inp.send(item) | ||
30 | } | ||
31 | } | ||
32 | |||
6 | pub struct ThreadWatcher { | 33 | pub struct ThreadWatcher { |
7 | name: &'static str, | 34 | name: &'static str, |
8 | thread: thread::JoinHandle<()>, | 35 | thread: thread::JoinHandle<()>, |
@@ -10,7 +37,7 @@ pub struct ThreadWatcher { | |||
10 | } | 37 | } |
11 | 38 | ||
12 | impl ThreadWatcher { | 39 | impl ThreadWatcher { |
13 | pub fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher { | 40 | fn spawn(name: &'static str, f: impl FnOnce() + Send + 'static) -> ThreadWatcher { |
14 | let thread = thread::spawn(f); | 41 | let thread = thread::spawn(f); |
15 | ThreadWatcher { | 42 | ThreadWatcher { |
16 | name, | 43 | name, |
@@ -36,7 +63,7 @@ impl ThreadWatcher { | |||
36 | /// Sets up worker channels in a deadlock-avoind way. | 63 | /// Sets up worker channels in a deadlock-avoind way. |
37 | /// If one sets both input and output buffers to a fixed size, | 64 | /// If one sets both input and output buffers to a fixed size, |
38 | /// a worker might get stuck. | 65 | /// a worker might get stuck. |
39 | pub fn worker_chan<I, O>(buf: usize) -> ((Sender<I>, Receiver<O>), Receiver<I>, Sender<O>) { | 66 | fn worker_chan<I, O>(buf: usize) -> ((Sender<I>, Receiver<O>), Receiver<I>, Sender<O>) { |
40 | let (input_sender, input_receiver) = bounded::<I>(buf); | 67 | let (input_sender, input_receiver) = bounded::<I>(buf); |
41 | let (output_sender, output_receiver) = unbounded::<O>(); | 68 | let (output_sender, output_receiver) = unbounded::<O>(); |
42 | ((input_sender, output_receiver), input_receiver, output_sender) | 69 | ((input_sender, output_receiver), input_receiver, output_sender) |
diff --git a/crates/server/src/vfs.rs b/crates/server/src/vfs.rs index c228f0b0a..a1c1783f2 100644 --- a/crates/server/src/vfs.rs +++ b/crates/server/src/vfs.rs | |||
@@ -3,11 +3,10 @@ use std::{ | |||
3 | fs, | 3 | fs, |
4 | }; | 4 | }; |
5 | 5 | ||
6 | use crossbeam_channel::{Sender, Receiver}; | ||
7 | use walkdir::WalkDir; | 6 | use walkdir::WalkDir; |
8 | 7 | ||
9 | use { | 8 | use { |
10 | thread_watcher::{ThreadWatcher, worker_chan}, | 9 | thread_watcher::{Worker, ThreadWatcher}, |
11 | }; | 10 | }; |
12 | 11 | ||
13 | 12 | ||
@@ -22,22 +21,21 @@ pub enum FileEventKind { | |||
22 | Add(String), | 21 | Add(String), |
23 | } | 22 | } |
24 | 23 | ||
25 | pub fn roots_loader() -> ((Sender<PathBuf>, Receiver<(PathBuf, Vec<FileEvent>)>), ThreadWatcher) { | 24 | pub fn roots_loader() -> (Worker<PathBuf, (PathBuf, Vec<FileEvent>)>, ThreadWatcher) { |
26 | let (interface, input_receiver, output_sender) = | 25 | Worker::<PathBuf, (PathBuf, Vec<FileEvent>)>::spawn( |
27 | worker_chan::<PathBuf, (PathBuf, Vec<FileEvent>)>(128); | 26 | "roots loader", |
28 | let thread = ThreadWatcher::spawn("roots loader", move || { | 27 | 128, |input_receiver, output_sender| { |
29 | input_receiver | 28 | input_receiver |
30 | .into_iter() | 29 | .into_iter() |
31 | .map(|path| { | 30 | .map(|path| { |
32 | debug!("loading {} ...", path.as_path().display()); | 31 | debug!("loading {} ...", path.as_path().display()); |
33 | let events = load_root(path.as_path()); | 32 | let events = load_root(path.as_path()); |
34 | debug!("... loaded {}", path.as_path().display()); | 33 | debug!("... loaded {}", path.as_path().display()); |
35 | (path, events) | 34 | (path, events) |
36 | }) | 35 | }) |
37 | .for_each(|it| output_sender.send(it)) | 36 | .for_each(|it| output_sender.send(it)) |
38 | }); | 37 | } |
39 | 38 | ) | |
40 | (interface, thread) | ||
41 | } | 39 | } |
42 | 40 | ||
43 | fn load_root(path: &Path) -> Vec<FileEvent> { | 41 | fn load_root(path: &Path) -> Vec<FileEvent> { |