1 //! Simple HTTP implementation which supports both async and traditional execution environments
2 //! with minimal dependencies. This is used as the basis for REST and RPC clients.
7 use std::convert::TryFrom;
8 #[cfg(not(feature = "tokio"))]
10 use std::net::ToSocketAddrs;
11 use std::time::Duration;
13 #[cfg(feature = "tokio")]
14 use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt};
15 #[cfg(feature = "tokio")]
16 use tokio::net::TcpStream;
18 #[cfg(not(feature = "tokio"))]
21 #[cfg(not(feature = "tokio"))]
22 use std::net::TcpStream;
24 /// Timeout for operations on TCP streams.
25 const TCP_STREAM_TIMEOUT: Duration = Duration::from_secs(5);
27 /// Timeout for reading the first byte of a response. This is separate from the general read
28 /// timeout as it is not uncommon for Bitcoin Core to be blocked waiting on UTXO cache flushes for
29 /// upwards of a minute or more. Note that we always retry once when we time out, so the maximum
30 /// time we allow Bitcoin Core to block for is twice this value.
31 const TCP_STREAM_RESPONSE_TIMEOUT: Duration = Duration::from_secs(120);
33 /// Maximum HTTP message header size in bytes.
34 const MAX_HTTP_MESSAGE_HEADER_SIZE: usize = 8192;
36 /// Maximum HTTP message body size in bytes. Enough for a hex-encoded block in JSON format and any
37 /// overhead for HTTP chunked transfer encoding.
38 const MAX_HTTP_MESSAGE_BODY_SIZE: usize = 2 * 4_000_000 + 32_000;
40 /// Endpoint for interacting with an HTTP-based API.
42 pub struct HttpEndpoint {
49 /// Creates an endpoint for the given host and default HTTP port.
50 pub fn for_host(host: String) -> Self {
54 path: String::from("/"),
58 /// Specifies a port to use with the endpoint.
59 pub fn with_port(mut self, port: u16) -> Self {
60 self.port = Some(port);
64 /// Specifies a path to use with the endpoint.
65 pub fn with_path(mut self, path: String) -> Self {
70 /// Returns the endpoint host.
71 pub fn host(&self) -> &str {
75 /// Returns the endpoint port.
76 pub fn port(&self) -> u16 {
83 /// Returns the endpoint path.
84 pub fn path(&self) -> &str {
89 impl<'a> std::net::ToSocketAddrs for &'a HttpEndpoint {
90 type Iter = <(&'a str, u16) as std::net::ToSocketAddrs>::Iter;
92 fn to_socket_addrs(&self) -> std::io::Result<Self::Iter> {
93 (self.host(), self.port()).to_socket_addrs()
97 /// Client for making HTTP requests.
98 pub(crate) struct HttpClient {
103 /// Opens a connection to an HTTP endpoint.
104 pub fn connect<E: ToSocketAddrs>(endpoint: E) -> std::io::Result<Self> {
105 let address = match endpoint.to_socket_addrs()?.next() {
107 return Err(std::io::Error::new(std::io::ErrorKind::InvalidInput, "could not resolve to any addresses"));
109 Some(address) => address,
111 let stream = std::net::TcpStream::connect_timeout(&address, TCP_STREAM_TIMEOUT)?;
112 stream.set_read_timeout(Some(TCP_STREAM_TIMEOUT))?;
113 stream.set_write_timeout(Some(TCP_STREAM_TIMEOUT))?;
115 #[cfg(feature = "tokio")]
117 stream.set_nonblocking(true)?;
118 TcpStream::from_std(stream)?
124 /// Sends a `GET` request for a resource identified by `uri` at the `host`.
126 /// Returns the response body in `F` format.
128 pub async fn get<F>(&mut self, uri: &str, host: &str) -> std::io::Result<F>
129 where F: TryFrom<Vec<u8>, Error = std::io::Error> {
130 let request = format!(
131 "GET {} HTTP/1.1\r\n\
133 Connection: keep-alive\r\n\
135 let response_body = self.send_request_with_retry(&request).await?;
136 F::try_from(response_body)
139 /// Sends a `POST` request for a resource identified by `uri` at the `host` using the given HTTP
140 /// authentication credentials.
142 /// The request body consists of the provided JSON `content`. Returns the response body in `F`
145 pub async fn post<F>(&mut self, uri: &str, host: &str, auth: &str, content: serde_json::Value) -> std::io::Result<F>
146 where F: TryFrom<Vec<u8>, Error = std::io::Error> {
147 let content = content.to_string();
148 let request = format!(
149 "POST {} HTTP/1.1\r\n\
151 Authorization: {}\r\n\
152 Connection: keep-alive\r\n\
153 Content-Type: application/json\r\n\
154 Content-Length: {}\r\n\
156 {}", uri, host, auth, content.len(), content);
157 let response_body = self.send_request_with_retry(&request).await?;
158 F::try_from(response_body)
161 /// Sends an HTTP request message and reads the response, returning its body. Attempts to
162 /// reconnect and retry if the connection has been closed.
163 async fn send_request_with_retry(&mut self, request: &str) -> std::io::Result<Vec<u8>> {
164 let endpoint = self.stream.peer_addr().unwrap();
165 match self.send_request(request).await {
166 Ok(bytes) => Ok(bytes),
168 // Reconnect and retry on fail. This can happen if the connection was closed after
169 // the keep-alive limits are reached, or generally if the request timed out due to
170 // Bitcoin Core being stuck on a long-running operation or its RPC queue being
172 // Block 100ms before retrying the request as in many cases the source of the error
173 // may be persistent for some time.
174 #[cfg(feature = "tokio")]
175 tokio::time::sleep(Duration::from_millis(100)).await;
176 #[cfg(not(feature = "tokio"))]
177 std::thread::sleep(Duration::from_millis(100));
178 *self = Self::connect(endpoint)?;
179 self.send_request(request).await
184 /// Sends an HTTP request message and reads the response, returning its body.
185 async fn send_request(&mut self, request: &str) -> std::io::Result<Vec<u8>> {
186 self.write_request(request).await?;
187 self.read_response().await
190 /// Writes an HTTP request message.
191 async fn write_request(&mut self, request: &str) -> std::io::Result<()> {
192 #[cfg(feature = "tokio")]
194 self.stream.write_all(request.as_bytes()).await?;
195 self.stream.flush().await
197 #[cfg(not(feature = "tokio"))]
199 self.stream.write_all(request.as_bytes())?;
204 /// Reads an HTTP response message.
205 async fn read_response(&mut self) -> std::io::Result<Vec<u8>> {
206 #[cfg(feature = "tokio")]
207 let stream = self.stream.split().0;
208 #[cfg(not(feature = "tokio"))]
209 let stream = std::io::Read::by_ref(&mut self.stream);
211 let limited_stream = stream.take(MAX_HTTP_MESSAGE_HEADER_SIZE as u64);
213 #[cfg(feature = "tokio")]
214 let mut reader = tokio::io::BufReader::new(limited_stream);
215 #[cfg(not(feature = "tokio"))]
216 let mut reader = std::io::BufReader::new(limited_stream);
218 macro_rules! read_line {
219 () => { read_line!(0) };
220 ($retry_count: expr) => { {
221 let mut line = String::new();
222 let mut timeout_count: u64 = 0;
223 let bytes_read = loop {
224 #[cfg(feature = "tokio")]
225 let read_res = reader.read_line(&mut line).await;
226 #[cfg(not(feature = "tokio"))]
227 let read_res = reader.read_line(&mut line);
229 Ok(bytes_read) => break bytes_read,
230 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
232 if timeout_count > $retry_count {
238 Err(e) => return Err(e),
245 // Remove trailing CRLF
246 if line.ends_with('\n') { line.pop(); if line.ends_with('\r') { line.pop(); } }
253 // Read and parse status line
254 // Note that we allow retrying a few times to reach TCP_STREAM_RESPONSE_TIMEOUT.
255 let status_line = read_line!(TCP_STREAM_RESPONSE_TIMEOUT.as_secs() / TCP_STREAM_TIMEOUT.as_secs())
256 .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no status line"))?;
257 let status = HttpStatus::parse(&status_line)?;
259 // Read and parse relevant headers
260 let mut message_length = HttpMessageLength::Empty;
262 let line = read_line!()
263 .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no headers"))?;
264 if line.is_empty() { break; }
266 let header = HttpHeader::parse(&line)?;
267 if header.has_name("Content-Length") {
268 let length = header.value.parse()
269 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
270 if let HttpMessageLength::Empty = message_length {
271 message_length = HttpMessageLength::ContentLength(length);
276 if header.has_name("Transfer-Encoding") {
277 message_length = HttpMessageLength::TransferEncoding(header.value.into());
283 let read_limit = MAX_HTTP_MESSAGE_BODY_SIZE - reader.buffer().len();
284 reader.get_mut().set_limit(read_limit as u64);
285 let contents = match message_length {
286 HttpMessageLength::Empty => { Vec::new() },
287 HttpMessageLength::ContentLength(length) => {
288 if length == 0 || length > MAX_HTTP_MESSAGE_BODY_SIZE {
289 return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "out of range"))
291 let mut content = vec![0; length];
292 #[cfg(feature = "tokio")]
293 reader.read_exact(&mut content[..]).await?;
294 #[cfg(not(feature = "tokio"))]
295 reader.read_exact(&mut content[..])?;
299 HttpMessageLength::TransferEncoding(coding) => {
300 if !coding.eq_ignore_ascii_case("chunked") {
301 return Err(std::io::Error::new(
302 std::io::ErrorKind::InvalidInput, "unsupported transfer coding"))
304 let mut content = Vec::new();
305 #[cfg(feature = "tokio")]
307 // Since chunked_transfer doesn't have an async interface, only use it to
308 // determine the size of each chunk to read.
310 // TODO: Replace with an async interface when available.
311 // https://github.com/frewsxcv/rust-chunked-transfer/issues/7
313 // Read the chunk header which contains the chunk size.
314 let mut chunk_header = String::new();
315 reader.read_line(&mut chunk_header).await?;
316 if chunk_header == "0\r\n" {
317 // Read the terminator chunk since the decoder consumes the CRLF
318 // immediately when this chunk is encountered.
319 reader.read_line(&mut chunk_header).await?;
322 // Decode the chunk header to obtain the chunk size.
323 let mut buffer = Vec::new();
324 let mut decoder = chunked_transfer::Decoder::new(chunk_header.as_bytes());
325 decoder.read_to_end(&mut buffer)?;
327 // Read the chunk body.
328 let chunk_size = match decoder.remaining_chunks_size() {
330 Some(chunk_size) => chunk_size,
332 let chunk_offset = content.len();
333 content.resize(chunk_offset + chunk_size + "\r\n".len(), 0);
334 reader.read_exact(&mut content[chunk_offset..]).await?;
335 content.resize(chunk_offset + chunk_size, 0);
339 #[cfg(not(feature = "tokio"))]
341 let mut decoder = chunked_transfer::Decoder::new(reader);
342 decoder.read_to_end(&mut content)?;
350 // TODO: Handle 3xx redirection responses.
351 let error_details = match String::from_utf8(contents) {
352 // Check that the string is all-ASCII with no control characters before returning
354 Ok(s) if s.as_bytes().iter().all(|c| c.is_ascii() && !c.is_ascii_control()) => s,
355 _ => "binary".to_string()
357 let error_msg = format!("Errored with status: {} and contents: {}",
358 status.code, error_details);
359 return Err(std::io::Error::new(std::io::ErrorKind::Other, error_msg));
366 /// HTTP response status code as defined by [RFC 7231].
368 /// [RFC 7231]: https://tools.ietf.org/html/rfc7231#section-6
369 struct HttpStatus<'a> {
373 impl<'a> HttpStatus<'a> {
374 /// Parses an HTTP status line as defined by [RFC 7230].
376 /// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.1.2
377 fn parse(line: &'a String) -> std::io::Result<HttpStatus<'a>> {
378 let mut tokens = line.splitn(3, ' ');
380 let http_version = tokens.next()
381 .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no HTTP-Version"))?;
382 if !http_version.eq_ignore_ascii_case("HTTP/1.1") &&
383 !http_version.eq_ignore_ascii_case("HTTP/1.0") {
384 return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid HTTP-Version"));
387 let code = tokens.next()
388 .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no Status-Code"))?;
389 if code.len() != 3 || !code.chars().all(|c| c.is_ascii_digit()) {
390 return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "invalid Status-Code"));
393 let _reason = tokens.next()
394 .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no Reason-Phrase"))?;
399 /// Returns whether the status is successful (i.e., 2xx status class).
400 fn is_ok(&self) -> bool {
401 self.code.starts_with('2')
405 /// HTTP response header as defined by [RFC 7231].
407 /// [RFC 7231]: https://tools.ietf.org/html/rfc7231#section-7
408 struct HttpHeader<'a> {
413 impl<'a> HttpHeader<'a> {
414 /// Parses an HTTP header field as defined by [RFC 7230].
416 /// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.2
417 fn parse(line: &'a String) -> std::io::Result<HttpHeader<'a>> {
418 let mut tokens = line.splitn(2, ':');
419 let name = tokens.next()
420 .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no header name"))?;
421 let value = tokens.next()
422 .ok_or(std::io::Error::new(std::io::ErrorKind::InvalidData, "no header value"))?
424 Ok(Self { name, value })
427 /// Returns whether the header field has the given name.
428 fn has_name(&self, name: &str) -> bool {
429 self.name.eq_ignore_ascii_case(name)
433 /// HTTP message body length as defined by [RFC 7230].
435 /// [RFC 7230]: https://tools.ietf.org/html/rfc7230#section-3.3.3
436 enum HttpMessageLength {
438 ContentLength(usize),
439 TransferEncoding(String),
442 /// An HTTP response body in binary format.
443 pub struct BinaryResponse(pub Vec<u8>);
445 /// An HTTP response body in JSON format.
446 pub struct JsonResponse(pub serde_json::Value);
448 /// Interprets bytes from an HTTP response body as binary data.
449 impl TryFrom<Vec<u8>> for BinaryResponse {
450 type Error = std::io::Error;
452 fn try_from(bytes: Vec<u8>) -> std::io::Result<Self> {
453 Ok(BinaryResponse(bytes))
457 /// Interprets bytes from an HTTP response body as a JSON value.
458 impl TryFrom<Vec<u8>> for JsonResponse {
459 type Error = std::io::Error;
461 fn try_from(bytes: Vec<u8>) -> std::io::Result<Self> {
462 Ok(JsonResponse(serde_json::from_slice(&bytes)?))
468 use super::HttpEndpoint;
471 fn with_default_port() {
472 let endpoint = HttpEndpoint::for_host("foo.com".into());
473 assert_eq!(endpoint.host(), "foo.com");
474 assert_eq!(endpoint.port(), 80);
478 fn with_custom_port() {
479 let endpoint = HttpEndpoint::for_host("foo.com".into()).with_port(8080);
480 assert_eq!(endpoint.host(), "foo.com");
481 assert_eq!(endpoint.port(), 8080);
486 let endpoint = HttpEndpoint::for_host("foo.com".into()).with_path("/path".into());
487 assert_eq!(endpoint.host(), "foo.com");
488 assert_eq!(endpoint.path(), "/path");
492 fn without_uri_path() {
493 let endpoint = HttpEndpoint::for_host("foo.com".into());
494 assert_eq!(endpoint.host(), "foo.com");
495 assert_eq!(endpoint.path(), "/");
499 fn convert_to_socket_addrs() {
500 let endpoint = HttpEndpoint::for_host("foo.com".into());
501 let host = endpoint.host();
502 let port = endpoint.port();
504 use std::net::ToSocketAddrs;
505 match (&endpoint).to_socket_addrs() {
506 Err(e) => panic!("Unexpected error: {:?}", e),
507 Ok(mut socket_addrs) => {
508 match socket_addrs.next() {
509 None => panic!("Expected socket address"),
511 assert_eq!(addr, (host, port).to_socket_addrs().unwrap().next().unwrap());
512 assert!(socket_addrs.next().is_none());
521 pub(crate) mod client_tests {
523 use std::io::BufRead;
526 /// Server for handling HTTP client requests with a stock response.
527 pub struct HttpServer {
528 address: std::net::SocketAddr,
529 handler: std::thread::JoinHandle<()>,
530 shutdown: std::sync::Arc<std::sync::atomic::AtomicBool>,
533 /// Body of HTTP response messages.
534 pub enum MessageBody<T: ToString> {
541 pub fn responding_with_ok<T: ToString>(body: MessageBody<T>) -> Self {
542 let response = match body {
543 MessageBody::Empty => "HTTP/1.1 200 OK\r\n\r\n".to_string(),
544 MessageBody::Content(body) => {
545 let body = body.to_string();
547 "HTTP/1.1 200 OK\r\n\
548 Content-Length: {}\r\n\
550 {}", body.len(), body)
552 MessageBody::ChunkedContent(body) => {
553 let mut chuncked_body = Vec::new();
555 use chunked_transfer::Encoder;
556 let mut encoder = Encoder::with_chunks_size(&mut chuncked_body, 8);
557 encoder.write_all(body.to_string().as_bytes()).unwrap();
560 "HTTP/1.1 200 OK\r\n\
561 Transfer-Encoding: chunked\r\n\
563 {}", String::from_utf8(chuncked_body).unwrap())
566 HttpServer::responding_with(response)
569 pub fn responding_with_not_found() -> Self {
570 let response = "HTTP/1.1 404 Not Found\r\n\r\n".to_string();
571 HttpServer::responding_with(response)
574 fn responding_with(response: String) -> Self {
575 let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap();
576 let address = listener.local_addr().unwrap();
578 let shutdown = std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false));
579 let shutdown_signaled = std::sync::Arc::clone(&shutdown);
580 let handler = std::thread::spawn(move || {
581 for stream in listener.incoming() {
582 let mut stream = stream.unwrap();
583 stream.set_write_timeout(Some(TCP_STREAM_TIMEOUT)).unwrap();
585 let lines_read = std::io::BufReader::new(&stream)
587 .take_while(|line| !line.as_ref().unwrap().is_empty())
589 if lines_read == 0 { continue; }
591 for chunk in response.as_bytes().chunks(16) {
592 if shutdown_signaled.load(std::sync::atomic::Ordering::SeqCst) {
595 if let Err(_) = stream.write(chunk) { break; }
596 if let Err(_) = stream.flush() { break; }
602 Self { address, handler, shutdown }
606 self.shutdown.store(true, std::sync::atomic::Ordering::SeqCst);
607 self.handler.join().unwrap();
610 pub fn endpoint(&self) -> HttpEndpoint {
611 HttpEndpoint::for_host(self.address.ip().to_string()).with_port(self.address.port())
616 fn connect_to_unresolvable_host() {
617 match HttpClient::connect(("example.invalid", 80)) {
618 Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::Other),
619 Ok(_) => panic!("Expected error"),
624 fn connect_with_no_socket_address() {
625 match HttpClient::connect(&vec![][..]) {
626 Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput),
627 Ok(_) => panic!("Expected error"),
632 fn connect_with_unknown_server() {
633 match HttpClient::connect(("::", 80)) {
634 #[cfg(target_os = "windows")]
635 Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::AddrNotAvailable),
636 #[cfg(not(target_os = "windows"))]
637 Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::ConnectionRefused),
638 Ok(_) => panic!("Expected error"),
643 async fn connect_with_valid_endpoint() {
644 let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);
646 match HttpClient::connect(&server.endpoint()) {
647 Err(e) => panic!("Unexpected error: {:?}", e),
653 async fn read_empty_message() {
654 let server = HttpServer::responding_with("".to_string());
656 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
657 match client.get::<BinaryResponse>("/foo", "foo.com").await {
659 assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
660 assert_eq!(e.get_ref().unwrap().to_string(), "no status line");
662 Ok(_) => panic!("Expected error"),
667 async fn read_incomplete_message() {
668 let server = HttpServer::responding_with("HTTP/1.1 200 OK".to_string());
670 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
671 match client.get::<BinaryResponse>("/foo", "foo.com").await {
673 assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
674 assert_eq!(e.get_ref().unwrap().to_string(), "no headers");
676 Ok(_) => panic!("Expected error"),
681 async fn read_too_large_message_headers() {
682 let response = format!(
683 "HTTP/1.1 302 Found\r\n\
685 \r\n", "Z".repeat(MAX_HTTP_MESSAGE_HEADER_SIZE));
686 let server = HttpServer::responding_with(response);
688 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
689 match client.get::<BinaryResponse>("/foo", "foo.com").await {
691 assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
692 assert_eq!(e.get_ref().unwrap().to_string(), "no headers");
694 Ok(_) => panic!("Expected error"),
699 async fn read_too_large_message_body() {
700 let body = "Z".repeat(MAX_HTTP_MESSAGE_BODY_SIZE + 1);
701 let server = HttpServer::responding_with_ok::<String>(MessageBody::Content(body));
703 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
704 match client.get::<BinaryResponse>("/foo", "foo.com").await {
706 assert_eq!(e.kind(), std::io::ErrorKind::InvalidData);
707 assert_eq!(e.get_ref().unwrap().to_string(), "out of range");
709 Ok(_) => panic!("Expected error"),
715 async fn read_message_with_unsupported_transfer_coding() {
716 let response = String::from(
717 "HTTP/1.1 200 OK\r\n\
718 Transfer-Encoding: gzip\r\n\
721 let server = HttpServer::responding_with(response);
723 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
724 match client.get::<BinaryResponse>("/foo", "foo.com").await {
726 assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput);
727 assert_eq!(e.get_ref().unwrap().to_string(), "unsupported transfer coding");
729 Ok(_) => panic!("Expected error"),
734 async fn read_error() {
735 let response = String::from(
736 "HTTP/1.1 500 Internal Server Error\r\n\
737 Content-Length: 10\r\n\r\ntest error\r\n");
738 let server = HttpServer::responding_with(response);
740 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
741 match client.get::<JsonResponse>("/foo", "foo.com").await {
743 assert_eq!(e.get_ref().unwrap().to_string(), "Errored with status: 500 and contents: test error");
744 assert_eq!(e.kind(), std::io::ErrorKind::Other);
746 Ok(_) => panic!("Expected error"),
751 async fn read_empty_message_body() {
752 let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);
754 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
755 match client.get::<BinaryResponse>("/foo", "foo.com").await {
756 Err(e) => panic!("Unexpected error: {:?}", e),
757 Ok(bytes) => assert_eq!(bytes.0, Vec::<u8>::new()),
762 async fn read_message_body_with_length() {
763 let body = "foo bar baz qux".repeat(32);
764 let content = MessageBody::Content(body.clone());
765 let server = HttpServer::responding_with_ok::<String>(content);
767 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
768 match client.get::<BinaryResponse>("/foo", "foo.com").await {
769 Err(e) => panic!("Unexpected error: {:?}", e),
770 Ok(bytes) => assert_eq!(bytes.0, body.as_bytes()),
775 async fn read_chunked_message_body() {
776 let body = "foo bar baz qux".repeat(32);
777 let chunked_content = MessageBody::ChunkedContent(body.clone());
778 let server = HttpServer::responding_with_ok::<String>(chunked_content);
780 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
781 match client.get::<BinaryResponse>("/foo", "foo.com").await {
782 Err(e) => panic!("Unexpected error: {:?}", e),
783 Ok(bytes) => assert_eq!(bytes.0, body.as_bytes()),
788 async fn reconnect_closed_connection() {
789 let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);
791 let mut client = HttpClient::connect(&server.endpoint()).unwrap();
792 assert!(client.get::<BinaryResponse>("/foo", "foo.com").await.is_ok());
793 match client.get::<BinaryResponse>("/foo", "foo.com").await {
794 Err(e) => panic!("Unexpected error: {:?}", e),
795 Ok(bytes) => assert_eq!(bytes.0, Vec::<u8>::new()),
800 fn from_bytes_into_binary_response() {
802 match BinaryResponse::try_from(bytes.to_vec()) {
803 Err(e) => panic!("Unexpected error: {:?}", e),
804 Ok(response) => assert_eq!(&response.0, bytes),
809 fn from_invalid_bytes_into_json_response() {
810 let json = serde_json::json!({ "result": 42 });
811 match JsonResponse::try_from(json.to_string().as_bytes()[..5].to_vec()) {
813 Ok(_) => panic!("Expected error"),
818 fn from_valid_bytes_into_json_response() {
819 let json = serde_json::json!({ "result": 42 });
820 match JsonResponse::try_from(json.to_string().as_bytes().to_vec()) {
821 Err(e) => panic!("Unexpected error: {:?}", e),
822 Ok(response) => assert_eq!(response.0, json),