X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-block-sync%2Fsrc%2Frpc.rs;h=d296088ae7eeaf9a1cc07e9ccfdfb7c8a6a2fd42;hb=2f734f97550014e5424e55523ed46d76b94b737d;hp=f04769560246f8537e1e022efa22c8b7a815eab4;hpb=0038a3f7dea705823939909c60bbd3826362bc1f;p=rust-lightning diff --git a/lightning-block-sync/src/rpc.rs b/lightning-block-sync/src/rpc.rs index f0476956..d296088a 100644 --- a/lightning-block-sync/src/rpc.rs +++ b/lightning-block-sync/src/rpc.rs @@ -3,23 +3,46 @@ use crate::{BlockData, BlockHeaderData, BlockSource, AsyncBlockSourceResult}; use crate::http::{HttpClient, HttpEndpoint, HttpError, JsonResponse}; +use crate::gossip::UtxoSource; use bitcoin::hash_types::BlockHash; -use bitcoin::hashes::hex::ToHex; +use bitcoin::OutPoint; -use futures_util::lock::Mutex; +use std::sync::Mutex; use serde_json; use std::convert::TryFrom; use std::convert::TryInto; +use std::error::Error; +use std::fmt; use std::sync::atomic::{AtomicUsize, Ordering}; +/// An error returned by the RPC server. +#[derive(Debug)] +pub struct RpcError { + /// The error code. + pub code: i64, + /// The error message. + pub message: String, +} + +impl fmt::Display for RpcError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "RPC error {}: {}", self.code, self.message) + } +} + +impl Error for RpcError {} + /// A simple RPC client for calling methods using HTTP `POST`. +/// +/// Implements [`BlockSource`] and may return an `Err` containing [`RpcError`]. See +/// [`RpcClient::call_method`] for details. pub struct RpcClient { basic_auth: String, endpoint: HttpEndpoint, - client: Mutex, + client: Mutex>, id: AtomicUsize, } @@ -28,16 +51,18 @@ impl RpcClient { /// credentials should be a base64 encoding of a user name and password joined by a colon, as is /// required for HTTP basic access authentication. pub fn new(credentials: &str, endpoint: HttpEndpoint) -> std::io::Result { - let client = Mutex::new(HttpClient::connect(&endpoint)?); Ok(Self { basic_auth: "Basic ".to_string() + credentials, endpoint, - client, + client: Mutex::new(None), id: AtomicUsize::new(0), }) } /// Calls a method with the response encoded in JSON format and interpreted as type `T`. + /// + /// When an `Err` is returned, [`std::io::Error::into_inner`] may contain an [`RpcError`] if + /// [`std::io::Error::kind`] is [`std::io::ErrorKind::Other`]. pub async fn call_method(&self, method: &str, params: &[serde_json::Value]) -> std::io::Result where JsonResponse: TryFrom, Error = std::io::Error> + TryInto { let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port()); @@ -48,7 +73,12 @@ impl RpcClient { "id": &self.id.fetch_add(1, Ordering::AcqRel).to_string() }); - let mut response = match self.client.lock().await.post::(&uri, &host, &self.basic_auth, content).await { + let mut client = if let Some(client) = self.client.lock().unwrap().take() { client } + else { HttpClient::connect(&self.endpoint)? }; + let http_response = client.post::(&uri, &host, &self.basic_auth, content).await; + *self.client.lock().unwrap() = Some(client); + + let mut response = match http_response { Ok(JsonResponse(response)) => response, Err(e) if e.kind() == std::io::ErrorKind::Other => { match e.get_ref().unwrap().downcast_ref::() { @@ -69,30 +99,34 @@ impl RpcClient { let error = &response["error"]; if !error.is_null() { // TODO: Examine error code for a more precise std::io::ErrorKind. - let message = error["message"].as_str().unwrap_or("unknown error"); - return Err(std::io::Error::new(std::io::ErrorKind::Other, message)); + let rpc_error = RpcError { + code: error["code"].as_i64().unwrap_or(-1), + message: error["message"].as_str().unwrap_or("unknown error").to_string() + }; + return Err(std::io::Error::new(std::io::ErrorKind::Other, rpc_error)); } - let result = &mut response["result"]; - if result.is_null() { - return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON result")); - } + let result = match response.get_mut("result") { + Some(result) => result.take(), + None => + return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON result")), + }; - JsonResponse(result.take()).try_into() + JsonResponse(result).try_into() } } impl BlockSource for RpcClient { fn get_header<'a>(&'a self, header_hash: &'a BlockHash, _height: Option) -> AsyncBlockSourceResult<'a, BlockHeaderData> { Box::pin(async move { - let header_hash = serde_json::json!(header_hash.to_hex()); + let header_hash = serde_json::json!(header_hash.to_string()); Ok(self.call_method("getblockheader", &[header_hash]).await?) }) } fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, BlockData> { Box::pin(async move { - let header_hash = serde_json::json!(header_hash.to_hex()); + let header_hash = serde_json::json!(header_hash.to_string()); let verbosity = serde_json::json!(0); Ok(BlockData::FullBlock(self.call_method("getblock", &[header_hash, verbosity]).await?)) }) @@ -105,11 +139,33 @@ impl BlockSource for RpcClient { } } +impl UtxoSource for RpcClient { + fn get_block_hash_by_height<'a>(&'a self, block_height: u32) -> AsyncBlockSourceResult<'a, BlockHash> { + Box::pin(async move { + let height_param = serde_json::json!(block_height); + Ok(self.call_method("getblockhash", &[height_param]).await?) + }) + } + + fn is_output_unspent<'a>(&'a self, outpoint: OutPoint) -> AsyncBlockSourceResult<'a, bool> { + Box::pin(async move { + let txid_param = serde_json::json!(outpoint.txid.to_string()); + let vout_param = serde_json::json!(outpoint.vout); + let include_mempool = serde_json::json!(false); + let utxo_opt: serde_json::Value = self.call_method( + "gettxout", &[txid_param, vout_param, include_mempool]).await?; + Ok(!utxo_opt.is_null()) + }) + } +} + #[cfg(test)] mod tests { use super::*; use crate::http::client_tests::{HttpServer, MessageBody}; + use bitcoin::hashes::Hash; + /// Credentials encoded in base64. const CREDENTIALS: &'static str = "dXNlcjpwYXNzd29yZA=="; @@ -163,7 +219,9 @@ mod tests { match client.call_method::("getblock", &[invalid_block_hash]).await { Err(e) => { assert_eq!(e.kind(), std::io::ErrorKind::Other); - assert_eq!(e.get_ref().unwrap().to_string(), "invalid parameter"); + let rpc_error: Box = e.into_inner().unwrap().downcast().unwrap(); + assert_eq!(rpc_error.code, -8); + assert_eq!(rpc_error.message, "invalid parameter"); }, Ok(_) => panic!("Expected error"), } @@ -171,7 +229,7 @@ mod tests { #[tokio::test] async fn call_method_returning_missing_result() { - let response = serde_json::json!({ "result": null }); + let response = serde_json::json!({ }); let server = HttpServer::responding_with_ok(MessageBody::Content(response)); let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); @@ -210,4 +268,24 @@ mod tests { Ok(count) => assert_eq!(count, 654470), } } + + #[tokio::test] + async fn fails_to_fetch_spent_utxo() { + let response = serde_json::json!({ "result": null }); + let server = HttpServer::responding_with_ok(MessageBody::Content(response)); + let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + let outpoint = OutPoint::new(bitcoin::Txid::from_byte_array([0; 32]), 0); + let unspent_output = client.is_output_unspent(outpoint).await.unwrap(); + assert_eq!(unspent_output, false); + } + + #[tokio::test] + async fn fetches_utxo() { + let response = serde_json::json!({ "result": {"bestblock": 1, "confirmations": 42}}); + let server = HttpServer::responding_with_ok(MessageBody::Content(response)); + let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + let outpoint = OutPoint::new(bitcoin::Txid::from_byte_array([0; 32]), 0); + let unspent_output = client.is_output_unspent(outpoint).await.unwrap(); + assert_eq!(unspent_output, true); + } }