Fix unused warning for un-accessed enum variant field in net-tokio
[rust-lightning] / lightning-block-sync / src / rpc.rs
index 34cbd2e02c028eb51a364f431f509cabdee4c4c0..d296088ae7eeaf9a1cc07e9ccfdfb7c8a6a2fd42 100644 (file)
@@ -1,21 +1,48 @@
-use crate::{BlockHeaderData, BlockSource, AsyncBlockSourceResult};
-use crate::http::{HttpClient, HttpEndpoint, JsonResponse};
+//! Simple RPC client implementation which implements [`BlockSource`] against a Bitcoin Core RPC
+//! endpoint.
+
+use crate::{BlockData, BlockHeaderData, BlockSource, AsyncBlockSourceResult};
+use crate::http::{HttpClient, HttpEndpoint, HttpError, JsonResponse};
+use crate::gossip::UtxoSource;
 
-use bitcoin::blockdata::block::Block;
 use bitcoin::hash_types::BlockHash;
-use bitcoin::hashes::hex::ToHex;
+use bitcoin::OutPoint;
+
+use std::sync::Mutex;
 
 use serde_json;
 
 use std::convert::TryFrom;
 use std::convert::TryInto;
+use std::error::Error;
+use std::fmt;
 use std::sync::atomic::{AtomicUsize, Ordering};
 
+/// An error returned by the RPC server.
+#[derive(Debug)]
+pub struct RpcError {
+       /// The error code.
+       pub code: i64,
+       /// The error message.
+       pub message: String,
+}
+
+impl fmt::Display for RpcError {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "RPC error {}: {}", self.code, self.message)
+    }
+}
+
+impl Error for RpcError {}
+
 /// A simple RPC client for calling methods using HTTP `POST`.
+///
+/// Implements [`BlockSource`] and may return an `Err` containing [`RpcError`]. See
+/// [`RpcClient::call_method`] for details.
 pub struct RpcClient {
        basic_auth: String,
        endpoint: HttpEndpoint,
-       client: HttpClient,
+       client: Mutex<Option<HttpClient>>,
        id: AtomicUsize,
 }
 
@@ -24,17 +51,19 @@ impl RpcClient {
        /// credentials should be a base64 encoding of a user name and password joined by a colon, as is
        /// required for HTTP basic access authentication.
        pub fn new(credentials: &str, endpoint: HttpEndpoint) -> std::io::Result<Self> {
-               let client = HttpClient::connect(&endpoint)?;
                Ok(Self {
                        basic_auth: "Basic ".to_string() + credentials,
                        endpoint,
-                       client,
+                       client: Mutex::new(None),
                        id: AtomicUsize::new(0),
                })
        }
 
        /// Calls a method with the response encoded in JSON format and interpreted as type `T`.
-       async fn call_method<T>(&mut self, method: &str, params: &[serde_json::Value]) -> std::io::Result<T>
+       ///
+       /// When an `Err` is returned, [`std::io::Error::into_inner`] may contain an [`RpcError`] if
+       /// [`std::io::Error::kind`] is [`std::io::ErrorKind::Other`].
+       pub async fn call_method<T>(&self, method: &str, params: &[serde_json::Value]) -> std::io::Result<T>
        where JsonResponse: TryFrom<Vec<u8>, Error = std::io::Error> + TryInto<T, Error = std::io::Error> {
                let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port());
                let uri = self.endpoint.path();
@@ -44,8 +73,25 @@ impl RpcClient {
                        "id": &self.id.fetch_add(1, Ordering::AcqRel).to_string()
                });
 
