From 2e165ae82eed1dc62f1f4c68e45440c143c7c8ef Mon Sep 17 00:00:00 2001 From: Aleksey Kladov Date: Fri, 10 Aug 2018 17:49:45 +0300 Subject: logging --- codeless/server/src/dispatch.rs | 4 +- codeless/server/src/io.rs | 55 +++++++++--------- codeless/server/src/main.rs | 126 +++++++++++++++++++++++++++++++++++----- 3 files changed, 142 insertions(+), 43 deletions(-) (limited to 'codeless/server/src') diff --git a/codeless/server/src/dispatch.rs b/codeless/server/src/dispatch.rs index a9476acde..ee87fa6c3 100644 --- a/codeless/server/src/dispatch.rs +++ b/codeless/server/src/dispatch.rs @@ -24,8 +24,8 @@ impl Responder R::Params: DeserializeOwned, R::Result: Serialize, { - pub fn respond_with(self, io: &mut Io, f: impl FnOnce() -> Result) -> Result<()> { - match f() { + pub fn response(self, io: &mut Io, resp: Result) -> Result<()> { + match resp { Ok(res) => self.result(io, res)?, Err(e) => { self.error(io)?; diff --git a/codeless/server/src/io.rs b/codeless/server/src/io.rs index b84103d65..5eafc6942 100644 --- a/codeless/server/src/io.rs +++ b/codeless/server/src/io.rs @@ -49,16 +49,21 @@ impl MsgReceiver { match self.chan.recv() { Some(msg) => Ok(msg), None => { - self.thread - .take() - .ok_or_else(|| format_err!("MsgReceiver thread panicked"))? - .join() - .map_err(|_| format_err!("MsgReceiver thread panicked"))??; - bail!("client disconnected") + self.cleanup()?; + unreachable!() } } } + fn cleanup(&mut self) -> Result<()> { + self.thread + .take() + .ok_or_else(|| format_err!("MsgReceiver thread panicked"))? + .join() + .map_err(|_| format_err!("MsgReceiver thread panicked"))??; + bail!("client disconnected") + } + fn stop(self) -> Result<()> { // Can't really self.thread.join() here, b/c it might be // blocking on read @@ -68,7 +73,7 @@ impl MsgReceiver { struct MsgSender { chan: Sender, - thread: Option>>, + thread: thread::JoinHandle>, } impl MsgSender { @@ -76,28 +81,14 @@ impl MsgSender { self.chan.send(msg) } - fn stop(mut self) -> Result<()> { - if let Some(thread) = self.thread.take() { - thread.join() - .map_err(|_| format_err!("MsgSender thread panicked"))?? - } + fn stop(self) -> Result<()> { + drop(self.chan); + self.thread.join() + .map_err(|_| format_err!("MsgSender thread panicked"))??; Ok(()) } } -impl Drop for MsgSender { - fn drop(&mut self) { - if let Some(thread) = self.thread.take() { - let res = thread.join(); - if thread::panicking() { - drop(res) - } else { - res.unwrap().unwrap() - } - } - } -} - pub struct Io { receiver: MsgReceiver, sender: MsgSender, @@ -109,7 +100,7 @@ impl Io { let (tx, rx) = bounded(16); MsgSender { chan: tx, - thread: Some(thread::spawn(move || { + thread: thread::spawn(move || { let stdout = stdout(); let mut stdout = stdout.lock(); for msg in rx { @@ -126,7 +117,7 @@ impl Io { write_msg_text(&mut stdout, &text)?; } Ok(()) - })), + }), } }; let receiver = { @@ -155,6 +146,14 @@ impl Io { self.receiver.recv() } + pub fn receiver(&mut self) -> &mut Receiver { + &mut self.receiver.chan + } + + pub fn cleanup_receiver(&mut self) -> Result<()> { + self.receiver.cleanup() + } + pub fn stop(self) -> Result<()> { self.receiver.stop()?; self.sender.stop()?; @@ -190,10 +189,12 @@ fn read_msg_text(inp: &mut impl BufRead) -> Result> { buf.resize(size, 0); inp.read_exact(&mut buf)?; let buf = String::from_utf8(buf)?; + debug!("< {}", buf); Ok(Some(buf)) } fn write_msg_text(out: &mut impl Write, msg: &str) -> Result<()> { + debug!("> {}", msg); write!(out, "Content-Length: {}\r\n\r\n", msg.len())?; out.write_all(msg.as_bytes())?; out.flush()?; diff --git a/codeless/server/src/main.rs b/codeless/server/src/main.rs index 11b6b7067..92f6a400c 100644 --- a/codeless/server/src/main.rs +++ b/codeless/server/src/main.rs @@ -6,7 +6,12 @@ extern crate serde; extern crate serde_json; extern crate languageserver_types; extern crate drop_bomb; +#[macro_use] extern crate crossbeam_channel; +extern crate threadpool; +#[macro_use] +extern crate log; +extern crate flexi_logger; extern crate libeditor; extern crate libanalysis; @@ -16,16 +21,50 @@ mod req; mod dispatch; use languageserver_types::InitializeResult; +use threadpool::ThreadPool; +use crossbeam_channel::{bounded, Sender, Receiver}; +use flexi_logger::Logger; use libanalysis::WorldState; -use self::io::{Io, RawMsg}; + +use ::{ + io::{Io, RawMsg}, +}; pub type Result = ::std::result::Result; fn main() -> Result<()> { + Logger::with_env_or_str("m=trace") + .log_to_file() + .directory("log") + .start()?; + info!("starting server"); + match ::std::panic::catch_unwind(|| main_inner()) { + Ok(res) => { + info!("shutting down: {:?}", res); + res + } + Err(_) => { + error!("server panicked"); + bail!("server panicked") + }, + } +} + +fn main_inner() -> Result<()> { let mut io = Io::from_stdio(); - initialize(&mut io)?; - io.stop()?; - Ok(()) + let res = initialize(&mut io); + info!("shutting down IO..."); + let io_res = io.stop(); + info!("... IO is down"); + match (res, io_res) { + (Ok(()), Ok(())) => Ok(()), + (res, Ok(())) => res, + (Ok(()), io_res) => io_res, + (res, Err(io_err)) => { + error!("shutdown error: {:?}", io_err); + res + } + } } fn initialize(io: &mut Io) -> Result<()> { @@ -59,20 +98,69 @@ fn initialize(io: &mut Io) -> Result<()> { } } +type Thunk = Box FnBox<&'a mut Io, Result<()>>>; + fn initialized(io: &mut Io) -> Result<()> { - eprintln!("initialized"); - let world = WorldState::new(); + let mut world = WorldState::new(); + let mut pool = ThreadPool::new(4); + let (sender, receiver) = bounded::(16); + let res = main_loop(io, &mut world, &mut pool, sender, receiver.clone()); + info!("waiting for background jobs to finish..."); + receiver.for_each(drop); + info!("...background jobs have finished"); + res +} + +fn main_loop( + io: &mut Io, + world: &mut WorldState, + pool: &mut ThreadPool, + sender: Sender, + receiver: Receiver, +) -> Result<()> { + info!("server initialized, serving requests"); loop { - match io.recv()? { + enum Event { + Msg(RawMsg), + Thunk(Thunk), + ReceiverDead, + } + + let event = select! { + recv(io.receiver(), msg) => match msg { + Some(msg) => Event::Msg(msg), + None => Event::ReceiverDead, + }, + recv(receiver, thunk) => Event::Thunk(thunk.unwrap()), + }; + + let msg = match event { + Event::ReceiverDead => { + io.cleanup_receiver()?; + unreachable!(); + } + Event::Thunk(thunk) => { + thunk.call_box(io)?; + continue; + } + Event::Msg(msg) => msg, + }; + + match msg { RawMsg::Request(req) => { - let world = world.snapshot(); if let Some((params, resp)) = dispatch::expect::(io, req)? { - resp.respond_with(io, || { - let path = params.text_document.uri.to_file_path() - .map_err(|()| format_err!("invalid path"))?; - let file = world.file_syntax(&path)?; - Ok(libeditor::syntax_tree(&file)) - })? + let world = world.snapshot(); + let sender = sender.clone(); + pool.execute(move || { + let res: Result = (|| { + let path = params.text_document.uri.to_file_path() + .map_err(|()| format_err!("invalid path"))?; + let file = world.file_syntax(&path)?; + Ok(libeditor::syntax_tree(&file)) + })(); + + sender.send(Box::new(|io: &mut Io| resp.response(io, res))) + }); } } msg => { @@ -82,3 +170,13 @@ fn initialized(io: &mut Io) -> Result<()> { } } + +trait FnBox: Send { + fn call_box(self: Box, a: A) -> R; +} + +impl R + Send> FnBox for F { + fn call_box(self: Box, a: A) -> R { + (*self)(a) + } +} -- cgit v1.2.3