X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-block-sync%2Fsrc%2Fhttp.rs;h=2e70e18659936e2746638ba684b36ce20c3095ca;hb=02c57f59404d3f4ae33c7bca0a39c58444d1ad0d;hp=f745e138fb915eddff035d814ff62de1ab05c9e8;hpb=2c7c1a6a7a7796d6cbef4a2549fc0a7338fc8fb9;p=rust-lightning diff --git a/lightning-block-sync/src/http.rs b/lightning-block-sync/src/http.rs index f745e138..2e70e186 100644 --- a/lightning-block-sync/src/http.rs +++ b/lightning-block-sync/src/http.rs @@ -24,6 +24,12 @@ use std::net::TcpStream; /// Timeout for operations on TCP streams. const TCP_STREAM_TIMEOUT: Duration = Duration::from_secs(5); +/// Timeout for reading the first byte of a response. This is separate from the general read +/// timeout as it is not uncommon for Bitcoin Core to be blocked waiting on UTXO cache flushes for +/// upwards of a minute or more. Note that we always retry once when we time out, so the maximum +/// time we allow Bitcoin Core to block for is twice this value. +const TCP_STREAM_RESPONSE_TIMEOUT: Duration = Duration::from_secs(120); + /// Maximum HTTP message header size in bytes. const MAX_HTTP_MESSAGE_HEADER_SIZE: usize = 8192; @@ -158,16 +164,19 @@ impl HttpClient { let endpoint = self.stream.peer_addr().unwrap(); match self.send_request(request).await { Ok(bytes) => Ok(bytes), - Err(e) => match e.kind() { - std::io::ErrorKind::ConnectionReset | - std::io::ErrorKind::ConnectionAborted | - std::io::ErrorKind::UnexpectedEof => { - // Reconnect if the connection was closed. This may happen if the server's - // keep-alive limits are reached. - *self = Self::connect(endpoint)?; - self.send_request(request).await - }, - _ => Err(e), + Err(_) => { + // Reconnect and retry on fail. This can happen if the connection was closed after + // the keep-alive limits are reached, or generally if the request timed out due to + // Bitcoin Core being stuck on a long-running operation or its RPC queue being + // full. + // Block 100ms before retrying the request as in many cases the source of the error + // may be persistent for some time. + #[cfg(feature = "tokio")] + tokio::time::sleep(Duration::from_millis(100)).await; + #[cfg(not(feature = "tokio"))] + std::thread::sleep(Duration::from_millis(100)); + *self = Self::connect(endpoint)?; + self.send_request(request).await }, } } @@ -206,25 +215,44 @@ impl HttpClient { #[cfg(not(feature = "tokio"))] let mut reader = std::io::BufReader::new(limited_stream); - macro_rules! read_line { () => { { - let mut line = String::new(); - #[cfg(feature = "tokio")] - let bytes_read = reader.read_line(&mut line).await?; - #[cfg(not(feature = "tokio"))] - let bytes_read = reader.read_line(&mut line)?; - - match bytes_read { - 0 => None, - _ => { - // Remove trailing CRLF - if line.ends_with('\n') { line.pop(); if line.ends_with('\r') { line.pop(); } } - Some(line) - }, - } - } } } + macro_rules! read_line { + () => { read_line!(0) }; + ($retry_count: expr) => { { + let mut line = String::new(); + let mut timeout_count: u64 = 0; + let bytes_read = loop { + #[cfg(feature = "tokio")] + let read_res = reader.read_line(&mut line).await; + #[cfg(not(feature = "tokio"))] + let read_res = reader.read_line(&mut line); + match read_res { + Ok(bytes_read) => break bytes_read, + Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + timeout_count += 1; + if timeout_count > $retry_count { + return Err(e); + } else { + continue; + } + } + Err(e) => return Err(e), + } + }; + + match bytes_read { + 0 => None, + _ => { + // Remove trailing CRLF + if line.ends_with('\n') { line.pop(); if line.ends_with('\r') { line.pop(); } } + Some(line) + }, + } + } } + } // Read and parse status line - let status_line = read_line!() + // Note that we allow retrying a few times to reach TCP_STREAM_RESPONSE_TIMEOUT. + let status_line = read_line!(TCP_STREAM_RESPONSE_TIMEOUT.as_secs() / TCP_STREAM_TIMEOUT.as_secs()) .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no status line"))?; let status = HttpStatus::parse(&status_line)?; @@ -251,32 +279,27 @@ impl HttpClient { } } - if !status.is_ok() { - // TODO: Handle 3xx redirection responses. - return Err(std::io::Error::new(std::io::ErrorKind::NotFound, "not found")); - } - // Read message body let read_limit = MAX_HTTP_MESSAGE_BODY_SIZE - reader.buffer().len(); reader.get_mut().set_limit(read_limit as u64); - match message_length { - HttpMessageLength::Empty => { Ok(Vec::new()) }, + let contents = match message_length { + HttpMessageLength::Empty => { Vec::new() }, HttpMessageLength::ContentLength(length) => { if length == 0 || length > MAX_HTTP_MESSAGE_BODY_SIZE { - Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "out of range")) + return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "out of range")) } else { let mut content = vec![0; length]; #[cfg(feature = "tokio")] reader.read_exact(&mut content[..]).await?; #[cfg(not(feature = "tokio"))] reader.read_exact(&mut content[..])?; - Ok(content) + content } }, HttpMessageLength::TransferEncoding(coding) => { if !coding.eq_ignore_ascii_case("chunked") { - Err(std::io::Error::new( - std::io::ErrorKind::InvalidInput, "unsupported transfer coding")) + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidInput, "unsupported transfer coding")) } else { let mut content = Vec::new(); #[cfg(feature = "tokio")] @@ -311,17 +334,32 @@ impl HttpClient { reader.read_exact(&mut content[chunk_offset..]).await?; content.resize(chunk_offset + chunk_size, 0); } - Ok(content) + content } #[cfg(not(feature = "tokio"))] { let mut decoder = chunked_transfer::Decoder::new(reader); decoder.read_to_end(&mut content)?; - Ok(content) + content } } }, + }; + + if !status.is_ok() { + // TODO: Handle 3xx redirection responses. + let error_details = match String::from_utf8(contents) { + // Check that the string is all-ASCII with no control characters before returning + // it. + Ok(s) if s.as_bytes().iter().all(|c| c.is_ascii() && !c.is_ascii_control()) => s, + _ => "binary".to_string() + }; + let error_msg = format!("Errored with status: {} and contents: {}", + status.code, error_details); + return Err(std::io::Error::new(std::io::ErrorKind::Other, error_msg)); } + + Ok(contents) } } @@ -692,6 +730,23 @@ pub(crate) mod client_tests { } } + #[tokio::test] + async fn read_error() { + let response = String::from( + "HTTP/1.1 500 Internal Server Error\r\n\ + Content-Length: 10\r\n\r\ntest error\r\n"); + let server = HttpServer::responding_with(response); + + let mut client = HttpClient::connect(&server.endpoint()).unwrap(); + match client.get::("/foo", "foo.com").await { + Err(e) => { + assert_eq!(e.get_ref().unwrap().to_string(), "Errored with status: 500 and contents: test error"); + assert_eq!(e.kind(), std::io::ErrorKind::Other); + }, + Ok(_) => panic!("Expected error"), + } + } + #[tokio::test] async fn read_empty_message_body() { let server = HttpServer::responding_with_ok::(MessageBody::Empty);