Define an HttpError for returning error contents
[rust-lightning] / lightning-block-sync / src / http.rs
index a33d919d05179c7adccc8bf40fc7ed86b467bb21..89eb95dcff84a5165b5dcfdf27f54d71d69f57a4 100644 (file)
@@ -1,7 +1,11 @@
+//! Simple HTTP implementation which supports both async and traditional execution environments
+//! with minimal dependencies. This is used as the basis for REST and RPC clients.
+
 use chunked_transfer;
 use serde_json;
 
 use std::convert::TryFrom;
+use std::fmt;
 #[cfg(not(feature = "tokio"))]
 use std::io::Write;
 use std::net::ToSocketAddrs;
@@ -21,6 +25,12 @@ use std::net::TcpStream;
 /// Timeout for operations on TCP streams.
 const TCP_STREAM_TIMEOUT: Duration = Duration::from_secs(5);
 
+/// Timeout for reading the first byte of a response. This is separate from the general read
+/// timeout as it is not uncommon for Bitcoin Core to be blocked waiting on UTXO cache flushes for
+/// upwards of a minute or more. Note that we always retry once when we time out, so the maximum
+/// time we allow Bitcoin Core to block for is twice this value.
+const TCP_STREAM_RESPONSE_TIMEOUT: Duration = Duration::from_secs(120);
+
 /// Maximum HTTP message header size in bytes.
 const MAX_HTTP_MESSAGE_HEADER_SIZE: usize = 8192;
 
@@ -155,16 +165,19 @@ impl HttpClient {
                let endpoint = self.stream.peer_addr().unwrap();
                match self.send_request(request).await {
                        Ok(bytes) => Ok(bytes),
-                       Err(e) => match e.kind() {
-                               std::io::ErrorKind::ConnectionReset |
-                               std::io::ErrorKind::ConnectionAborted |
-                               std::io::ErrorKind::UnexpectedEof => {
-                                       // Reconnect if the connection was closed. This may happen if the server's
-                                       // keep-alive limits are reached.
-                                       *self = Self::connect(endpoint)?;
-                                       self.send_request(request).await
-                               },
-                               _ => Err(e),
+                       Err(_) => {
+                               // Reconnect and retry on fail. This can happen if the connection was closed after
+                               // the keep-alive limits are reached, or generally if the request timed out due to
+                               // Bitcoin Core being stuck on a long-running operation or its RPC queue being
+                               // full.
+                               // Block 100ms before retrying the request as in many cases the source of the error
+                               // may be persistent for some time.
+                               #[cfg(feature = "tokio")]
+                               tokio::time::sleep(Duration::from_millis(100)).await;
+                               #[cfg(not(feature = "tokio"))]
+                               std::thread::sleep(Duration::from_millis(100));
+                               *self = Self::connect(endpoint)?;
+                               self.send_request(request).await
                        },
                }
        }
@@ -203,25 +216,44 @@ impl HttpClient {
                #[cfg(not(feature = "tokio"))]
                let mut reader = std::io::BufReader::new(limited_stream);
 
-               macro_rules! read_line { () => { {
-                       let mut line = String::new();
-                       #[cfg(feature = "tokio")]
-                       let bytes_read = reader.read_line(&mut line).await?;
-                       #[cfg(not(feature = "tokio"))]
-                       let bytes_read = reader.read_line(&mut line)?;
-
-                       match bytes_read {
-                               0 => None,
-                               _ => {
-                                       // Remove trailing CRLF
-                                       if line.ends_with('\n') { line.pop(); if line.ends_with('\r') { line.pop(); } }
-                                       Some(line)
-                               },
-                       }
-               } } }
+               macro_rules! read_line {
+                       () => { read_line!(0) };
+                       ($retry_count: expr) => { {
+                               let mut line = String::new();
+                               let mut timeout_count: u64 = 0;
+                               let bytes_read = loop {
+                                       #[cfg(feature = "tokio")]
+                                       let read_res = reader.read_line(&mut line).await;
+                                       #[cfg(not(feature = "tokio"))]
+                                       let read_res = reader.read_line(&mut line);
+                                       match read_res {
+                                               Ok(bytes_read) => break bytes_read,
+                                               Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
+                                                       timeout_count += 1;
+                                                       if timeout_count > $retry_count {
+                                                               return Err(e);
+                                                       } else {
+                                                               continue;
+                                                       }
+                                               }
+                                               Err(e) => return Err(e),
+                                       }
+                               };
+
+                               match bytes_read {
+                                       0 => None,
+                                       _ => {
+                                               // Remove trailing CRLF
+                                               if line.ends_with('\n') { line.pop(); if line.ends_with('\r') { line.pop(); } }
+                                               Some(line)
+                                       },
+                               }
+                       } }
+               }
 
                // Read and parse status line
-               let status_line = read_line!()
+               // Note that we allow retrying a few times to reach TCP_STREAM_RESPONSE_TIMEOUT.
+               let status_line = read_line!(TCP_STREAM_RESPONSE_TIMEOUT.as_secs() / TCP_STREAM_TIMEOUT.as_secs())
                        .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no status line"))?;
                let status = HttpStatus::parse(&status_line)?;
 
@@ -248,32 +280,27 @@ impl HttpClient {
                        }
                }
 
-               if !status.is_ok() {
-                       // TODO: Handle 3xx redirection responses.
-                       return Err(std::io::Error::new(std::io::ErrorKind::NotFound, "not found"));
-               }
-
                // Read message body
                let read_limit = MAX_HTTP_MESSAGE_BODY_SIZE - reader.buffer().len();
                reader.get_mut().set_limit(read_limit as u64);