-               let mut response = self.client.post::<JsonResponse>(&uri, &host, &self.basic_auth, content)
-                       .await?.0;
+               let mut client = if let Some(client) = self.client.lock().unwrap().take() { client }
+                       else { HttpClient::connect(&self.endpoint)? };
+               let http_response = client.post::<JsonResponse>(&uri, &host, &self.basic_auth, content).await;
+               *self.client.lock().unwrap() = Some(client);
+
+               let mut response = match http_response {
+                       Ok(JsonResponse(response)) => response,
+                       Err(e) if e.kind() == std::io::ErrorKind::Other => {
+                               match e.get_ref().unwrap().downcast_ref::<HttpError>() {
+                                       Some(http_error) => match JsonResponse::try_from(http_error.contents.clone()) {
+                                               Ok(JsonResponse(response)) => response,
+                                               Err(_) => Err(e)?,
+                                       },
+                                       None => Err(e)?,
+                               }
+                       },
+                       Err(e) => Err(e)?,
+               };
+
                if !response.is_object() {
                        return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON object"));
                }
@@ -53,47 +99,73 @@ impl RpcClient {
                let error = &response["error"];
                if !error.is_null() {
                        // TODO: Examine error code for a more precise std::io::ErrorKind.
-                       let message = error["message"].as_str().unwrap_or("unknown error");
-                       return Err(std::io::Error::new(std::io::ErrorKind::Other, message));
+                       let rpc_error = RpcError { 
+                               code: error["code"].as_i64().unwrap_or(-1), 
+                               message: error["message"].as_str().unwrap_or("unknown error").to_string() 
+                       };
+                       return Err(std::io::Error::new(std::io::ErrorKind::Other, rpc_error));
                }
 
-               let result = &mut response["result"];
-               if result.is_null() {
-                       return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON result"));
-               }
+               let result = match response.get_mut("result") {
+                       Some(result) => result.take(),
+                       None =>
+                               return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON result")),
+               };
 
-               JsonResponse(result.take()).try_into()
+               JsonResponse(result).try_into()
        }
 }
 
 impl BlockSource for RpcClient {
-       fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, _height: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
+       fn get_header<'a>(&'a self, header_hash: &'a BlockHash, _height: Option<u32>) -> AsyncBlockSourceResult<'a, BlockHeaderData> {
                Box::pin(async move {
-                       let header_hash = serde_json::json!(header_hash.to_hex());
+                       let header_hash = serde_json::json!(header_hash.to_string());
                        Ok(self.call_method("getblockheader", &[header_hash]).await?)
                })
        }
 
-       fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> {
+       fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, BlockData> {
                Box::pin(async move {
-                       let header_hash = serde_json::json!(header_hash.to_hex());
+                       let header_hash = serde_json::json!(header_hash.to_string());
                        let verbosity = serde_json::json!(0);
-                       Ok(self.call_method("getblock", &[header_hash, verbosity]).await?)
+                       Ok(BlockData::FullBlock(self.call_method("getblock", &[header_hash, verbosity]).await?))
                })
        }
 
-       fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
+       fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<'a, (BlockHash, Option<u32>)> {
                Box::pin(async move {
                        Ok(self.call_method("getblockchaininfo", &[]).await?)
                })
        }
 }
 
+impl UtxoSource for RpcClient {
+       fn get_block_hash_by_height<'a>(&'a self, block_height: u32) -> AsyncBlockSourceResult<'a, BlockHash> {
+               Box::pin(async move {
+                       let height_param = serde_json::json!(block_height);
+                       Ok(self.call_method("getblockhash", &[height_param]).await?)
+               })
+       }
+
+       fn is_output_unspent<'a>(&'a self, outpoint: OutPoint) -> AsyncBlockSourceResult<'a, bool> {
+               Box::pin(async move {
+                       let txid_param = serde_json::json!(outpoint.txid.to_string());
+                       let vout_param = serde_json::json!(outpoint.vout);
+                       let include_mempool = serde_json::json!(false);
+                       let utxo_opt: serde_json::Value = self.call_method(
+                               "gettxout", &[txid_param, vout_param, include_mempool]).await?;
+                       Ok(!utxo_opt.is_null())
+               })
+       }
+}
+
 #[cfg(test)]
 mod tests {
        use super::*;
        use crate::http::client_tests::{HttpServer, MessageBody};
 
+       use bitcoin::hashes::Hash;
+
        /// Credentials encoded in base64.
        const CREDENTIALS: &'static str = "dXNlcjpwYXNzd29yZA==";
 
@@ -112,10 +184,10 @@ mod tests {
        #[tokio::test]
        async fn call_method_returning_unknown_response() {
                let server = HttpServer::responding_with_not_found();
-               let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+               let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
 
                match client.call_method::<u64>("getblockcount", &[]).await {
-                       Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::NotFound),
+                       Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::Other),
                        Ok(_) => panic!("Expected error"),
                }
        }
