Merge pull request #915 from TheBlueMatt/2021-05-bump-rpc-timeout
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Mon, 10 May 2021 18:04:28 +0000 (18:04 +0000)
committerGitHub <noreply@github.com>
Mon, 10 May 2021 18:04:28 +0000 (18:04 +0000)
Increase the timeout for RPC responses from Bitcoin Core

lightning-block-sync/Cargo.toml
lightning-block-sync/src/http.rs

index 6125874d963eadea1c5be3bc905a0540abe6f030..00208d78b9b09b7db39259b589ea68959c4db7ee 100644 (file)
@@ -16,7 +16,7 @@ rpc-client = [ "serde", "serde_json", "chunked_transfer" ]
 [dependencies]
 bitcoin = "0.26"
 lightning = { version = "0.0.14", path = "../lightning" }
-tokio = { version = "1.0", features = [ "io-util", "net" ], optional = true }
+tokio = { version = "1.0", features = [ "io-util", "net", "time" ], optional = true }
 serde = { version = "1.0", features = ["derive"], optional = true }
 serde_json = { version = "1.0", optional = true }
 chunked_transfer = { version = "1.4", optional = true }
index f745e138fb915eddff035d814ff62de1ab05c9e8..2cfb8e50593d4aef4dc283f47655b3d6d8b5dc6d 100644 (file)
@@ -24,6 +24,12 @@ use std::net::TcpStream;
 /// Timeout for operations on TCP streams.
 const TCP_STREAM_TIMEOUT: Duration = Duration::from_secs(5);
 
+/// Timeout for reading the first byte of a response. This is separate from the general read
+/// timeout as it is not uncommon for Bitcoin Core to be blocked waiting on UTXO cache flushes for
+/// upwards of a minute or more. Note that we always retry once when we time out, so the maximum
+/// time we allow Bitcoin Core to block for is twice this value.
+const TCP_STREAM_RESPONSE_TIMEOUT: Duration = Duration::from_secs(120);
+
 /// Maximum HTTP message header size in bytes.
 const MAX_HTTP_MESSAGE_HEADER_SIZE: usize = 8192;
 
@@ -158,16 +164,19 @@ impl HttpClient {
                let endpoint = self.stream.peer_addr().unwrap();
                match self.send_request(request).await {
                        Ok(bytes) => Ok(bytes),
-                       Err(e) => match e.kind() {
-                               std::io::ErrorKind::ConnectionReset |
-                               std::io::ErrorKind::ConnectionAborted |
-                               std::io::ErrorKind::UnexpectedEof => {
-                                       // Reconnect if the connection was closed. This may happen if the server's
-                                       // keep-alive limits are reached.
-                                       *self = Self::connect(endpoint)?;
-                                       self.send_request(request).await
-                               },
-                               _ => Err(e),
+                       Err(_) => {
+                               // Reconnect and retry on fail. This can happen if the connection was closed after
+                               // the keep-alive limits are reached, or generally if the request timed out due to
+                               // Bitcoin Core being stuck on a long-running operation or its RPC queue being
+                               // full.
+                               // Block 100ms before retrying the request as in many cases the source of the error
+                               // may be persistent for some time.
+                               #[cfg(feature = "tokio")]
+                               tokio::time::sleep(Duration::from_millis(100)).await;
+                               #[cfg(not(feature = "tokio"))]
+                               std::thread::sleep(Duration::from_millis(100));
+                               *self = Self::connect(endpoint)?;
+                               self.send_request(request).await
                        },
                }
        }
@@ -206,25 +215,44 @@ impl HttpClient {
                #[cfg(not(feature = "tokio"))]
                let mut reader = std::io::BufReader::new(limited_stream);
 
-               macro_rules! read_line { () => { {
-                       let mut line = String::new();
-                       #[cfg(feature = "tokio")]
-                       let bytes_read = reader.read_line(&mut line).await?;
-                       #[cfg(not(feature = "tokio"))]
-                       let bytes_read = reader.read_line(&mut line)?;
-
-                       match bytes_read {
-                               0 => None,
-                               _ => {
-                                       // Remove trailing CRLF
-                                       if line.ends_with('\n') { line.pop(); if line.ends_with('\r') { line.pop(); } }
-                                       Some(line)
-                               },
-                       }
-               } } }
+               macro_rules! read_line {
+                       () => { read_line!(0) };
+                       ($retry_count: expr) => { {
+                               let mut line = String::new();
+                               let mut timeout_count: u64 = 0;
+                               let bytes_read = loop {
+                                       #[cfg(feature = "tokio")]
+                                       let read_res = reader.read_line(&mut line).await;
+                                       #[cfg(not(feature = "tokio"))]
+                                       let read_res = reader.read_line(&mut line);
+                                       match read_res {
+                                               Ok(bytes_read) => break bytes_read,
+                                               Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
+                                                       timeout_count += 1;
+                                                       if timeout_count > $retry_count {
+                                                               return Err(e);
+                                                       } else {
+                                                               continue;
+                                                       }
+                                               }
+                                               Err(e) => return Err(e),
+                                       }
+                               };
+
+                               match bytes_read {
+                                       0 => None,
+                                       _ => {
+                                               // Remove trailing CRLF
+                                               if line.ends_with('\n') { line.pop(); if line.ends_with('\r') { line.pop(); } }
+                                               Some(line)
+                                       },
+                               }
+                       } }
+               }
 
                // Read and parse status line
-               let status_line = read_line!()
+               // Note that we allow retrying a few times to reach TCP_STREAM_RESPONSE_TIMEOUT.
+               let status_line = read_line!(TCP_STREAM_RESPONSE_TIMEOUT.as_secs() / TCP_STREAM_TIMEOUT.as_secs())
                        .ok_or(std::io::Error::new(std::io::ErrorKind::UnexpectedEof, "no status line"))?;
                let status = HttpStatus::parse(&status_line)?;