aboutsummaryrefslogtreecommitdiff
path: root/codeless/server/src
diff options
context:
space:
mode:
Diffstat (limited to 'codeless/server/src')
-rw-r--r--codeless/server/src/dispatch.rs4
-rw-r--r--codeless/server/src/io.rs55
-rw-r--r--codeless/server/src/main.rs126
3 files changed, 142 insertions, 43 deletions
diff --git a/codeless/server/src/dispatch.rs b/codeless/server/src/dispatch.rs
index a9476acde..ee87fa6c3 100644
--- a/codeless/server/src/dispatch.rs
+++ b/codeless/server/src/dispatch.rs
@@ -24,8 +24,8 @@ impl<R: Request> Responder<R>
24 R::Params: DeserializeOwned, 24 R::Params: DeserializeOwned,
25 R::Result: Serialize, 25 R::Result: Serialize,
26{ 26{
27 pub fn respond_with(self, io: &mut Io, f: impl FnOnce() -> Result<R::Result>) -> Result<()> { 27 pub fn response(self, io: &mut Io, resp: Result<R::Result>) -> Result<()> {
28 match f() { 28 match resp {
29 Ok(res) => self.result(io, res)?, 29 Ok(res) => self.result(io, res)?,
30 Err(e) => { 30 Err(e) => {
31 self.error(io)?; 31 self.error(io)?;
diff --git a/codeless/server/src/io.rs b/codeless/server/src/io.rs
index b84103d65..5eafc6942 100644
--- a/codeless/server/src/io.rs
+++ b/codeless/server/src/io.rs
@@ -49,16 +49,21 @@ impl MsgReceiver {
49 match self.chan.recv() { 49 match self.chan.recv() {
50 Some(msg) => Ok(msg), 50 Some(msg) => Ok(msg),
51 None => { 51 None => {
52 self.thread 52 self.cleanup()?;
53 .take() 53 unreachable!()
54 .ok_or_else(|| format_err!("MsgReceiver thread panicked"))?
55 .join()
56 .map_err(|_| format_err!("MsgReceiver thread panicked"))??;
57 bail!("client disconnected")
58 } 54 }
59 } 55 }
60 } 56 }
61 57
58 fn cleanup(&mut self) -> Result<()> {
59 self.thread
60 .take()
61 .ok_or_else(|| format_err!("MsgReceiver thread panicked"))?
62 .join()
63 .map_err(|_| format_err!("MsgReceiver thread panicked"))??;
64 bail!("client disconnected")
65 }
66
62 fn stop(self) -> Result<()> { 67 fn stop(self) -> Result<()> {
63 // Can't really self.thread.join() here, b/c it might be 68 // Can't really self.thread.join() here, b/c it might be
64 // blocking on read 69 // blocking on read
@@ -68,7 +73,7 @@ impl MsgReceiver {
68 73
69struct MsgSender { 74struct MsgSender {
70 chan: Sender<RawMsg>, 75 chan: Sender<RawMsg>,
71 thread: Option<thread::JoinHandle<Result<()>>>, 76 thread: thread::JoinHandle<Result<()>>,
72} 77}
73 78
74impl MsgSender { 79impl MsgSender {
@@ -76,28 +81,14 @@ impl MsgSender {
76 self.chan.send(msg) 81 self.chan.send(msg)
77 } 82 }
78 83
79 fn stop(mut self) -> Result<()> { 84 fn stop(self) -> Result<()> {
80 if let Some(thread) = self.thread.take() { 85 drop(self.chan);
81 thread.join() 86 self.thread.join()
82 .map_err(|_| format_err!("MsgSender thread panicked"))?? 87 .map_err(|_| format_err!("MsgSender thread panicked"))??;
83 }
84 Ok(()) 88 Ok(())
85 } 89 }
86} 90}
87 91
88impl Drop for MsgSender {
89 fn drop(&mut self) {
90 if let Some(thread) = self.thread.take() {
91 let res = thread.join();
92 if thread::panicking() {
93 drop(res)
94 } else {
95 res.unwrap().unwrap()
96 }
97 }
98 }
99}
100
101pub struct Io { 92pub struct Io {
102 receiver: MsgReceiver, 93 receiver: MsgReceiver,
103 sender: MsgSender, 94 sender: MsgSender,
@@ -109,7 +100,7 @@ impl Io {
109 let (tx, rx) = bounded(16); 100 let (tx, rx) = bounded(16);
110 MsgSender { 101 MsgSender {
111 chan: tx, 102 chan: tx,
112 thread: Some(thread::spawn(move || { 103 thread: thread::spawn(move || {
113 let stdout = stdout(); 104 let stdout = stdout();
114 let mut stdout = stdout.lock(); 105 let mut stdout = stdout.lock();
115 for msg in rx { 106 for msg in rx {
@@ -126,7 +117,7 @@ impl Io {
126 write_msg_text(&mut stdout, &text)?; 117 write_msg_text(&mut stdout, &text)?;
127 } 118 }
128 Ok(()) 119 Ok(())
129 })), 120 }),
130 } 121 }
131 }; 122 };
132 let receiver = { 123 let receiver = {
@@ -155,6 +146,14 @@ impl Io {
155 self.receiver.recv() 146 self.receiver.recv()
156 } 147 }
157 148
149 pub fn receiver(&mut self) -> &mut Receiver<RawMsg> {
150 &mut self.receiver.chan
151 }
152
153 pub fn cleanup_receiver(&mut self) -> Result<()> {
154 self.receiver.cleanup()
155 }
156
158 pub fn stop(self) -> Result<()> { 157 pub fn stop(self) -> Result<()> {
159 self.receiver.stop()?; 158 self.receiver.stop()?;
160 self.sender.stop()?; 159 self.sender.stop()?;
@@ -190,10 +189,12 @@ fn read_msg_text(inp: &mut impl BufRead) -> Result<Option<String>> {
190 buf.resize(size, 0); 189 buf.resize(size, 0);
191 inp.read_exact(&mut buf)?; 190 inp.read_exact(&mut buf)?;
192 let buf = String::from_utf8(buf)?; 191 let buf = String::from_utf8(buf)?;
192 debug!("< {}", buf);
193 Ok(Some(buf)) 193 Ok(Some(buf))
194} 194}
195 195
196fn write_msg_text(out: &mut impl Write, msg: &str) -> Result<()> { 196fn write_msg_text(out: &mut impl Write, msg: &str) -> Result<()> {
197 debug!("> {}", msg);
197 write!(out, "Content-Length: {}\r\n\r\n", msg.len())?; 198 write!(out, "Content-Length: {}\r\n\r\n", msg.len())?;
198 out.write_all(msg.as_bytes())?; 199 out.write_all(msg.as_bytes())?;
199 out.flush()?; 200 out.flush()?;
diff --git a/codeless/server/src/main.rs b/codeless/server/src/main.rs
index 11b6b7067..92f6a400c 100644
--- a/codeless/server/src/main.rs
+++ b/codeless/server/src/main.rs
@@ -6,7 +6,12 @@ extern crate serde;
6extern crate serde_json; 6extern crate serde_json;
7extern crate languageserver_types; 7extern crate languageserver_types;
8extern crate drop_bomb; 8extern crate drop_bomb;
9#[macro_use]
9extern crate crossbeam_channel; 10extern crate crossbeam_channel;
11extern crate threadpool;
12#[macro_use]
13extern crate log;
14extern crate flexi_logger;
10extern crate libeditor; 15extern crate libeditor;
11extern crate libanalysis; 16extern crate libanalysis;
12 17
@@ -16,16 +21,50 @@ mod req;
16mod dispatch; 21mod dispatch;
17 22
18use languageserver_types::InitializeResult; 23use languageserver_types::InitializeResult;
24use threadpool::ThreadPool;
25use crossbeam_channel::{bounded, Sender, Receiver};
26use flexi_logger::Logger;
19use libanalysis::WorldState; 27use libanalysis::WorldState;
20use self::io::{Io, RawMsg}; 28
29use ::{
30 io::{Io, RawMsg},
31};
21 32
22pub type Result<T> = ::std::result::Result<T, ::failure::Error>; 33pub type Result<T> = ::std::result::Result<T, ::failure::Error>;
23 34
24fn main() -> Result<()> { 35fn main() -> Result<()> {
36 Logger::with_env_or_str("m=trace")
37 .log_to_file()
38 .directory("log")
39 .start()?;
40 info!("starting server");
41 match ::std::panic::catch_unwind(|| main_inner()) {
42 Ok(res) => {
43 info!("shutting down: {:?}", res);
44 res
45 }
46 Err(_) => {
47 error!("server panicked");
48 bail!("server panicked")
49 },
50 }
51}
52
53fn main_inner() -> Result<()> {
25 let mut io = Io::from_stdio(); 54 let mut io = Io::from_stdio();
26 initialize(&mut io)?; 55 let res = initialize(&mut io);
27 io.stop()?; 56 info!("shutting down IO...");
28 Ok(()) 57 let io_res = io.stop();
58 info!("... IO is down");
59 match (res, io_res) {
60 (Ok(()), Ok(())) => Ok(()),
61 (res, Ok(())) => res,
62 (Ok(()), io_res) => io_res,
63 (res, Err(io_err)) => {
64 error!("shutdown error: {:?}", io_err);
65 res
66 }
67 }
29} 68}
30 69
31fn initialize(io: &mut Io) -> Result<()> { 70fn initialize(io: &mut Io) -> Result<()> {
@@ -59,20 +98,69 @@ fn initialize(io: &mut Io) -> Result<()> {
59 } 98 }
60} 99}
61 100
101type Thunk = Box<for<'a> FnBox<&'a mut Io, Result<()>>>;
102
62fn initialized(io: &mut Io) -> Result<()> { 103fn initialized(io: &mut Io) -> Result<()> {
63 eprintln!("initialized"); 104 let mut world = WorldState::new();
64 let world = WorldState::new(); 105 let mut pool = ThreadPool::new(4);
106 let (sender, receiver) = bounded::<Thunk>(16);
107 let res = main_loop(io, &mut world, &mut pool, sender, receiver.clone());
108 info!("waiting for background jobs to finish...");
109 receiver.for_each(drop);
110 info!("...background jobs have finished");
111 res
112}
113
114fn main_loop(
115 io: &mut Io,
116 world: &mut WorldState,
117 pool: &mut ThreadPool,
118 sender: Sender<Thunk>,
119 receiver: Receiver<Thunk>,
120) -> Result<()> {
121 info!("server initialized, serving requests");
65 loop { 122 loop {
66 match io.recv()? { 123 enum Event {
124 Msg(RawMsg),
125 Thunk(Thunk),
126 ReceiverDead,
127 }
128
129 let event = select! {
130 recv(io.receiver(), msg) => match msg {
131 Some(msg) => Event::Msg(msg),
132 None => Event::ReceiverDead,
133 },
134 recv(receiver, thunk) => Event::Thunk(thunk.unwrap()),
135 };
136
137 let msg = match event {
138 Event::ReceiverDead => {
139 io.cleanup_receiver()?;
140 unreachable!();
141 }
142 Event::Thunk(thunk) => {
143 thunk.call_box(io)?;
144 continue;
145 }
146 Event::Msg(msg) => msg,
147 };
148
149 match msg {
67 RawMsg::Request(req) => { 150 RawMsg::Request(req) => {
68 let world = world.snapshot();
69 if let Some((params, resp)) = dispatch::expect::<req::SyntaxTree>(io, req)? { 151 if let Some((params, resp)) = dispatch::expect::<req::SyntaxTree>(io, req)? {
70 resp.respond_with(io, || { 152 let world = world.snapshot();
71 let path = params.text_document.uri.to_file_path() 153 let sender = sender.clone();
72 .map_err(|()| format_err!("invalid path"))?; 154 pool.execute(move || {
73 let file = world.file_syntax(&path)?; 155 let res: Result<String> = (|| {
74 Ok(libeditor::syntax_tree(&file)) 156 let path = params.text_document.uri.to_file_path()
75 })? 157 .map_err(|()| format_err!("invalid path"))?;
158 let file = world.file_syntax(&path)?;
159 Ok(libeditor::syntax_tree(&file))
160 })();
161
162 sender.send(Box::new(|io: &mut Io| resp.response(io, res)))
163 });
76 } 164 }
77 } 165 }
78 msg => { 166 msg => {
@@ -82,3 +170,13 @@ fn initialized(io: &mut Io) -> Result<()> {
82 } 170 }
83} 171}
84 172
173
174trait FnBox<A, R>: Send {
175 fn call_box(self: Box<Self>, a: A) -> R;
176}
177
178impl<A, R, F: FnOnce(A) -> R + Send> FnBox<A, R> for F {
179 fn call_box(self: Box<F>, a: A) -> R {
180 (*self)(a)
181 }
182}