diff options
author | Aleksey Kladov <[email protected]> | 2018-08-10 15:49:45 +0100 |
---|---|---|
committer | Aleksey Kladov <[email protected]> | 2018-08-10 15:49:45 +0100 |
commit | 2e165ae82eed1dc62f1f4c68e45440c143c7c8ef (patch) | |
tree | 4148d68878bbd05a0c7b7f4ace803083f23293fd /codeless/server/src | |
parent | d7c5a6f3081c2e7266620779d3c32067f947b959 (diff) |
logging
Diffstat (limited to 'codeless/server/src')
-rw-r--r-- | codeless/server/src/dispatch.rs | 4 | ||||
-rw-r--r-- | codeless/server/src/io.rs | 55 | ||||
-rw-r--r-- | codeless/server/src/main.rs | 126 |
3 files changed, 142 insertions, 43 deletions
diff --git a/codeless/server/src/dispatch.rs b/codeless/server/src/dispatch.rs index a9476acde..ee87fa6c3 100644 --- a/codeless/server/src/dispatch.rs +++ b/codeless/server/src/dispatch.rs | |||
@@ -24,8 +24,8 @@ impl<R: Request> Responder<R> | |||
24 | R::Params: DeserializeOwned, | 24 | R::Params: DeserializeOwned, |
25 | R::Result: Serialize, | 25 | R::Result: Serialize, |
26 | { | 26 | { |
27 | pub fn respond_with(self, io: &mut Io, f: impl FnOnce() -> Result<R::Result>) -> Result<()> { | 27 | pub fn response(self, io: &mut Io, resp: Result<R::Result>) -> Result<()> { |
28 | match f() { | 28 | match resp { |
29 | Ok(res) => self.result(io, res)?, | 29 | Ok(res) => self.result(io, res)?, |
30 | Err(e) => { | 30 | Err(e) => { |
31 | self.error(io)?; | 31 | self.error(io)?; |
diff --git a/codeless/server/src/io.rs b/codeless/server/src/io.rs index b84103d65..5eafc6942 100644 --- a/codeless/server/src/io.rs +++ b/codeless/server/src/io.rs | |||
@@ -49,16 +49,21 @@ impl MsgReceiver { | |||
49 | match self.chan.recv() { | 49 | match self.chan.recv() { |
50 | Some(msg) => Ok(msg), | 50 | Some(msg) => Ok(msg), |
51 | None => { | 51 | None => { |
52 | self.thread | 52 | self.cleanup()?; |
53 | .take() | 53 | unreachable!() |
54 | .ok_or_else(|| format_err!("MsgReceiver thread panicked"))? | ||
55 | .join() | ||
56 | .map_err(|_| format_err!("MsgReceiver thread panicked"))??; | ||
57 | bail!("client disconnected") | ||
58 | } | 54 | } |
59 | } | 55 | } |
60 | } | 56 | } |
61 | 57 | ||
58 | fn cleanup(&mut self) -> Result<()> { | ||
59 | self.thread | ||
60 | .take() | ||
61 | .ok_or_else(|| format_err!("MsgReceiver thread panicked"))? | ||
62 | .join() | ||
63 | .map_err(|_| format_err!("MsgReceiver thread panicked"))??; | ||
64 | bail!("client disconnected") | ||
65 | } | ||
66 | |||
62 | fn stop(self) -> Result<()> { | 67 | fn stop(self) -> Result<()> { |
63 | // Can't really self.thread.join() here, b/c it might be | 68 | // Can't really self.thread.join() here, b/c it might be |
64 | // blocking on read | 69 | // blocking on read |
@@ -68,7 +73,7 @@ impl MsgReceiver { | |||
68 | 73 | ||
69 | struct MsgSender { | 74 | struct MsgSender { |
70 | chan: Sender<RawMsg>, | 75 | chan: Sender<RawMsg>, |
71 | thread: Option<thread::JoinHandle<Result<()>>>, | 76 | thread: thread::JoinHandle<Result<()>>, |
72 | } | 77 | } |
73 | 78 | ||
74 | impl MsgSender { | 79 | impl MsgSender { |
@@ -76,28 +81,14 @@ impl MsgSender { | |||
76 | self.chan.send(msg) | 81 | self.chan.send(msg) |
77 | } | 82 | } |
78 | 83 | ||
79 | fn stop(mut self) -> Result<()> { | 84 | fn stop(self) -> Result<()> { |
80 | if let Some(thread) = self.thread.take() { | 85 | drop(self.chan); |
81 | thread.join() | 86 | self.thread.join() |
82 | .map_err(|_| format_err!("MsgSender thread panicked"))?? | 87 | .map_err(|_| format_err!("MsgSender thread panicked"))??; |
83 | } | ||
84 | Ok(()) | 88 | Ok(()) |
85 | } | 89 | } |
86 | } | 90 | } |
87 | 91 | ||
88 | impl Drop for MsgSender { | ||
89 | fn drop(&mut self) { | ||
90 | if let Some(thread) = self.thread.take() { | ||
91 | let res = thread.join(); | ||
92 | if thread::panicking() { | ||
93 | drop(res) | ||
94 | } else { | ||
95 | res.unwrap().unwrap() | ||
96 | } | ||
97 | } | ||
98 | } | ||
99 | } | ||
100 | |||
101 | pub struct Io { | 92 | pub struct Io { |
102 | receiver: MsgReceiver, | 93 | receiver: MsgReceiver, |
103 | sender: MsgSender, | 94 | sender: MsgSender, |
@@ -109,7 +100,7 @@ impl Io { | |||
109 | let (tx, rx) = bounded(16); | 100 | let (tx, rx) = bounded(16); |
110 | MsgSender { | 101 | MsgSender { |
111 | chan: tx, | 102 | chan: tx, |
112 | thread: Some(thread::spawn(move || { | 103 | thread: thread::spawn(move || { |
113 | let stdout = stdout(); | 104 | let stdout = stdout(); |
114 | let mut stdout = stdout.lock(); | 105 | let mut stdout = stdout.lock(); |
115 | for msg in rx { | 106 | for msg in rx { |
@@ -126,7 +117,7 @@ impl Io { | |||
126 | write_msg_text(&mut stdout, &text)?; | 117 | write_msg_text(&mut stdout, &text)?; |
127 | } | 118 | } |
128 | Ok(()) | 119 | Ok(()) |
129 | })), | 120 | }), |
130 | } | 121 | } |
131 | }; | 122 | }; |
132 | let receiver = { | 123 | let receiver = { |
@@ -155,6 +146,14 @@ impl Io { | |||
155 | self.receiver.recv() | 146 | self.receiver.recv() |
156 | } | 147 | } |
157 | 148 | ||
149 | pub fn receiver(&mut self) -> &mut Receiver<RawMsg> { | ||
150 | &mut self.receiver.chan | ||
151 | } | ||
152 | |||
153 | pub fn cleanup_receiver(&mut self) -> Result<()> { | ||
154 | self.receiver.cleanup() | ||
155 | } | ||
156 | |||
158 | pub fn stop(self) -> Result<()> { | 157 | pub fn stop(self) -> Result<()> { |
159 | self.receiver.stop()?; | 158 | self.receiver.stop()?; |
160 | self.sender.stop()?; | 159 | self.sender.stop()?; |
@@ -190,10 +189,12 @@ fn read_msg_text(inp: &mut impl BufRead) -> Result<Option<String>> { | |||
190 | buf.resize(size, 0); | 189 | buf.resize(size, 0); |
191 | inp.read_exact(&mut buf)?; | 190 | inp.read_exact(&mut buf)?; |
192 | let buf = String::from_utf8(buf)?; | 191 | let buf = String::from_utf8(buf)?; |
192 | debug!("< {}", buf); | ||
193 | Ok(Some(buf)) | 193 | Ok(Some(buf)) |
194 | } | 194 | } |
195 | 195 | ||
196 | fn write_msg_text(out: &mut impl Write, msg: &str) -> Result<()> { | 196 | fn write_msg_text(out: &mut impl Write, msg: &str) -> Result<()> { |
197 | debug!("> {}", msg); | ||
197 | write!(out, "Content-Length: {}\r\n\r\n", msg.len())?; | 198 | write!(out, "Content-Length: {}\r\n\r\n", msg.len())?; |
198 | out.write_all(msg.as_bytes())?; | 199 | out.write_all(msg.as_bytes())?; |
199 | out.flush()?; | 200 | out.flush()?; |
diff --git a/codeless/server/src/main.rs b/codeless/server/src/main.rs index 11b6b7067..92f6a400c 100644 --- a/codeless/server/src/main.rs +++ b/codeless/server/src/main.rs | |||
@@ -6,7 +6,12 @@ extern crate serde; | |||
6 | extern crate serde_json; | 6 | extern crate serde_json; |
7 | extern crate languageserver_types; | 7 | extern crate languageserver_types; |
8 | extern crate drop_bomb; | 8 | extern crate drop_bomb; |
9 | #[macro_use] | ||
9 | extern crate crossbeam_channel; | 10 | extern crate crossbeam_channel; |
11 | extern crate threadpool; | ||
12 | #[macro_use] | ||
13 | extern crate log; | ||
14 | extern crate flexi_logger; | ||
10 | extern crate libeditor; | 15 | extern crate libeditor; |
11 | extern crate libanalysis; | 16 | extern crate libanalysis; |
12 | 17 | ||
@@ -16,16 +21,50 @@ mod req; | |||
16 | mod dispatch; | 21 | mod dispatch; |
17 | 22 | ||
18 | use languageserver_types::InitializeResult; | 23 | use languageserver_types::InitializeResult; |
24 | use threadpool::ThreadPool; | ||
25 | use crossbeam_channel::{bounded, Sender, Receiver}; | ||
26 | use flexi_logger::Logger; | ||
19 | use libanalysis::WorldState; | 27 | use libanalysis::WorldState; |
20 | use self::io::{Io, RawMsg}; | 28 | |
29 | use ::{ | ||
30 | io::{Io, RawMsg}, | ||
31 | }; | ||
21 | 32 | ||
22 | pub type Result<T> = ::std::result::Result<T, ::failure::Error>; | 33 | pub type Result<T> = ::std::result::Result<T, ::failure::Error>; |
23 | 34 | ||
24 | fn main() -> Result<()> { | 35 | fn main() -> Result<()> { |
36 | Logger::with_env_or_str("m=trace") | ||
37 | .log_to_file() | ||
38 | .directory("log") | ||
39 | .start()?; | ||
40 | info!("starting server"); | ||
41 | match ::std::panic::catch_unwind(|| main_inner()) { | ||
42 | Ok(res) => { | ||
43 | info!("shutting down: {:?}", res); | ||
44 | res | ||
45 | } | ||
46 | Err(_) => { | ||
47 | error!("server panicked"); | ||
48 | bail!("server panicked") | ||
49 | }, | ||
50 | } | ||
51 | } | ||
52 | |||
53 | fn main_inner() -> Result<()> { | ||
25 | let mut io = Io::from_stdio(); | 54 | let mut io = Io::from_stdio(); |
26 | initialize(&mut io)?; | 55 | let res = initialize(&mut io); |
27 | io.stop()?; | 56 | info!("shutting down IO..."); |
28 | Ok(()) | 57 | let io_res = io.stop(); |
58 | info!("... IO is down"); | ||
59 | match (res, io_res) { | ||
60 | (Ok(()), Ok(())) => Ok(()), | ||
61 | (res, Ok(())) => res, | ||
62 | (Ok(()), io_res) => io_res, | ||
63 | (res, Err(io_err)) => { | ||
64 | error!("shutdown error: {:?}", io_err); | ||
65 | res | ||
66 | } | ||
67 | } | ||
29 | } | 68 | } |
30 | 69 | ||
31 | fn initialize(io: &mut Io) -> Result<()> { | 70 | fn initialize(io: &mut Io) -> Result<()> { |
@@ -59,20 +98,69 @@ fn initialize(io: &mut Io) -> Result<()> { | |||
59 | } | 98 | } |
60 | } | 99 | } |
61 | 100 | ||
101 | type Thunk = Box<for<'a> FnBox<&'a mut Io, Result<()>>>; | ||
102 | |||
62 | fn initialized(io: &mut Io) -> Result<()> { | 103 | fn initialized(io: &mut Io) -> Result<()> { |
63 | eprintln!("initialized"); | 104 | let mut world = WorldState::new(); |
64 | let world = WorldState::new(); | 105 | let mut pool = ThreadPool::new(4); |
106 | let (sender, receiver) = bounded::<Thunk>(16); | ||
107 | let res = main_loop(io, &mut world, &mut pool, sender, receiver.clone()); | ||
108 | info!("waiting for background jobs to finish..."); | ||
109 | receiver.for_each(drop); | ||
110 | info!("...background jobs have finished"); | ||
111 | res | ||
112 | } | ||
113 | |||
114 | fn main_loop( | ||
115 | io: &mut Io, | ||
116 | world: &mut WorldState, | ||
117 | pool: &mut ThreadPool, | ||
118 | sender: Sender<Thunk>, | ||
119 | receiver: Receiver<Thunk>, | ||
120 | ) -> Result<()> { | ||
121 | info!("server initialized, serving requests"); | ||
65 | loop { | 122 | loop { |
66 | match io.recv()? { | 123 | enum Event { |
124 | Msg(RawMsg), | ||
125 | Thunk(Thunk), | ||
126 | ReceiverDead, | ||
127 | } | ||
128 | |||
129 | let event = select! { | ||
130 | recv(io.receiver(), msg) => match msg { | ||
131 | Some(msg) => Event::Msg(msg), | ||
132 | None => Event::ReceiverDead, | ||
133 | }, | ||
134 | recv(receiver, thunk) => Event::Thunk(thunk.unwrap()), | ||
135 | }; | ||
136 | |||
137 | let msg = match event { | ||
138 | Event::ReceiverDead => { | ||
139 | io.cleanup_receiver()?; | ||
140 | unreachable!(); | ||
141 | } | ||
142 | Event::Thunk(thunk) => { | ||
143 | thunk.call_box(io)?; | ||
144 | continue; | ||
145 | } | ||
146 | Event::Msg(msg) => msg, | ||
147 | }; | ||
148 | |||
149 | match msg { | ||
67 | RawMsg::Request(req) => { | 150 | RawMsg::Request(req) => { |
68 | let world = world.snapshot(); | ||
69 | if let Some((params, resp)) = dispatch::expect::<req::SyntaxTree>(io, req)? { | 151 | if let Some((params, resp)) = dispatch::expect::<req::SyntaxTree>(io, req)? { |
70 | resp.respond_with(io, || { | 152 | let world = world.snapshot(); |
71 | let path = params.text_document.uri.to_file_path() | 153 | let sender = sender.clone(); |
72 | .map_err(|()| format_err!("invalid path"))?; | 154 | pool.execute(move || { |
73 | let file = world.file_syntax(&path)?; | 155 | let res: Result<String> = (|| { |
74 | Ok(libeditor::syntax_tree(&file)) | 156 | let path = params.text_document.uri.to_file_path() |
75 | })? | 157 | .map_err(|()| format_err!("invalid path"))?; |
158 | let file = world.file_syntax(&path)?; | ||
159 | Ok(libeditor::syntax_tree(&file)) | ||
160 | })(); | ||
161 | |||
162 | sender.send(Box::new(|io: &mut Io| resp.response(io, res))) | ||
163 | }); | ||
76 | } | 164 | } |
77 | } | 165 | } |
78 | msg => { | 166 | msg => { |
@@ -82,3 +170,13 @@ fn initialized(io: &mut Io) -> Result<()> { | |||
82 | } | 170 | } |
83 | } | 171 | } |
84 | 172 | ||
173 | |||
174 | trait FnBox<A, R>: Send { | ||
175 | fn call_box(self: Box<Self>, a: A) -> R; | ||
176 | } | ||
177 | |||
178 | impl<A, R, F: FnOnce(A) -> R + Send> FnBox<A, R> for F { | ||
179 | fn call_box(self: Box<F>, a: A) -> R { | ||
180 | (*self)(a) | ||
181 | } | ||
182 | } | ||