X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-block-sync%2Fsrc%2Frpc.rs;h=f04769560246f8537e1e022efa22c8b7a815eab4;hb=8c8bb6d246a53f1a0e4fc70788eb59947352fcb9;hp=88199688aefd1a48fd128aac5c45f319f947c90e;hpb=b2f16ad821672e07fd588beecd117206d9b656fd;p=rust-lightning diff --git a/lightning-block-sync/src/rpc.rs b/lightning-block-sync/src/rpc.rs index 88199688..f0476956 100644 --- a/lightning-block-sync/src/rpc.rs +++ b/lightning-block-sync/src/rpc.rs @@ -1,13 +1,14 @@ //! Simple RPC client implementation which implements [`BlockSource`] against a Bitcoin Core RPC //! endpoint. -use crate::{BlockHeaderData, BlockSource, AsyncBlockSourceResult}; +use crate::{BlockData, BlockHeaderData, BlockSource, AsyncBlockSourceResult}; use crate::http::{HttpClient, HttpEndpoint, HttpError, JsonResponse}; -use bitcoin::blockdata::block::Block; use bitcoin::hash_types::BlockHash; use bitcoin::hashes::hex::ToHex; +use futures_util::lock::Mutex; + use serde_json; use std::convert::TryFrom; @@ -18,7 +19,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; pub struct RpcClient { basic_auth: String, endpoint: HttpEndpoint, - client: HttpClient, + client: Mutex, id: AtomicUsize, } @@ -27,7 +28,7 @@ impl RpcClient { /// credentials should be a base64 encoding of a user name and password joined by a colon, as is /// required for HTTP basic access authentication. pub fn new(credentials: &str, endpoint: HttpEndpoint) -> std::io::Result { - let client = HttpClient::connect(&endpoint)?; + let client = Mutex::new(HttpClient::connect(&endpoint)?); Ok(Self { basic_auth: "Basic ".to_string() + credentials, endpoint, @@ -37,7 +38,7 @@ impl RpcClient { } /// Calls a method with the response encoded in JSON format and interpreted as type `T`. - pub async fn call_method(&mut self, method: &str, params: &[serde_json::Value]) -> std::io::Result + pub async fn call_method(&self, method: &str, params: &[serde_json::Value]) -> std::io::Result where JsonResponse: TryFrom, Error = std::io::Error> + TryInto { let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port()); let uri = self.endpoint.path(); @@ -47,7 +48,7 @@ impl RpcClient { "id": &self.id.fetch_add(1, Ordering::AcqRel).to_string() }); - let mut response = match self.client.post::(&uri, &host, &self.basic_auth, content).await { + let mut response = match self.client.lock().await.post::(&uri, &host, &self.basic_auth, content).await { Ok(JsonResponse(response)) => response, Err(e) if e.kind() == std::io::ErrorKind::Other => { match e.get_ref().unwrap().downcast_ref::() { @@ -82,22 +83,22 @@ impl RpcClient { } impl BlockSource for RpcClient { - fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, _height: Option) -> AsyncBlockSourceResult<'a, BlockHeaderData> { + fn get_header<'a>(&'a self, header_hash: &'a BlockHash, _height: Option) -> AsyncBlockSourceResult<'a, BlockHeaderData> { Box::pin(async move { let header_hash = serde_json::json!(header_hash.to_hex()); Ok(self.call_method("getblockheader", &[header_hash]).await?) }) } - fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> { + fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, BlockData> { Box::pin(async move { let header_hash = serde_json::json!(header_hash.to_hex()); let verbosity = serde_json::json!(0); - Ok(self.call_method("getblock", &[header_hash, verbosity]).await?) + Ok(BlockData::FullBlock(self.call_method("getblock", &[header_hash, verbosity]).await?)) }) } - fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<'a, (BlockHash, Option)> { + fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<'a, (BlockHash, Option)> { Box::pin(async move { Ok(self.call_method("getblockchaininfo", &[]).await?) }) @@ -127,7 +128,7 @@ mod tests { #[tokio::test] async fn call_method_returning_unknown_response() { let server = HttpServer::responding_with_not_found(); - let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); match client.call_method::("getblockcount", &[]).await { Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::Other), @@ -139,7 +140,7 @@ mod tests { async fn call_method_returning_malfomred_response() { let response = serde_json::json!("foo"); let server = HttpServer::responding_with_ok(MessageBody::Content(response)); - let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); match client.call_method::("getblockcount", &[]).await { Err(e) => { @@ -156,7 +157,7 @@ mod tests { "error": { "code": -8, "message": "invalid parameter" }, }); let server = HttpServer::responding_with_server_error(response); - let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); let invalid_block_hash = serde_json::json!("foo"); match client.call_method::("getblock", &[invalid_block_hash]).await { @@ -172,7 +173,7 @@ mod tests { async fn call_method_returning_missing_result() { let response = serde_json::json!({ "result": null }); let server = HttpServer::responding_with_ok(MessageBody::Content(response)); - let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); match client.call_method::("getblockcount", &[]).await { Err(e) => { @@ -187,7 +188,7 @@ mod tests { async fn call_method_returning_malformed_result() { let response = serde_json::json!({ "result": "foo" }); let server = HttpServer::responding_with_ok(MessageBody::Content(response)); - let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); match client.call_method::("getblockcount", &[]).await { Err(e) => { @@ -202,7 +203,7 @@ mod tests { async fn call_method_returning_valid_result() { let response = serde_json::json!({ "result": 654470 }); let server = HttpServer::responding_with_ok(MessageBody::Content(response)); - let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); match client.call_method::("getblockcount", &[]).await { Err(e) => panic!("Unexpected error: {:?}", e),