Merge pull request #1706 from jkczyz/2022-09-filtered-blocks
[rust-lightning] / lightning-block-sync / src / rpc.rs
index 66570bcb57c032353ab0d6a2f500bcda91740381..6e78654a9714a90b6438d6655c132db6fbbf45ee 100644 (file)
@@ -1,13 +1,14 @@
 //! Simple RPC client implementation which implements [`BlockSource`] against a Bitcoin Core RPC
 //! endpoint.
 
-use crate::{BlockHeaderData, BlockSource, AsyncBlockSourceResult};
-use crate::http::{HttpClient, HttpEndpoint, JsonResponse};
+use crate::{BlockData, BlockHeaderData, BlockSource, AsyncBlockSourceResult};
+use crate::http::{HttpClient, HttpEndpoint, HttpError, JsonResponse};
 
-use bitcoin::blockdata::block::Block;
 use bitcoin::hash_types::BlockHash;
 use bitcoin::hashes::hex::ToHex;
 
+use futures::lock::Mutex;
+
 use serde_json;
 
 use std::convert::TryFrom;
@@ -18,7 +19,7 @@ use std::sync::atomic::{AtomicUsize, Ordering};
 pub struct RpcClient {
        basic_auth: String,
        endpoint: HttpEndpoint,
-       client: HttpClient,
+       client: Mutex<HttpClient>,
        id: AtomicUsize,
 }
 
@@ -27,7 +28,7 @@ impl RpcClient {
        /// credentials should be a base64 encoding of a user name and password joined by a colon, as is
        /// required for HTTP basic access authentication.
        pub fn new(credentials: &str, endpoint: HttpEndpoint) -> std::io::Result<Self> {
-               let client = HttpClient::connect(&endpoint)?;
+               let client = Mutex::new(HttpClient::connect(&endpoint)?);
                Ok(Self {
                        basic_auth: "Basic ".to_string() + credentials,
                        endpoint,
@@ -37,7 +38,7 @@ impl RpcClient {
        }
 
        /// Calls a method with the response encoded in JSON format and interpreted as type `T`.
-       pub async fn call_method<T>(&mut self, method: &str, params: &[serde_json::Value]) -> std::io::Result<T>
+       pub async fn call_method<T>(&self, method: &str, params: &[serde_json::Value]) -> std::io::Result<T>
        where JsonResponse: TryFrom<Vec<u8>, Error = std::io::Error> + TryInto<T, Error = std::io::Error> {
                let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port());
                let uri = self.endpoint.path();
@@ -47,8 +48,20 @@ impl RpcClient {
                        "id": &self.id.fetch_add(1, Ordering::AcqRel).to_string()
                });
 
-               let mut response = self.client.post::<JsonResponse>(&uri, &host, &self.basic_auth, content)
-                       .await?.0;
+               let mut response = match self.client.lock().await.post::<JsonResponse>(&uri, &host, &self.basic_auth, content).await {
+                       Ok(JsonResponse(response)) => response,
+                       Err(e) if e.kind() == std::io::ErrorKind::Other => {
+                               match e.get_ref().unwrap().downcast_ref::<HttpError>() {
+                                       Some(http_error) => match JsonResponse::try_from(http_error.contents.clone()) {
+                                               Ok(JsonResponse(response)) => response,
+                                               Err(_) => Err(e)?,
+                                       },
+                                       None => Err(e)?,
+                               }
+                       },
+                       Err(e) => Err(e)?,
+               };
+
                if !response.is_object() {
                        return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON object"));
                }
@@ -70,22 +83,22 @@ impl RpcClient {
 }
 
 impl BlockSource for RpcClient {
-       fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, _height: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
+       fn get_header<'a>(&'a self, header_hash: &'a BlockHash, _height: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
                Box::pin(async move {
                        let header_hash = serde_json::json!(header_hash.to_hex());
                        Ok(self.call_method("getblockheader", &[header_hash]).await?)
                })
        }
 
-       fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
+       fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, BlockData> {
                Box::pin(async move {
                        let header_hash = serde_json::json!(header_hash.to_hex());
                        let verbosity = serde_json::json!(0);
-                       Ok(self.call_method("getblock", &[header_hash, verbosity]).await?)
+                       Ok(BlockData::FullBlock(self.call_method("getblock", &[header_hash, verbosity]).await?))
                })
        }
 
-       fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
+       fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
                Box::pin(async move {
                        Ok(self.call_method("getblockchaininfo", &[]).await?)
                })
@@ -115,7 +128,7 @@ mod tests {
        #[tokio::test]
        async fn call_method_returning_unknown_response() {
                let server = HttpServer::responding_with_not_found();
-               let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+               let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
 
                match client.call_method::<u64>("getblockcount", &[]).await {
                        Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::Other),
@@ -127,7 +140,7 @@ mod tests {
        async fn call_method_returning_malfomred_response() {
                let response = serde_json::json!("foo");
                let server = HttpServer::responding_with_ok(MessageBody::Content(response));
-               let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+               let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
 
                match client.call_method::<u64>("getblockcount", &[]).await {
                        Err(e) => {
@@ -143,8 +156,8 @@ mod tests {
                let response = serde_json::json!({
                        "error": { "code": -8, "message": "invalid parameter" },
                });
-               let server = HttpServer::responding_with_ok(MessageBody::Content(response));
-               let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+               let server = HttpServer::responding_with_server_error(response);
+               let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
 
                let invalid_block_hash = serde_json::json!("foo");
                match client.call_method::<u64>("getblock", &[invalid_block_hash]).await {
@@ -160,7 +173,7 @@ mod tests {
        async fn call_method_returning_missing_result() {
                let response = serde_json::json!({ "result": null });
                let server = HttpServer::responding_with_ok(MessageBody::Content(response));
-               let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+               let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
 
                match client.call_method::<u64>("getblockcount", &[]).await {
                        Err(e) => {
@@ -175,7 +188,7 @@ mod tests {
        async fn call_method_returning_malformed_result() {
                let response = serde_json::json!({ "result": "foo" });
                let server = HttpServer::responding_with_ok(MessageBody::Content(response));
-               let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+               let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
 
                match client.call_method::<u64>("getblockcount", &[]).await {
                        Err(e) => {
@@ -190,7 +203,7 @@ mod tests {
        async fn call_method_returning_valid_result() {
                let response = serde_json::json!({ "result": 654470 });
                let server = HttpServer::responding_with_ok(MessageBody::Content(response));
-               let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+               let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
 
                match client.call_method::<u64>("getblockcount", &[]).await {
                        Err(e) => panic!("Unexpected error: {:?}", e),