X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;ds=sidebyside;f=lightning-block-sync%2Fsrc%2Frpc.rs;h=c778279ae24141f9b4df9e3db48f662e91592cb0;hb=8164cb930714129cde4d3f671f6f0b82621285c0;hp=34cbd2e02c028eb51a364f431f509cabdee4c4c0;hpb=c35002fa9c16042badfa5e7bf819df5f1d2ae60a;p=rust-lightning diff --git a/lightning-block-sync/src/rpc.rs b/lightning-block-sync/src/rpc.rs index 34cbd2e0..c778279a 100644 --- a/lightning-block-sync/src/rpc.rs +++ b/lightning-block-sync/src/rpc.rs @@ -1,21 +1,47 @@ -use crate::{BlockHeaderData, BlockSource, AsyncBlockSourceResult}; -use crate::http::{HttpClient, HttpEndpoint, JsonResponse}; +//! Simple RPC client implementation which implements [`BlockSource`] against a Bitcoin Core RPC +//! endpoint. + +use crate::{BlockData, BlockHeaderData, BlockSource, AsyncBlockSourceResult}; +use crate::http::{HttpClient, HttpEndpoint, HttpError, JsonResponse}; -use bitcoin::blockdata::block::Block; use bitcoin::hash_types::BlockHash; use bitcoin::hashes::hex::ToHex; +use std::sync::Mutex; + use serde_json; use std::convert::TryFrom; use std::convert::TryInto; +use std::error::Error; +use std::fmt; use std::sync::atomic::{AtomicUsize, Ordering}; +/// An error returned by the RPC server. +#[derive(Debug)] +pub struct RpcError { + /// The error code. + pub code: i64, + /// The error message. + pub message: String, +} + +impl fmt::Display for RpcError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "RPC error {}: {}", self.code, self.message) + } +} + +impl Error for RpcError {} + /// A simple RPC client for calling methods using HTTP `POST`. +/// +/// Implements [`BlockSource`] and may return an `Err` containing [`RpcError`]. See +/// [`RpcClient::call_method`] for details. pub struct RpcClient { basic_auth: String, endpoint: HttpEndpoint, - client: HttpClient, + client: Mutex>, id: AtomicUsize, } @@ -24,17 +50,19 @@ impl RpcClient { /// credentials should be a base64 encoding of a user name and password joined by a colon, as is /// required for HTTP basic access authentication. pub fn new(credentials: &str, endpoint: HttpEndpoint) -> std::io::Result { - let client = HttpClient::connect(&endpoint)?; Ok(Self { basic_auth: "Basic ".to_string() + credentials, endpoint, - client, + client: Mutex::new(None), id: AtomicUsize::new(0), }) } /// Calls a method with the response encoded in JSON format and interpreted as type `T`. - async fn call_method(&mut self, method: &str, params: &[serde_json::Value]) -> std::io::Result + /// + /// When an `Err` is returned, [`std::io::Error::into_inner`] may contain an [`RpcError`] if + /// [`std::io::Error::kind`] is [`std::io::ErrorKind::Other`]. + pub async fn call_method(&self, method: &str, params: &[serde_json::Value]) -> std::io::Result where JsonResponse: TryFrom, Error = std::io::Error> + TryInto { let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port()); let uri = self.endpoint.path(); @@ -44,8 +72,25 @@ impl RpcClient { "id": &self.id.fetch_add(1, Ordering::AcqRel).to_string() }); - let mut response = self.client.post::(&uri, &host, &self.basic_auth, content) - .await?.0; + let mut client = if let Some(client) = self.client.lock().unwrap().take() { client } + else { HttpClient::connect(&self.endpoint)? }; + let http_response = client.post::(&uri, &host, &self.basic_auth, content).await; + *self.client.lock().unwrap() = Some(client); + + let mut response = match http_response { + Ok(JsonResponse(response)) => response, + Err(e) if e.kind() == std::io::ErrorKind::Other => { + match e.get_ref().unwrap().downcast_ref::() { + Some(http_error) => match JsonResponse::try_from(http_error.contents.clone()) { + Ok(JsonResponse(response)) => response, + Err(_) => Err(e)?, + }, + None => Err(e)?, + } + }, + Err(e) => Err(e)?, + }; + if !response.is_object() { return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON object")); } @@ -53,36 +98,40 @@ impl RpcClient { let error = &response["error"]; if !error.is_null() { // TODO: Examine error code for a more precise std::io::ErrorKind. - let message = error["message"].as_str().unwrap_or("unknown error"); - return Err(std::io::Error::new(std::io::ErrorKind::Other, message)); + let rpc_error = RpcError { + code: error["code"].as_i64().unwrap_or(-1), + message: error["message"].as_str().unwrap_or("unknown error").to_string() + }; + return Err(std::io::Error::new(std::io::ErrorKind::Other, rpc_error)); } - let result = &mut response["result"]; - if result.is_null() { - return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON result")); - } + let result = match response.get_mut("result") { + Some(result) => result.take(), + None => + return Err(std::io::Error::new(std::io::ErrorKind::InvalidData, "expected JSON result")), + }; - JsonResponse(result.take()).try_into() + JsonResponse(result).try_into() } } impl BlockSource for RpcClient { - fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, _height: Option) -> AsyncBlockSourceResult<'a, BlockHeaderData> { + fn get_header<'a>(&'a self, header_hash: &'a BlockHash, _height: Option) -> AsyncBlockSourceResult<'a, BlockHeaderData> { Box::pin(async move { let header_hash = serde_json::json!(header_hash.to_hex()); Ok(self.call_method("getblockheader", &[header_hash]).await?) }) } - fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> { + fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, BlockData> { Box::pin(async move { let header_hash = serde_json::json!(header_hash.to_hex()); let verbosity = serde_json::json!(0); - Ok(self.call_method("getblock", &[header_hash, verbosity]).await?) + Ok(BlockData::FullBlock(self.call_method("getblock", &[header_hash, verbosity]).await?)) }) } - fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<'a, (BlockHash, Option)> { + fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<'a, (BlockHash, Option)> { Box::pin(async move { Ok(self.call_method("getblockchaininfo", &[]).await?) }) @@ -112,10 +161,10 @@ mod tests { #[tokio::test] async fn call_method_returning_unknown_response() { let server = HttpServer::responding_with_not_found(); - let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); match client.call_method::("getblockcount", &[]).await { - Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::NotFound), + Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::Other), Ok(_) => panic!("Expected error"), } } @@ -124,7 +173,7 @@ mod tests { async fn call_method_returning_malfomred_response() { let response = serde_json::json!("foo"); let server = HttpServer::responding_with_ok(MessageBody::Content(response)); - let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); match client.call_method::("getblockcount", &[]).await { Err(e) => { @@ -140,14 +189,16 @@ mod tests { let response = serde_json::json!({ "error": { "code": -8, "message": "invalid parameter" }, }); - let server = HttpServer::responding_with_ok(MessageBody::Content(response)); - let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + let server = HttpServer::responding_with_server_error(response); + let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); let invalid_block_hash = serde_json::json!("foo"); match client.call_method::("getblock", &[invalid_block_hash]).await { Err(e) => { assert_eq!(e.kind(), std::io::ErrorKind::Other); - assert_eq!(e.get_ref().unwrap().to_string(), "invalid parameter"); + let rpc_error: Box = e.into_inner().unwrap().downcast().unwrap(); + assert_eq!(rpc_error.code, -8); + assert_eq!(rpc_error.message, "invalid parameter"); }, Ok(_) => panic!("Expected error"), } @@ -155,9 +206,9 @@ mod tests { #[tokio::test] async fn call_method_returning_missing_result() { - let response = serde_json::json!({ "result": null }); + let response = serde_json::json!({ }); let server = HttpServer::responding_with_ok(MessageBody::Content(response)); - let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); match client.call_method::("getblockcount", &[]).await { Err(e) => { @@ -172,7 +223,7 @@ mod tests { async fn call_method_returning_malformed_result() { let response = serde_json::json!({ "result": "foo" }); let server = HttpServer::responding_with_ok(MessageBody::Content(response)); - let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); match client.call_method::("getblockcount", &[]).await { Err(e) => { @@ -187,7 +238,7 @@ mod tests { async fn call_method_returning_valid_result() { let response = serde_json::json!({ "result": 654470 }); let server = HttpServer::responding_with_ok(MessageBody::Content(response)); - let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); match client.call_method::("getblockcount", &[]).await { Err(e) => panic!("Unexpected error: {:?}", e),