aboutsummaryrefslogtreecommitdiff
path: root/crates/server/src/io.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/server/src/io.rs')
-rw-r--r--crates/server/src/io.rs207
1 files changed, 0 insertions, 207 deletions
diff --git a/crates/server/src/io.rs b/crates/server/src/io.rs
deleted file mode 100644
index f247327ab..000000000
--- a/crates/server/src/io.rs
+++ /dev/null
@@ -1,207 +0,0 @@
1use std::{
2 thread,
3 io::{
4 stdout, stdin,
5 BufRead, Write,
6 },
7};
8use serde_json::{Value, from_str, to_string};
9use crossbeam_channel::{Receiver, Sender, bounded};
10
11use Result;
12
13
14#[derive(Debug, Serialize, Deserialize)]
15#[serde(untagged)]
16pub enum RawMsg {
17 Request(RawRequest),
18 Notification(RawNotification),
19 Response(RawResponse),
20}
21
22#[derive(Debug, Serialize, Deserialize)]
23pub struct RawRequest {
24 pub id: u64,
25 pub method: String,
26 pub params: Value,
27}
28
29#[derive(Debug, Serialize, Deserialize)]
30pub struct RawNotification {
31 pub method: String,
32 pub params: Value,
33}
34
35#[derive(Debug, Serialize, Deserialize)]
36pub struct RawResponse {
37 // JSON RPC allows this to be null if it was impossible
38 // to decode the request's id. Ignore this special case
39 // and just die horribly.
40 pub id: u64,
41 #[serde(default)]
42 pub result: Value,
43 #[serde(default)]
44 pub error: Value,
45}
46
47struct MsgReceiver {
48 chan: Receiver<RawMsg>,
49 thread: Option<thread::JoinHandle<Result<()>>>,
50}
51
52impl MsgReceiver {
53 fn recv(&mut self) -> Result<RawMsg> {
54 match self.chan.recv() {
55 Some(msg) => Ok(msg),
56 None => {
57 self.cleanup()?;
58 unreachable!()
59 }
60 }
61 }
62
63 fn cleanup(&mut self) -> Result<()> {
64 self.thread
65 .take()
66 .ok_or_else(|| format_err!("MsgReceiver thread panicked"))?
67 .join()
68 .map_err(|_| format_err!("MsgReceiver thread panicked"))??;
69 bail!("client disconnected")
70 }
71
72 fn stop(self) -> Result<()> {
73 // Can't really self.thread.join() here, b/c it might be
74 // blocking on read
75 Ok(())
76 }
77}
78
79struct MsgSender {
80 chan: Sender<RawMsg>,
81 thread: thread::JoinHandle<Result<()>>,
82}
83
84impl MsgSender {
85 fn send(&mut self, msg: RawMsg) {
86 self.chan.send(msg)
87 }
88
89 fn stop(self) -> Result<()> {
90 drop(self.chan);
91 self.thread.join()
92 .map_err(|_| format_err!("MsgSender thread panicked"))??;
93 Ok(())
94 }
95}
96
97pub struct Io {
98 receiver: MsgReceiver,
99 sender: MsgSender,
100}
101
102impl Io {
103 pub fn from_stdio() -> Io {
104 let sender = {
105 let (tx, rx) = bounded(16);
106 MsgSender {
107 chan: tx,
108 thread: thread::spawn(move || {
109 let stdout = stdout();
110 let mut stdout = stdout.lock();
111 for msg in rx {
112 #[derive(Serialize)]
113 struct JsonRpc {
114 jsonrpc: &'static str,
115 #[serde(flatten)]
116 msg: RawMsg,
117 }
118 let text = to_string(&JsonRpc {
119 jsonrpc: "2.0",
120 msg,
121 })?;
122 write_msg_text(&mut stdout, &text)?;
123 }
124 Ok(())
125 }),
126 }
127 };
128 let receiver = {
129 let (tx, rx) = bounded(16);
130 MsgReceiver {
131 chan: rx,
132 thread: Some(thread::spawn(move || {
133 let stdin = stdin();
134 let mut stdin = stdin.lock();
135 while let Some(text) = read_msg_text(&mut stdin)? {
136 let msg: RawMsg = from_str(&text)?;
137 tx.send(msg);
138 }
139 Ok(())
140 })),
141 }
142 };
143 Io { receiver, sender }
144 }
145
146 pub fn send(&mut self, msg: RawMsg) {
147 self.sender.send(msg)
148 }
149
150 pub fn recv(&mut self) -> Result<RawMsg> {
151 self.receiver.recv()
152 }
153
154 pub fn receiver(&mut self) -> &mut Receiver<RawMsg> {
155 &mut self.receiver.chan
156 }
157
158 pub fn cleanup_receiver(&mut self) -> Result<()> {
159 self.receiver.cleanup()
160 }
161
162 pub fn stop(self) -> Result<()> {
163 self.receiver.stop()?;
164 self.sender.stop()?;
165 Ok(())
166 }
167}
168
169
170fn read_msg_text(inp: &mut impl BufRead) -> Result<Option<String>> {
171 let mut size = None;
172 let mut buf = String::new();
173 loop {
174 buf.clear();
175 if inp.read_line(&mut buf)? == 0 {
176 return Ok(None);
177 }
178 if !buf.ends_with("\r\n") {
179 bail!("malformed header: {:?}", buf);
180 }
181 let buf = &buf[..buf.len() - 2];
182 if buf.is_empty() {
183 break;
184 }
185 let mut parts = buf.splitn(2, ": ");
186 let header_name = parts.next().unwrap();
187 let header_value = parts.next().ok_or_else(|| format_err!("malformed header: {:?}", buf))?;
188 if header_name == "Content-Length" {
189 size = Some(header_value.parse::<usize>()?);
190 }
191 }
192 let size = size.ok_or_else(|| format_err!("no Content-Length"))?;
193 let mut buf = buf.into_bytes();
194 buf.resize(size, 0);
195 inp.read_exact(&mut buf)?;
196 let buf = String::from_utf8(buf)?;
197 debug!("< {}", buf);
198 Ok(Some(buf))
199}
200
201fn write_msg_text(out: &mut impl Write, msg: &str) -> Result<()> {
202 debug!("> {}", msg);
203 write!(out, "Content-Length: {}\r\n\r\n", msg.len())?;
204 out.write_all(msg.as_bytes())?;
205 out.flush()?;
206 Ok(())
207}