2e70e18659936e2746638ba684b36ce20c3095ca
[rust-lightning] / lightning-block-sync / src / http.rs
1 //! Simple HTTP implementation which supports both async and traditional execution environments
2 //! with minimal dependencies. This is used as the basis for REST and RPC clients.
3
4 use chunked_transfer;
5 use serde_json;
6
7 use std::convert::TryFrom;
8 #[cfg(not(feature = "tokio"))]
9 use std::io::Write;
10 use std::net::ToSocketAddrs;
11 use std::time::Duration;
12
13 #[cfg(feature = "tokio")]
14 use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt};
15 #[cfg(feature = "tokio")]
16 use tokio::net::TcpStream;
17
18 #[cfg(not(feature = "tokio"))]
19 use std::io::BufRead;
20 use std::io::Read;
21 #[cfg(not(feature = "tokio"))]
22 use std::net::TcpStream;
23
24 /// Timeout for operations on TCP streams.
25 const TCP_STREAM_TIMEOUT: Duration = Duration::from_secs(5);
26
27 /// Timeout for reading the first byte of a response. This is separate from the general read
28 /// timeout as it is not uncommon for Bitcoin Core to be blocked waiting on UTXO cache flushes for
29 /// upwards of a minute or more. Note that we always retry once when we time out, so the maximum
30 /// time we allow Bitcoin Core to block for is twice this value.
31 const TCP_STREAM_RESPONSE_TIMEOUT: Duration = Duration::from_secs(120);
32
33 /// Maximum HTTP message header size in bytes.
34 const MAX_HTTP_MESSAGE_HEADER_SIZE: usize = 8192;
35
36 /// Maximum HTTP message body size in bytes. Enough for a hex-encoded block in JSON format and any
37 /// overhead for HTTP chunked transfer encoding.
38 const MAX_HTTP_MESSAGE_BODY_SIZE: usize = 2 * 4_000_000 + 32_000;
39
40 /// Endpoint for interacting with an HTTP-based API.
41 #[derive(Debug)]
42 pub struct HttpEndpoint {
43         host: String,
44         port: Option<u16>,
45         path: String,
46 }
47
48 impl HttpEndpoint {
49         /// Creates an endpoint for the given host and default HTTP port.
50         pub fn for_host(host: String) -> Self {
51                 Self {
52                         host,
53                         port: None,
54                         path: String::from("/"),
55                 }
56         }
57
58         /// Specifies a port to use with the endpoint.
59         pub fn with_port(mut self, port: u16) -> Self {
60                 self.port = Some(port);
61                 self
62         }
63
64         /// Specifies a path to use with the endpoint.
65         pub fn with_path(mut self, path: String) -> Self {
66                 self.path = path;
67                 self
68         }
69
70         /// Returns the endpoint host.
71         pub fn host(&self) -> &str {
72                 &self.host
73         }
74
75         /// Returns the endpoint port.
76         pub fn port(&self) -> u16 {
77                 match self.port {
78                         None => 80,
79                         Some(port) => port,
80                 }
81         }
82
83         /// Returns the endpoint path.
84         pub fn path(&self) -> &str {
85                 &self.path
86         }
87 }
88
89 impl<'a> std::net::ToSocketAddrs for &'a HttpEndpoint {
90         type Iter = <(&'a str, u16) as std::net::ToSocketAddrs>::Iter;
91
92         fn to_socket_addrs(&self) -> std::io::Result<Self::Iter> {
93                 (self.host(), self.port()).to_socket_addrs()
94         }
95 }
96
97 /// Client for making HTTP requests.
98 pub(crate) struct HttpClient {
99         stream: TcpStream,
100 }
101
102 impl HttpClient {
103         /// Opens a connection to an HTTP endpoint.
104         pub fn connect<E: ToSocketAddrs>(endpoint: E) -> std::io::Result<Self> {
105                 let address = match endpoint.to_socket_addrs()?.next() {
106                         None => {
107                                 return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "could not resolve to any addresses"));
108                         },
109                         Some(address) => address,
110                 };
111                 let stream = std::net::TcpStream::connect_timeout(&address, TCP_STREAM_TIMEOUT)?;
112                 stream.set_read_timeout(Some(TCP_STREAM_TIMEOUT))?;
113                 stream.set_write_timeout(Some(TCP_STREAM_TIMEOUT))?;
114
115                 #[cfg(feature = "tokio")]
116                 let stream = {
117                         stream.set_nonblocking(true)?;
118                         TcpStream::from_std(stream)?
119                 };
120
121                 Ok(Self { stream })
122         }
123
124         /// Sends a `GET` request for a resource identified by `uri` at the `host`.
125         ///
126         /// Returns the response body in `F` format.
127         #[allow(dead_code)]
128         pub async fn get<F>(&mut self, uri: &str, host: &str) -> std::io::Result<F>
129         where F: TryFrom<Vec<u8>, Error = std::io::Error> {
130                 let request = format!(
131                         "GET {} HTTP/1.1\r\n\
132                          Host: {}\r\n\
133                          Connection: keep-alive\r\n\
134                          \r\n", uri, host);
135                 let response_body = self.send_request_with_retry(&request).await?;
136                 F::try_from(response_body)
137         }
138
139         /// Sends a `POST` request for a resource identified by `uri` at the `host` using the given HTTP
140         /// authentication credentials.
141         ///
142         /// The request body consists of the provided JSON `content`. Returns the response body in `F`
143         /// format.
144         #[allow(dead_code)]
145         pub async fn post<F>(&mut self, uri: &str, host: &str, auth: &str, content: serde_json::Value) -> std::io::Result<F>
146         where F: TryFrom<Vec<u8>, Error = std::io::Error> {
147                 let content = content.to_string();
148                 let request = format!(
149                         "POST {} HTTP/1.1\r\n\
150                          Host: {}\r\n\
151                          Authorization: {}\r\n\
152                          Connection: keep-alive\r\n\
153                          Content-Type: application/json\r\n\
154                          Content-Length: {}\r\n\
155                          \r\n\
156                          {}", uri, host, auth, content.len(), content);
157                 let response_body = self.send_request_with_retry(&request).await?;
158                 F::try_from(response_body)
159         }
160
161         /// Sends an HTTP request message and reads the response, returning its body. Attempts to
162         /// reconnect and retry if the connection has been closed.
163         async fn send_request_with_retry(&mut self, request: &str) -> std::io::Result<Vec<u8>> {
164                 let endpoint = self.stream.peer_addr().unwrap();
165                 match self.send_request(request).await {
166                         Ok(bytes) => Ok(bytes),
167                         Err(_) => {
168                                 // Reconnect and retry on fail. This can happen if the connection was closed after
169                                 // the keep-alive limits are reached, or generally if the request timed out due to
170                                 // Bitcoin Core being stuck on a long-running operation or its RPC queue being
171                                 // full.
172                                 // Block 100ms before retrying the request as in many cases the source of the error
173                                 // may be persistent for some time.
174                                 #[cfg(feature = "tokio")]
175                                 tokio::time::sleep(Duration::from_millis(100)).await;
176                                 #[cfg(not(feature = "tokio"))]
177                                 std::thread::sleep(Duration::from_millis(100));
178                                 *self = Self::connect(endpoint)?;
179                                 self.send_request(request).await
180                         },
181                 }
182         }
183
184         /// Sends an HTTP request message and reads the response, returning its body.
185         async fn send_request(&mut self, request: &str) -> std::io::Result<Vec<u8>> {
186                 self.write_request(request).await?;
187                 self.read_response().await
188         }
189
190         /// Writes an HTTP request message.
191         async fn write_request(&mut self, request: &str) -> std::io::Result<()> {
192                 #[cfg(feature = "tokio")]
193                 {
194                         self.stream.write_all(request.as_bytes()).await?;
195                         self.stream.flush().await
196                 }
197                 #[cfg(not(feature = "tokio"))]
198                 {
199                         self.stream.write_all(request.as_bytes())?;
200                         self.stream.flush()
201                 }
202         }
203
204         /// Reads an HTTP response message.
205         async fn read_response(&mut self) -> std::io::Result<Vec<u8>> {
206                 #[cfg(feature = "tokio")]
207                 let stream = self.stream.split().0;
208                 #[cfg(not(feature = "tokio"))]
209                 let stream = std::io::Read::by_ref(&mut self.stream);
210
211                 let limited_stream = stream.take(MAX_HTTP_MESSAGE_HEADER_SIZE as u64);
212
213                 #[cfg(feature = "tokio")]
214                 let mut reader = tokio::io::BufReader::new(limited_stream);
215                 #[cfg(not(feature = "tokio"))]
216                 let mut reader = std::io::BufReader::new(limited_stream);
217
218                 macro_rules! read_line {
219                         () => { read_line!(0) };
220                         ($retry_count: expr) => { {
221                                 let mut line = String::new();
222                                 let mut timeout_count: u64 = 0;
223                                 let bytes_read = loop {
224                                         #[cfg(feature = "tokio")]
225                                         let read_res = reader.read_line(&mut line).await;
226                                         #[cfg(not(feature = "tokio"))]
227                                         let read_res = reader.read_line(&mut line);
228                                         match read_res {
229                                                 Ok(bytes_read) => break bytes_read,
230                                                 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
231                                                         timeout_count += 1;
232                                                         if timeout_count > $retry_count {
233                                                                 return Err(e);
234                                                         } else {
235                                                                 continue;
236                                                         }
237                                                 }
238                                                 Err(e) => return Err(e),
239                                         }
240                                 };
241
242                                 match bytes_read {
243                                         0 => None,
244                                         _ => {
245                                                 // Remove trailing CRLF
246                                                 if line.ends_with('\n') { line.pop(); if line.ends_with('\r') { line.pop(); } }
247                                                 Some(line)
248                                         },
249                                 }
250                         } }
251                 }
252
253                 // Read and parse status line
254                 // Note that we allow retrying a few times to reach TCP_STREAM_RESPONSE_TIMEOUT.
255                 let status_line = read_line!(TCP_STREAM_RESPONSE_TIMEOUT.as_secs() / TCP_STREAM_TIMEOUT.as_secs())
256                         .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no status line"))?;
257                 let status = HttpStatus::parse(&status_line)?;
258
259                 // Read and parse relevant headers
260                 let mut message_length = HttpMessageLength::Empty;
261                 loop {
262                         let line = read_line!()
263                                 .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no headers"))?;
264                         if line.is_empty() { break; }
265
266                         let header = HttpHeader::parse(&line)?;
267                         if header.has_name("Content-Length") {
268                                 let length = header.value.parse()
269                                         .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
270                                 if let HttpMessageLength::Empty = message_length {
271                                         message_length = HttpMessageLength::ContentLength(length);
272                                 }
273                                 continue;
274                         }
275
276                         if header.has_name("Transfer-Encoding") {
277                                 message_length = HttpMessageLength::TransferEncoding(header.value.into());
278                                 continue;
279                         }
280                 }
281
282                 // Read message body
283                 let read_limit = MAX_HTTP_MESSAGE_BODY_SIZE - reader.buffer().len();
284                 reader.get_mut().set_limit(read_limit as u64);
285                 let contents = match message_length {
286                         HttpMessageLength::Empty => { Vec::new() },
287                         HttpMessageLength::ContentLength(length) => {
288                                 if length == 0 || length > MAX_HTTP_MESSAGE_BODY_SIZE {
289                                         return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "out of range"))
290                                 } else {
291                                         let mut content = vec![0; length];
292                                         #[cfg(feature = "tokio")]
293                                         reader.read_exact(&mut content[..]).await?;
294                                         #[cfg(not(feature = "tokio"))]
295                                         reader.read_exact(&mut content[..])?;
296                                         content
297                                 }
298                         },
299                         HttpMessageLength::TransferEncoding(coding) => {
300                                 if !coding.eq_ignore_ascii_case("chunked") {
301                                         return Err(std::io::Error::new(
302                                                 std::io::ErrorKind::InvalidInput, "unsupported transfer coding"))
303                                 } else {
304                                         let mut content = Vec::new();
305                                         #[cfg(feature = "tokio")]
306                                         {
307                                                 // Since chunked_transfer doesn't have an async interface, only use it to
308                                                 // determine the size of each chunk to read.
309                                                 //
310                                                 // TODO: Replace with an async interface when available.
311                                                 // https://github.com/frewsxcv/rust-chunked-transfer/issues/7
312                                                 loop {
313                                                         // Read the chunk header which contains the chunk size.
314                                                         let mut chunk_header = String::new();
315                                                         reader.read_line(&mut chunk_header).await?;
316                                                         if chunk_header == "0\r\n" {
317                                                                 // Read the terminator chunk since the decoder consumes the CRLF
318                                                                 // immediately when this chunk is encountered.
319                                                                 reader.read_line(&mut chunk_header).await?;
320                                                         }
321
322                                                         // Decode the chunk header to obtain the chunk size.
323                                                         let mut buffer = Vec::new();
324                                                         let mut decoder = chunked_transfer::Decoder::new(chunk_header.as_bytes());
325                                                         decoder.read_to_end(&mut buffer)?;
326
327                                                         // Read the chunk body.
328                                                         let chunk_size = match decoder.remaining_chunks_size() {
329                                                                 None => break,
330                                                                 Some(chunk_size) => chunk_size,
331                                                         };
332                                                         let chunk_offset = content.len();
333                                                         content.resize(chunk_offset + chunk_size + "\r\n".len(), 0);
334                                                         reader.read_exact(&mut content[chunk_offset..]).await?;
335                                                         content.resize(chunk_offset + chunk_size, 0);
336                                                 }
337                                                 content
338                                         }
339                                         #[cfg(not(feature = "tokio"))]
340                                         {
341                                                 let mut decoder = chunked_transfer::Decoder::new(reader);
342                                                 decoder.read_to_end(&mut content)?;
343                                                 content
344                                         }
345                                 }
346                         },
347                 };
348
349                 if !status.is_ok() {
350                         // TODO: Handle 3xx redirection responses.
351                         let error_details = match String::from_utf8(contents) {
352                                 // Check that the string is all-ASCII with no control characters before returning
353                                 // it.
354                                 Ok(s) if s.as_bytes().iter().all(|c| c.is_ascii() && !c.is_ascii_control()) => s,
355                                 _ => "binary".to_string()
356                         };
357                         let error_msg = format!("Errored with status: {} and contents: {}",
358                                                 status.code, error_details);
359                         return Err(std::io::Error::new(std::io::ErrorKind::Other, error_msg));
360                 }
361
362                 Ok(contents)
363         }
364 }
365
366 /// HTTP response status code as defined by [RFC 7231].
367 ///
368 /// [RFC 7231]: https://tools.ietf.org/html/rfc7231#section-6
369 struct HttpStatus<'a> {
370         code: &'a str,
371 }
372
373 impl<'a> HttpStatus<'a> {
374         /// Parses an HTTP status line as defined by [RFC 7230].
375         ///
376         /// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.1.2
377         fn parse(line: &'a String) -> std::io::Result<HttpStatus<'a>> {
378                 let mut tokens = line.splitn(3, ' ');
379
380                 let http_version = tokens.next()
381                         .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no HTTP-Version"))?;
382                 if !http_version.eq_ignore_ascii_case("HTTP/1.1") &&
383                         !http_version.eq_ignore_ascii_case("HTTP/1.0") {
384                         return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid HTTP-Version"));
385                 }
386
387                 let code = tokens.next()
388                         .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no Status-Code"))?;
389                 if code.len() != 3 || !code.chars().all(|c| c.is_ascii_digit()) {
390                         return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid Status-Code"));
391                 }
392
393                 let _reason = tokens.next()
394                         .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no Reason-Phrase"))?;
395
396                 Ok(Self { code })
397         }
398
399         /// Returns whether the status is successful (i.e., 2xx status class).
400         fn is_ok(&self) -> bool {
401                 self.code.starts_with('2')
402         }
403 }
404
405 /// HTTP response header as defined by [RFC 7231].
406 ///
407 /// [RFC 7231]: https://tools.ietf.org/html/rfc7231#section-7
408 struct HttpHeader<'a> {
409         name: &'a str,
410         value: &'a str,
411 }
412
413 impl<'a> HttpHeader<'a> {
414         /// Parses an HTTP header field as defined by [RFC 7230].
415         ///
416         /// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.2
417         fn parse(line: &'a String) -> std::io::Result<HttpHeader<'a>> {
418                 let mut tokens = line.splitn(2, ':');
419                 let name = tokens.next()
420                         .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no header name"))?;
421                 let value = tokens.next()
422                         .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no header value"))?
423                         .trim_start();
424                 Ok(Self { name, value })
425         }
426
427         /// Returns whether the header field has the given name.
428         fn has_name(&self, name: &str) -> bool {
429                 self.name.eq_ignore_ascii_case(name)
430         }
431 }
432
433 /// HTTP message body length as defined by [RFC 7230].
434 ///
435 /// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.3.3
436 enum HttpMessageLength {
437         Empty,
438         ContentLength(usize),
439         TransferEncoding(String),
440 }
441
442 /// An HTTP response body in binary format.
443 pub struct BinaryResponse(pub Vec<u8>);
444
445 /// An HTTP response body in JSON format.
446 pub struct JsonResponse(pub serde_json::Value);
447
448 /// Interprets bytes from an HTTP response body as binary data.
449 impl TryFrom<Vec<u8>> for BinaryResponse {
450         type Error = std::io::Error;
451
452         fn try_from(bytes: Vec<u8>) -> std::io::Result<Self> {
453                 Ok(BinaryResponse(bytes))
454         }
455 }
456
457 /// Interprets bytes from an HTTP response body as a JSON value.
458 impl TryFrom<Vec<u8>> for JsonResponse {
459         type Error = std::io::Error;
460
461         fn try_from(bytes: Vec<u8>) -> std::io::Result<Self> {
462                 Ok(JsonResponse(serde_json::from_slice(&bytes)?))
463         }
464 }
465
466 #[cfg(test)]
467 mod endpoint_tests {
468         use super::HttpEndpoint;
469
470         #[test]
471         fn with_default_port() {
472                 let endpoint = HttpEndpoint::for_host("foo.com".into());
473                 assert_eq!(endpoint.host(), "foo.com");
474                 assert_eq!(endpoint.port(), 80);
475         }
476
477         #[test]
478         fn with_custom_port() {
479                 let endpoint = HttpEndpoint::for_host("foo.com".into()).with_port(8080);
480                 assert_eq!(endpoint.host(), "foo.com");
481                 assert_eq!(endpoint.port(), 8080);
482         }
483
484         #[test]
485         fn with_uri_path() {
486                 let endpoint = HttpEndpoint::for_host("foo.com".into()).with_path("/path".into());
487                 assert_eq!(endpoint.host(), "foo.com");
488                 assert_eq!(endpoint.path(), "/path");
489         }
490
491         #[test]
492         fn without_uri_path() {
493                 let endpoint = HttpEndpoint::for_host("foo.com".into());
494                 assert_eq!(endpoint.host(), "foo.com");
495                 assert_eq!(endpoint.path(), "/");
496         }
497
498         #[test]
499         fn convert_to_socket_addrs() {
500                 let endpoint = HttpEndpoint::for_host("foo.com".into());
501                 let host = endpoint.host();
502                 let port = endpoint.port();
503
504                 use std::net::ToSocketAddrs;
505                 match (&endpoint).to_socket_addrs() {
506                         Err(e) => panic!("Unexpected error: {:?}", e),
507                         Ok(mut socket_addrs) => {
508                                 match socket_addrs.next() {
509                                         None => panic!("Expected socket address"),
510                                         Some(addr) => {
511                                                 assert_eq!(addr, (host, port).to_socket_addrs().unwrap().next().unwrap());
512                                                 assert!(socket_addrs.next().is_none());
513                                         }
514                                 }
515                         }
516                 }
517         }
518 }
519
520 #[cfg(test)]
521 pub(crate) mod client_tests {
522         use super::*;
523         use std::io::BufRead;
524         use std::io::Write;
525
526         /// Server for handling HTTP client requests with a stock response.
527         pub struct HttpServer {
528                 address: std::net::SocketAddr,
529                 handler: std::thread::JoinHandle<()>,
530                 shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>,
531         }
532
533         /// Body of HTTP response messages.
534         pub enum MessageBody<T: ToString> {
535                 Empty,
536                 Content(T),
537                 ChunkedContent(T),
538         }
539
540         impl HttpServer {
541                 pub fn responding_with_ok<T: ToString>(body: MessageBody<T>) -> Self {
542                         let response = match body {
543                                 MessageBody::Empty => "HTTP/1.1 200 OK\r\n\r\n".to_string(),
544                                 MessageBody::Content(body) => {
545                                         let body = body.to_string();
546                                         format!(
547                                                 "HTTP/1.1 200 OK\r\n\
548                                                  Content-Length: {}\r\n\
549                                                  \r\n\
550                                                  {}", body.len(), body)
551                                 },
552                                 MessageBody::ChunkedContent(body) => {
553                                         let mut chuncked_body = Vec::new();
554                                         {
555                                                 use chunked_transfer::Encoder;
556                                                 let mut encoder = Encoder::with_chunks_size(&mut chuncked_body, 8);
557                                                 encoder.write_all(body.to_string().as_bytes()).unwrap();
558                                         }
559                                         format!(
560                                                 "HTTP/1.1 200 OK\r\n\
561                                                  Transfer-Encoding: chunked\r\n\
562                                                  \r\n\
563                                                  {}", String::from_utf8(chuncked_body).unwrap())
564                                 },
565                         };
566                         HttpServer::responding_with(response)
567                 }
568
569                 pub fn responding_with_not_found() -> Self {
570                         let response = "HTTP/1.1 404 Not Found\r\n\r\n".to_string();
571                         HttpServer::responding_with(response)
572                 }
573
574                 fn responding_with(response: String) -> Self {
575                         let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
576                         let address = listener.local_addr().unwrap();
577
578                         let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
579                         let shutdown_signaled = std::sync::Arc::clone(&shutdown);
580                         let handler = std::thread::spawn(move || {
581                                 for stream in listener.incoming() {
582                                         let mut stream = stream.unwrap();
583                                         stream.set_write_timeout(Some(TCP_STREAM_TIMEOUT)).unwrap();
584
585                                         let lines_read = std::io::BufReader::new(&stream)
586                                                 .lines()
587                                                 .take_while(|line| !line.as_ref().unwrap().is_empty())
588                                                 .count();
589                                         if lines_read == 0 { continue; }
590
591                                         for chunk in response.as_bytes().chunks(16) {
592                                                 if shutdown_signaled.load(std::sync::atomic::Ordering::SeqCst) {
593                                                         return;
594                                                 } else {
595                                                         if let Err(_) = stream.write(chunk) { break; }
596                                                         if let Err(_) = stream.flush() { break; }
597                                                 }
598                                         }
599                                 }
600                         });
601
602                         Self { address, handler, shutdown }
603                 }
604
605                 fn shutdown(self) {
606                         self.shutdown.store(true, std::sync::atomic::Ordering::SeqCst);
607                         self.handler.join().unwrap();
608                 }
609
610                 pub fn endpoint(&self) -> HttpEndpoint {
611                         HttpEndpoint::for_host(self.address.ip().to_string()).with_port(self.address.port())
612                 }
613         }
614
615         #[test]
616         fn connect_to_unresolvable_host() {
617                 match HttpClient::connect(("example.invalid", 80)) {
618                         Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::Other),
619                         Ok(_) => panic!("Expected error"),
620                 }
621         }
622
623         #[test]
624         fn connect_with_no_socket_address() {
625                 match HttpClient::connect(&vec![][..]) {
626                         Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput),
627                         Ok(_) => panic!("Expected error"),
628                 }
629         }
630
631         #[test]
632         fn connect_with_unknown_server() {
633                 match HttpClient::connect(("::", 80)) {
634                         #[cfg(target_os = "windows")]
635                         Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::AddrNotAvailable),
636                         #[cfg(not(target_os = "windows"))]
637                         Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::ConnectionRefused),
638                         Ok(_) => panic!("Expected error"),
639                 }
640         }
641
642         #[tokio::test]
643         async fn connect_with_valid_endpoint() {
644                 let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);
645
646                 match HttpClient::connect(&server.endpoint()) {
647                         Err(e) => panic!("Unexpected error: {:?}", e),
648                         Ok(_) => {},
649                 }
650         }
651
652         #[tokio::test]
653         async fn read_empty_message() {
654                 let server = HttpServer::responding_with("".to_string());
655
656                 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
657                 match client.get::<BinaryResponse>("/foo", "foo.com").await {
658                         Err(e) => {
659                                 assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
660                                 assert_eq!(e.get_ref().unwrap().to_string(), "no status line");
661                         },
662                         Ok(_) => panic!("Expected error"),
663                 }
664         }
665
666         #[tokio::test]
667         async fn read_incomplete_message() {
668                 let server = HttpServer::responding_with("HTTP/1.1 200 OK".to_string());
669
670                 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
671                 match client.get::<BinaryResponse>("/foo", "foo.com").await {
672                         Err(e) => {
673                                 assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
674                                 assert_eq!(e.get_ref().unwrap().to_string(), "no headers");
675                         },
676                         Ok(_) => panic!("Expected error"),
677                 }
678         }
679
680         #[tokio::test]
681         async fn read_too_large_message_headers() {
682                 let response = format!(
683                         "HTTP/1.1 302 Found\r\n\
684                          Location: {}\r\n\
685                          \r\n", "Z".repeat(MAX_HTTP_MESSAGE_HEADER_SIZE));
686                 let server = HttpServer::responding_with(response);
687
688                 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
689                 match client.get::<BinaryResponse>("/foo", "foo.com").await {
690                         Err(e) => {
691                                 assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
692                                 assert_eq!(e.get_ref().unwrap().to_string(), "no headers");
693                         },
694                         Ok(_) => panic!("Expected error"),
695                 }
696         }
697
698         #[tokio::test]
699         async fn read_too_large_message_body() {
700                 let body = "Z".repeat(MAX_HTTP_MESSAGE_BODY_SIZE + 1);
701                 let server = HttpServer::responding_with_ok::<String>(MessageBody::Content(body));
702
703                 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
704                 match client.get::<BinaryResponse>("/foo", "foo.com").await {
705                         Err(e) => {
706                                 assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
707                                 assert_eq!(e.get_ref().unwrap().to_string(), "out of range");
708                         },
709                         Ok(_) => panic!("Expected error"),
710                 }
711                 server.shutdown();
712         }
713
714         #[tokio::test]
715         async fn read_message_with_unsupported_transfer_coding() {
716                 let response = String::from(
717                         "HTTP/1.1 200 OK\r\n\
718                          Transfer-Encoding: gzip\r\n\
719                          \r\n\
720                          foobar");
721                 let server = HttpServer::responding_with(response);
722
723                 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
724                 match client.get::<BinaryResponse>("/foo", "foo.com").await {
725                         Err(e) => {
726                                 assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput);
727                                 assert_eq!(e.get_ref().unwrap().to_string(), "unsupported transfer coding");
728                         },
729                         Ok(_) => panic!("Expected error"),
730                 }
731         }
732
733         #[tokio::test]
734         async fn read_error() {
735                 let response = String::from(
736                         "HTTP/1.1 500 Internal Server Error\r\n\
737                          Content-Length: 10\r\n\r\ntest error\r\n");
738                 let server = HttpServer::responding_with(response);
739
740                 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
741                 match client.get::<JsonResponse>("/foo", "foo.com").await {
742                         Err(e) => {
743                                 assert_eq!(e.get_ref().unwrap().to_string(), "Errored with status: 500 and contents: test error");
744                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
745                         },
746                         Ok(_) => panic!("Expected error"),
747                 }
748         }
749
750         #[tokio::test]
751         async fn read_empty_message_body() {
752                 let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);
753
754                 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
755                 match client.get::<BinaryResponse>("/foo", "foo.com").await {
756                         Err(e) => panic!("Unexpected error: {:?}", e),
757                         Ok(bytes) => assert_eq!(bytes.0, Vec::<u8>::new()),
758                 }
759         }
760
761         #[tokio::test]
762         async fn read_message_body_with_length() {
763                 let body = "foo bar baz qux".repeat(32);
764                 let content = MessageBody::Content(body.clone());
765                 let server = HttpServer::responding_with_ok::<String>(content);
766
767                 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
768                 match client.get::<BinaryResponse>("/foo", "foo.com").await {
769                         Err(e) => panic!("Unexpected error: {:?}", e),
770                         Ok(bytes) => assert_eq!(bytes.0, body.as_bytes()),
771                 }
772         }
773
774         #[tokio::test]
775         async fn read_chunked_message_body() {
776                 let body = "foo bar baz qux".repeat(32);
777                 let chunked_content = MessageBody::ChunkedContent(body.clone());
778                 let server = HttpServer::responding_with_ok::<String>(chunked_content);
779
780                 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
781                 match client.get::<BinaryResponse>("/foo", "foo.com").await {
782                         Err(e) => panic!("Unexpected error: {:?}", e),
783                         Ok(bytes) => assert_eq!(bytes.0, body.as_bytes()),
784                 }
785         }
786
787         #[tokio::test]
788         async fn reconnect_closed_connection() {
789                 let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);
790
791                 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
792                 assert!(client.get::<BinaryResponse>("/foo", "foo.com").await.is_ok());
793                 match client.get::<BinaryResponse>("/foo", "foo.com").await {
794                         Err(e) => panic!("Unexpected error: {:?}", e),
795                         Ok(bytes) => assert_eq!(bytes.0, Vec::<u8>::new()),
796                 }
797         }
798
799         #[test]
800         fn from_bytes_into_binary_response() {
801                 let bytes = b"foo";
802                 match BinaryResponse::try_from(bytes.to_vec()) {
803                         Err(e) => panic!("Unexpected error: {:?}", e),
804                         Ok(response) => assert_eq!(&response.0, bytes),
805                 }
806         }
807
808         #[test]
809         fn from_invalid_bytes_into_json_response() {
810                 let json = serde_json::json!({ "result": 42 });
811                 match JsonResponse::try_from(json.to_string().as_bytes()[..5].to_vec()) {
812                         Err(_) => {},
813                         Ok(_) => panic!("Expected error"),
814                 }
815         }
816
817         #[test]
818         fn from_valid_bytes_into_json_response() {
819                 let json = serde_json::json!({ "result": 42 });
820                 match JsonResponse::try_from(json.to_string().as_bytes().to_vec()) {
821                         Err(e) => panic!("Unexpected error: {:?}", e),
822                         Ok(response) => assert_eq!(response.0, json),
823                 }
824         }
825 }