-               match message_length {
-                       HttpMessageLength::Empty => { Ok(Vec::new()) },
+               let contents = match message_length {
+                       HttpMessageLength::Empty => { Vec::new() },
                        HttpMessageLength::ContentLength(length) => {
                                if length == 0 || length > MAX_HTTP_MESSAGE_BODY_SIZE {
-                                       Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "out of range"))
+                                       return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "out of range"))
                                } else {
                                        let mut content = vec![0; length];
                                        #[cfg(feature = "tokio")]
                                        reader.read_exact(&mut content[..]).await?;
                                        #[cfg(not(feature = "tokio"))]
                                        reader.read_exact(&mut content[..])?;
-                                       Ok(content)
+                                       content
                                }
                        },
                        HttpMessageLength::TransferEncoding(coding) => {
                                if !coding.eq_ignore_ascii_case("chunked") {
-                                       Err(std::io::Error::new(
-                                                       std::io::ErrorKind::InvalidInput, "unsupported transfer coding"))
+                                       return Err(std::io::Error::new(
+                                               std::io::ErrorKind::InvalidInput, "unsupported transfer coding"))
                                } else {
                                        let mut content = Vec::new();
                                        #[cfg(feature = "tokio")]
@@ -308,17 +335,44 @@ impl HttpClient {
                                                        reader.read_exact(&mut content[chunk_offset..]).await?;
                                                        content.resize(chunk_offset + chunk_size, 0);
                                                }
-                                               Ok(content)
+                                               content
                                        }
                                        #[cfg(not(feature = "tokio"))]
                                        {
                                                let mut decoder = chunked_transfer::Decoder::new(reader);
                                                decoder.read_to_end(&mut content)?;
-                                               Ok(content)
+                                               content
                                        }
                                }
                        },
+               };
+
+               if !status.is_ok() {
+                       // TODO: Handle 3xx redirection responses.
+                       let error = HttpError {
+                               status_code: status.code.to_string(),
+                               contents,
+                       };
+                       return Err(std::io::Error::new(std::io::ErrorKind::Other, error));
                }
+
+               Ok(contents)
+       }
+}
+
+/// HTTP error consisting of a status code and body contents.
+#[derive(Debug)]
+pub(crate) struct HttpError {
+       pub(crate) status_code: String,
+       pub(crate) contents: Vec<u8>,
+}
+
+impl std::error::Error for HttpError {}
+
+impl fmt::Display for HttpError {
+       fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+               let contents = String::from_utf8_lossy(&self.contents);
+               write!(f, "status_code: {}, contents: {}", self.status_code, contents)
        }
 }
 
@@ -497,16 +551,16 @@ pub(crate) mod client_tests {
        }
 
        impl HttpServer {
-               pub fn responding_with_ok<T: ToString>(body: MessageBody<T>) -> Self {
+               fn responding_with_body<T: ToString>(status: &str, body: MessageBody<T>) -> Self {
                        let response = match body {
-                               MessageBody::Empty => "HTTP/1.1 200 OK\r\n\r\n".to_string(),
+                               MessageBody::Empty => format!("{}\r\n\r\n", status),
                                MessageBody::Content(body) => {
                                        let body = body.to_string();
                                        format!(
-                                               "HTTP/1.1 200 OK\r\n\
+                                               "{}\r\n\
                                                 Content-Length: {}\r\n\
                                                 \r\n\
-                                                {}", body.len(), body)
+                                                {}", status, body.len(), body)
                                },
                                MessageBody::ChunkedContent(body) => {
                                        let mut chuncked_body = Vec::new();
@@ -516,18 +570,26 @@ pub(crate) mod client_tests {
                                                encoder.write_all(body.to_string().as_bytes()).unwrap();
                                        }
                                        format!(
-                                               "HTTP/1.1 200 OK\r\n\
+                                               "{}\r\n\
                                                 Transfer-Encoding: chunked\r\n\
                                                 \r\n\
-                                                {}", String::from_utf8(chuncked_body).unwrap())
+                                                {}", status, String::from_utf8(chuncked_body).unwrap())
                                },
                        };
                        HttpServer::responding_with(response)
                }
 
+               pub fn responding_with_ok<T: ToString>(body: MessageBody<T>) -> Self {
+                       HttpServer::responding_with_body("HTTP/1.1 200 OK", body)
+               }
+
                pub fn responding_with_not_found() -> Self {
-                       let response = "HTTP/1.1 404 Not Found\r\n\r\n".to_string();
-                       HttpServer::responding_with(response)
+                       HttpServer::responding_with_body::<String>("HTTP/1.1 404 Not Found", MessageBody::Empty)
+               }
+
+               pub fn responding_with_server_error<T: ToString>(content: T) -> Self {
+                       let body = MessageBody::Content(content);
+                       HttpServer::responding_with_body("HTTP/1.1 500 Internal Server Error", body)
                }
 
                fn responding_with(response: String) -> Self {
@@ -689,6 +751,22 @@ pub(crate) mod client_tests {
                }
        }
 
+       #[tokio::test]
+       async fn read_error() {
+               let server = HttpServer::responding_with_server_error("foo");
+
+               let mut client = HttpClient::connect(&server.endpoint()).unwrap();
+               match client.get::<JsonResponse>("/foo", "foo.com").await {
+                       Err(e) => {
+                               assert_eq!(e.kind(), std::io::ErrorKind::Other);
+                               let http_error = e.into_inner().unwrap().downcast::<HttpError>().unwrap();
+                               assert_eq!(http_error.status_code, "500");
+                               assert_eq!(http_error.contents, "foo".as_bytes());
+                       },
+                       Ok(_) => panic!("Expected error"),
+               }
+       }
+
        #[tokio::test]
        async fn read_empty_message_body() {
                let server = HttpServer::responding_with_ok::<String>(MessageBody::Empty);