diff options
author | Aleksey Kladov <[email protected]> | 2018-09-01 15:40:45 +0100 |
---|---|---|
committer | Aleksey Kladov <[email protected]> | 2018-09-01 15:40:45 +0100 |
commit | 8f1ce8275347e915b1cc824567e96369875cefd4 (patch) | |
tree | 1d7b56d7947a5c6a20a6547b5342e3363e6c0e0f /crates/server | |
parent | 3588d6b2da6e63730cc560c9986ba7fda5de816e (diff) |
move to gen-server impl
Diffstat (limited to 'crates/server')
-rw-r--r-- | crates/server/Cargo.toml | 1 | ||||
-rw-r--r-- | crates/server/src/dispatch.rs | 151 | ||||
-rw-r--r-- | crates/server/src/io.rs | 207 | ||||
-rw-r--r-- | crates/server/src/main.rs | 104 | ||||
-rw-r--r-- | crates/server/src/main_loop/mod.rs | 281 |
5 files changed, 170 insertions, 574 deletions
diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml index 35ced91ac..32c1219e1 100644 --- a/crates/server/Cargo.toml +++ b/crates/server/Cargo.toml | |||
@@ -23,3 +23,4 @@ text_unit = { version = "0.1.2", features = ["serde"] } | |||
23 | libsyntax2 = { path = "../libsyntax2" } | 23 | libsyntax2 = { path = "../libsyntax2" } |
24 | libeditor = { path = "../libeditor" } | 24 | libeditor = { path = "../libeditor" } |
25 | libanalysis = { path = "../libanalysis" } | 25 | libanalysis = { path = "../libanalysis" } |
26 | gen_lsp_server = { path = "../gen_lsp_server" } | ||
diff --git a/crates/server/src/dispatch.rs b/crates/server/src/dispatch.rs deleted file mode 100644 index 806534944..000000000 --- a/crates/server/src/dispatch.rs +++ /dev/null | |||
@@ -1,151 +0,0 @@ | |||
1 | use std::marker::PhantomData; | ||
2 | |||
3 | use serde::{ | ||
4 | ser::Serialize, | ||
5 | de::DeserializeOwned, | ||
6 | }; | ||
7 | use serde_json; | ||
8 | use drop_bomb::DropBomb; | ||
9 | |||
10 | use ::{ | ||
11 | Result, | ||
12 | req::{ClientRequest, Notification}, | ||
13 | io::{RawResponse, RawRequest, RawNotification}, | ||
14 | }; | ||
15 | |||
16 | pub struct Responder<R: ClientRequest> { | ||
17 | id: u64, | ||
18 | bomb: DropBomb, | ||
19 | ph: PhantomData<fn(R)>, | ||
20 | } | ||
21 | |||
22 | impl<R: ClientRequest> Responder<R> { | ||
23 | pub fn into_response(mut self, result: Result<R::Result>) -> Result<RawResponse> { | ||
24 | self.bomb.defuse(); | ||
25 | let res = match result { | ||
26 | Ok(result) => { | ||
27 | RawResponse { | ||
28 | id: self.id, | ||
29 | result: serde_json::to_value(result)?, | ||
30 | error: serde_json::Value::Null, | ||
31 | } | ||
32 | } | ||
33 | Err(e) => { | ||
34 | error_response( | ||
35 | self.id, | ||
36 | ErrorCode::InternalError, | ||
37 | format!("internal error: {}", e), | ||
38 | )? | ||
39 | } | ||
40 | }; | ||
41 | Ok(res) | ||
42 | } | ||
43 | } | ||
44 | |||
45 | fn parse_request_as<R: ClientRequest>(raw: RawRequest) | ||
46 | -> Result<::std::result::Result<(R::Params, Responder<R>), RawRequest>> | ||
47 | { | ||
48 | if raw.method != R::METHOD { | ||
49 | return Ok(Err(raw)); | ||
50 | } | ||
51 | |||
52 | let params: R::Params = serde_json::from_value(raw.params)?; | ||
53 | let responder = Responder { | ||
54 | id: raw.id, | ||
55 | bomb: DropBomb::new("dropped request"), | ||
56 | ph: PhantomData, | ||
57 | }; | ||
58 | Ok(Ok((params, responder))) | ||
59 | } | ||
60 | |||
61 | pub fn handle_request<R, F>(req: RawRequest, f: F) -> Result<::std::result::Result<u64, RawRequest>> | ||
62 | where | ||
63 | R: ClientRequest, | ||
64 | F: FnOnce(R::Params, Responder<R>) -> Result<()> | ||
65 | { | ||
66 | let id = req.id; | ||
67 | match parse_request_as::<R>(req)? { | ||
68 | Ok((params, responder)) => { | ||
69 | let () = f(params, responder)?; | ||
70 | Ok(Ok(id)) | ||
71 | }, | ||
72 | Err(r) => Ok(Err(r)), | ||
73 | } | ||
74 | } | ||
75 | |||
76 | fn parse_notification_as<N>(raw: RawNotification) -> Result<::std::result::Result<N::Params, RawNotification>> | ||
77 | where | ||
78 | N: Notification, | ||
79 | N::Params: DeserializeOwned, | ||
80 | { | ||
81 | if raw.method != N::METHOD { | ||
82 | return Ok(Err(raw)); | ||
83 | } | ||
84 | let params: N::Params = serde_json::from_value(raw.params)?; | ||
85 | Ok(Ok(params)) | ||
86 | } | ||
87 | |||
88 | pub fn handle_notification<N, F>(not: &mut Option<RawNotification>, f: F) -> Result<()> | ||
89 | where | ||
90 | N: Notification, | ||
91 | N::Params: DeserializeOwned, | ||
92 | F: FnOnce(N::Params) -> Result<()> | ||
93 | { | ||
94 | match not.take() { | ||
95 | None => Ok(()), | ||
96 | Some(n) => match parse_notification_as::<N>(n)? { | ||
97 | Ok(params) => f(params), | ||
98 | Err(n) => { | ||
99 | *not = Some(n); | ||
100 | Ok(()) | ||
101 | } | ||
102 | } | ||
103 | } | ||
104 | } | ||
105 | |||
106 | pub fn send_notification<N>(params: N::Params) -> RawNotification | ||
107 | where | ||
108 | N: Notification, | ||
109 | N::Params: Serialize | ||
110 | { | ||
111 | RawNotification { | ||
112 | method: N::METHOD.to_string(), | ||
113 | params: serde_json::to_value(params) | ||
114 | .unwrap(), | ||
115 | } | ||
116 | } | ||
117 | |||
118 | pub fn unknown_method(id: u64) -> Result<RawResponse> { | ||
119 | error_response(id, ErrorCode::MethodNotFound, "unknown method") | ||
120 | } | ||
121 | |||
122 | fn error_response(id: u64, code: ErrorCode, message: impl Into<String>) -> Result<RawResponse> { | ||
123 | #[derive(Serialize)] | ||
124 | struct Error { | ||
125 | code: i32, | ||
126 | message: String, | ||
127 | } | ||
128 | let resp = RawResponse { | ||
129 | id, | ||
130 | result: serde_json::Value::Null, | ||
131 | error: serde_json::to_value(Error { | ||
132 | code: code as i32, | ||
133 | message: message.into(), | ||
134 | })?, | ||
135 | }; | ||
136 | Ok(resp) | ||
137 | } | ||
138 | |||
139 | #[allow(unused)] | ||
140 | enum ErrorCode { | ||
141 | ParseError = -32700, | ||
142 | InvalidRequest = -32600, | ||
143 | MethodNotFound = -32601, | ||
144 | InvalidParams = -32602, | ||
145 | InternalError = -32603, | ||
146 | ServerErrorStart = -32099, | ||
147 | ServerErrorEnd = -32000, | ||
148 | ServerNotInitialized = -32002, | ||
149 | UnknownErrorCode = -32001, | ||
150 | RequestCancelled = -32800, | ||
151 | } | ||
diff --git a/crates/server/src/io.rs b/crates/server/src/io.rs deleted file mode 100644 index f247327ab..000000000 --- a/crates/server/src/io.rs +++ /dev/null | |||
@@ -1,207 +0,0 @@ | |||
1 | use std::{ | ||
2 | thread, | ||
3 | io::{ | ||
4 | stdout, stdin, | ||
5 | BufRead, Write, | ||
6 | }, | ||
7 | }; | ||
8 | use serde_json::{Value, from_str, to_string}; | ||
9 | use crossbeam_channel::{Receiver, Sender, bounded}; | ||
10 | |||
11 | use Result; | ||
12 | |||
13 | |||
14 | #[derive(Debug, Serialize, Deserialize)] | ||
15 | #[serde(untagged)] | ||
16 | pub enum RawMsg { | ||
17 | Request(RawRequest), | ||
18 | Notification(RawNotification), | ||
19 | Response(RawResponse), | ||
20 | } | ||
21 | |||
22 | #[derive(Debug, Serialize, Deserialize)] | ||
23 | pub struct RawRequest { | ||
24 | pub id: u64, | ||
25 | pub method: String, | ||
26 | pub params: Value, | ||
27 | } | ||
28 | |||
29 | #[derive(Debug, Serialize, Deserialize)] | ||
30 | pub struct RawNotification { | ||
31 | pub method: String, | ||
32 | pub params: Value, | ||
33 | } | ||
34 | |||
35 | #[derive(Debug, Serialize, Deserialize)] | ||
36 | pub struct RawResponse { | ||
37 | // JSON RPC allows this to be null if it was impossible | ||
38 | // to decode the request's id. Ignore this special case | ||
39 | // and just die horribly. | ||
40 | pub id: u64, | ||
41 | #[serde(default)] | ||
42 | pub result: Value, | ||
43 | #[serde(default)] | ||
44 | pub error: Value, | ||
45 | } | ||
46 | |||
47 | struct MsgReceiver { | ||
48 | chan: Receiver<RawMsg>, | ||
49 | thread: Option<thread::JoinHandle<Result<()>>>, | ||
50 | } | ||
51 | |||
52 | impl MsgReceiver { | ||
53 | fn recv(&mut self) -> Result<RawMsg> { | ||
54 | match self.chan.recv() { | ||
55 | Some(msg) => Ok(msg), | ||
56 | None => { | ||
57 | self.cleanup()?; | ||
58 | unreachable!() | ||
59 | } | ||
60 | } | ||
61 | } | ||
62 | |||
63 | fn cleanup(&mut self) -> Result<()> { | ||
64 | self.thread | ||
65 | .take() | ||
66 | .ok_or_else(|| format_err!("MsgReceiver thread panicked"))? | ||
67 | .join() | ||
68 | .map_err(|_| format_err!("MsgReceiver thread panicked"))??; | ||
69 | bail!("client disconnected") | ||
70 | } | ||
71 | |||
72 | fn stop(self) -> Result<()> { | ||
73 | // Can't really self.thread.join() here, b/c it might be | ||
74 | // blocking on read | ||
75 | Ok(()) | ||
76 | } | ||
77 | } | ||
78 | |||
79 | struct MsgSender { | ||
80 | chan: Sender<RawMsg>, | ||
81 | thread: thread::JoinHandle<Result<()>>, | ||
82 | } | ||
83 | |||
84 | impl MsgSender { | ||
85 | fn send(&mut self, msg: RawMsg) { | ||
86 | self.chan.send(msg) | ||
87 | } | ||
88 | |||
89 | fn stop(self) -> Result<()> { | ||
90 | drop(self.chan); | ||
91 | self.thread.join() | ||
92 | .map_err(|_| format_err!("MsgSender thread panicked"))??; | ||
93 | Ok(()) | ||
94 | } | ||
95 | } | ||
96 | |||
97 | pub struct Io { | ||
98 | receiver: MsgReceiver, | ||
99 | sender: MsgSender, | ||
100 | } | ||
101 | |||
102 | impl Io { | ||
103 | pub fn from_stdio() -> Io { | ||
104 | let sender = { | ||
105 | let (tx, rx) = bounded(16); | ||
106 | MsgSender { | ||
107 | chan: tx, | ||
108 | thread: thread::spawn(move || { | ||
109 | let stdout = stdout(); | ||
110 | let mut stdout = stdout.lock(); | ||
111 | for msg in rx { | ||
112 | #[derive(Serialize)] | ||
113 | struct JsonRpc { | ||
114 | jsonrpc: &'static str, | ||
115 | #[serde(flatten)] | ||
116 | msg: RawMsg, | ||
117 | } | ||
118 | let text = to_string(&JsonRpc { | ||
119 | jsonrpc: "2.0", | ||
120 | msg, | ||
121 | })?; | ||
122 | write_msg_text(&mut stdout, &text)?; | ||
123 | } | ||
124 | Ok(()) | ||
125 | }), | ||
126 | } | ||
127 | }; | ||
128 | let receiver = { | ||
129 | let (tx, rx) = bounded(16); | ||
130 | MsgReceiver { | ||
131 | chan: rx, | ||
132 | thread: Some(thread::spawn(move || { | ||
133 | let stdin = stdin(); | ||
134 | let mut stdin = stdin.lock(); | ||
135 | while let Some(text) = read_msg_text(&mut stdin)? { | ||
136 | let msg: RawMsg = from_str(&text)?; | ||
137 | tx.send(msg); | ||
138 | } | ||
139 | Ok(()) | ||
140 | })), | ||
141 | } | ||
142 | }; | ||
143 | Io { receiver, sender } | ||
144 | } | ||
145 | |||
146 | pub fn send(&mut self, msg: RawMsg) { | ||
147 | self.sender.send(msg) | ||
148 | } | ||
149 | |||
150 | pub fn recv(&mut self) -> Result<RawMsg> { | ||
151 | self.receiver.recv() | ||
152 | } | ||
153 | |||
154 | pub fn receiver(&mut self) -> &mut Receiver<RawMsg> { | ||
155 | &mut self.receiver.chan | ||
156 | } | ||
157 | |||
158 | pub fn cleanup_receiver(&mut self) -> Result<()> { | ||
159 | self.receiver.cleanup() | ||
160 | } | ||
161 | |||
162 | pub fn stop(self) -> Result<()> { | ||
163 | self.receiver.stop()?; | ||
164 | self.sender.stop()?; | ||
165 | Ok(()) | ||
166 | } | ||
167 | } | ||
168 | |||
169 | |||
170 | fn read_msg_text(inp: &mut impl BufRead) -> Result<Option<String>> { | ||
171 | let mut size = None; | ||
172 | let mut buf = String::new(); | ||
173 | loop { | ||
174 | buf.clear(); | ||
175 | if inp.read_line(&mut buf)? == 0 { | ||
176 | return Ok(None); | ||
177 | } | ||
178 | if !buf.ends_with("\r\n") { | ||
179 | bail!("malformed header: {:?}", buf); | ||
180 | } | ||
181 | let buf = &buf[..buf.len() - 2]; | ||
182 | if buf.is_empty() { | ||
183 | break; | ||
184 | } | ||
185 | let mut parts = buf.splitn(2, ": "); | ||
186 | let header_name = parts.next().unwrap(); | ||
187 | let header_value = parts.next().ok_or_else(|| format_err!("malformed header: {:?}", buf))?; | ||
188 | if header_name == "Content-Length" { | ||
189 | size = Some(header_value.parse::<usize>()?); | ||
190 | } | ||
191 | } | ||
192 | let size = size.ok_or_else(|| format_err!("no Content-Length"))?; | ||
193 | let mut buf = buf.into_bytes(); | ||
194 | buf.resize(size, 0); | ||
195 | inp.read_exact(&mut buf)?; | ||
196 | let buf = String::from_utf8(buf)?; | ||
197 | debug!("< {}", buf); | ||
198 | Ok(Some(buf)) | ||
199 | } | ||
200 | |||
201 | fn write_msg_text(out: &mut impl Write, msg: &str) -> Result<()> { | ||
202 | debug!("> {}", msg); | ||
203 | write!(out, "Content-Length: {}\r\n\r\n", msg.len())?; | ||
204 | out.write_all(msg.as_bytes())?; | ||
205 | out.flush()?; | ||
206 | Ok(()) | ||
207 | } | ||
diff --git a/crates/server/src/main.rs b/crates/server/src/main.rs index eeb343b80..3e3bd44a1 100644 --- a/crates/server/src/main.rs +++ b/crates/server/src/main.rs | |||
@@ -17,26 +17,20 @@ extern crate walkdir; | |||
17 | extern crate libeditor; | 17 | extern crate libeditor; |
18 | extern crate libanalysis; | 18 | extern crate libanalysis; |
19 | extern crate libsyntax2; | 19 | extern crate libsyntax2; |
20 | extern crate gen_lsp_server; | ||
20 | extern crate im; | 21 | extern crate im; |
21 | extern crate relative_path; | 22 | extern crate relative_path; |
22 | 23 | ||
23 | mod io; | ||
24 | mod caps; | 24 | mod caps; |
25 | mod req; | 25 | mod req; |
26 | mod dispatch; | ||
27 | mod conv; | 26 | mod conv; |
28 | mod main_loop; | 27 | mod main_loop; |
29 | mod vfs; | 28 | mod vfs; |
30 | mod path_map; | 29 | mod path_map; |
31 | mod server_world; | 30 | mod server_world; |
32 | 31 | ||
33 | use threadpool::ThreadPool; | ||
34 | use crossbeam_channel::bounded; | ||
35 | use flexi_logger::{Logger, Duplicate}; | 32 | use flexi_logger::{Logger, Duplicate}; |
36 | 33 | use gen_lsp_server::{run_server, stdio_transport}; | |
37 | use ::{ | ||
38 | io::{Io, RawMsg, RawResponse, RawNotification}, | ||
39 | }; | ||
40 | 34 | ||
41 | pub type Result<T> = ::std::result::Result<T, ::failure::Error>; | 35 | pub type Result<T> = ::std::result::Result<T, ::failure::Error>; |
42 | 36 | ||
@@ -60,96 +54,10 @@ fn main() -> Result<()> { | |||
60 | } | 54 | } |
61 | 55 | ||
62 | fn main_inner() -> Result<()> { | 56 | fn main_inner() -> Result<()> { |
63 | let mut io = Io::from_stdio(); | 57 | let (receiver, sender, threads) = stdio_transport(); |
64 | let res = initialize(&mut io); | 58 | run_server(caps::server_capabilities(), main_loop::main_loop, receiver, sender)?; |
65 | info!("shutting down IO..."); | 59 | info!("shutting down IO..."); |
66 | let io_res = io.stop(); | 60 | threads.join()?; |
67 | info!("... IO is down"); | 61 | info!("... IO is down"); |
68 | match (res, io_res) { | 62 | Ok(()) |
69 | (Ok(()), Ok(())) => Ok(()), | ||
70 | (res, Ok(())) => res, | ||
71 | (Ok(()), io_res) => io_res, | ||
72 | (res, Err(io_err)) => { | ||
73 | error!("shutdown error: {:?}", io_err); | ||
74 | res | ||
75 | } | ||
76 | } | ||
77 | } | ||
78 | |||
79 | fn initialize(io: &mut Io) -> Result<()> { | ||
80 | match io.recv()? { | ||
81 | RawMsg::Notification(n) => | ||
82 | bail!("expected initialize request, got {:?}", n), | ||
83 | RawMsg::Response(res) => | ||
84 | bail!("expected initialize request, got {:?}", res), | ||
85 | |||
86 | RawMsg::Request(req) => { | ||
87 | let req = dispatch::handle_request::<req::Initialize, _>(req, |_params, resp| { | ||
88 | let res = req::InitializeResult { capabilities: caps::server_capabilities() }; | ||
89 | let resp = resp.into_response(Ok(res))?; | ||
90 | io.send(RawMsg::Response(resp)); | ||
91 | Ok(()) | ||
92 | })?; | ||
93 | if let Err(req) = req { | ||
94 | bail!("expected initialize request, got {:?}", req) | ||
95 | } | ||
96 | match io.recv()? { | ||
97 | RawMsg::Notification(n) => { | ||
98 | if n.method != "initialized" { | ||
99 | bail!("expected initialized notification"); | ||
100 | } | ||
101 | } | ||
102 | _ => bail!("expected initialized notification"), | ||
103 | } | ||
104 | } | ||
105 | } | ||
106 | initialized(io) | ||
107 | } | ||
108 | |||
109 | enum Task { | ||
110 | Respond(RawResponse), | ||
111 | Notify(RawNotification), | ||
112 | Die(::failure::Error), | ||
113 | } | ||
114 | |||
115 | fn initialized(io: &mut Io) -> Result<()> { | ||
116 | { | ||
117 | let mut pool = ThreadPool::new(4); | ||
118 | let (task_sender, task_receiver) = bounded::<Task>(16); | ||
119 | let (fs_events_receiver, watcher) = vfs::watch(vec![ | ||
120 | ::std::env::current_dir()?, | ||
121 | ]); | ||
122 | info!("lifecycle: handshake finished, server ready to serve requests"); | ||
123 | let res = main_loop::main_loop( | ||
124 | io, | ||
125 | &mut pool, | ||
126 | task_sender, | ||
127 | task_receiver.clone(), | ||
128 | fs_events_receiver, | ||
129 | ); | ||
130 | |||
131 | info!("waiting for background jobs to finish..."); | ||
132 | task_receiver.for_each(drop); | ||
133 | pool.join(); | ||
134 | info!("...background jobs have finished"); | ||
135 | |||
136 | info!("waiting for file watcher to finish..."); | ||
137 | watcher.stop()?; | ||
138 | info!("...file watcher has finished"); | ||
139 | |||
140 | res | ||
141 | }?; | ||
142 | |||
143 | match io.recv()? { | ||
144 | RawMsg::Notification(n) => { | ||
145 | if n.method == "exit" { | ||
146 | info!("lifecycle: shutdown complete"); | ||
147 | return Ok(()); | ||
148 | } | ||
149 | bail!("unexpected notification during shutdown: {:?}", n) | ||
150 | } | ||
151 | m => { | ||
152 | bail!("unexpected message during shutdown: {:?}", m) | ||
153 | } | ||
154 | } | ||
155 | } | 63 | } |
diff --git a/crates/server/src/main_loop/mod.rs b/crates/server/src/main_loop/mod.rs index db7d5ae34..34d077805 100644 --- a/crates/server/src/main_loop/mod.rs +++ b/crates/server/src/main_loop/mod.rs | |||
@@ -6,59 +6,97 @@ use std::{ | |||
6 | }; | 6 | }; |
7 | 7 | ||
8 | use threadpool::ThreadPool; | 8 | use threadpool::ThreadPool; |
9 | use crossbeam_channel::{Sender, Receiver}; | 9 | use serde::{Serialize, de::DeserializeOwned}; |
10 | use crossbeam_channel::{bounded, Sender, Receiver}; | ||
10 | use languageserver_types::{NumberOrString}; | 11 | use languageserver_types::{NumberOrString}; |
11 | use libanalysis::{FileId, JobHandle, JobToken}; | 12 | use libanalysis::{FileId, JobHandle, JobToken}; |
13 | use gen_lsp_server::{RawRequest, RawNotification, RawMessage, RawResponse, ErrorCode}; | ||
12 | 14 | ||
13 | use { | 15 | use { |
14 | req, dispatch, | 16 | req, |
15 | Task, Result, | 17 | Result, |
16 | io::{Io, RawMsg, RawRequest, RawNotification}, | 18 | vfs::{self, FileEvent}, |
17 | vfs::FileEvent, | ||
18 | server_world::{ServerWorldState, ServerWorld}, | 19 | server_world::{ServerWorldState, ServerWorld}, |
19 | main_loop::subscriptions::{Subscriptions}, | 20 | main_loop::subscriptions::{Subscriptions}, |
20 | }; | 21 | }; |
21 | 22 | ||
23 | enum Task { | ||
24 | Respond(RawResponse), | ||
25 | Notify(RawNotification), | ||
26 | } | ||
27 | |||
22 | pub(super) fn main_loop( | 28 | pub(super) fn main_loop( |
23 | io: &mut Io, | 29 | receriver: &mut Receiver<RawMessage>, |
24 | pool: &mut ThreadPool, | 30 | sender: &mut Sender<RawMessage>, |
25 | task_sender: Sender<Task>, | ||
26 | task_receiver: Receiver<Task>, | ||
27 | fs_events_receiver: Receiver<Vec<FileEvent>>, | ||
28 | ) -> Result<()> { | 31 | ) -> Result<()> { |
32 | let pool = ThreadPool::new(4); | ||
33 | let (task_sender, task_receiver) = bounded::<Task>(16); | ||
34 | let (fs_events_receiver, watcher) = vfs::watch(vec![ | ||
35 | ::std::env::current_dir()?, | ||
36 | ]); | ||
37 | |||
29 | info!("server initialized, serving requests"); | 38 | info!("server initialized, serving requests"); |
30 | let mut state = ServerWorldState::new(); | 39 | let mut state = ServerWorldState::new(); |
31 | 40 | ||
32 | let mut pending_requests: HashMap<u64, JobHandle> = HashMap::new(); | 41 | let mut pending_requests = HashMap::new(); |
33 | let mut fs_events_receiver = Some(&fs_events_receiver); | ||
34 | let mut subs = Subscriptions::new(); | 42 | let mut subs = Subscriptions::new(); |
43 | main_loop_inner( | ||
44 | &pool, | ||
45 | receriver, | ||
46 | sender, | ||
47 | task_receiver.clone(), | ||
48 | task_sender, | ||
49 | fs_events_receiver, | ||
50 | &mut state, | ||
51 | &mut pending_requests, | ||
52 | &mut subs, | ||
53 | )?; | ||
54 | |||
55 | info!("waiting for background jobs to finish..."); | ||
56 | task_receiver.for_each(drop); | ||
57 | pool.join(); | ||
58 | info!("...background jobs have finished"); | ||
59 | |||
60 | info!("waiting for file watcher to finish..."); | ||
61 | watcher.stop()?; | ||
62 | info!("...file watcher has finished"); | ||
63 | Ok(()) | ||
64 | } | ||
65 | |||
66 | fn main_loop_inner( | ||
67 | pool: &ThreadPool, | ||
68 | msg_receiver: &mut Receiver<RawMessage>, | ||
69 | msg_sender: &mut Sender<RawMessage>, | ||
70 | task_receiver: Receiver<Task>, | ||
71 | task_sender: Sender<Task>, | ||
72 | fs_receiver: Receiver<Vec<FileEvent>>, | ||
73 | state: &mut ServerWorldState, | ||
74 | pending_requests: &mut HashMap<u64, JobHandle>, | ||
75 | subs: &mut Subscriptions, | ||
76 | ) -> Result<u64> { | ||
77 | let mut fs_receiver = Some(fs_receiver); | ||
35 | loop { | 78 | loop { |
36 | enum Event { | 79 | enum Event { |
37 | Msg(RawMsg), | 80 | Msg(RawMessage), |
38 | Task(Task), | 81 | Task(Task), |
39 | Fs(Vec<FileEvent>), | 82 | Fs(Vec<FileEvent>), |
40 | ReceiverDead, | ||
41 | FsWatcherDead, | 83 | FsWatcherDead, |
42 | } | 84 | } |
43 | let event = select! { | 85 | let event = select! { |
44 | recv(io.receiver(), msg) => match msg { | 86 | recv(msg_receiver, msg) => match msg { |
45 | Some(msg) => Event::Msg(msg), | 87 | Some(msg) => Event::Msg(msg), |
46 | None => Event::ReceiverDead, | 88 | None => bail!("client exited without shutdown"), |
47 | }, | 89 | }, |
48 | recv(task_receiver, task) => Event::Task(task.unwrap()), | 90 | recv(task_receiver, task) => Event::Task(task.unwrap()), |
49 | recv(fs_events_receiver, events) => match events { | 91 | recv(fs_receiver, events) => match events { |
50 | Some(events) => Event::Fs(events), | 92 | Some(events) => Event::Fs(events), |
51 | None => Event::FsWatcherDead, | 93 | None => Event::FsWatcherDead, |
52 | } | 94 | } |
53 | }; | 95 | }; |
54 | let mut state_changed = false; | 96 | let mut state_changed = false; |
55 | match event { | 97 | match event { |
56 | Event::ReceiverDead => { | ||
57 | io.cleanup_receiver()?; | ||
58 | unreachable!(); | ||
59 | } | ||
60 | Event::FsWatcherDead => { | 98 | Event::FsWatcherDead => { |
61 | fs_events_receiver = None; | 99 | fs_receiver = None; |
62 | } | 100 | } |
63 | Event::Task(task) => { | 101 | Event::Task(task) => { |
64 | match task { | 102 | match task { |
@@ -66,12 +104,10 @@ pub(super) fn main_loop( | |||
66 | if let Some(handle) = pending_requests.remove(&response.id) { | 104 | if let Some(handle) = pending_requests.remove(&response.id) { |
67 | assert!(handle.has_completed()); | 105 | assert!(handle.has_completed()); |
68 | } | 106 | } |
69 | io.send(RawMsg::Response(response)) | 107 | msg_sender.send(RawMessage::Response(response)) |
70 | } | 108 | } |
71 | Task::Notify(n) => | 109 | Task::Notify(n) => |
72 | io.send(RawMsg::Notification(n)), | 110 | msg_sender.send(RawMessage::Notification(n)), |
73 | Task::Die(error) => | ||
74 | return Err(error), | ||
75 | } | 111 | } |
76 | continue; | 112 | continue; |
77 | } | 113 | } |
@@ -82,16 +118,29 @@ pub(super) fn main_loop( | |||
82 | } | 118 | } |
83 | Event::Msg(msg) => { | 119 | Event::Msg(msg) => { |
84 | match msg { | 120 | match msg { |
85 | RawMsg::Request(req) => { | 121 | RawMessage::Request(req) => { |
86 | if !on_request(io, &mut state, &mut pending_requests, pool, &task_sender, req)? { | 122 | let req = match req.cast::<req::Shutdown>() { |
87 | return Ok(()); | 123 | Ok((id, _params)) => return Ok(id), |
124 | Err(req) => req, | ||
125 | }; | ||
126 | match on_request(state, pending_requests, pool, &task_sender, req)? { | ||
127 | None => (), | ||
128 | Some(req) => { | ||
129 | error!("unknown request: {:?}", req); | ||
130 | let resp = RawResponse::err( | ||
131 | req.id, | ||
132 | ErrorCode::MethodNotFound as i32, | ||
133 | "unknown request".to_string(), | ||
134 | ); | ||
135 | msg_sender.send(RawMessage::Response(resp)) | ||
136 | } | ||
88 | } | 137 | } |
89 | } | 138 | } |
90 | RawMsg::Notification(not) => { | 139 | RawMessage::Notification(not) => { |
91 | on_notification(io, &mut state, &mut pending_requests, &mut subs, not)?; | 140 | on_notification(msg_sender, state, pending_requests, subs, not)?; |
92 | state_changed = true; | 141 | state_changed = true; |
93 | } | 142 | } |
94 | RawMsg::Response(resp) => { | 143 | RawMessage::Response(resp) => { |
95 | error!("unexpected response: {:?}", resp) | 144 | error!("unexpected response: {:?}", resp) |
96 | } | 145 | } |
97 | } | 146 | } |
@@ -110,13 +159,12 @@ pub(super) fn main_loop( | |||
110 | } | 159 | } |
111 | 160 | ||
112 | fn on_request( | 161 | fn on_request( |
113 | io: &mut Io, | ||
114 | world: &mut ServerWorldState, | 162 | world: &mut ServerWorldState, |
115 | pending_requests: &mut HashMap<u64, JobHandle>, | 163 | pending_requests: &mut HashMap<u64, JobHandle>, |
116 | pool: &ThreadPool, | 164 | pool: &ThreadPool, |
117 | sender: &Sender<Task>, | 165 | sender: &Sender<Task>, |
118 | req: RawRequest, | 166 | req: RawRequest, |
119 | ) -> Result<bool> { | 167 | ) -> Result<Option<RawRequest>> { |
120 | let mut pool_dispatcher = PoolDispatcher { | 168 | let mut pool_dispatcher = PoolDispatcher { |
121 | req: Some(req), | 169 | req: Some(req), |
122 | res: None, | 170 | res: None, |
@@ -141,81 +189,73 @@ fn on_request( | |||
141 | Ok((id, handle)) => { | 189 | Ok((id, handle)) => { |
142 | let inserted = pending_requests.insert(id, handle).is_none(); | 190 | let inserted = pending_requests.insert(id, handle).is_none(); |
143 | assert!(inserted, "duplicate request: {}", id); | 191 | assert!(inserted, "duplicate request: {}", id); |
192 | Ok(None) | ||
144 | }, | 193 | }, |
145 | Err(req) => { | 194 | Err(req) => Ok(Some(req)), |
146 | let req = dispatch::handle_request::<req::Shutdown, _>(req, |(), resp| { | ||
147 | let resp = resp.into_response(Ok(()))?; | ||
148 | io.send(RawMsg::Response(resp)); | ||
149 | Ok(()) | ||
150 | })?; | ||
151 | match req { | ||
152 | Ok(_id) => { | ||
153 | info!("lifecycle: initiating shutdown"); | ||
154 | return Ok(false); | ||
155 | } | ||
156 | Err(req) => { | ||
157 | error!("unknown method: {:?}", req); | ||
158 | io.send(RawMsg::Response(dispatch::unknown_method(req.id)?)); | ||
159 | } | ||
160 | } | ||
161 | } | ||
162 | } | 195 | } |
163 | Ok(true) | ||
164 | } | 196 | } |
165 | 197 | ||
166 | fn on_notification( | 198 | fn on_notification( |
167 | io: &mut Io, | 199 | msg_sender: &mut Sender<RawMessage>, |
168 | state: &mut ServerWorldState, | 200 | state: &mut ServerWorldState, |
169 | pending_requests: &mut HashMap<u64, JobHandle>, | 201 | pending_requests: &mut HashMap<u64, JobHandle>, |
170 | subs: &mut Subscriptions, | 202 | subs: &mut Subscriptions, |
171 | not: RawNotification, | 203 | not: RawNotification, |
172 | ) -> Result<()> { | 204 | ) -> Result<()> { |
173 | let mut not = Some(not); | 205 | let not = match not.cast::<req::Cancel>() { |
174 | dispatch::handle_notification::<req::Cancel, _>(&mut not, |params| { | 206 | Ok(params) => { |
175 | let id = match params.id { | 207 | let id = match params.id { |
176 | NumberOrString::Number(id) => id, | 208 | NumberOrString::Number(id) => id, |
177 | NumberOrString::String(id) => { | 209 | NumberOrString::String(id) => { |
178 | panic!("string id's not supported: {:?}", id); | 210 | panic!("string id's not supported: {:?}", id); |
211 | } | ||
212 | }; | ||
213 | if let Some(handle) = pending_requests.remove(&id) { | ||
214 | handle.cancel(); | ||
179 | } | 215 | } |
180 | }; | 216 | return Ok(()) |
181 | if let Some(handle) = pending_requests.remove(&id) { | ||
182 | handle.cancel(); | ||
183 | } | 217 | } |
184 | Ok(()) | 218 | Err(not) => not, |
185 | })?; | 219 | }; |
186 | dispatch::handle_notification::<req::DidOpenTextDocument, _>(&mut not, |params| { | 220 | let not = match not.cast::<req::DidOpenTextDocument>() { |
187 | let uri = params.text_document.uri; | 221 | Ok(params) => { |
188 | let path = uri.to_file_path() | 222 | let uri = params.text_document.uri; |
189 | .map_err(|()| format_err!("invalid uri: {}", uri))?; | 223 | let path = uri.to_file_path() |
190 | let file_id = state.add_mem_file(path, params.text_document.text); | 224 | .map_err(|()| format_err!("invalid uri: {}", uri))?; |
191 | subs.add_sub(file_id); | 225 | let file_id = state.add_mem_file(path, params.text_document.text); |
192 | Ok(()) | 226 | subs.add_sub(file_id); |
193 | })?; | 227 | return Ok(()) |
194 | dispatch::handle_notification::<req::DidChangeTextDocument, _>(&mut not, |mut params| { | 228 | } |
195 | let uri = params.text_document.uri; | 229 | Err(not) => not, |
196 | let path = uri.to_file_path() | 230 | }; |
197 | .map_err(|()| format_err!("invalid uri: {}", uri))?; | 231 | let not = match not.cast::<req::DidChangeTextDocument>() { |
198 | let text = params.content_changes.pop() | 232 | Ok(mut params) => { |
199 | .ok_or_else(|| format_err!("empty changes"))? | 233 | let uri = params.text_document.uri; |
200 | .text; | 234 | let path = uri.to_file_path() |
201 | state.change_mem_file(path.as_path(), text)?; | 235 | .map_err(|()| format_err!("invalid uri: {}", uri))?; |
202 | Ok(()) | 236 | let text = params.content_changes.pop() |
203 | })?; | 237 | .ok_or_else(|| format_err!("empty changes"))? |
204 | dispatch::handle_notification::<req::DidCloseTextDocument, _>(&mut not, |params| { | 238 | .text; |
205 | let uri = params.text_document.uri; | 239 | state.change_mem_file(path.as_path(), text)?; |
206 | let path = uri.to_file_path() | 240 | return Ok(()) |
207 | .map_err(|()| format_err!("invalid uri: {}", uri))?; | 241 | } |
208 | let file_id = state.remove_mem_file(path.as_path())?; | 242 | Err(not) => not, |
209 | subs.remove_sub(file_id); | 243 | }; |
210 | let not = req::PublishDiagnosticsParams { uri, diagnostics: Vec::new() }; | 244 | let not = match not.cast::<req::DidCloseTextDocument>() { |
211 | let not = dispatch::send_notification::<req::PublishDiagnostics>(not); | 245 | Ok(params) => { |
212 | io.send(RawMsg::Notification(not)); | 246 | let uri = params.text_document.uri; |
213 | Ok(()) | 247 | let path = uri.to_file_path() |
214 | })?; | 248 | .map_err(|()| format_err!("invalid uri: {}", uri))?; |
215 | 249 | let file_id = state.remove_mem_file(path.as_path())?; | |
216 | if let Some(not) = not { | 250 | subs.remove_sub(file_id); |
217 | error!("unhandled notification: {:?}", not); | 251 | let params = req::PublishDiagnosticsParams { uri, diagnostics: Vec::new() }; |
218 | } | 252 | let not = RawNotification::new::<req::PublishDiagnostics>(params); |
253 | msg_sender.send(RawMessage::Notification(not)); | ||
254 | return Ok(()) | ||
255 | } | ||
256 | Err(not) => not, | ||
257 | }; | ||
258 | error!("unhandled notification: {:?}", not); | ||
219 | Ok(()) | 259 | Ok(()) |
220 | } | 260 | } |
221 | 261 | ||
@@ -228,10 +268,14 @@ struct PoolDispatcher<'a> { | |||
228 | } | 268 | } |
229 | 269 | ||
230 | impl<'a> PoolDispatcher<'a> { | 270 | impl<'a> PoolDispatcher<'a> { |
231 | fn on<'b, R: req::ClientRequest>( | 271 | fn on<'b, R>( |
232 | &'b mut self, | 272 | &'b mut self, |
233 | f: fn(ServerWorld, R::Params, JobToken) -> Result<R::Result> | 273 | f: fn(ServerWorld, R::Params, JobToken) -> Result<R::Result> |
234 | ) -> Result<&'b mut Self> { | 274 | ) -> Result<&'b mut Self> |
275 | where R: req::Request, | ||
276 | R::Params: DeserializeOwned + Send + 'static, | ||
277 | R::Result: Serialize + 'static, | ||
278 | { | ||
235 | let req = match self.req.take() { | 279 | let req = match self.req.take() { |
236 | None => return Ok(self), | 280 | None => return Ok(self), |
237 | Some(req) => req, | 281 | Some(req) => req, |
@@ -239,23 +283,24 @@ impl<'a> PoolDispatcher<'a> { | |||
239 | let world = self.world; | 283 | let world = self.world; |
240 | let sender = self.sender; | 284 | let sender = self.sender; |
241 | let pool = self.pool; | 285 | let pool = self.pool; |
242 | let (handle, token) = JobHandle::new(); | 286 | match req.cast::<R>() { |
243 | let req = dispatch::handle_request::<R, _>(req, |params, resp| { | 287 | Ok((id, params)) => { |
244 | let world = world.snapshot(); | 288 | let (handle, token) = JobHandle::new(); |
245 | let sender = sender.clone(); | 289 | let world = world.snapshot(); |
246 | pool.execute(move || { | 290 | let sender = sender.clone(); |
247 | let res = f(world, params, token); | 291 | pool.execute(move || { |
248 | let task = match resp.into_response(res) { | 292 | let resp = match f(world, params, token) { |
249 | Ok(resp) => Task::Respond(resp), | 293 | Ok(resp) => RawResponse::ok(id, resp), |
250 | Err(e) => Task::Die(e), | 294 | Err(e) => RawResponse::err(id, ErrorCode::InternalError as i32, e.to_string()), |
251 | }; | 295 | }; |
252 | sender.send(task); | 296 | let task = Task::Respond(resp); |
253 | }); | 297 | sender.send(task); |
254 | Ok(()) | 298 | }); |
255 | })?; | 299 | self.res = Some((id, handle)); |
256 | match req { | 300 | } |
257 | Ok(id) => self.res = Some((id, handle)), | 301 | Err(req) => { |
258 | Err(req) => self.req = Some(req), | 302 | self.req = Some(req) |
303 | } | ||
259 | } | 304 | } |
260 | Ok(self) | 305 | Ok(self) |
261 | } | 306 | } |
@@ -282,7 +327,7 @@ fn update_file_notifications_on_threadpool( | |||
282 | error!("failed to compute diagnostics: {:?}", e) | 327 | error!("failed to compute diagnostics: {:?}", e) |
283 | } | 328 | } |
284 | Ok(params) => { | 329 | Ok(params) => { |
285 | let not = dispatch::send_notification::<req::PublishDiagnostics>(params); | 330 | let not = RawNotification::new::<req::PublishDiagnostics>(params); |
286 | sender.send(Task::Notify(not)); | 331 | sender.send(Task::Notify(not)); |
287 | } | 332 | } |
288 | } | 333 | } |
@@ -291,7 +336,7 @@ fn update_file_notifications_on_threadpool( | |||
291 | error!("failed to compute decorations: {:?}", e) | 336 | error!("failed to compute decorations: {:?}", e) |
292 | } | 337 | } |
293 | Ok(params) => { | 338 | Ok(params) => { |
294 | let not = dispatch::send_notification::<req::PublishDecorations>(params); | 339 | let not = RawNotification::new::<req::PublishDecorations>(params); |
295 | sender.send(Task::Notify(not)) | 340 | sender.send(Task::Notify(not)) |
296 | } | 341 | } |
297 | } | 342 | } |