aboutsummaryrefslogtreecommitdiff
path: root/codeless/server/src/io.rs
diff options
context:
space:
mode:
Diffstat (limited to 'codeless/server/src/io.rs')
-rw-r--r--codeless/server/src/io.rs201
1 files changed, 201 insertions, 0 deletions
diff --git a/codeless/server/src/io.rs b/codeless/server/src/io.rs
new file mode 100644
index 000000000..b84103d65
--- /dev/null
+++ b/codeless/server/src/io.rs
@@ -0,0 +1,201 @@
1use std::{
2 thread,
3 io::{
4 stdout, stdin,
5 BufRead, Write,
6 },
7};
8use serde_json::{Value, from_str, to_string};
9use crossbeam_channel::{Receiver, Sender, bounded};
10
11use Result;
12
13
14#[derive(Debug, Serialize, Deserialize)]
15#[serde(untagged)]
16pub enum RawMsg {
17 Request(RawRequest),
18 Notification(RawNotification),
19 Response(RawResponse),
20}
21
22#[derive(Debug, Serialize, Deserialize)]
23pub struct RawRequest {
24 pub id: u64,
25 pub method: String,
26 pub params: Value,
27}
28
29#[derive(Debug, Serialize, Deserialize)]
30pub struct RawNotification {
31 pub method: String,
32 pub params: Value,
33}
34
35#[derive(Debug, Serialize, Deserialize)]
36pub struct RawResponse {
37 pub id: Option<u64>,
38 pub result: Value,
39 pub error: Value,
40}
41
42struct MsgReceiver {
43 chan: Receiver<RawMsg>,
44 thread: Option<thread::JoinHandle<Result<()>>>,
45}
46
47impl MsgReceiver {
48 fn recv(&mut self) -> Result<RawMsg> {
49 match self.chan.recv() {
50 Some(msg) => Ok(msg),
51 None => {
52 self.thread
53 .take()
54 .ok_or_else(|| format_err!("MsgReceiver thread panicked"))?
55 .join()
56 .map_err(|_| format_err!("MsgReceiver thread panicked"))??;
57 bail!("client disconnected")
58 }
59 }
60 }
61
62 fn stop(self) -> Result<()> {
63 // Can't really self.thread.join() here, b/c it might be
64 // blocking on read
65 Ok(())
66 }
67}
68
69struct MsgSender {
70 chan: Sender<RawMsg>,
71 thread: Option<thread::JoinHandle<Result<()>>>,
72}
73
74impl MsgSender {
75 fn send(&mut self, msg: RawMsg) {
76 self.chan.send(msg)
77 }
78
79 fn stop(mut self) -> Result<()> {
80 if let Some(thread) = self.thread.take() {
81 thread.join()
82 .map_err(|_| format_err!("MsgSender thread panicked"))??
83 }
84 Ok(())
85 }
86}
87
88impl Drop for MsgSender {
89 fn drop(&mut self) {
90 if let Some(thread) = self.thread.take() {
91 let res = thread.join();
92 if thread::panicking() {
93 drop(res)
94 } else {
95 res.unwrap().unwrap()
96 }
97 }
98 }
99}
100
101pub struct Io {
102 receiver: MsgReceiver,
103 sender: MsgSender,
104}
105
106impl Io {
107 pub fn from_stdio() -> Io {
108 let sender = {
109 let (tx, rx) = bounded(16);
110 MsgSender {
111 chan: tx,
112 thread: Some(thread::spawn(move || {
113 let stdout = stdout();
114 let mut stdout = stdout.lock();
115 for msg in rx {
116 #[derive(Serialize)]
117 struct JsonRpc {
118 jsonrpc: &'static str,
119 #[serde(flatten)]
120 msg: RawMsg,
121 }
122 let text = to_string(&JsonRpc {
123 jsonrpc: "2.0",
124 msg,
125 })?;
126 write_msg_text(&mut stdout, &text)?;
127 }
128 Ok(())
129 })),
130 }
131 };
132 let receiver = {
133 let (tx, rx) = bounded(16);
134 MsgReceiver {
135 chan: rx,
136 thread: Some(thread::spawn(move || {
137 let stdin = stdin();
138 let mut stdin = stdin.lock();
139 while let Some(text) = read_msg_text(&mut stdin)? {
140 let msg: RawMsg = from_str(&text)?;
141 tx.send(msg);
142 }
143 Ok(())
144 })),
145 }
146 };
147 Io { receiver, sender }
148 }
149
150 pub fn send(&mut self, msg: RawMsg) {
151 self.sender.send(msg)
152 }
153
154 pub fn recv(&mut self) -> Result<RawMsg> {
155 self.receiver.recv()
156 }
157
158 pub fn stop(self) -> Result<()> {
159 self.receiver.stop()?;
160 self.sender.stop()?;
161 Ok(())
162 }
163}
164
165
166fn read_msg_text(inp: &mut impl BufRead) -> Result<Option<String>> {
167 let mut size = None;
168 let mut buf = String::new();
169 loop {
170 buf.clear();
171 if inp.read_line(&mut buf)? == 0 {
172 return Ok(None);
173 }
174 if !buf.ends_with("\r\n") {
175 bail!("malformed header: {:?}", buf);
176 }
177 let buf = &buf[..buf.len() - 2];
178 if buf.is_empty() {
179 break;
180 }
181 let mut parts = buf.splitn(2, ": ");
182 let header_name = parts.next().unwrap();
183 let header_value = parts.next().ok_or_else(|| format_err!("malformed header: {:?}", buf))?;
184 if header_name == "Content-Length" {
185 size = Some(header_value.parse::<usize>()?);
186 }
187 }
188 let size = size.ok_or_else(|| format_err!("no Content-Length"))?;
189 let mut buf = buf.into_bytes();
190 buf.resize(size, 0);
191 inp.read_exact(&mut buf)?;
192 let buf = String::from_utf8(buf)?;
193 Ok(Some(buf))
194}
195
196fn write_msg_text(out: &mut impl Write, msg: &str) -> Result<()> {
197 write!(out, "Content-Length: {}\r\n\r\n", msg.len())?;
198 out.write_all(msg.as_bytes())?;
199 out.flush()?;
200 Ok(())
201}