1 //! Simple HTTP implementation which supports both async and traditional execution environments
2 //! with minimal dependencies. This is used as the basis for REST and RPC clients.
7 use std::convert::TryFrom;
9 #[cfg(not(feature = "tokio"))]
11 use std::net::{SocketAddr, ToSocketAddrs};
12 use std::time::Duration;
14 #[cfg(feature = "tokio")]
15 use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt};
16 #[cfg(feature = "tokio")]
17 use tokio::net::TcpStream;
19 #[cfg(not(feature = "tokio"))]
22 #[cfg(not(feature = "tokio"))]
23 use std::net::TcpStream;
25 /// Timeout for operations on TCP streams.
26 const TCP_STREAM_TIMEOUT: Duration = Duration::from_secs(5);
28 /// Timeout for reading the first byte of a response. This is separate from the general read
29 /// timeout as it is not uncommon for Bitcoin Core to be blocked waiting on UTXO cache flushes for
30 /// upwards of 10 minutes on slow devices (e.g. RPis with SSDs over USB). Note that we always retry
31 /// once when we time out, so the maximum time we allow Bitcoin Core to block for is twice this
33 const TCP_STREAM_RESPONSE_TIMEOUT: Duration = Duration::from_secs(300);
35 /// Maximum HTTP message header size in bytes.
36 const MAX_HTTP_MESSAGE_HEADER_SIZE: usize = 8192;
38 /// Maximum HTTP message body size in bytes. Enough for a hex-encoded block in JSON format and any
39 /// overhead for HTTP chunked transfer encoding.
40 const MAX_HTTP_MESSAGE_BODY_SIZE: usize = 2 * 4_000_000 + 32_000;
42 /// Endpoint for interacting with an HTTP-based API.
44 pub struct HttpEndpoint {
51 /// Creates an endpoint for the given host and default HTTP port.
52 pub fn for_host(host: String) -> Self {
56 path: String::from("/"),
60 /// Specifies a port to use with the endpoint.
61 pub fn with_port(mut self, port: u16) -> Self {
62 self.port = Some(port);
66 /// Specifies a path to use with the endpoint.
67 pub fn with_path(mut self, path: String) -> Self {
72 /// Returns the endpoint host.
73 pub fn host(&self) -> &str {
77 /// Returns the endpoint port.
78 pub fn port(&self) -> u16 {
85 /// Returns the endpoint path.
86 pub fn path(&self) -> &str {
91 impl<'a> std::net::ToSocketAddrs for &'a HttpEndpoint {
92 type Iter = <(&'a str, u16) as std::net::ToSocketAddrs>::Iter;
94 fn to_socket_addrs(&self) -> std::io::Result<Self::Iter> {
95 (self.host(), self.port()).to_socket_addrs()
99 /// Client for making HTTP requests.
100 pub(crate) struct HttpClient {
106 /// Opens a connection to an HTTP endpoint.
107 pub fn connect<E: ToSocketAddrs>(endpoint: E) -> std::io::Result<Self> {
108 let address = match endpoint.to_socket_addrs()?.next() {
110 return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "could not resolve to any addresses"));
112 Some(address) => address,
114 let stream = std::net::TcpStream::connect_timeout(&address, TCP_STREAM_TIMEOUT)?;
115 stream.set_read_timeout(Some(TCP_STREAM_TIMEOUT))?;
116 stream.set_write_timeout(Some(TCP_STREAM_TIMEOUT))?;
118 #[cfg(feature = "tokio")]
120 stream.set_nonblocking(true)?;
121 TcpStream::from_std(stream)?
124 Ok(Self { address, stream })
127 /// Sends a `GET` request for a resource identified by `uri` at the `host`.
129 /// Returns the response body in `F` format.
131 pub async fn get<F>(&mut self, uri: &str, host: &str) -> std::io::Result<F>
132 where F: TryFrom<Vec<u8>, Error = std::io::Error> {
133 let request = format!(
134 "GET {} HTTP/1.1\r\n\
136 Connection: keep-alive\r\n\
138 let response_body = self.send_request_with_retry(&request).await?;
139 F::try_from(response_body)
142 /// Sends a `POST` request for a resource identified by `uri` at the `host` using the given HTTP
143 /// authentication credentials.
145 /// The request body consists of the provided JSON `content`. Returns the response body in `F`
148 pub async fn post<F>(&mut self, uri: &str, host: &str, auth: &str, content: serde_json::Value) -> std::io::Result<F>
149 where F: TryFrom<Vec<u8>, Error = std::io::Error> {
150 let content = content.to_string();
151 let request = format!(
152 "POST {} HTTP/1.1\r\n\
154 Authorization: {}\r\n\
155 Connection: keep-alive\r\n\
156 Content-Type: application/json\r\n\
157 Content-Length: {}\r\n\
159 {}", uri, host, auth, content.len(), content);
160 let response_body = self.send_request_with_retry(&request).await?;
161 F::try_from(response_body)
164 /// Sends an HTTP request message and reads the response, returning its body. Attempts to
165 /// reconnect and retry if the connection has been closed.
166 async fn send_request_with_retry(&mut self, request: &str) -> std::io::Result<Vec<u8>> {
167 match self.send_request(request).await {
168 Ok(bytes) => Ok(bytes),
170 // Reconnect and retry on fail. This can happen if the connection was closed after
171 // the keep-alive limits are reached, or generally if the request timed out due to
172 // Bitcoin Core being stuck on a long-running operation or its RPC queue being
174 // Block 100ms before retrying the request as in many cases the source of the error
175 // may be persistent for some time.
176 #[cfg(feature = "tokio")]
177 tokio::time::sleep(Duration::from_millis(100)).await;
178 #[cfg(not(feature = "tokio"))]
179 std::thread::sleep(Duration::from_millis(100));
180 *self = Self::connect(self.address)?;
181 self.send_request(request).await
186 /// Sends an HTTP request message and reads the response, returning its body.
187 async fn send_request(&mut self, request: &str) -> std::io::Result<Vec<u8>> {
188 self.write_request(request).await?;
189 self.read_response().await
192 /// Writes an HTTP request message.
193 async fn write_request(&mut self, request: &str) -> std::io::Result<()> {
194 #[cfg(feature = "tokio")]
196 self.stream.write_all(request.as_bytes()).await?;
197 self.stream.flush().await
199 #[cfg(not(feature = "tokio"))]
201 self.stream.write_all(request.as_bytes())?;
206 /// Reads an HTTP response message.
207 async fn read_response(&mut self) -> std::io::Result<Vec<u8>> {
208 #[cfg(feature = "tokio")]
209 let stream = self.stream.split().0;
210 #[cfg(not(feature = "tokio"))]
211 let stream = std::io::Read::by_ref(&mut self.stream);
213 let limited_stream = stream.take(MAX_HTTP_MESSAGE_HEADER_SIZE as u64);
215 #[cfg(feature = "tokio")]
216 let mut reader = tokio::io::BufReader::new(limited_stream);
217 #[cfg(not(feature = "tokio"))]
218 let mut reader = std::io::BufReader::new(limited_stream);
220 macro_rules! read_line {
221 () => { read_line!(0) };
222 ($retry_count: expr) => { {
223 let mut line = String::new();
224 let mut timeout_count: u64 = 0;
225 let bytes_read = loop {
226 #[cfg(feature = "tokio")]
227 let read_res = reader.read_line(&mut line).await;
228 #[cfg(not(feature = "tokio"))]
229 let read_res = reader.read_line(&mut line);
231 Ok(bytes_read) => break bytes_read,
232 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
234 if timeout_count > $retry_count {
240 Err(e) => return Err(e),
247 // Remove trailing CRLF
248 if line.ends_with('\n') { line.pop(); if line.ends_with('\r') { line.pop(); } }
255 // Read and parse status line
256 // Note that we allow retrying a few times to reach TCP_STREAM_RESPONSE_TIMEOUT.
257 let status_line = read_line!(TCP_STREAM_RESPONSE_TIMEOUT.as_secs() / TCP_STREAM_TIMEOUT.as_secs())
258 .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no status line"))?;
259 let status = HttpStatus::parse(&status_line)?;
261 // Read and parse relevant headers
262 let mut message_length = HttpMessageLength::Empty;
264 let line = read_line!()
265 .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no headers"))?;
266 if line.is_empty() { break; }
268 let header = HttpHeader::parse(&line)?;
269 if header.has_name("Content-Length") {
270 let length = header.value.parse()
271 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
272 if let HttpMessageLength::Empty = message_length {
273 message_length = HttpMessageLength::ContentLength(length);
278 if header.has_name("Transfer-Encoding") {
279 message_length = HttpMessageLength::TransferEncoding(header.value.into());
285 let read_limit = MAX_HTTP_MESSAGE_BODY_SIZE - reader.buffer().len();
286 reader.get_mut().set_limit(read_limit as u64);
287 let contents = match message_length {
288 HttpMessageLength::Empty => { Vec::new() },
289 HttpMessageLength::ContentLength(length) => {
290 if length == 0 || length > MAX_HTTP_MESSAGE_BODY_SIZE {
291 return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, format!("invalid response length: {} bytes", length)));
293 let mut content = vec![0; length];
294 #[cfg(feature = "tokio")]
295 reader.read_exact(&mut content[..]).await?;
296 #[cfg(not(feature = "tokio"))]
297 reader.read_exact(&mut content[..])?;
301 HttpMessageLength::TransferEncoding(coding) => {
302 if !coding.eq_ignore_ascii_case("chunked") {
303 return Err(std::io::Error::new(
304 std::io::ErrorKind::InvalidInput, "unsupported transfer coding"))
306 let mut content = Vec::new();
307 #[cfg(feature = "tokio")]
309 // Since chunked_transfer doesn't have an async interface, only use it to
310 // determine the size of each chunk to read.
312 // TODO: Replace with an async interface when available.
313 // https://github.com/frewsxcv/rust-chunked-transfer/issues/7
315 // Read the chunk header which contains the chunk size.
316 let mut chunk_header = String::new();
317 reader.read_line(&mut chunk_header).await?;
318 if chunk_header == "0\r\n" {
319 // Read the terminator chunk since the decoder consumes the CRLF
320 // immediately when this chunk is encountered.
321 reader.read_line(&mut chunk_header).await?;
324 // Decode the chunk header to obtain the chunk size.
325 let mut buffer = Vec::new();
326 let mut decoder = chunked_transfer::Decoder::new(chunk_header.as_bytes());
327 decoder.read_to_end(&mut buffer)?;
329 // Read the chunk body.
330 let chunk_size = match decoder.remaining_chunks_size() {
332 Some(chunk_size) => chunk_size,
334 let chunk_offset = content.len();
335 content.resize(chunk_offset + chunk_size + "\r\n".len(), 0);
336 reader.read_exact(&mut content[chunk_offset..]).await?;
337 content.resize(chunk_offset + chunk_size, 0);
341 #[cfg(not(feature = "tokio"))]
343 let mut decoder = chunked_transfer::Decoder::new(reader);
344 decoder.read_to_end(&mut content)?;
352 // TODO: Handle 3xx redirection responses.
353 let error = HttpError {
354 status_code: status.code.to_string(),
357 return Err(std::io::Error::new(std::io::ErrorKind::Other, error));
364 /// HTTP error consisting of a status code and body contents.
366 pub(crate) struct HttpError {
367 pub(crate) status_code: String,
368 pub(crate) contents: Vec<u8>,
371 impl std::error::Error for HttpError {}
373 impl fmt::Display for HttpError {
374 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
375 let contents = String::from_utf8_lossy(&self.contents);
376 write!(f, "status_code: {}, contents: {}", self.status_code, contents)
380 /// HTTP response status code as defined by [RFC 7231].
382 /// [RFC 7231]: https://tools.ietf.org/html/rfc7231#section-6
383 struct HttpStatus<'a> {
387 impl<'a> HttpStatus<'a> {
388 /// Parses an HTTP status line as defined by [RFC 7230].
390 /// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.1.2
391 fn parse(line: &'a String) -> std::io::Result<HttpStatus<'a>> {
392 let mut tokens = line.splitn(3, ' ');
394 let http_version = tokens.next()
395 .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no HTTP-Version"))?;
396 if !http_version.eq_ignore_ascii_case("HTTP/1.1") &&
397 !http_version.eq_ignore_ascii_case("HTTP/1.0") {
398 return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid HTTP-Version"));
401 let code = tokens.next()
402 .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no Status-Code"))?;
403 if code.len() != 3 || !code.chars().all(|c| c.is_ascii_digit()) {
404 return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid Status-Code"));
407 let _reason = tokens.next()
408 .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no Reason-Phrase"))?;
413 /// Returns whether the status is successful (i.e., 2xx status class).
414 fn is_ok(&self) -> bool {
415 self.code.starts_with('2')
419 /// HTTP response header as defined by [RFC 7231].
421 /// [RFC 7231]: https://tools.ietf.org/html/rfc7231#section-7
422 struct HttpHeader<'a> {
427 impl<'a> HttpHeader<'a> {
428 /// Parses an HTTP header field as defined by [RFC 7230].
430 /// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.2
431 fn parse(line: &'a String) -> std::io::Result<HttpHeader<'a>> {
432 let mut tokens = line.splitn(2, ':');
433 let name = tokens.next()
434 .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no header name"))?;
435 let value = tokens.next()
436 .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no header value"))?
438 Ok(Self { name, value })
441 /// Returns whether the header field has the given name.
442 fn has_name(&self, name: &str) -> bool {
443 self.name.eq_ignore_ascii_case(name)
447 /// HTTP message body length as defined by [RFC 7230].
449 /// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.3.3
450 enum HttpMessageLength {
452 ContentLength(usize),
453 TransferEncoding(String),
456 /// An HTTP response body in binary format.
457 pub struct BinaryResponse(pub Vec<u8>);
459 /// An HTTP response body in JSON format.
460 pub struct JsonResponse(pub serde_json::Value);
462 /// Interprets bytes from an HTTP response body as binary data.
463 impl TryFrom<Vec<u8>> for BinaryResponse {
464 type Error = std::io::Error;
466 fn try_from(bytes: Vec<u8>) -> std::io::Result<Self> {
467 Ok(BinaryResponse(bytes))
471 /// Interprets bytes from an HTTP response body as a JSON value.
472 impl TryFrom<Vec<u8>> for JsonResponse {
473 type Error = std::io::Error;
475 fn try_from(bytes: Vec<u8>) -> std::io::Result<Self> {
476 Ok(JsonResponse(serde_json::from_slice(&bytes)?))
482 use super::HttpEndpoint;
485 fn with_default_port() {
486 let endpoint = HttpEndpoint::for_host("foo.com".into());
487 assert_eq!(endpoint.host(), "foo.com");
488 assert_eq!(endpoint.port(), 80);
492 fn with_custom_port() {
493 let endpoint = HttpEndpoint::for_host("foo.com".into()).with_port(8080);
494 assert_eq!(endpoint.host(), "foo.com");
495 assert_eq!(endpoint.port(), 8080);
500 let endpoint = HttpEndpoint::for_host("foo.com".into()).with_path("/path".into());
501 assert_eq!(endpoint.host(), "foo.com");
502 assert_eq!(endpoint.path(), "/path");
506 fn without_uri_path() {
507 let endpoint = HttpEndpoint::for_host("foo.com".into());
508 assert_eq!(endpoint.host(), "foo.com");
509 assert_eq!(endpoint.path(), "/");
513 fn convert_to_socket_addrs() {
514 let endpoint = HttpEndpoint::for_host("localhost".into());
515 let host = endpoint.host();
516 let port = endpoint.port();
518 use std::net::ToSocketAddrs;
519 match (&endpoint).to_socket_addrs() {
520 Err(e) => panic!("Unexpected error: {:?}", e),
521 Ok(socket_addrs) => {
522 let mut std_addrs = (host, port).to_socket_addrs().unwrap();
523 for addr in socket_addrs {
524 assert_eq!(addr, std_addrs.next().unwrap());
526 assert!(std_addrs.next().is_none());
533 pub(crate) mod client_tests {
535 use std::io::BufRead;
538 /// Server for handling HTTP client requests with a stock response.
539 pub struct HttpServer {
540 address: std::net::SocketAddr,
541 handler: std::thread::JoinHandle<()>,
542 shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>,
545 /// Body of HTTP response messages.
546 pub enum MessageBody<T: ToString> {
553 fn responding_with_body<T: ToString>(status: &str, body: MessageBody<T>) -> Self {
554 let response = match body {
555 MessageBody::Empty => format!("{}\r\n\r\n", status),
556 MessageBody::Content(body) => {
557 let body = body.to_string();
560 Content-Length: {}\r\n\
562 {}", status, body.len(), body)
564 MessageBody::ChunkedContent(body) => {
565 let mut chuncked_body = Vec::new();
567 use chunked_transfer::Encoder;
568 let mut encoder = Encoder::with_chunks_size(&mut chuncked_body, 8);
569 encoder.write_all(body.to_string().as_bytes()).unwrap();
573 Transfer-Encoding: chunked\r\n\
575 {}", status, String::from_utf8(chuncked_body).unwrap())
578 HttpServer::responding_with(response)
581 pub fn responding_with_ok<T: ToString>(body: MessageBody<T>) -> Self {
582 HttpServer::responding_with_body("HTTP/1.1 200 OK", body)
585 pub fn responding_with_not_found() -> Self {
586 HttpServer::responding_with_body::<String>("HTTP/1.1 404 Not Found", MessageBody::Empty)
589 pub fn responding_with_server_error<T: ToString>(content: T) -> Self {
590 let body = MessageBody::Content(content);
591 HttpServer::responding_with_body("HTTP/1.1 500 Internal Server Error", body)
594 fn responding_with(response: String) -> Self {
595 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
596 let address = listener.local_addr().unwrap();
598 let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
599 let shutdown_signaled = std::sync::Arc::clone(&shutdown);
600 let handler = std::thread::spawn(move || {
601 for stream in listener.incoming() {
602 let mut stream = stream.unwrap();
603 stream.set_write_timeout(Some(TCP_STREAM_TIMEOUT)).unwrap();
605 let lines_read = std::io::BufReader::new(&stream)
607 .take_while(|line| !line.as_ref().unwrap().is_empty())
609 if lines_read == 0 { continue; }
611 for chunk in response.as_bytes().chunks(16) {
612 if shutdown_signaled.load(std::sync::atomic::Ordering::SeqCst) {
615 if let Err(_) = stream.write(chunk) { break; }
616 if let Err(_) = stream.flush() { break; }
622 Self { address, handler, shutdown }
626 self.shutdown.store(true, std::sync::atomic::Ordering::SeqCst);
627 self.handler.join().unwrap();
630 pub fn endpoint(&self) -> HttpEndpoint {
631 HttpEndpoint::for_host(self.address.ip().to_string()).with_port(self.address.port())
636 fn connect_to_unresolvable_host() {
637 match HttpClient::connect(("example.invalid", 80)) {
639 assert!(e.to_string().contains("failed to lookup address information") ||
640 e.to_string().contains("No such host"), "{:?}", e);
642 Ok(_) => panic!("Expected error"),
647 fn connect_with_no_socket_address() {
648 match HttpClient::connect(&vec![][..]) {
649 Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput),
650 Ok(_) => panic!("Expected error"),
655 fn connect_with_unknown_server() {
656 match HttpClient::connect(("::", 80)) {
657 #[cfg(target_os = "windows")]
658 Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::AddrNotAvailable),
659 #[cfg(not(target_os = "windows"))]
660 Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::ConnectionRefused),
661 Ok(_) => panic!("Expected error"),
666 async fn connect_with_valid_endpoint() {
667 let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);
669 match HttpClient::connect(&server.endpoint()) {
670 Err(e) => panic!("Unexpected error: {:?}", e),
676 async fn read_empty_message() {
677 let server = HttpServer::responding_with("".to_string());
679 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
680 match client.get::<BinaryResponse>("/foo", "foo.com").await {
682 assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
683 assert_eq!(e.get_ref().unwrap().to_string(), "no status line");
685 Ok(_) => panic!("Expected error"),
690 async fn read_incomplete_message() {
691 let server = HttpServer::responding_with("HTTP/1.1 200 OK".to_string());
693 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
694 match client.get::<BinaryResponse>("/foo", "foo.com").await {
696 assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
697 assert_eq!(e.get_ref().unwrap().to_string(), "no headers");
699 Ok(_) => panic!("Expected error"),
704 async fn read_too_large_message_headers() {
705 let response = format!(
706 "HTTP/1.1 302 Found\r\n\
708 \r\n", "Z".repeat(MAX_HTTP_MESSAGE_HEADER_SIZE));
709 let server = HttpServer::responding_with(response);
711 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
712 match client.get::<BinaryResponse>("/foo", "foo.com").await {
714 assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
715 assert_eq!(e.get_ref().unwrap().to_string(), "no headers");
717 Ok(_) => panic!("Expected error"),
722 async fn read_too_large_message_body() {
723 let body = "Z".repeat(MAX_HTTP_MESSAGE_BODY_SIZE + 1);
724 let server = HttpServer::responding_with_ok::<String>(MessageBody::Content(body));
726 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
727 match client.get::<BinaryResponse>("/foo", "foo.com").await {
729 assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
730 assert_eq!(e.get_ref().unwrap().to_string(), "invalid response length: 8032001 bytes");
732 Ok(_) => panic!("Expected error"),
738 async fn read_message_with_unsupported_transfer_coding() {
739 let response = String::from(
740 "HTTP/1.1 200 OK\r\n\
741 Transfer-Encoding: gzip\r\n\
744 let server = HttpServer::responding_with(response);
746 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
747 match client.get::<BinaryResponse>("/foo", "foo.com").await {
749 assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput);
750 assert_eq!(e.get_ref().unwrap().to_string(), "unsupported transfer coding");
752 Ok(_) => panic!("Expected error"),
757 async fn read_error() {
758 let server = HttpServer::responding_with_server_error("foo");
760 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
761 match client.get::<JsonResponse>("/foo", "foo.com").await {
763 assert_eq!(e.kind(), std::io::ErrorKind::Other);
764 let http_error = e.into_inner().unwrap().downcast::<HttpError>().unwrap();
765 assert_eq!(http_error.status_code, "500");
766 assert_eq!(http_error.contents, "foo".as_bytes());
768 Ok(_) => panic!("Expected error"),
773 async fn read_empty_message_body() {
774 let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);
776 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
777 match client.get::<BinaryResponse>("/foo", "foo.com").await {
778 Err(e) => panic!("Unexpected error: {:?}", e),
779 Ok(bytes) => assert_eq!(bytes.0, Vec::<u8>::new()),
784 async fn read_message_body_with_length() {
785 let body = "foo bar baz qux".repeat(32);
786 let content = MessageBody::Content(body.clone());
787 let server = HttpServer::responding_with_ok::<String>(content);
789 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
790 match client.get::<BinaryResponse>("/foo", "foo.com").await {
791 Err(e) => panic!("Unexpected error: {:?}", e),
792 Ok(bytes) => assert_eq!(bytes.0, body.as_bytes()),
797 async fn read_chunked_message_body() {
798 let body = "foo bar baz qux".repeat(32);
799 let chunked_content = MessageBody::ChunkedContent(body.clone());
800 let server = HttpServer::responding_with_ok::<String>(chunked_content);
802 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
803 match client.get::<BinaryResponse>("/foo", "foo.com").await {
804 Err(e) => panic!("Unexpected error: {:?}", e),
805 Ok(bytes) => assert_eq!(bytes.0, body.as_bytes()),
810 async fn reconnect_closed_connection() {
811 let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);
813 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
814 assert!(client.get::<BinaryResponse>("/foo", "foo.com").await.is_ok());
815 match client.get::<BinaryResponse>("/foo", "foo.com").await {
816 Err(e) => panic!("Unexpected error: {:?}", e),
817 Ok(bytes) => assert_eq!(bytes.0, Vec::<u8>::new()),
822 fn from_bytes_into_binary_response() {
824 match BinaryResponse::try_from(bytes.to_vec()) {
825 Err(e) => panic!("Unexpected error: {:?}", e),
826 Ok(response) => assert_eq!(&response.0, bytes),
831 fn from_invalid_bytes_into_json_response() {
832 let json = serde_json::json!({ "result": 42 });
833 match JsonResponse::try_from(json.to_string().as_bytes()[..5].to_vec()) {
835 Ok(_) => panic!("Expected error"),
840 fn from_valid_bytes_into_json_response() {
841 let json = serde_json::json!({ "result": 42 });
842 match JsonResponse::try_from(json.to_string().as_bytes().to_vec()) {
843 Err(e) => panic!("Unexpected error: {:?}", e),
844 Ok(response) => assert_eq!(response.0, json),