Allow retrying HTTP requests if we hit a socket timeout
[rust-lightning] / lightning-block-sync / src / http.rs
1 //! Simple HTTP implementation which supports both async and traditional execution environments
2 //! with minimal dependencies. This is used as the basis for REST and RPC clients.
3
4 use chunked_transfer;
5 use serde_json;
6
7 use std::convert::TryFrom;
8 #[cfg(not(feature = "tokio"))]
9 use std::io::Write;
10 use std::net::ToSocketAddrs;
11 use std::time::Duration;
12
13 #[cfg(feature = "tokio")]
14 use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt};
15 #[cfg(feature = "tokio")]
16 use tokio::net::TcpStream;
17
18 #[cfg(not(feature = "tokio"))]
19 use std::io::BufRead;
20 use std::io::Read;
21 #[cfg(not(feature = "tokio"))]
22 use std::net::TcpStream;
23
24 /// Timeout for operations on TCP streams.
25 const TCP_STREAM_TIMEOUT: Duration = Duration::from_secs(5);
26
27 /// Maximum HTTP message header size in bytes.
28 const MAX_HTTP_MESSAGE_HEADER_SIZE: usize = 8192;
29
30 /// Maximum HTTP message body size in bytes. Enough for a hex-encoded block in JSON format and any
31 /// overhead for HTTP chunked transfer encoding.
32 const MAX_HTTP_MESSAGE_BODY_SIZE: usize = 2 * 4_000_000 + 32_000;
33
34 /// Endpoint for interacting with an HTTP-based API.
35 #[derive(Debug)]
36 pub struct HttpEndpoint {
37         host: String,
38         port: Option<u16>,
39         path: String,
40 }
41
42 impl HttpEndpoint {
43         /// Creates an endpoint for the given host and default HTTP port.
44         pub fn for_host(host: String) -> Self {
45                 Self {
46                         host,
47                         port: None,
48                         path: String::from("/"),
49                 }
50         }
51
52         /// Specifies a port to use with the endpoint.
53         pub fn with_port(mut self, port: u16) -> Self {
54                 self.port = Some(port);
55                 self
56         }
57
58         /// Specifies a path to use with the endpoint.
59         pub fn with_path(mut self, path: String) -> Self {
60                 self.path = path;
61                 self
62         }
63
64         /// Returns the endpoint host.
65         pub fn host(&self) -> &str {
66                 &self.host
67         }
68
69         /// Returns the endpoint port.
70         pub fn port(&self) -> u16 {
71                 match self.port {
72                         None => 80,
73                         Some(port) => port,
74                 }
75         }
76
77         /// Returns the endpoint path.
78         pub fn path(&self) -> &str {
79                 &self.path
80         }
81 }
82
83 impl<'a> std::net::ToSocketAddrs for &'a HttpEndpoint {
84         type Iter = <(&'a str, u16) as std::net::ToSocketAddrs>::Iter;
85
86         fn to_socket_addrs(&self) -> std::io::Result<Self::Iter> {
87                 (self.host(), self.port()).to_socket_addrs()
88         }
89 }
90
91 /// Client for making HTTP requests.
92 pub(crate) struct HttpClient {
93         stream: TcpStream,
94 }
95
96 impl HttpClient {
97         /// Opens a connection to an HTTP endpoint.
98         pub fn connect<E: ToSocketAddrs>(endpoint: E) -> std::io::Result<Self> {
99                 let address = match endpoint.to_socket_addrs()?.next() {
100                         None => {
101                                 return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "could not resolve to any addresses"));
102                         },
103                         Some(address) => address,
104                 };
105                 let stream = std::net::TcpStream::connect_timeout(&address, TCP_STREAM_TIMEOUT)?;
106                 stream.set_read_timeout(Some(TCP_STREAM_TIMEOUT))?;
107                 stream.set_write_timeout(Some(TCP_STREAM_TIMEOUT))?;
108
109                 #[cfg(feature = "tokio")]
110                 let stream = {
111                         stream.set_nonblocking(true)?;
112                         TcpStream::from_std(stream)?
113                 };
114
115                 Ok(Self { stream })
116         }
117
118         /// Sends a `GET` request for a resource identified by `uri` at the `host`.
119         ///
120         /// Returns the response body in `F` format.
121         #[allow(dead_code)]
122         pub async fn get<F>(&mut self, uri: &str, host: &str) -> std::io::Result<F>
123         where F: TryFrom<Vec<u8>, Error = std::io::Error> {
124                 let request = format!(
125                         "GET {} HTTP/1.1\r\n\
126                          Host: {}\r\n\
127                          Connection: keep-alive\r\n\
128                          \r\n", uri, host);
129                 let response_body = self.send_request_with_retry(&request).await?;
130                 F::try_from(response_body)
131         }
132
133         /// Sends a `POST` request for a resource identified by `uri` at the `host` using the given HTTP
134         /// authentication credentials.
135         ///
136         /// The request body consists of the provided JSON `content`. Returns the response body in `F`
137         /// format.
138         #[allow(dead_code)]
139         pub async fn post<F>(&mut self, uri: &str, host: &str, auth: &str, content: serde_json::Value) -> std::io::Result<F>
140         where F: TryFrom<Vec<u8>, Error = std::io::Error> {
141                 let content = content.to_string();
142                 let request = format!(
143                         "POST {} HTTP/1.1\r\n\
144                          Host: {}\r\n\
145                          Authorization: {}\r\n\
146                          Connection: keep-alive\r\n\
147                          Content-Type: application/json\r\n\
148                          Content-Length: {}\r\n\
149                          \r\n\
150                          {}", uri, host, auth, content.len(), content);
151                 let response_body = self.send_request_with_retry(&request).await?;
152                 F::try_from(response_body)
153         }
154
155         /// Sends an HTTP request message and reads the response, returning its body. Attempts to
156         /// reconnect and retry if the connection has been closed.
157         async fn send_request_with_retry(&mut self, request: &str) -> std::io::Result<Vec<u8>> {
158                 let endpoint = self.stream.peer_addr().unwrap();
159                 match self.send_request(request).await {
160                         Ok(bytes) => Ok(bytes),
161                         Err(_) => {
162                                 // Reconnect and retry on fail. This can happen if the connection was closed after
163                                 // the keep-alive limits are reached, or generally if the request timed out due to
164                                 // Bitcoin Core being stuck on a long-running operation or its RPC queue being
165                                 // full.
166                                 // Block 100ms before retrying the request as in many cases the source of the error
167                                 // may be persistent for some time.
168                                 #[cfg(feature = "tokio")]
169                                 tokio::time::sleep(Duration::from_millis(100)).await;
170                                 #[cfg(not(feature = "tokio"))]
171                                 std::thread::sleep(Duration::from_millis(100));
172                                 *self = Self::connect(endpoint)?;
173                                 self.send_request(request).await
174                         },
175                 }
176         }
177
178         /// Sends an HTTP request message and reads the response, returning its body.
179         async fn send_request(&mut self, request: &str) -> std::io::Result<Vec<u8>> {
180                 self.write_request(request).await?;
181                 self.read_response().await
182         }
183
184         /// Writes an HTTP request message.
185         async fn write_request(&mut self, request: &str) -> std::io::Result<()> {
186                 #[cfg(feature = "tokio")]
187                 {
188                         self.stream.write_all(request.as_bytes()).await?;
189                         self.stream.flush().await
190                 }
191                 #[cfg(not(feature = "tokio"))]
192                 {
193                         self.stream.write_all(request.as_bytes())?;
194                         self.stream.flush()
195                 }
196         }
197
198         /// Reads an HTTP response message.
199         async fn read_response(&mut self) -> std::io::Result<Vec<u8>> {
200                 #[cfg(feature = "tokio")]
201                 let stream = self.stream.split().0;
202                 #[cfg(not(feature = "tokio"))]
203                 let stream = std::io::Read::by_ref(&mut self.stream);
204
205                 let limited_stream = stream.take(MAX_HTTP_MESSAGE_HEADER_SIZE as u64);
206
207                 #[cfg(feature = "tokio")]
208                 let mut reader = tokio::io::BufReader::new(limited_stream);
209                 #[cfg(not(feature = "tokio"))]
210                 let mut reader = std::io::BufReader::new(limited_stream);
211
212                 macro_rules! read_line { () => { {
213                         let mut line = String::new();
214                         #[cfg(feature = "tokio")]
215                         let bytes_read = reader.read_line(&mut line).await?;
216                         #[cfg(not(feature = "tokio"))]
217                         let bytes_read = reader.read_line(&mut line)?;
218
219                         match bytes_read {
220                                 0 => None,
221                                 _ => {
222                                         // Remove trailing CRLF
223                                         if line.ends_with('\n') { line.pop(); if line.ends_with('\r') { line.pop(); } }
224                                         Some(line)
225                                 },
226                         }
227                 } } }
228
229                 // Read and parse status line
230                 let status_line = read_line!()
231                         .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no status line"))?;
232                 let status = HttpStatus::parse(&status_line)?;
233
234                 // Read and parse relevant headers
235                 let mut message_length = HttpMessageLength::Empty;
236                 loop {
237                         let line = read_line!()
238                                 .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no headers"))?;
239                         if line.is_empty() { break; }
240
241                         let header = HttpHeader::parse(&line)?;
242                         if header.has_name("Content-Length") {
243                                 let length = header.value.parse()
244                                         .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
245                                 if let HttpMessageLength::Empty = message_length {
246                                         message_length = HttpMessageLength::ContentLength(length);
247                                 }
248                                 continue;
249                         }
250
251                         if header.has_name("Transfer-Encoding") {
252                                 message_length = HttpMessageLength::TransferEncoding(header.value.into());
253                                 continue;
254                         }
255                 }
256
257                 if !status.is_ok() {
258                         // TODO: Handle 3xx redirection responses.
259                         return Err(std::io::Error::new(std::io::ErrorKind::NotFound, "not found"));
260                 }
261
262                 // Read message body
263                 let read_limit = MAX_HTTP_MESSAGE_BODY_SIZE - reader.buffer().len();
264                 reader.get_mut().set_limit(read_limit as u64);
265                 match message_length {
266                         HttpMessageLength::Empty => { Ok(Vec::new()) },
267                         HttpMessageLength::ContentLength(length) => {
268                                 if length == 0 || length > MAX_HTTP_MESSAGE_BODY_SIZE {
269                                         Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "out of range"))
270                                 } else {
271                                         let mut content = vec![0; length];
272                                         #[cfg(feature = "tokio")]
273                                         reader.read_exact(&mut content[..]).await?;
274                                         #[cfg(not(feature = "tokio"))]
275                                         reader.read_exact(&mut content[..])?;
276                                         Ok(content)
277                                 }
278                         },
279                         HttpMessageLength::TransferEncoding(coding) => {
280                                 if !coding.eq_ignore_ascii_case("chunked") {
281                                         Err(std::io::Error::new(
282                                                         std::io::ErrorKind::InvalidInput, "unsupported transfer coding"))
283                                 } else {
284                                         let mut content = Vec::new();
285                                         #[cfg(feature = "tokio")]
286                                         {
287                                                 // Since chunked_transfer doesn't have an async interface, only use it to
288                                                 // determine the size of each chunk to read.
289                                                 //
290                                                 // TODO: Replace with an async interface when available.
291                                                 // https://github.com/frewsxcv/rust-chunked-transfer/issues/7
292                                                 loop {
293                                                         // Read the chunk header which contains the chunk size.
294                                                         let mut chunk_header = String::new();
295                                                         reader.read_line(&mut chunk_header).await?;
296                                                         if chunk_header == "0\r\n" {
297                                                                 // Read the terminator chunk since the decoder consumes the CRLF
298                                                                 // immediately when this chunk is encountered.
299                                                                 reader.read_line(&mut chunk_header).await?;
300                                                         }
301
302                                                         // Decode the chunk header to obtain the chunk size.
303                                                         let mut buffer = Vec::new();
304                                                         let mut decoder = chunked_transfer::Decoder::new(chunk_header.as_bytes());
305                                                         decoder.read_to_end(&mut buffer)?;
306
307                                                         // Read the chunk body.
308                                                         let chunk_size = match decoder.remaining_chunks_size() {
309                                                                 None => break,
310                                                                 Some(chunk_size) => chunk_size,
311                                                         };
312                                                         let chunk_offset = content.len();
313                                                         content.resize(chunk_offset + chunk_size + "\r\n".len(), 0);
314                                                         reader.read_exact(&mut content[chunk_offset..]).await?;
315                                                         content.resize(chunk_offset + chunk_size, 0);
316                                                 }
317                                                 Ok(content)
318                                         }
319                                         #[cfg(not(feature = "tokio"))]
320                                         {
321                                                 let mut decoder = chunked_transfer::Decoder::new(reader);
322                                                 decoder.read_to_end(&mut content)?;
323                                                 Ok(content)
324                                         }
325                                 }
326                         },
327                 }
328         }
329 }
330
331 /// HTTP response status code as defined by [RFC 7231].
332 ///
333 /// [RFC 7231]: https://tools.ietf.org/html/rfc7231#section-6
334 struct HttpStatus<'a> {
335         code: &'a str,
336 }
337
338 impl<'a> HttpStatus<'a> {
339         /// Parses an HTTP status line as defined by [RFC 7230].
340         ///
341         /// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.1.2
342         fn parse(line: &'a String) -> std::io::Result<HttpStatus<'a>> {
343                 let mut tokens = line.splitn(3, ' ');
344
345                 let http_version = tokens.next()
346                         .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no HTTP-Version"))?;
347                 if !http_version.eq_ignore_ascii_case("HTTP/1.1") &&
348                         !http_version.eq_ignore_ascii_case("HTTP/1.0") {
349                         return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid HTTP-Version"));
350                 }
351
352                 let code = tokens.next()
353                         .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no Status-Code"))?;
354                 if code.len() != 3 || !code.chars().all(|c| c.is_ascii_digit()) {
355                         return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid Status-Code"));
356                 }
357
358                 let _reason = tokens.next()
359                         .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no Reason-Phrase"))?;
360
361                 Ok(Self { code })
362         }
363
364         /// Returns whether the status is successful (i.e., 2xx status class).
365         fn is_ok(&self) -> bool {
366                 self.code.starts_with('2')
367         }
368 }
369
370 /// HTTP response header as defined by [RFC 7231].
371 ///
372 /// [RFC 7231]: https://tools.ietf.org/html/rfc7231#section-7
373 struct HttpHeader<'a> {
374         name: &'a str,
375         value: &'a str,
376 }
377
378 impl<'a> HttpHeader<'a> {
379         /// Parses an HTTP header field as defined by [RFC 7230].
380         ///
381         /// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.2
382         fn parse(line: &'a String) -> std::io::Result<HttpHeader<'a>> {
383                 let mut tokens = line.splitn(2, ':');
384                 let name = tokens.next()
385                         .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no header name"))?;
386                 let value = tokens.next()
387                         .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no header value"))?
388                         .trim_start();
389                 Ok(Self { name, value })
390         }
391
392         /// Returns whether the header field has the given name.
393         fn has_name(&self, name: &str) -> bool {
394                 self.name.eq_ignore_ascii_case(name)
395         }
396 }
397
398 /// HTTP message body length as defined by [RFC 7230].
399 ///
400 /// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.3.3
401 enum HttpMessageLength {
402         Empty,
403         ContentLength(usize),
404         TransferEncoding(String),
405 }
406
407 /// An HTTP response body in binary format.
408 pub struct BinaryResponse(pub Vec<u8>);
409
410 /// An HTTP response body in JSON format.
411 pub struct JsonResponse(pub serde_json::Value);
412
413 /// Interprets bytes from an HTTP response body as binary data.
414 impl TryFrom<Vec<u8>> for BinaryResponse {
415         type Error = std::io::Error;
416
417         fn try_from(bytes: Vec<u8>) -> std::io::Result<Self> {
418                 Ok(BinaryResponse(bytes))
419         }
420 }
421
422 /// Interprets bytes from an HTTP response body as a JSON value.
423 impl TryFrom<Vec<u8>> for JsonResponse {
424         type Error = std::io::Error;
425
426         fn try_from(bytes: Vec<u8>) -> std::io::Result<Self> {
427                 Ok(JsonResponse(serde_json::from_slice(&bytes)?))
428         }
429 }
430
431 #[cfg(test)]
432 mod endpoint_tests {
433         use super::HttpEndpoint;
434
435         #[test]
436         fn with_default_port() {
437                 let endpoint = HttpEndpoint::for_host("foo.com".into());
438                 assert_eq!(endpoint.host(), "foo.com");
439                 assert_eq!(endpoint.port(), 80);
440         }
441
442         #[test]
443         fn with_custom_port() {
444                 let endpoint = HttpEndpoint::for_host("foo.com".into()).with_port(8080);
445                 assert_eq!(endpoint.host(), "foo.com");
446                 assert_eq!(endpoint.port(), 8080);
447         }
448
449         #[test]
450         fn with_uri_path() {
451                 let endpoint = HttpEndpoint::for_host("foo.com".into()).with_path("/path".into());
452                 assert_eq!(endpoint.host(), "foo.com");
453                 assert_eq!(endpoint.path(), "/path");
454         }
455
456         #[test]
457         fn without_uri_path() {
458                 let endpoint = HttpEndpoint::for_host("foo.com".into());
459                 assert_eq!(endpoint.host(), "foo.com");
460                 assert_eq!(endpoint.path(), "/");
461         }
462
463         #[test]
464         fn convert_to_socket_addrs() {
465                 let endpoint = HttpEndpoint::for_host("foo.com".into());
466                 let host = endpoint.host();
467                 let port = endpoint.port();
468
469                 use std::net::ToSocketAddrs;
470                 match (&endpoint).to_socket_addrs() {
471                         Err(e) => panic!("Unexpected error: {:?}", e),
472                         Ok(mut socket_addrs) => {
473                                 match socket_addrs.next() {
474                                         None => panic!("Expected socket address"),
475                                         Some(addr) => {
476                                                 assert_eq!(addr, (host, port).to_socket_addrs().unwrap().next().unwrap());
477                                                 assert!(socket_addrs.next().is_none());
478                                         }
479                                 }
480                         }
481                 }
482         }
483 }
484
485 #[cfg(test)]
486 pub(crate) mod client_tests {
487         use super::*;
488         use std::io::BufRead;
489         use std::io::Write;
490
491         /// Server for handling HTTP client requests with a stock response.
492         pub struct HttpServer {
493                 address: std::net::SocketAddr,
494                 handler: std::thread::JoinHandle<()>,
495                 shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>,
496         }
497
498         /// Body of HTTP response messages.
499         pub enum MessageBody<T: ToString> {
500                 Empty,
501                 Content(T),
502                 ChunkedContent(T),
503         }
504
505         impl HttpServer {
506                 pub fn responding_with_ok<T: ToString>(body: MessageBody<T>) -> Self {
507                         let response = match body {
508                                 MessageBody::Empty => "HTTP/1.1 200 OK\r\n\r\n".to_string(),
509                                 MessageBody::Content(body) => {
510                                         let body = body.to_string();
511                                         format!(
512                                                 "HTTP/1.1 200 OK\r\n\
513                                                  Content-Length: {}\r\n\
514                                                  \r\n\
515                                                  {}", body.len(), body)
516                                 },
517                                 MessageBody::ChunkedContent(body) => {
518                                         let mut chuncked_body = Vec::new();
519                                         {
520                                                 use chunked_transfer::Encoder;
521                                                 let mut encoder = Encoder::with_chunks_size(&mut chuncked_body, 8);
522                                                 encoder.write_all(body.to_string().as_bytes()).unwrap();
523                                         }
524                                         format!(
525                                                 "HTTP/1.1 200 OK\r\n\
526                                                  Transfer-Encoding: chunked\r\n\
527                                                  \r\n\
528                                                  {}", String::from_utf8(chuncked_body).unwrap())
529                                 },
530                         };
531                         HttpServer::responding_with(response)
532                 }
533
534                 pub fn responding_with_not_found() -> Self {
535                         let response = "HTTP/1.1 404 Not Found\r\n\r\n".to_string();
536                         HttpServer::responding_with(response)
537                 }
538
539                 fn responding_with(response: String) -> Self {
540                         let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
541                         let address = listener.local_addr().unwrap();
542
543                         let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
544                         let shutdown_signaled = std::sync::Arc::clone(&shutdown);
545                         let handler = std::thread::spawn(move || {
546                                 for stream in listener.incoming() {
547                                         let mut stream = stream.unwrap();
548                                         stream.set_write_timeout(Some(TCP_STREAM_TIMEOUT)).unwrap();
549
550                                         let lines_read = std::io::BufReader::new(&stream)
551                                                 .lines()
552                                                 .take_while(|line| !line.as_ref().unwrap().is_empty())
553                                                 .count();
554                                         if lines_read == 0 { continue; }
555
556                                         for chunk in response.as_bytes().chunks(16) {
557                                                 if shutdown_signaled.load(std::sync::atomic::Ordering::SeqCst) {
558                                                         return;
559                                                 } else {
560                                                         if let Err(_) = stream.write(chunk) { break; }
561                                                         if let Err(_) = stream.flush() { break; }
562                                                 }
563                                         }
564                                 }
565                         });
566
567                         Self { address, handler, shutdown }
568                 }
569
570                 fn shutdown(self) {
571                         self.shutdown.store(true, std::sync::atomic::Ordering::SeqCst);
572                         self.handler.join().unwrap();
573                 }
574
575                 pub fn endpoint(&self) -> HttpEndpoint {
576                         HttpEndpoint::for_host(self.address.ip().to_string()).with_port(self.address.port())
577                 }
578         }
579
580         #[test]
581         fn connect_to_unresolvable_host() {
582                 match HttpClient::connect(("example.invalid", 80)) {
583                         Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::Other),
584                         Ok(_) => panic!("Expected error"),
585                 }
586         }
587
588         #[test]
589         fn connect_with_no_socket_address() {
590                 match HttpClient::connect(&vec![][..]) {
591                         Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput),
592                         Ok(_) => panic!("Expected error"),
593                 }
594         }
595
596         #[test]
597         fn connect_with_unknown_server() {
598                 match HttpClient::connect(("::", 80)) {
599                         #[cfg(target_os = "windows")]
600                         Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::AddrNotAvailable),
601                         #[cfg(not(target_os = "windows"))]
602                         Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::ConnectionRefused),
603                         Ok(_) => panic!("Expected error"),
604                 }
605         }
606
607         #[tokio::test]
608         async fn connect_with_valid_endpoint() {
609                 let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);
610
611                 match HttpClient::connect(&server.endpoint()) {
612                         Err(e) => panic!("Unexpected error: {:?}", e),
613                         Ok(_) => {},
614                 }
615         }
616
617         #[tokio::test]
618         async fn read_empty_message() {
619                 let server = HttpServer::responding_with("".to_string());
620
621                 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
622                 match client.get::<BinaryResponse>("/foo", "foo.com").await {
623                         Err(e) => {
624                                 assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
625                                 assert_eq!(e.get_ref().unwrap().to_string(), "no status line");
626                         },
627                         Ok(_) => panic!("Expected error"),
628                 }
629         }
630
631         #[tokio::test]
632         async fn read_incomplete_message() {
633                 let server = HttpServer::responding_with("HTTP/1.1 200 OK".to_string());
634
635                 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
636                 match client.get::<BinaryResponse>("/foo", "foo.com").await {
637                         Err(e) => {
638                                 assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
639                                 assert_eq!(e.get_ref().unwrap().to_string(), "no headers");
640                         },
641                         Ok(_) => panic!("Expected error"),
642                 }
643         }
644
645         #[tokio::test]
646         async fn read_too_large_message_headers() {
647                 let response = format!(
648                         "HTTP/1.1 302 Found\r\n\
649                          Location: {}\r\n\
650                          \r\n", "Z".repeat(MAX_HTTP_MESSAGE_HEADER_SIZE));
651                 let server = HttpServer::responding_with(response);
652
653                 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
654                 match client.get::<BinaryResponse>("/foo", "foo.com").await {
655                         Err(e) => {
656                                 assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
657                                 assert_eq!(e.get_ref().unwrap().to_string(), "no headers");
658                         },
659                         Ok(_) => panic!("Expected error"),
660                 }
661         }
662
663         #[tokio::test]
664         async fn read_too_large_message_body() {
665                 let body = "Z".repeat(MAX_HTTP_MESSAGE_BODY_SIZE + 1);
666                 let server = HttpServer::responding_with_ok::<String>(MessageBody::Content(body));
667
668                 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
669                 match client.get::<BinaryResponse>("/foo", "foo.com").await {
670                         Err(e) => {
671                                 assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
672                                 assert_eq!(e.get_ref().unwrap().to_string(), "out of range");
673                         },
674                         Ok(_) => panic!("Expected error"),
675                 }
676                 server.shutdown();
677         }
678
679         #[tokio::test]
680         async fn read_message_with_unsupported_transfer_coding() {
681                 let response = String::from(
682                         "HTTP/1.1 200 OK\r\n\
683                          Transfer-Encoding: gzip\r\n\
684                          \r\n\
685                          foobar");
686                 let server = HttpServer::responding_with(response);
687
688                 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
689                 match client.get::<BinaryResponse>("/foo", "foo.com").await {
690                         Err(e) => {
691                                 assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput);
692                                 assert_eq!(e.get_ref().unwrap().to_string(), "unsupported transfer coding");
693                         },
694                         Ok(_) => panic!("Expected error"),
695                 }
696         }
697
698         #[tokio::test]
699         async fn read_empty_message_body() {
700                 let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);
701
702                 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
703                 match client.get::<BinaryResponse>("/foo", "foo.com").await {
704                         Err(e) => panic!("Unexpected error: {:?}", e),
705                         Ok(bytes) => assert_eq!(bytes.0, Vec::<u8>::new()),
706                 }
707         }
708
709         #[tokio::test]
710         async fn read_message_body_with_length() {
711                 let body = "foo bar baz qux".repeat(32);
712                 let content = MessageBody::Content(body.clone());
713                 let server = HttpServer::responding_with_ok::<String>(content);
714
715                 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
716                 match client.get::<BinaryResponse>("/foo", "foo.com").await {
717                         Err(e) => panic!("Unexpected error: {:?}", e),
718                         Ok(bytes) => assert_eq!(bytes.0, body.as_bytes()),
719                 }
720         }
721
722         #[tokio::test]
723         async fn read_chunked_message_body() {
724                 let body = "foo bar baz qux".repeat(32);
725                 let chunked_content = MessageBody::ChunkedContent(body.clone());
726                 let server = HttpServer::responding_with_ok::<String>(chunked_content);
727
728                 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
729                 match client.get::<BinaryResponse>("/foo", "foo.com").await {
730                         Err(e) => panic!("Unexpected error: {:?}", e),
731                         Ok(bytes) => assert_eq!(bytes.0, body.as_bytes()),
732                 }
733         }
734
735         #[tokio::test]
736         async fn reconnect_closed_connection() {
737                 let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);
738
739                 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
740                 assert!(client.get::<BinaryResponse>("/foo", "foo.com").await.is_ok());
741                 match client.get::<BinaryResponse>("/foo", "foo.com").await {
742                         Err(e) => panic!("Unexpected error: {:?}", e),
743                         Ok(bytes) => assert_eq!(bytes.0, Vec::<u8>::new()),
744                 }
745         }
746
747         #[test]
748         fn from_bytes_into_binary_response() {
749                 let bytes = b"foo";
750                 match BinaryResponse::try_from(bytes.to_vec()) {
751                         Err(e) => panic!("Unexpected error: {:?}", e),
752                         Ok(response) => assert_eq!(&response.0, bytes),
753                 }
754         }
755
756         #[test]
757         fn from_invalid_bytes_into_json_response() {
758                 let json = serde_json::json!({ "result": 42 });
759                 match JsonResponse::try_from(json.to_string().as_bytes()[..5].to_vec()) {
760                         Err(_) => {},
761                         Ok(_) => panic!("Expected error"),
762                 }
763         }
764
765         #[test]
766         fn from_valid_bytes_into_json_response() {
767                 let json = serde_json::json!({ "result": 42 });
768                 match JsonResponse::try_from(json.to_string().as_bytes().to_vec()) {
769                         Err(e) => panic!("Unexpected error: {:?}", e),
770                         Ok(response) => assert_eq!(response.0, json),
771                 }
772         }
773 }