From 3cdbbf56e42daf67b954cd983052b09a9a18b8cd Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Mon, 14 Feb 2022 15:31:59 -0600 Subject: [PATCH] Immutable BlockSource interface Querying a BlockSource is a logically immutable operation. Use non-mut references in its interface to reflect this, which allows for users to hold multiple references if desired. --- lightning-block-sync/Cargo.toml | 1 + lightning-block-sync/src/init.rs | 6 ++-- lightning-block-sync/src/lib.rs | 6 ++-- lightning-block-sync/src/poll.rs | 44 +++++++++++++------------- lightning-block-sync/src/rest.rs | 22 +++++++------ lightning-block-sync/src/rpc.rs | 28 ++++++++-------- lightning-block-sync/src/test_utils.rs | 6 ++-- 7 files changed, 59 insertions(+), 54 deletions(-) diff --git a/lightning-block-sync/Cargo.toml b/lightning-block-sync/Cargo.toml index 67edb09e..67303408 100644 --- a/lightning-block-sync/Cargo.toml +++ b/lightning-block-sync/Cargo.toml @@ -20,6 +20,7 @@ rpc-client = [ "serde", "serde_json", "chunked_transfer" ] [dependencies] bitcoin = "0.27" lightning = { version = "0.0.106", path = "../lightning" } +futures = { version = "0.3" } tokio = { version = "1.0", features = [ "io-util", "net", "time" ], optional = true } serde = { version = "1.0", features = ["derive"], optional = true } serde_json = { version = "1.0", optional = true } diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs index d971cadc..6611d185 100644 --- a/lightning-block-sync/src/init.rs +++ b/lightning-block-sync/src/init.rs @@ -16,7 +16,7 @@ use lightning::chain; /// start when there are no chain listeners to sync yet. /// /// [`SpvClient`]: crate::SpvClient -pub async fn validate_best_block_header(block_source: &mut B) -> +pub async fn validate_best_block_header(block_source: &B) -> BlockSourceResult { let (best_block_hash, best_block_height) = block_source.get_best_block().await?; block_source @@ -67,7 +67,7 @@ BlockSourceResult { /// C: chain::Filter, /// P: chainmonitor::Persist, /// >( -/// block_source: &mut B, +/// block_source: &B, /// chain_monitor: &ChainMonitor, /// config: UserConfig, /// keys_manager: &K, @@ -122,7 +122,7 @@ BlockSourceResult { /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor pub async fn synchronize_listeners<'a, B: BlockSource, C: Cache, L: chain::Listen + ?Sized>( - block_source: &mut B, + block_source: &B, network: Network, header_cache: &mut C, mut chain_listeners: Vec<(BlockHash, &'a L)>, diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index 8854aa3e..321dd57e 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -61,11 +61,11 @@ pub trait BlockSource : Sync + Send { /// /// Implementations that cannot find headers based on the hash should return a `Transient` error /// when `height_hint` is `None`. - fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, height_hint: Option) -> AsyncBlockSourceResult<'a, BlockHeaderData>; + fn get_header<'a>(&'a self, header_hash: &'a BlockHash, height_hint: Option) -> AsyncBlockSourceResult<'a, BlockHeaderData>; /// Returns the block for a given hash. A headers-only block source should return a `Transient` /// error. - fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block>; + fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block>; /// Returns the hash of the best block and, optionally, its height. /// @@ -73,7 +73,7 @@ pub trait BlockSource : Sync + Send { /// to allow for a more efficient lookup. /// /// [`get_header`]: Self::get_header - fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<(BlockHash, Option)>; + fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<(BlockHash, Option)>; } /// Result type for `BlockSource` requests. diff --git a/lightning-block-sync/src/poll.rs b/lightning-block-sync/src/poll.rs index c59652ea..b32d2239 100644 --- a/lightning-block-sync/src/poll.rs +++ b/lightning-block-sync/src/poll.rs @@ -6,7 +6,7 @@ use bitcoin::blockdata::block::Block; use bitcoin::hash_types::BlockHash; use bitcoin::network::constants::Network; -use std::ops::DerefMut; +use std::ops::Deref; /// The `Poll` trait defines behavior for polling block sources for a chain tip and retrieving /// related chain data. It serves as an adapter for `BlockSource`. @@ -17,15 +17,15 @@ use std::ops::DerefMut; /// [`ChainPoller`]: ../struct.ChainPoller.html pub trait Poll { /// Returns a chain tip in terms of its relationship to the provided chain tip. - fn poll_chain_tip<'a>(&'a mut self, best_known_chain_tip: ValidatedBlockHeader) -> + fn poll_chain_tip<'a>(&'a self, best_known_chain_tip: ValidatedBlockHeader) -> AsyncBlockSourceResult<'a, ChainTip>; /// Returns the header that preceded the given header in the chain. - fn look_up_previous_header<'a>(&'a mut self, header: &'a ValidatedBlockHeader) -> + fn look_up_previous_header<'a>(&'a self, header: &'a ValidatedBlockHeader) -> AsyncBlockSourceResult<'a, ValidatedBlockHeader>; /// Returns the block associated with the given header. - fn fetch_block<'a>(&'a mut self, header: &'a ValidatedBlockHeader) -> + fn fetch_block<'a>(&'a self, header: &'a ValidatedBlockHeader) -> AsyncBlockSourceResult<'a, ValidatedBlock>; } @@ -170,12 +170,12 @@ mod sealed { /// /// Other `Poll` implementations should be built using `ChainPoller` as it provides the simplest way /// of validating chain data and checking consistency. -pub struct ChainPoller + Sized, T: BlockSource> { +pub struct ChainPoller + Sized, T: BlockSource> { block_source: B, network: Network, } -impl + Sized, T: BlockSource> ChainPoller { +impl + Sized, T: BlockSource> ChainPoller { /// Creates a new poller for the given block source. /// /// If the `network` parameter is mainnet, then the difficulty between blocks is checked for @@ -185,8 +185,8 @@ impl + Sized, T: BlockSource> ChainPoller { } } -impl + Sized + Send + Sync, T: BlockSource> Poll for ChainPoller { - fn poll_chain_tip<'a>(&'a mut self, best_known_chain_tip: ValidatedBlockHeader) -> +impl + Sized + Send + Sync, T: BlockSource> Poll for ChainPoller { + fn poll_chain_tip<'a>(&'a self, best_known_chain_tip: ValidatedBlockHeader) -> AsyncBlockSourceResult<'a, ChainTip> { Box::pin(async move { @@ -206,7 +206,7 @@ impl + Sized + Send + Sync, T: BlockSource> Poll for Chain }) } - fn look_up_previous_header<'a>(&'a mut self, header: &'a ValidatedBlockHeader) -> + fn look_up_previous_header<'a>(&'a self, header: &'a ValidatedBlockHeader) -> AsyncBlockSourceResult<'a, ValidatedBlockHeader> { Box::pin(async move { @@ -225,7 +225,7 @@ impl + Sized + Send + Sync, T: BlockSource> Poll for Chain }) } - fn fetch_block<'a>(&'a mut self, header: &'a ValidatedBlockHeader) -> + fn fetch_block<'a>(&'a self, header: &'a ValidatedBlockHeader) -> AsyncBlockSourceResult<'a, ValidatedBlock> { Box::pin(async move { @@ -249,7 +249,7 @@ mod tests { let best_known_chain_tip = chain.tip(); chain.disconnect_tip(); - let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin); + let poller = ChainPoller::new(&chain, Network::Bitcoin); match poller.poll_chain_tip(best_known_chain_tip).await { Err(e) => { assert_eq!(e.kind(), BlockSourceErrorKind::Transient); @@ -261,10 +261,10 @@ mod tests { #[tokio::test] async fn poll_chain_without_headers() { - let mut chain = Blockchain::default().with_height(1).without_headers(); + let chain = Blockchain::default().with_height(1).without_headers(); let best_known_chain_tip = chain.at_height(0); - let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin); + let poller = ChainPoller::new(&chain, Network::Bitcoin); match poller.poll_chain_tip(best_known_chain_tip).await { Err(e) => { assert_eq!(e.kind(), BlockSourceErrorKind::Persistent); @@ -283,7 +283,7 @@ mod tests { chain.blocks.last_mut().unwrap().header.bits = BlockHeader::compact_target_from_u256(&Uint256::from_be_bytes([0; 32])); - let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin); + let poller = ChainPoller::new(&chain, Network::Bitcoin); match poller.poll_chain_tip(best_known_chain_tip).await { Err(e) => { assert_eq!(e.kind(), BlockSourceErrorKind::Persistent); @@ -295,10 +295,10 @@ mod tests { #[tokio::test] async fn poll_chain_with_malformed_headers() { - let mut chain = Blockchain::default().with_height(1).malformed_headers(); + let chain = Blockchain::default().with_height(1).malformed_headers(); let best_known_chain_tip = chain.at_height(0); - let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin); + let poller = ChainPoller::new(&chain, Network::Bitcoin); match poller.poll_chain_tip(best_known_chain_tip).await { Err(e) => { assert_eq!(e.kind(), BlockSourceErrorKind::Persistent); @@ -310,10 +310,10 @@ mod tests { #[tokio::test] async fn poll_chain_with_common_tip() { - let mut chain = Blockchain::default().with_height(0); + let chain = Blockchain::default().with_height(0); let best_known_chain_tip = chain.tip(); - let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin); + let poller = ChainPoller::new(&chain, Network::Bitcoin); match poller.poll_chain_tip(best_known_chain_tip).await { Err(e) => panic!("Unexpected error: {:?}", e), Ok(tip) => assert_eq!(tip, ChainTip::Common), @@ -330,7 +330,7 @@ mod tests { let worse_chain_tip = chain.tip(); assert_eq!(best_known_chain_tip.chainwork, worse_chain_tip.chainwork); - let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin); + let poller = ChainPoller::new(&chain, Network::Bitcoin); match poller.poll_chain_tip(best_known_chain_tip).await { Err(e) => panic!("Unexpected error: {:?}", e), Ok(tip) => assert_eq!(tip, ChainTip::Worse(worse_chain_tip)), @@ -345,7 +345,7 @@ mod tests { chain.disconnect_tip(); let worse_chain_tip = chain.tip(); - let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin); + let poller = ChainPoller::new(&chain, Network::Bitcoin); match poller.poll_chain_tip(best_known_chain_tip).await { Err(e) => panic!("Unexpected error: {:?}", e), Ok(tip) => assert_eq!(tip, ChainTip::Worse(worse_chain_tip)), @@ -354,12 +354,12 @@ mod tests { #[tokio::test] async fn poll_chain_with_better_tip() { - let mut chain = Blockchain::default().with_height(1); + let chain = Blockchain::default().with_height(1); let best_known_chain_tip = chain.at_height(0); let better_chain_tip = chain.tip(); - let mut poller = ChainPoller::new(&mut chain, Network::Bitcoin); + let poller = ChainPoller::new(&chain, Network::Bitcoin); match poller.poll_chain_tip(best_known_chain_tip).await { Err(e) => panic!("Unexpected error: {:?}", e), Ok(tip) => assert_eq!(tip, ChainTip::Better(better_chain_tip)), diff --git a/lightning-block-sync/src/rest.rs b/lightning-block-sync/src/rest.rs index e04bb86d..2ddfed7d 100644 --- a/lightning-block-sync/src/rest.rs +++ b/lightning-block-sync/src/rest.rs @@ -8,13 +8,15 @@ use bitcoin::blockdata::block::Block; use bitcoin::hash_types::BlockHash; use bitcoin::hashes::hex::ToHex; +use futures::lock::Mutex; + use std::convert::TryFrom; use std::convert::TryInto; /// A simple REST client for requesting resources using HTTP `GET`. pub struct RestClient { endpoint: HttpEndpoint, - client: HttpClient, + client: Mutex, } impl RestClient { @@ -22,35 +24,35 @@ impl RestClient { /// /// The endpoint should contain the REST path component (e.g., http://127.0.0.1:8332/rest). pub fn new(endpoint: HttpEndpoint) -> std::io::Result { - let client = HttpClient::connect(&endpoint)?; + let client = Mutex::new(HttpClient::connect(&endpoint)?); Ok(Self { endpoint, client }) } /// Requests a resource encoded in `F` format and interpreted as type `T`. - pub async fn request_resource(&mut self, resource_path: &str) -> std::io::Result + pub async fn request_resource(&self, resource_path: &str) -> std::io::Result where F: TryFrom, Error = std::io::Error> + TryInto { let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port()); let uri = format!("{}/{}", self.endpoint.path().trim_end_matches("/"), resource_path); - self.client.get::(&uri, &host).await?.try_into() + self.client.lock().await.get::(&uri, &host).await?.try_into() } } impl BlockSource for RestClient { - fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, _height: Option) -> AsyncBlockSourceResult<'a, BlockHeaderData> { + fn get_header<'a>(&'a self, header_hash: &'a BlockHash, _height: Option) -> AsyncBlockSourceResult<'a, BlockHeaderData> { Box::pin(async move { let resource_path = format!("headers/1/{}.json", header_hash.to_hex()); Ok(self.request_resource::(&resource_path).await?) }) } - fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> { + fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> { Box::pin(async move { let resource_path = format!("block/{}.bin", header_hash.to_hex()); Ok(self.request_resource::(&resource_path).await?) }) } - fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<'a, (BlockHash, Option)> { + fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<'a, (BlockHash, Option)> { Box::pin(async move { Ok(self.request_resource::("chaininfo.json").await?) }) @@ -81,7 +83,7 @@ mod tests { #[tokio::test] async fn request_unknown_resource() { let server = HttpServer::responding_with_not_found(); - let mut client = RestClient::new(server.endpoint()).unwrap(); + let client = RestClient::new(server.endpoint()).unwrap(); match client.request_resource::("/").await { Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::Other), @@ -92,7 +94,7 @@ mod tests { #[tokio::test] async fn request_malformed_resource() { let server = HttpServer::responding_with_ok(MessageBody::Content("foo")); - let mut client = RestClient::new(server.endpoint()).unwrap(); + let client = RestClient::new(server.endpoint()).unwrap(); match client.request_resource::("/").await { Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::InvalidData), @@ -103,7 +105,7 @@ mod tests { #[tokio::test] async fn request_valid_resource() { let server = HttpServer::responding_with_ok(MessageBody::Content(42)); - let mut client = RestClient::new(server.endpoint()).unwrap(); + let client = RestClient::new(server.endpoint()).unwrap(); match client.request_resource::("/").await { Err(e) => panic!("Unexpected error: {:?}", e), diff --git a/lightning-block-sync/src/rpc.rs b/lightning-block-sync/src/rpc.rs index 88199688..1e0aa9d9 100644 --- a/lightning-block-sync/src/rpc.rs +++ b/lightning-block-sync/src/rpc.rs @@ -8,6 +8,8 @@ use bitcoin::blockdata::block::Block; use bitcoin::hash_types::BlockHash; use bitcoin::hashes::hex::ToHex; +use futures::lock::Mutex; + use serde_json; use std::convert::TryFrom; @@ -18,7 +20,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; pub struct RpcClient { basic_auth: String, endpoint: HttpEndpoint, - client: HttpClient, + client: Mutex, id: AtomicUsize, } @@ -27,7 +29,7 @@ impl RpcClient { /// credentials should be a base64 encoding of a user name and password joined by a colon, as is /// required for HTTP basic access authentication. pub fn new(credentials: &str, endpoint: HttpEndpoint) -> std::io::Result { - let client = HttpClient::connect(&endpoint)?; + let client = Mutex::new(HttpClient::connect(&endpoint)?); Ok(Self { basic_auth: "Basic ".to_string() + credentials, endpoint, @@ -37,7 +39,7 @@ impl RpcClient { } /// Calls a method with the response encoded in JSON format and interpreted as type `T`. - pub async fn call_method(&mut self, method: &str, params: &[serde_json::Value]) -> std::io::Result + pub async fn call_method(&self, method: &str, params: &[serde_json::Value]) -> std::io::Result where JsonResponse: TryFrom, Error = std::io::Error> + TryInto { let host = format!("{}:{}", self.endpoint.host(), self.endpoint.port()); let uri = self.endpoint.path(); @@ -47,7 +49,7 @@ impl RpcClient { "id": &self.id.fetch_add(1, Ordering::AcqRel).to_string() }); - let mut response = match self.client.post::(&uri, &host, &self.basic_auth, content).await { + let mut response = match self.client.lock().await.post::(&uri, &host, &self.basic_auth, content).await { Ok(JsonResponse(response)) => response, Err(e) if e.kind() == std::io::ErrorKind::Other => { match e.get_ref().unwrap().downcast_ref::() { @@ -82,14 +84,14 @@ impl RpcClient { } impl BlockSource for RpcClient { - fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, _height: Option) -> AsyncBlockSourceResult<'a, BlockHeaderData> { + fn get_header<'a>(&'a self, header_hash: &'a BlockHash, _height: Option) -> AsyncBlockSourceResult<'a, BlockHeaderData> { Box::pin(async move { let header_hash = serde_json::json!(header_hash.to_hex()); Ok(self.call_method("getblockheader", &[header_hash]).await?) }) } - fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> { + fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> { Box::pin(async move { let header_hash = serde_json::json!(header_hash.to_hex()); let verbosity = serde_json::json!(0); @@ -97,7 +99,7 @@ impl BlockSource for RpcClient { }) } - fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<'a, (BlockHash, Option)> { + fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<'a, (BlockHash, Option)> { Box::pin(async move { Ok(self.call_method("getblockchaininfo", &[]).await?) }) @@ -127,7 +129,7 @@ mod tests { #[tokio::test] async fn call_method_returning_unknown_response() { let server = HttpServer::responding_with_not_found(); - let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); match client.call_method::("getblockcount", &[]).await { Err(e) => assert_eq!(e.kind(), std::io::ErrorKind::Other), @@ -139,7 +141,7 @@ mod tests { async fn call_method_returning_malfomred_response() { let response = serde_json::json!("foo"); let server = HttpServer::responding_with_ok(MessageBody::Content(response)); - let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); match client.call_method::("getblockcount", &[]).await { Err(e) => { @@ -156,7 +158,7 @@ mod tests { "error": { "code": -8, "message": "invalid parameter" }, }); let server = HttpServer::responding_with_server_error(response); - let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); let invalid_block_hash = serde_json::json!("foo"); match client.call_method::("getblock", &[invalid_block_hash]).await { @@ -172,7 +174,7 @@ mod tests { async fn call_method_returning_missing_result() { let response = serde_json::json!({ "result": null }); let server = HttpServer::responding_with_ok(MessageBody::Content(response)); - let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); match client.call_method::("getblockcount", &[]).await { Err(e) => { @@ -187,7 +189,7 @@ mod tests { async fn call_method_returning_malformed_result() { let response = serde_json::json!({ "result": "foo" }); let server = HttpServer::responding_with_ok(MessageBody::Content(response)); - let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); match client.call_method::("getblockcount", &[]).await { Err(e) => { @@ -202,7 +204,7 @@ mod tests { async fn call_method_returning_valid_result() { let response = serde_json::json!({ "result": 654470 }); let server = HttpServer::responding_with_ok(MessageBody::Content(response)); - let mut client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); + let client = RpcClient::new(CREDENTIALS, server.endpoint()).unwrap(); match client.call_method::("getblockcount", &[]).await { Err(e) => panic!("Unexpected error: {:?}", e), diff --git a/lightning-block-sync/src/test_utils.rs b/lightning-block-sync/src/test_utils.rs index 8c37c94d..fe57c0c6 100644 --- a/lightning-block-sync/src/test_utils.rs +++ b/lightning-block-sync/src/test_utils.rs @@ -113,7 +113,7 @@ impl Blockchain { } impl BlockSource for Blockchain { - fn get_header<'a>(&'a mut self, header_hash: &'a BlockHash, _height_hint: Option) -> AsyncBlockSourceResult<'a, BlockHeaderData> { + fn get_header<'a>(&'a self, header_hash: &'a BlockHash, _height_hint: Option) -> AsyncBlockSourceResult<'a, BlockHeaderData> { Box::pin(async move { if self.without_headers { return Err(BlockSourceError::persistent("header not found")); @@ -133,7 +133,7 @@ impl BlockSource for Blockchain { }) } - fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> { + fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> { Box::pin(async move { for (height, block) in self.blocks.iter().enumerate() { if block.header.block_hash() == *header_hash { @@ -150,7 +150,7 @@ impl BlockSource for Blockchain { }) } - fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<'a, (BlockHash, Option)> { + fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<'a, (BlockHash, Option)> { Box::pin(async move { match self.blocks.last() { None => Err(BlockSourceError::transient("empty chain")), -- 2.30.2