+//! Simple HTTP implementation which supports both async and traditional execution environments
+//! with minimal dependencies. This is used as the basis for REST and RPC clients.
+
use chunked_transfer;
use serde_json;
/// Timeout for operations on TCP streams.
const TCP_STREAM_TIMEOUT: Duration = Duration::from_secs(5);
+/// Timeout for reading the first byte of a response. This is separate from the general read
+/// timeout as it is not uncommon for Bitcoin Core to be blocked waiting on UTXO cache flushes for
+/// upwards of a minute or more. Note that we always retry once when we time out, so the maximum
+/// time we allow Bitcoin Core to block for is twice this value.
+const TCP_STREAM_RESPONSE_TIMEOUT: Duration = Duration::from_secs(120);
+
/// Maximum HTTP message header size in bytes.
const MAX_HTTP_MESSAGE_HEADER_SIZE: usize = 8192;
let endpoint = self.stream.peer_addr().unwrap();
match self.send_request(request).await {
Ok(bytes) => Ok(bytes),
- Err(e) => match e.kind() {
- std::io::ErrorKind::ConnectionReset |
- std::io::ErrorKind::ConnectionAborted |
- std::io::ErrorKind::UnexpectedEof => {
- // Reconnect if the connection was closed. This may happen if the server's
- // keep-alive limits are reached.
- *self = Self::connect(endpoint)?;
- self.send_request(request).await
- },
- _ => Err(e),
+ Err(_) => {
+ // Reconnect and retry on fail. This can happen if the connection was closed after
+ // the keep-alive limits are reached, or generally if the request timed out due to
+ // Bitcoin Core being stuck on a long-running operation or its RPC queue being
+ // full.
+ // Block 100ms before retrying the request as in many cases the source of the error
+ // may be persistent for some time.
+ #[cfg(feature = "tokio")]
+ tokio::time::sleep(Duration::from_millis(100)).await;
+ #[cfg(not(feature = "tokio"))]
+ std::thread::sleep(Duration::from_millis(100));
+ *self = Self::connect(endpoint)?;
+ self.send_request(request).await
},
}
}
#[cfg(not(feature = "tokio"))]
let mut reader = std::io::BufReader::new(limited_stream);
- macro_rules! read_line { () => { {
- let mut line = String::new();
- #[cfg(feature = "tokio")]
- let bytes_read = reader.read_line(&mut line).await?;
- #[cfg(not(feature = "tokio"))]
- let bytes_read = reader.read_line(&mut line)?;
-
- match bytes_read {
- 0 => None,
- _ => {
- // Remove trailing CRLF
- if line.ends_with('\n') { line.pop(); if line.ends_with('\r') { line.pop(); } }
- Some(line)
- },
- }
- } } }
+ macro_rules! read_line {
+ () => { read_line!(0) };
+ ($retry_count: expr) => { {
+ let mut line = String::new();
+ let mut timeout_count: u64 = 0;
+ let bytes_read = loop {
+ #[cfg(feature = "tokio")]
+ let read_res = reader.read_line(&mut line).await;
+ #[cfg(not(feature = "tokio"))]
+ let read_res = reader.read_line(&mut line);
+ match read_res {
+ Ok(bytes_read) => break bytes_read,
+ Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
+ timeout_count += 1;
+ if timeout_count > $retry_count {
+ return Err(e);
+ } else {
+ continue;
+ }
+ }
+ Err(e) => return Err(e),
+ }
+ };
+
+ match bytes_read {
+ 0 => None,
+ _ => {
+ // Remove trailing CRLF
+ if line.ends_with('\n') { line.pop(); if line.ends_with('\r') { line.pop(); } }
+ Some(line)
+ },
+ }
+ } }
+ }
// Read and parse status line
- let status_line = read_line!()
+ // Note that we allow retrying a few times to reach TCP_STREAM_RESPONSE_TIMEOUT.
+ let status_line = read_line!(TCP_STREAM_RESPONSE_TIMEOUT.as_secs() / TCP_STREAM_TIMEOUT.as_secs())
.ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no status line"))?;
let status = HttpStatus::parse(&status_line)?;
}
}
- if !status.is_ok() {
- // TODO: Handle 3xx redirection responses.
- return Err(std::io::Error::new(std::io::ErrorKind::NotFound, "not found"));
- }
-
// Read message body
let read_limit = MAX_HTTP_MESSAGE_BODY_SIZE - reader.buffer().len();
reader.get_mut().set_limit(read_limit as u64);
- match message_length {
- HttpMessageLength::Empty => { Ok(Vec::new()) },
+ let contents = match message_length {
+ HttpMessageLength::Empty => { Vec::new() },
HttpMessageLength::ContentLength(length) => {
if length == 0 || length > MAX_HTTP_MESSAGE_BODY_SIZE {
- Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "out of range"))
+ return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "out of range"))
} else {
let mut content = vec![0; length];
#[cfg(feature = "tokio")]
reader.read_exact(&mut content[..]).await?;
#[cfg(not(feature = "tokio"))]
reader.read_exact(&mut content[..])?;
- Ok(content)
+ content
}
},
HttpMessageLength::TransferEncoding(coding) => {
if !coding.eq_ignore_ascii_case("chunked") {
- Err(std::io::Error::new(
- std::io::ErrorKind::InvalidInput, "unsupported transfer coding"))
+ return Err(std::io::Error::new(
+ std::io::ErrorKind::InvalidInput, "unsupported transfer coding"))
} else {
let mut content = Vec::new();
#[cfg(feature = "tokio")]
reader.read_exact(&mut content[chunk_offset..]).await?;
content.resize(chunk_offset + chunk_size, 0);
}
- Ok(content)
+ content
}
#[cfg(not(feature = "tokio"))]
{
let mut decoder = chunked_transfer::Decoder::new(reader);
decoder.read_to_end(&mut content)?;
- Ok(content)
+ content
}
}
},
+ };
+
+ if !status.is_ok() {
+ // TODO: Handle 3xx redirection responses.
+ let error_details = match String::from_utf8(contents) {
+ // Check that the string is all-ASCII with no control characters before returning
+ // it.
+ Ok(s) if s.as_bytes().iter().all(|c| c.is_ascii() && !c.is_ascii_control()) => s,
+ _ => "binary".to_string()
+ };
+ let error_msg = format!("Errored with status: {} and contents: {}",
+ status.code, error_details);
+ return Err(std::io::Error::new(std::io::ErrorKind::Other, error_msg));
}
+
+ Ok(contents)
}
}
}
/// An HTTP response body in binary format.
-pub(crate) struct BinaryResponse(pub(crate) Vec<u8>);
+pub struct BinaryResponse(pub Vec<u8>);
/// An HTTP response body in JSON format.
-pub(crate) struct JsonResponse(pub(crate) serde_json::Value);
+pub struct JsonResponse(pub serde_json::Value);
/// Interprets bytes from an HTTP response body as binary data.
impl TryFrom<Vec<u8>> for BinaryResponse {
}
}
+ #[tokio::test]
+ async fn read_error() {
+ let response = String::from(
+ "HTTP/1.1 500 Internal Server Error\r\n\
+ Content-Length: 10\r\n\r\ntest error\r\n");
+ let server = HttpServer::responding_with(response);
+
+ let mut client = HttpClient::connect(&server.endpoint()).unwrap();
+ match client.get::<JsonResponse>("/foo", "foo.com").await {
+ Err(e) => {
+ assert_eq!(e.get_ref().unwrap().to_string(), "Errored with status: 500 and contents: test error");
+ assert_eq!(e.kind(), std::io::ErrorKind::Other);
+ },
+ Ok(_) => panic!("Expected error"),
+ }
+ }
+
#[tokio::test]
async fn read_empty_message_body() {
let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);