aboutsummaryrefslogtreecommitdiff
path: root/codeless/server/src/io.rs
diff options
context:
space:
mode:
Diffstat (limited to 'codeless/server/src/io.rs')
-rw-r--r--codeless/server/src/io.rs202
1 files changed, 0 insertions, 202 deletions
diff --git a/codeless/server/src/io.rs b/codeless/server/src/io.rs
deleted file mode 100644
index 5eafc6942..000000000
--- a/codeless/server/src/io.rs
+++ /dev/null
@@ -1,202 +0,0 @@
1use std::{
2 thread,
3 io::{
4 stdout, stdin,
5 BufRead, Write,
6 },
7};
8use serde_json::{Value, from_str, to_string};
9use crossbeam_channel::{Receiver, Sender, bounded};
10
11use Result;
12
13
14#[derive(Debug, Serialize, Deserialize)]
15#[serde(untagged)]
16pub enum RawMsg {
17 Request(RawRequest),
18 Notification(RawNotification),
19 Response(RawResponse),
20}
21
22#[derive(Debug, Serialize, Deserialize)]
23pub struct RawRequest {
24 pub id: u64,
25 pub method: String,
26 pub params: Value,
27}
28
29#[derive(Debug, Serialize, Deserialize)]
30pub struct RawNotification {
31 pub method: String,
32 pub params: Value,
33}
34
35#[derive(Debug, Serialize, Deserialize)]
36pub struct RawResponse {
37 pub id: Option<u64>,
38 pub result: Value,
39 pub error: Value,
40}
41
42struct MsgReceiver {
43 chan: Receiver<RawMsg>,
44 thread: Option<thread::JoinHandle<Result<()>>>,
45}
46
47impl MsgReceiver {
48 fn recv(&mut self) -> Result<RawMsg> {
49 match self.chan.recv() {
50 Some(msg) => Ok(msg),
51 None => {
52 self.cleanup()?;
53 unreachable!()
54 }
55 }
56 }
57
58 fn cleanup(&mut self) -> Result<()> {
59 self.thread
60 .take()
61 .ok_or_else(|| format_err!("MsgReceiver thread panicked"))?
62 .join()
63 .map_err(|_| format_err!("MsgReceiver thread panicked"))??;
64 bail!("client disconnected")
65 }
66
67 fn stop(self) -> Result<()> {
68 // Can't really self.thread.join() here, b/c it might be
69 // blocking on read
70 Ok(())
71 }
72}
73
74struct MsgSender {
75 chan: Sender<RawMsg>,
76 thread: thread::JoinHandle<Result<()>>,
77}
78
79impl MsgSender {
80 fn send(&mut self, msg: RawMsg) {
81 self.chan.send(msg)
82 }
83
84 fn stop(self) -> Result<()> {
85 drop(self.chan);
86 self.thread.join()
87 .map_err(|_| format_err!("MsgSender thread panicked"))??;
88 Ok(())
89 }
90}
91
92pub struct Io {
93 receiver: MsgReceiver,
94 sender: MsgSender,
95}
96
97impl Io {
98 pub fn from_stdio() -> Io {
99 let sender = {
100 let (tx, rx) = bounded(16);
101 MsgSender {
102 chan: tx,
103 thread: thread::spawn(move || {
104 let stdout = stdout();
105 let mut stdout = stdout.lock();
106 for msg in rx {
107 #[derive(Serialize)]
108 struct JsonRpc {
109 jsonrpc: &'static str,
110 #[serde(flatten)]
111 msg: RawMsg,
112 }
113 let text = to_string(&JsonRpc {
114 jsonrpc: "2.0",
115 msg,
116 })?;
117 write_msg_text(&mut stdout, &text)?;
118 }
119 Ok(())
120 }),
121 }
122 };
123 let receiver = {
124 let (tx, rx) = bounded(16);
125 MsgReceiver {
126 chan: rx,
127 thread: Some(thread::spawn(move || {
128 let stdin = stdin();
129 let mut stdin = stdin.lock();
130 while let Some(text) = read_msg_text(&mut stdin)? {
131 let msg: RawMsg = from_str(&text)?;
132 tx.send(msg);
133 }
134 Ok(())
135 })),
136 }
137 };
138 Io { receiver, sender }
139 }
140
141 pub fn send(&mut self, msg: RawMsg) {
142 self.sender.send(msg)
143 }
144
145 pub fn recv(&mut self) -> Result<RawMsg> {
146 self.receiver.recv()
147 }
148
149 pub fn receiver(&mut self) -> &mut Receiver<RawMsg> {
150 &mut self.receiver.chan
151 }
152
153 pub fn cleanup_receiver(&mut self) -> Result<()> {
154 self.receiver.cleanup()
155 }
156
157 pub fn stop(self) -> Result<()> {
158 self.receiver.stop()?;
159 self.sender.stop()?;
160 Ok(())
161 }
162}
163
164
165fn read_msg_text(inp: &mut impl BufRead) -> Result<Option<String>> {
166 let mut size = None;
167 let mut buf = String::new();
168 loop {
169 buf.clear();
170 if inp.read_line(&mut buf)? == 0 {
171 return Ok(None);
172 }
173 if !buf.ends_with("\r\n") {
174 bail!("malformed header: {:?}", buf);
175 }
176 let buf = &buf[..buf.len() - 2];
177 if buf.is_empty() {
178 break;
179 }
180 let mut parts = buf.splitn(2, ": ");
181 let header_name = parts.next().unwrap();
182 let header_value = parts.next().ok_or_else(|| format_err!("malformed header: {:?}", buf))?;
183 if header_name == "Content-Length" {
184 size = Some(header_value.parse::<usize>()?);
185 }
186 }
187 let size = size.ok_or_else(|| format_err!("no Content-Length"))?;
188 let mut buf = buf.into_bytes();
189 buf.resize(size, 0);
190 inp.read_exact(&mut buf)?;
191 let buf = String::from_utf8(buf)?;
192 debug!("< {}", buf);
193 Ok(Some(buf))
194}
195
196fn write_msg_text(out: &mut impl Write, msg: &str) -> Result<()> {
197 debug!("> {}", msg);
198 write!(out, "Content-Length: {}\r\n\r\n", msg.len())?;
199 out.write_all(msg.as_bytes())?;
200 out.flush()?;
201 Ok(())
202}