aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleksey Kladov <[email protected]>2018-09-01 15:40:45 +0100
committerAleksey Kladov <[email protected]>2018-09-01 15:40:45 +0100
commit8f1ce8275347e915b1cc824567e96369875cefd4 (patch)
tree1d7b56d7947a5c6a20a6547b5342e3363e6c0e0f
parent3588d6b2da6e63730cc560c9986ba7fda5de816e (diff)
move to gen-server impl
-rw-r--r--crates/gen_lsp_server/src/lib.rs2
-rw-r--r--crates/gen_lsp_server/src/msg.rs10
-rw-r--r--crates/server/Cargo.toml1
-rw-r--r--crates/server/src/dispatch.rs151
-rw-r--r--crates/server/src/io.rs207
-rw-r--r--crates/server/src/main.rs104
-rw-r--r--crates/server/src/main_loop/mod.rs281
7 files changed, 181 insertions, 575 deletions
diff --git a/crates/gen_lsp_server/src/lib.rs b/crates/gen_lsp_server/src/lib.rs
index a31e90f35..b47db0df4 100644
--- a/crates/gen_lsp_server/src/lib.rs
+++ b/crates/gen_lsp_server/src/lib.rs
@@ -21,7 +21,7 @@ use languageserver_types::{
21 21
22pub type Result<T> = ::std::result::Result<T, failure::Error>; 22pub type Result<T> = ::std::result::Result<T, failure::Error>;
23pub use { 23pub use {
24 msg::{RawMessage, RawRequest, RawResponse, RawResponseError, RawNotification}, 24 msg::{RawMessage, RawRequest, RawResponse, RawResponseError, RawNotification, ErrorCode},
25 stdio::{stdio_transport, Threads}, 25 stdio::{stdio_transport, Threads},
26}; 26};
27 27
diff --git a/crates/gen_lsp_server/src/msg.rs b/crates/gen_lsp_server/src/msg.rs
index 9426e98ec..533d7da3e 100644
--- a/crates/gen_lsp_server/src/msg.rs
+++ b/crates/gen_lsp_server/src/msg.rs
@@ -120,6 +120,16 @@ impl RawResponse {
120} 120}
121 121
122impl RawNotification { 122impl RawNotification {
123 pub fn new<N>(params: N::Params) -> RawNotification
124 where
125 N: Notification,
126 N::Params: Serialize,
127 {
128 RawNotification {
129 method: N::METHOD.to_string(),
130 params: to_value(&params).unwrap(),
131 }
132 }
123 pub fn cast<N>(self) -> ::std::result::Result<N::Params, RawNotification> 133 pub fn cast<N>(self) -> ::std::result::Result<N::Params, RawNotification>
124 where 134 where
125 N: Notification, 135 N: Notification,
diff --git a/crates/server/Cargo.toml b/crates/server/Cargo.toml
index 35ced91ac..32c1219e1 100644
--- a/crates/server/Cargo.toml
+++ b/crates/server/Cargo.toml
@@ -23,3 +23,4 @@ text_unit = { version = "0.1.2", features = ["serde"] }
23libsyntax2 = { path = "../libsyntax2" } 23libsyntax2 = { path = "../libsyntax2" }
24libeditor = { path = "../libeditor" } 24libeditor = { path = "../libeditor" }
25libanalysis = { path = "../libanalysis" } 25libanalysis = { path = "../libanalysis" }
26gen_lsp_server = { path = "../gen_lsp_server" }
diff --git a/crates/server/src/dispatch.rs b/crates/server/src/dispatch.rs
deleted file mode 100644
index 806534944..000000000
--- a/crates/server/src/dispatch.rs
+++ /dev/null
@@ -1,151 +0,0 @@
1use std::marker::PhantomData;
2
3use serde::{
4 ser::Serialize,
5 de::DeserializeOwned,
6};
7use serde_json;
8use drop_bomb::DropBomb;
9
10use ::{
11 Result,
12 req::{ClientRequest, Notification},
13 io::{RawResponse, RawRequest, RawNotification},
14};
15
16pub struct Responder<R: ClientRequest> {
17 id: u64,
18 bomb: DropBomb,
19 ph: PhantomData<fn(R)>,
20}
21
22impl<R: ClientRequest> Responder<R> {
23 pub fn into_response(mut self, result: Result<R::Result>) -> Result<RawResponse> {
24 self.bomb.defuse();
25 let res = match result {
26 Ok(result) => {
27 RawResponse {
28 id: self.id,
29 result: serde_json::to_value(result)?,
30 error: serde_json::Value::Null,
31 }
32 }
33 Err(e) => {
34 error_response(
35 self.id,
36 ErrorCode::InternalError,
37 format!("internal error: {}", e),
38 )?
39 }
40 };
41 Ok(res)
42 }
43}
44
45fn parse_request_as<R: ClientRequest>(raw: RawRequest)
46 -> Result<::std::result::Result<(R::Params, Responder<R>), RawRequest>>
47{
48 if raw.method != R::METHOD {
49 return Ok(Err(raw));
50 }
51
52 let params: R::Params = serde_json::from_value(raw.params)?;
53 let responder = Responder {
54 id: raw.id,
55 bomb: DropBomb::new("dropped request"),
56 ph: PhantomData,
57 };
58 Ok(Ok((params, responder)))
59}
60
61pub fn handle_request<R, F>(req: RawRequest, f: F) -> Result<::std::result::Result<u64, RawRequest>>
62 where
63 R: ClientRequest,
64 F: FnOnce(R::Params, Responder<R>) -> Result<()>
65{
66 let id = req.id;
67 match parse_request_as::<R>(req)? {
68 Ok((params, responder)) => {
69 let () = f(params, responder)?;
70 Ok(Ok(id))
71 },
72 Err(r) => Ok(Err(r)),
73 }
74}
75
76fn parse_notification_as<N>(raw: RawNotification) -> Result<::std::result::Result<N::Params, RawNotification>>
77 where
78 N: Notification,
79 N::Params: DeserializeOwned,
80{
81 if raw.method != N::METHOD {
82 return Ok(Err(raw));
83 }
84 let params: N::Params = serde_json::from_value(raw.params)?;
85 Ok(Ok(params))
86}
87
88pub fn handle_notification<N, F>(not: &mut Option<RawNotification>, f: F) -> Result<()>
89 where
90 N: Notification,
91 N::Params: DeserializeOwned,
92 F: FnOnce(N::Params) -> Result<()>
93{
94 match not.take() {
95 None => Ok(()),
96 Some(n) => match parse_notification_as::<N>(n)? {
97 Ok(params) => f(params),
98 Err(n) => {
99 *not = Some(n);
100 Ok(())
101 }
102 }
103 }
104}
105
106pub fn send_notification<N>(params: N::Params) -> RawNotification
107 where
108 N: Notification,
109 N::Params: Serialize
110{
111 RawNotification {
112 method: N::METHOD.to_string(),
113 params: serde_json::to_value(params)
114 .unwrap(),
115 }
116}
117
118pub fn unknown_method(id: u64) -> Result<RawResponse> {
119 error_response(id, ErrorCode::MethodNotFound, "unknown method")
120}
121
122fn error_response(id: u64, code: ErrorCode, message: impl Into<String>) -> Result<RawResponse> {
123 #[derive(Serialize)]
124 struct Error {
125 code: i32,
126 message: String,
127 }
128 let resp = RawResponse {
129 id,
130 result: serde_json::Value::Null,
131 error: serde_json::to_value(Error {
132 code: code as i32,
133 message: message.into(),
134 })?,
135 };
136 Ok(resp)
137}
138
139#[allow(unused)]
140enum ErrorCode {
141 ParseError = -32700,
142 InvalidRequest = -32600,
143 MethodNotFound = -32601,
144 InvalidParams = -32602,
145 InternalError = -32603,
146 ServerErrorStart = -32099,
147 ServerErrorEnd = -32000,
148 ServerNotInitialized = -32002,
149 UnknownErrorCode = -32001,
150 RequestCancelled = -32800,
151}
diff --git a/crates/server/src/io.rs b/crates/server/src/io.rs
deleted file mode 100644
index f247327ab..000000000
--- a/crates/server/src/io.rs
+++ /dev/null
@@ -1,207 +0,0 @@
1use std::{
2 thread,
3 io::{
4 stdout, stdin,
5 BufRead, Write,
6 },
7};
8use serde_json::{Value, from_str, to_string};
9use crossbeam_channel::{Receiver, Sender, bounded};
10
11use Result;
12
13
14#[derive(Debug, Serialize, Deserialize)]
15#[serde(untagged)]
16pub enum RawMsg {
17 Request(RawRequest),
18 Notification(RawNotification),
19 Response(RawResponse),
20}
21
22#[derive(Debug, Serialize, Deserialize)]
23pub struct RawRequest {
24 pub id: u64,
25 pub method: String,
26 pub params: Value,
27}
28
29#[derive(Debug, Serialize, Deserialize)]
30pub struct RawNotification {
31 pub method: String,
32 pub params: Value,
33}
34
35#[derive(Debug, Serialize, Deserialize)]
36pub struct RawResponse {
37 // JSON RPC allows this to be null if it was impossible
38 // to decode the request's id. Ignore this special case
39 // and just die horribly.
40 pub id: u64,
41 #[serde(default)]
42 pub result: Value,
43 #[serde(default)]
44 pub error: Value,
45}
46
47struct MsgReceiver {
48 chan: Receiver<RawMsg>,
49 thread: Option<thread::JoinHandle<Result<()>>>,
50}
51
52impl MsgReceiver {
53 fn recv(&mut self) -> Result<RawMsg> {
54 match self.chan.recv() {
55 Some(msg) => Ok(msg),
56 None => {
57 self.cleanup()?;
58 unreachable!()
59 }
60 }
61 }
62
63 fn cleanup(&mut self) -> Result<()> {
64 self.thread
65 .take()
66 .ok_or_else(|| format_err!("MsgReceiver thread panicked"))?
67 .join()
68 .map_err(|_| format_err!("MsgReceiver thread panicked"))??;
69 bail!("client disconnected")
70 }
71
72 fn stop(self) -> Result<()> {
73 // Can't really self.thread.join() here, b/c it might be
74 // blocking on read
75 Ok(())
76 }
77}
78
79struct MsgSender {
80 chan: Sender<RawMsg>,
81 thread: thread::JoinHandle<Result<()>>,
82}
83
84impl MsgSender {
85 fn send(&mut self, msg: RawMsg) {
86 self.chan.send(msg)
87 }
88
89 fn stop(self) -> Result<()> {
90 drop(self.chan);
91 self.thread.join()
92 .map_err(|_| format_err!("MsgSender thread panicked"))??;
93 Ok(())
94 }
95}
96
97pub struct Io {
98 receiver: MsgReceiver,
99 sender: MsgSender,
100}
101
102impl Io {
103 pub fn from_stdio() -> Io {
104 let sender = {
105 let (tx, rx) = bounded(16);
106 MsgSender {
107 chan: tx,
108 thread: thread::spawn(move || {
109 let stdout = stdout();
110 let mut stdout = stdout.lock();
111 for msg in rx {
112 #[derive(Serialize)]
113 struct JsonRpc {
114 jsonrpc: &'static str,
115 #[serde(flatten)]
116 msg: RawMsg,
117 }
118 let text = to_string(&JsonRpc {
119 jsonrpc: "2.0",
120 msg,
121 })?;
122 write_msg_text(&mut stdout, &text)?;
123 }
124 Ok(())
125 }),
126 }
127 };
128 let receiver = {
129 let (tx, rx) = bounded(16);
130 MsgReceiver {
131 chan: rx,
132 thread: Some(thread::spawn(move || {
133 let stdin = stdin();
134 let mut stdin = stdin.lock();
135 while let Some(text) = read_msg_text(&mut stdin)? {
136 let msg: RawMsg = from_str(&text)?;
137 tx.send(msg);
138 }
139 Ok(())
140 })),
141 }
142 };
143 Io { receiver, sender }
144 }
145
146 pub fn send(&mut self, msg: RawMsg) {
147 self.sender.send(msg)
148 }
149
150 pub fn recv(&mut self) -> Result<RawMsg> {
151 self.receiver.recv()
152 }
153
154 pub fn receiver(&mut self) -> &mut Receiver<RawMsg> {
155 &mut self.receiver.chan
156 }
157
158 pub fn cleanup_receiver(&mut self) -> Result<()> {
159 self.receiver.cleanup()
160 }
161
162 pub fn stop(self) -> Result<()> {
163 self.receiver.stop()?;
164 self.sender.stop()?;
165 Ok(())
166 }
167}
168
169
170fn read_msg_text(inp: &mut impl BufRead) -> Result<Option<String>> {
171 let mut size = None;
172 let mut buf = String::new();
173 loop {
174 buf.clear();
175 if inp.read_line(&mut buf)? == 0 {
176 return Ok(None);
177 }
178 if !buf.ends_with("\r\n") {
179 bail!("malformed header: {:?}", buf);
180 }
181 let buf = &buf[..buf.len() - 2];
182 if buf.is_empty() {
183 break;
184 }
185 let mut parts = buf.splitn(2, ": ");
186 let header_name = parts.next().unwrap();
187 let header_value = parts.next().ok_or_else(|| format_err!("malformed header: {:?}", buf))?;
188 if header_name == "Content-Length" {
189 size = Some(header_value.parse::<usize>()?);
190 }
191 }
192 let size = size.ok_or_else(|| format_err!("no Content-Length"))?;
193 let mut buf = buf.into_bytes();
194 buf.resize(size, 0);
195 inp.read_exact(&mut buf)?;
196 let buf = String::from_utf8(buf)?;
197 debug!("< {}", buf);
198 Ok(Some(buf))
199}
200
201fn write_msg_text(out: &mut impl Write, msg: &str) -> Result<()> {
202 debug!("> {}", msg);
203 write!(out, "Content-Length: {}\r\n\r\n", msg.len())?;
204 out.write_all(msg.as_bytes())?;
205 out.flush()?;
206 Ok(())
207}
diff --git a/crates/server/src/main.rs b/crates/server/src/main.rs
index eeb343b80..3e3bd44a1 100644
--- a/crates/server/src/main.rs
+++ b/crates/server/src/main.rs
@@ -17,26 +17,20 @@ extern crate walkdir;
17extern crate libeditor; 17extern crate libeditor;
18extern crate libanalysis; 18extern crate libanalysis;
19extern crate libsyntax2; 19extern crate libsyntax2;
20extern crate gen_lsp_server;
20extern crate im; 21extern crate im;
21extern crate relative_path; 22extern crate relative_path;
22 23
23mod io;
24mod caps; 24mod caps;
25mod req; 25mod req;
26mod dispatch;
27mod conv; 26mod conv;
28mod main_loop; 27mod main_loop;
29mod vfs; 28mod vfs;
30mod path_map; 29mod path_map;
31mod server_world; 30mod server_world;
32 31
33use threadpool::ThreadPool;
34use crossbeam_channel::bounded;
35use flexi_logger::{Logger, Duplicate}; 32use flexi_logger::{Logger, Duplicate};
36 33use gen_lsp_server::{run_server, stdio_transport};
37use ::{
38 io::{Io, RawMsg, RawResponse, RawNotification},
39};
40 34
41pub type Result<T> = ::std::result::Result<T, ::failure::Error>; 35pub type Result<T> = ::std::result::Result<T, ::failure::Error>;
42 36
@@ -60,96 +54,10 @@ fn main() -> Result<()> {
60} 54}
61 55
62fn main_inner() -> Result<()> { 56fn main_inner() -> Result<()> {
63 let mut io = Io::from_stdio(); 57 let (receiver, sender, threads) = stdio_transport();
64 let res = initialize(&mut io); 58 run_server(caps::server_capabilities(), main_loop::main_loop, receiver, sender)?;
65 info!("shutting down IO..."); 59 info!("shutting down IO...");
66 let io_res = io.stop(); 60 threads.join()?;
67 info!("... IO is down"); 61 info!("... IO is down");
68 match (res, io_res) { 62 Ok(())
69 (Ok(()), Ok(())) => Ok(()),
70 (res, Ok(())) => res,
71 (Ok(()), io_res) => io_res,
72 (res, Err(io_err)) => {
73 error!("shutdown error: {:?}", io_err);
74 res
75 }
76 }
77}
78
79fn initialize(io: &mut Io) -> Result<()> {
80 match io.recv()? {
81 RawMsg::Notification(n) =>
82 bail!("expected initialize request, got {:?}", n),
83 RawMsg::Response(res) =>
84 bail!("expected initialize request, got {:?}", res),
85
86 RawMsg::Request(req) => {
87 let req = dispatch::handle_request::<req::Initialize, _>(req, |_params, resp| {
88 let res = req::InitializeResult { capabilities: caps::server_capabilities() };
89 let resp = resp.into_response(Ok(res))?;
90 io.send(RawMsg::Response(resp));
91 Ok(())
92 })?;
93 if let Err(req) = req {
94 bail!("expected initialize request, got {:?}", req)
95 }
96 match io.recv()? {
97 RawMsg::Notification(n) => {
98 if n.method != "initialized" {
99 bail!("expected initialized notification");
100 }
101 }
102 _ => bail!("expected initialized notification"),
103 }
104 }
105 }
106 initialized(io)
107}
108
109enum Task {
110 Respond(RawResponse),
111 Notify(RawNotification),
112 Die(::failure::Error),
113}
114
115fn initialized(io: &mut Io) -> Result<()> {
116 {
117 let mut pool = ThreadPool::new(4);
118 let (task_sender, task_receiver) = bounded::<Task>(16);
119 let (fs_events_receiver, watcher) = vfs::watch(vec![
120 ::std::env::current_dir()?,
121 ]);
122 info!("lifecycle: handshake finished, server ready to serve requests");
123 let res = main_loop::main_loop(
124 io,
125 &mut pool,
126 task_sender,
127 task_receiver.clone(),
128 fs_events_receiver,
129 );
130
131 info!("waiting for background jobs to finish...");
132 task_receiver.for_each(drop);
133 pool.join();
134 info!("...background jobs have finished");
135
136 info!("waiting for file watcher to finish...");
137 watcher.stop()?;
138 info!("...file watcher has finished");
139
140 res
141 }?;
142
143 match io.recv()? {
144 RawMsg::Notification(n) => {
145 if n.method == "exit" {
146 info!("lifecycle: shutdown complete");
147 return Ok(());
148 }
149 bail!("unexpected notification during shutdown: {:?}", n)
150 }
151 m => {
152 bail!("unexpected message during shutdown: {:?}", m)
153 }
154 }
155} 63}
diff --git a/crates/server/src/main_loop/mod.rs b/crates/server/src/main_loop/mod.rs
index db7d5ae34..34d077805 100644
--- a/crates/server/src/main_loop/mod.rs
+++ b/crates/server/src/main_loop/mod.rs
@@ -6,59 +6,97 @@ use std::{
6}; 6};
7 7
8use threadpool::ThreadPool; 8use threadpool::ThreadPool;
9use crossbeam_channel::{Sender, Receiver}; 9use serde::{Serialize, de::DeserializeOwned};
10use crossbeam_channel::{bounded, Sender, Receiver};
10use languageserver_types::{NumberOrString}; 11use languageserver_types::{NumberOrString};
11use libanalysis::{FileId, JobHandle, JobToken}; 12use libanalysis::{FileId, JobHandle, JobToken};
13use gen_lsp_server::{RawRequest, RawNotification, RawMessage, RawResponse, ErrorCode};
12 14
13use { 15use {
14 req, dispatch, 16 req,
15 Task, Result, 17 Result,
16 io::{Io, RawMsg, RawRequest, RawNotification}, 18 vfs::{self, FileEvent},
17 vfs::FileEvent,
18 server_world::{ServerWorldState, ServerWorld}, 19 server_world::{ServerWorldState, ServerWorld},
19 main_loop::subscriptions::{Subscriptions}, 20 main_loop::subscriptions::{Subscriptions},
20}; 21};
21 22
23enum Task {
24 Respond(RawResponse),
25 Notify(RawNotification),
26}
27
22pub(super) fn main_loop( 28pub(super) fn main_loop(
23 io: &mut Io, 29 receriver: &mut Receiver<RawMessage>,
24 pool: &mut ThreadPool, 30 sender: &mut Sender<RawMessage>,
25 task_sender: Sender<Task>,
26 task_receiver: Receiver<Task>,
27 fs_events_receiver: Receiver<Vec<FileEvent>>,
28) -> Result<()> { 31) -> Result<()> {
32 let pool = ThreadPool::new(4);
33 let (task_sender, task_receiver) = bounded::<Task>(16);
34 let (fs_events_receiver, watcher) = vfs::watch(vec![
35 ::std::env::current_dir()?,
36 ]);
37
29 info!("server initialized, serving requests"); 38 info!("server initialized, serving requests");
30 let mut state = ServerWorldState::new(); 39 let mut state = ServerWorldState::new();
31 40
32 let mut pending_requests: HashMap<u64, JobHandle> = HashMap::new(); 41 let mut pending_requests = HashMap::new();
33 let mut fs_events_receiver = Some(&fs_events_receiver);
34 let mut subs = Subscriptions::new(); 42 let mut subs = Subscriptions::new();
43 main_loop_inner(
44 &pool,
45 receriver,
46 sender,
47 task_receiver.clone(),
48 task_sender,
49 fs_events_receiver,
50 &mut state,
51 &mut pending_requests,
52 &mut subs,
53 )?;
54
55 info!("waiting for background jobs to finish...");
56 task_receiver.for_each(drop);
57 pool.join();
58 info!("...background jobs have finished");
59
60 info!("waiting for file watcher to finish...");
61 watcher.stop()?;
62 info!("...file watcher has finished");
63 Ok(())
64}
65
66fn main_loop_inner(
67 pool: &ThreadPool,
68 msg_receiver: &mut Receiver<RawMessage>,
69 msg_sender: &mut Sender<RawMessage>,
70 task_receiver: Receiver<Task>,
71 task_sender: Sender<Task>,
72 fs_receiver: Receiver<Vec<FileEvent>>,
73 state: &mut ServerWorldState,
74 pending_requests: &mut HashMap<u64, JobHandle>,
75 subs: &mut Subscriptions,
76) -> Result<u64> {
77 let mut fs_receiver = Some(fs_receiver);
35 loop { 78 loop {
36 enum Event { 79 enum Event {
37 Msg(RawMsg), 80 Msg(RawMessage),
38 Task(Task), 81 Task(Task),
39 Fs(Vec<FileEvent>), 82 Fs(Vec<FileEvent>),
40 ReceiverDead,
41 FsWatcherDead, 83 FsWatcherDead,
42 } 84 }
43 let event = select! { 85 let event = select! {
44 recv(io.receiver(), msg) => match msg { 86 recv(msg_receiver, msg) => match msg {
45 Some(msg) => Event::Msg(msg), 87 Some(msg) => Event::Msg(msg),
46 None => Event::ReceiverDead, 88 None => bail!("client exited without shutdown"),
47 }, 89 },
48 recv(task_receiver, task) => Event::Task(task.unwrap()), 90 recv(task_receiver, task) => Event::Task(task.unwrap()),
49 recv(fs_events_receiver, events) => match events { 91 recv(fs_receiver, events) => match events {
50 Some(events) => Event::Fs(events), 92 Some(events) => Event::Fs(events),
51 None => Event::FsWatcherDead, 93 None => Event::FsWatcherDead,
52 } 94 }
53 }; 95 };
54 let mut state_changed = false; 96 let mut state_changed = false;
55 match event { 97 match event {
56 Event::ReceiverDead => {
57 io.cleanup_receiver()?;
58 unreachable!();
59 }
60 Event::FsWatcherDead => { 98 Event::FsWatcherDead => {
61 fs_events_receiver = None; 99 fs_receiver = None;
62 } 100 }
63 Event::Task(task) => { 101 Event::Task(task) => {
64 match task { 102 match task {
@@ -66,12 +104,10 @@ pub(super) fn main_loop(
66 if let Some(handle) = pending_requests.remove(&response.id) { 104 if let Some(handle) = pending_requests.remove(&response.id) {
67 assert!(handle.has_completed()); 105 assert!(handle.has_completed());
68 } 106 }
69 io.send(RawMsg::Response(response)) 107 msg_sender.send(RawMessage::Response(response))
70 } 108 }
71 Task::Notify(n) => 109 Task::Notify(n) =>
72 io.send(RawMsg::Notification(n)), 110 msg_sender.send(RawMessage::Notification(n)),
73 Task::Die(error) =>
74 return Err(error),
75 } 111 }
76 continue; 112 continue;
77 } 113 }
@@ -82,16 +118,29 @@ pub(super) fn main_loop(
82 } 118 }
83 Event::Msg(msg) => { 119 Event::Msg(msg) => {
84 match msg { 120 match msg {
85 RawMsg::Request(req) => { 121 RawMessage::Request(req) => {
86 if !on_request(io, &mut state, &mut pending_requests, pool, &task_sender, req)? { 122 let req = match req.cast::<req::Shutdown>() {
87 return Ok(()); 123 Ok((id, _params)) => return Ok(id),
124 Err(req) => req,
125 };
126 match on_request(state, pending_requests, pool, &task_sender, req)? {
127 None => (),
128 Some(req) => {
129 error!("unknown request: {:?}", req);
130 let resp = RawResponse::err(
131 req.id,
132 ErrorCode::MethodNotFound as i32,
133 "unknown request".to_string(),
134 );
135 msg_sender.send(RawMessage::Response(resp))
136 }
88 } 137 }
89 } 138 }
90 RawMsg::Notification(not) => { 139 RawMessage::Notification(not) => {
91 on_notification(io, &mut state, &mut pending_requests, &mut subs, not)?; 140 on_notification(msg_sender, state, pending_requests, subs, not)?;
92 state_changed = true; 141 state_changed = true;
93 } 142 }
94 RawMsg::Response(resp) => { 143 RawMessage::Response(resp) => {
95 error!("unexpected response: {:?}", resp) 144 error!("unexpected response: {:?}", resp)
96 } 145 }
97 } 146 }
@@ -110,13 +159,12 @@ pub(super) fn main_loop(
110} 159}
111 160
112fn on_request( 161fn on_request(
113 io: &mut Io,
114 world: &mut ServerWorldState, 162 world: &mut ServerWorldState,
115 pending_requests: &mut HashMap<u64, JobHandle>, 163 pending_requests: &mut HashMap<u64, JobHandle>,
116 pool: &ThreadPool, 164 pool: &ThreadPool,
117 sender: &Sender<Task>, 165 sender: &Sender<Task>,
118 req: RawRequest, 166 req: RawRequest,
119) -> Result<bool> { 167) -> Result<Option<RawRequest>> {
120 let mut pool_dispatcher = PoolDispatcher { 168 let mut pool_dispatcher = PoolDispatcher {
121 req: Some(req), 169 req: Some(req),
122 res: None, 170 res: None,
@@ -141,81 +189,73 @@ fn on_request(
141 Ok((id, handle)) => { 189 Ok((id, handle)) => {
142 let inserted = pending_requests.insert(id, handle).is_none(); 190 let inserted = pending_requests.insert(id, handle).is_none();
143 assert!(inserted, "duplicate request: {}", id); 191 assert!(inserted, "duplicate request: {}", id);
192 Ok(None)
144 }, 193 },
145 Err(req) => { 194 Err(req) => Ok(Some(req)),
146 let req = dispatch::handle_request::<req::Shutdown, _>(req, |(), resp| {
147 let resp = resp.into_response(Ok(()))?;
148 io.send(RawMsg::Response(resp));
149 Ok(())
150 })?;
151 match req {
152 Ok(_id) => {
153 info!("lifecycle: initiating shutdown");
154 return Ok(false);
155 }
156 Err(req) => {
157 error!("unknown method: {:?}", req);
158 io.send(RawMsg::Response(dispatch::unknown_method(req.id)?));
159 }
160 }
161 }
162 } 195 }
163 Ok(true)
164} 196}
165 197
166fn on_notification( 198fn on_notification(
167 io: &mut Io, 199 msg_sender: &mut Sender<RawMessage>,
168 state: &mut ServerWorldState, 200 state: &mut ServerWorldState,
169 pending_requests: &mut HashMap<u64, JobHandle>, 201 pending_requests: &mut HashMap<u64, JobHandle>,
170 subs: &mut Subscriptions, 202 subs: &mut Subscriptions,
171 not: RawNotification, 203 not: RawNotification,
172) -> Result<()> { 204) -> Result<()> {
173 let mut not = Some(not); 205 let not = match not.cast::<req::Cancel>() {
174 dispatch::handle_notification::<req::Cancel, _>(&mut not, |params| { 206 Ok(params) => {
175 let id = match params.id { 207 let id = match params.id {
176 NumberOrString::Number(id) => id, 208 NumberOrString::Number(id) => id,
177 NumberOrString::String(id) => { 209 NumberOrString::String(id) => {
178 panic!("string id's not supported: {:?}", id); 210 panic!("string id's not supported: {:?}", id);
211 }
212 };
213 if let Some(handle) = pending_requests.remove(&id) {
214 handle.cancel();
179 } 215 }
180 }; 216 return Ok(())
181 if let Some(handle) = pending_requests.remove(&id) {
182 handle.cancel();
183 } 217 }
184 Ok(()) 218 Err(not) => not,
185 })?; 219 };
186 dispatch::handle_notification::<req::DidOpenTextDocument, _>(&mut not, |params| { 220 let not = match not.cast::<req::DidOpenTextDocument>() {
187 let uri = params.text_document.uri; 221 Ok(params) => {
188 let path = uri.to_file_path() 222 let uri = params.text_document.uri;
189 .map_err(|()| format_err!("invalid uri: {}", uri))?; 223 let path = uri.to_file_path()
190 let file_id = state.add_mem_file(path, params.text_document.text); 224 .map_err(|()| format_err!("invalid uri: {}", uri))?;
191 subs.add_sub(file_id); 225 let file_id = state.add_mem_file(path, params.text_document.text);
192 Ok(()) 226 subs.add_sub(file_id);
193 })?; 227 return Ok(())
194 dispatch::handle_notification::<req::DidChangeTextDocument, _>(&mut not, |mut params| { 228 }
195 let uri = params.text_document.uri; 229 Err(not) => not,
196 let path = uri.to_file_path() 230 };
197 .map_err(|()| format_err!("invalid uri: {}", uri))?; 231 let not = match not.cast::<req::DidChangeTextDocument>() {
198 let text = params.content_changes.pop() 232 Ok(mut params) => {
199 .ok_or_else(|| format_err!("empty changes"))? 233 let uri = params.text_document.uri;
200 .text; 234 let path = uri.to_file_path()
201 state.change_mem_file(path.as_path(), text)?; 235 .map_err(|()| format_err!("invalid uri: {}", uri))?;
202 Ok(()) 236 let text = params.content_changes.pop()
203 })?; 237 .ok_or_else(|| format_err!("empty changes"))?
204 dispatch::handle_notification::<req::DidCloseTextDocument, _>(&mut not, |params| { 238 .text;
205 let uri = params.text_document.uri; 239 state.change_mem_file(path.as_path(), text)?;
206 let path = uri.to_file_path() 240 return Ok(())
207 .map_err(|()| format_err!("invalid uri: {}", uri))?; 241 }
208 let file_id = state.remove_mem_file(path.as_path())?; 242 Err(not) => not,
209 subs.remove_sub(file_id); 243 };
210 let not = req::PublishDiagnosticsParams { uri, diagnostics: Vec::new() }; 244 let not = match not.cast::<req::DidCloseTextDocument>() {
211 let not = dispatch::send_notification::<req::PublishDiagnostics>(not); 245 Ok(params) => {
212 io.send(RawMsg::Notification(not)); 246 let uri = params.text_document.uri;
213 Ok(()) 247 let path = uri.to_file_path()
214 })?; 248 .map_err(|()| format_err!("invalid uri: {}", uri))?;
215 249 let file_id = state.remove_mem_file(path.as_path())?;
216 if let Some(not) = not { 250 subs.remove_sub(file_id);
217 error!("unhandled notification: {:?}", not); 251 let params = req::PublishDiagnosticsParams { uri, diagnostics: Vec::new() };
218 } 252 let not = RawNotification::new::<req::PublishDiagnostics>(params);
253 msg_sender.send(RawMessage::Notification(not));
254 return Ok(())
255 }
256 Err(not) => not,
257 };
258 error!("unhandled notification: {:?}", not);
219 Ok(()) 259 Ok(())
220} 260}
221 261
@@ -228,10 +268,14 @@ struct PoolDispatcher<'a> {
228} 268}
229 269
230impl<'a> PoolDispatcher<'a> { 270impl<'a> PoolDispatcher<'a> {
231 fn on<'b, R: req::ClientRequest>( 271 fn on<'b, R>(
232 &'b mut self, 272 &'b mut self,
233 f: fn(ServerWorld, R::Params, JobToken) -> Result<R::Result> 273 f: fn(ServerWorld, R::Params, JobToken) -> Result<R::Result>
234 ) -> Result<&'b mut Self> { 274 ) -> Result<&'b mut Self>
275 where R: req::Request,
276 R::Params: DeserializeOwned + Send + 'static,
277 R::Result: Serialize + 'static,
278 {
235 let req = match self.req.take() { 279 let req = match self.req.take() {
236 None => return Ok(self), 280 None => return Ok(self),
237 Some(req) => req, 281 Some(req) => req,
@@ -239,23 +283,24 @@ impl<'a> PoolDispatcher<'a> {
239 let world = self.world; 283 let world = self.world;
240 let sender = self.sender; 284 let sender = self.sender;
241 let pool = self.pool; 285 let pool = self.pool;
242 let (handle, token) = JobHandle::new(); 286 match req.cast::<R>() {
243 let req = dispatch::handle_request::<R, _>(req, |params, resp| { 287 Ok((id, params)) => {
244 let world = world.snapshot(); 288 let (handle, token) = JobHandle::new();
245 let sender = sender.clone(); 289 let world = world.snapshot();
246 pool.execute(move || { 290 let sender = sender.clone();
247 let res = f(world, params, token); 291 pool.execute(move || {
248 let task = match resp.into_response(res) { 292 let resp = match f(world, params, token) {
249 Ok(resp) => Task::Respond(resp), 293 Ok(resp) => RawResponse::ok(id, resp),
250 Err(e) => Task::Die(e), 294 Err(e) => RawResponse::err(id, ErrorCode::InternalError as i32, e.to_string()),
251 }; 295 };
252 sender.send(task); 296 let task = Task::Respond(resp);
253 }); 297 sender.send(task);
254 Ok(()) 298 });
255 })?; 299 self.res = Some((id, handle));
256 match req { 300 }
257 Ok(id) => self.res = Some((id, handle)), 301 Err(req) => {
258 Err(req) => self.req = Some(req), 302 self.req = Some(req)
303 }
259 } 304 }
260 Ok(self) 305 Ok(self)
261 } 306 }
@@ -282,7 +327,7 @@ fn update_file_notifications_on_threadpool(
282 error!("failed to compute diagnostics: {:?}", e) 327 error!("failed to compute diagnostics: {:?}", e)
283 } 328 }
284 Ok(params) => { 329 Ok(params) => {
285 let not = dispatch::send_notification::<req::PublishDiagnostics>(params); 330 let not = RawNotification::new::<req::PublishDiagnostics>(params);
286 sender.send(Task::Notify(not)); 331 sender.send(Task::Notify(not));
287 } 332 }
288 } 333 }
@@ -291,7 +336,7 @@ fn update_file_notifications_on_threadpool(
291 error!("failed to compute decorations: {:?}", e) 336 error!("failed to compute decorations: {:?}", e)
292 } 337 }
293 Ok(params) => { 338 Ok(params) => {
294 let not = dispatch::send_notification::<req::PublishDecorations>(params); 339 let not = RawNotification::new::<req::PublishDecorations>(params);
295 sender.send(Task::Notify(not)) 340 sender.send(Task::Notify(not))
296 } 341 }
297 } 342 }