@@ -124,7 +196,7 @@ mod tests {
        async fn call_method_returning_malfomred_response() {
                let response = serde_json::json!("foo");
                let server = HttpServer::responding_with_ok(MessageBody::Content(response));
-               let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+               let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
 
                match client.call_method::<u64>("getblockcount", &[]).await {
                        Err(e) => {
@@ -140,14 +212,16 @@ mod tests {
                let response = serde_json::json!({
                        "error": { "code": -8, "message": "invalid parameter" },
                });
-               let server = HttpServer::responding_with_ok(MessageBody::Content(response));
-               let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+               let server = HttpServer::responding_with_server_error(response);
+               let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
 
                let invalid_block_hash = serde_json::json!("foo");
                match client.call_method::<u64>("getblock", &[invalid_block_hash]).await {
                        Err(e) => {
                                assert_eq!(e.kind(), std::io::ErrorKind::Other);
-                               assert_eq!(e.get_ref().unwrap().to_string(), "invalid parameter");
+                               let rpc_error: Box<RpcError> = e.into_inner().unwrap().downcast().unwrap();
+                               assert_eq!(rpc_error.code, -8);
+                               assert_eq!(rpc_error.message, "invalid parameter");
                        },
                        Ok(_) => panic!("Expected error"),
                }
@@ -155,9 +229,9 @@ mod tests {
 
        #[tokio::test]
        async fn call_method_returning_missing_result() {
-               let response = serde_json::json!({ "result": null });
+               let response = serde_json::json!({  });
                let server = HttpServer::responding_with_ok(MessageBody::Content(response));
-               let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+               let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
 
                match client.call_method::<u64>("getblockcount", &[]).await {
                        Err(e) => {
@@ -172,7 +246,7 @@ mod tests {
        async fn call_method_returning_malformed_result() {
                let response = serde_json::json!({ "result": "foo" });
                let server = HttpServer::responding_with_ok(MessageBody::Content(response));
-               let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+               let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
 
                match client.call_method::<u64>("getblockcount", &[]).await {
                        Err(e) => {
@@ -187,11 +261,31 @@ mod tests {
        async fn call_method_returning_valid_result() {
                let response = serde_json::json!({ "result": 654470 });
                let server = HttpServer::responding_with_ok(MessageBody::Content(response));
-               let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+               let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
 
                match client.call_method::<u64>("getblockcount", &[]).await {
                        Err(e) => panic!("Unexpected error: {:?}", e),
                        Ok(count) => assert_eq!(count, 654470),
                }
        }
+
+       #[tokio::test]
+       async fn fails_to_fetch_spent_utxo() {
+               let response = serde_json::json!({ "result": null });
+               let server = HttpServer::responding_with_ok(MessageBody::Content(response));
+               let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+               let outpoint = OutPoint::new(bitcoin::Txid::from_byte_array([0; 32]), 0);
+               let unspent_output = client.is_output_unspent(outpoint).await.unwrap();
+               assert_eq!(unspent_output, false);
+       }
+
+       #[tokio::test]
+       async fn fetches_utxo() {
+               let response = serde_json::json!({ "result": {"bestblock": 1, "confirmations": 42}});
+               let server = HttpServer::responding_with_ok(MessageBody::Content(response));
+               let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap();
+               let outpoint = OutPoint::new(bitcoin::Txid::from_byte_array([0; 32]), 0);
+               let unspent_output = client.is_output_unspent(outpoint).await.unwrap();
+               assert_eq!(unspent_output, true);
+       }
 }