From 8505382b197ee9469028b2ea6062fe2489aee6c1 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Wed, 3 Feb 2021 17:41:03 -0800 Subject: [PATCH] Add SpvClient used to poll for the best chain tip Adds a lightweight client for polling one or more block sources for the best chain tip. Notifies listeners of blocks connected or disconnected since the last poll. Useful for keeping a Lightning node in sync with the chain. --- lightning-block-sync/src/lib.rs | 220 ++++++++++++++++++++++++- lightning-block-sync/src/test_utils.rs | 7 + 2 files changed, 223 insertions(+), 4 deletions(-) diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index 9228e11f..5ed8689f 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -1,5 +1,9 @@ //! A lightweight client for keeping in sync with chain activity. //! +//! Defines an [`SpvClient`] utility for polling one or more block sources for the best chain tip. +//! It is used to notify listeners of blocks connected or disconnected since the last poll. Useful +//! for keeping a Lightning node in sync with the chain. +//! //! Defines a [`BlockSource`] trait, which is an asynchronous interface for retrieving block headers //! and data. //! @@ -9,6 +13,7 @@ //! Both features support either blocking I/O using `std::net::TcpStream` or, with feature `tokio`, //! non-blocking I/O using `tokio::net::TcpStream` from inside a Tokio runtime. //! +//! [`SpvClient`]: struct.SpvClient.html //! [`BlockSource`]: trait.BlockSource.html #[cfg(any(feature = "rest-client", feature = "rpc-client"))] @@ -31,7 +36,7 @@ mod test_utils; #[cfg(any(feature = "rest-client", feature = "rpc-client"))] mod utils; -use crate::poll::{Poll, ValidatedBlockHeader}; +use crate::poll::{ChainTip, Poll, ValidatedBlockHeader}; use bitcoin::blockdata::block::{Block, BlockHeader}; use bitcoin::hash_types::BlockHash; @@ -54,9 +59,13 @@ pub trait BlockSource : Sync + Send { /// error. fn get_block<'a>(&'a mut self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block>; - // TODO: Phrase in terms of `Poll` once added. - /// Returns the hash of the best block and, optionally, its height. When polling a block source, - /// the height is passed to `get_header` to allow for a more efficient lookup. + /// Returns the hash of the best block and, optionally, its height. + /// + /// When polling a block source, [`Poll`] implementations may pass the height to [`get_header`] + /// to allow for a more efficient lookup. + /// + /// [`Poll`]: poll/trait.Poll.html + /// [`get_header`]: #tymethod.get_header fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<(BlockHash, Option)>; } @@ -133,6 +142,25 @@ pub struct BlockHeaderData { pub chainwork: Uint256, } +/// A lightweight client for keeping a listener in sync with the chain, allowing for Simplified +/// Payment Verification (SPV). +/// +/// The client is parameterized by a chain poller which is responsible for polling one or more block +/// sources for the best chain tip. During this process it detects any chain forks, determines which +/// constitutes the best chain, and updates the listener accordingly with any blocks that were +/// connected or disconnected since the last poll. +/// +/// Block headers for the best chain are maintained in the parameterized cache, allowing for a +/// custom cache eviction policy. This offers flexibility to those sensitive to resource usage. +/// Hence, there is a trade-off between a lower memory footprint and potentially increased network +/// I/O as headers are re-fetched during fork detection. +pub struct SpvClient { + chain_tip: ValidatedBlockHeader, + chain_poller: P, + chain_notifier: ChainNotifier, + chain_listener: L, +} + /// Adaptor used for notifying when blocks have been connected or disconnected from the chain. /// /// Used when needing to replay chain data upon startup or as new chain events occur. @@ -186,6 +214,69 @@ impl Cache for UnboundedCache { } } +impl SpvClient { + /// Creates a new SPV client using `chain_tip` as the best known chain tip. + /// + /// Subsequent calls to [`poll_best_tip`] will poll for the best chain tip using the given chain + /// poller, which may be configured with one or more block sources to query. At least one block + /// source must provide headers back from the best chain tip to its common ancestor with + /// `chain_tip`. + /// * `header_cache` is used to look up and store headers on the best chain + /// * `chain_listener` is notified of any blocks connected or disconnected + /// + /// [`poll_best_tip`]: struct.SpvClient.html#method.poll_best_tip + pub fn new( + chain_tip: ValidatedBlockHeader, + chain_poller: P, + header_cache: C, + chain_listener: L, + ) -> Self { + let chain_notifier = ChainNotifier { header_cache }; + Self { chain_tip, chain_poller, chain_notifier, chain_listener } + } + + /// Polls for the best tip and updates the chain listener with any connected or disconnected + /// blocks accordingly. + /// + /// Returns the best polled chain tip relative to the previous best known tip and whether any + /// blocks were indeed connected or disconnected. + pub async fn poll_best_tip(&mut self) -> BlockSourceResult<(ChainTip, bool)> { + let chain_tip = self.chain_poller.poll_chain_tip(self.chain_tip).await?; + let blocks_connected = match chain_tip { + ChainTip::Common => false, + ChainTip::Better(chain_tip) => { + debug_assert_ne!(chain_tip.block_hash, self.chain_tip.block_hash); + debug_assert!(chain_tip.chainwork > self.chain_tip.chainwork); + self.update_chain_tip(chain_tip).await + }, + ChainTip::Worse(chain_tip) => { + debug_assert_ne!(chain_tip.block_hash, self.chain_tip.block_hash); + debug_assert!(chain_tip.chainwork <= self.chain_tip.chainwork); + false + }, + }; + Ok((chain_tip, blocks_connected)) + } + + /// Updates the chain tip, syncing the chain listener with any connected or disconnected + /// blocks. Returns whether there were any such blocks. + async fn update_chain_tip(&mut self, best_chain_tip: ValidatedBlockHeader) -> bool { + match self.chain_notifier.synchronize_listener( + best_chain_tip, &self.chain_tip, &mut self.chain_poller, &mut self.chain_listener).await + { + Ok(_) => { + self.chain_tip = best_chain_tip; + true + }, + Err((_, Some(chain_tip))) if chain_tip.block_hash != self.chain_tip.block_hash => { + self.chain_tip = chain_tip; + true + }, + Err(_) => false, + } + } +} + /// Notifies [listeners] of blocks that have been connected or disconnected from the chain. /// /// [listeners]: trait.ChainListener.html @@ -299,6 +390,127 @@ impl ChainNotifier { } } +#[cfg(test)] +mod spv_client_tests { + use crate::test_utils::{Blockchain, NullChainListener}; + use super::*; + + use bitcoin::network::constants::Network; + + #[tokio::test] + async fn poll_from_chain_without_headers() { + let mut chain = Blockchain::default().with_height(3).without_headers(); + let best_tip = chain.at_height(1); + + let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); + let cache = UnboundedCache::new(); + let mut client = SpvClient::new(best_tip, poller, cache, NullChainListener {}); + match client.poll_best_tip().await { + Err(e) => { + assert_eq!(e.kind(), BlockSourceErrorKind::Persistent); + assert_eq!(e.into_inner().as_ref().to_string(), "header not found"); + }, + Ok(_) => panic!("Expected error"), + } + assert_eq!(client.chain_tip, best_tip); + } + + #[tokio::test] + async fn poll_from_chain_with_common_tip() { + let mut chain = Blockchain::default().with_height(3); + let common_tip = chain.tip(); + + let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); + let cache = UnboundedCache::new(); + let mut client = SpvClient::new(common_tip, poller, cache, NullChainListener {}); + match client.poll_best_tip().await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok((chain_tip, blocks_connected)) => { + assert_eq!(chain_tip, ChainTip::Common); + assert!(!blocks_connected); + }, + } + assert_eq!(client.chain_tip, common_tip); + } + + #[tokio::test] + async fn poll_from_chain_with_better_tip() { + let mut chain = Blockchain::default().with_height(3); + let new_tip = chain.tip(); + let old_tip = chain.at_height(1); + + let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); + let cache = UnboundedCache::new(); + let mut client = SpvClient::new(old_tip, poller, cache, NullChainListener {}); + match client.poll_best_tip().await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok((chain_tip, blocks_connected)) => { + assert_eq!(chain_tip, ChainTip::Better(new_tip)); + assert!(blocks_connected); + }, + } + assert_eq!(client.chain_tip, new_tip); + } + + #[tokio::test] + async fn poll_from_chain_with_better_tip_and_without_any_new_blocks() { + let mut chain = Blockchain::default().with_height(3).without_blocks(2..); + let new_tip = chain.tip(); + let old_tip = chain.at_height(1); + + let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); + let cache = UnboundedCache::new(); + let mut client = SpvClient::new(old_tip, poller, cache, NullChainListener {}); + match client.poll_best_tip().await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok((chain_tip, blocks_connected)) => { + assert_eq!(chain_tip, ChainTip::Better(new_tip)); + assert!(!blocks_connected); + }, + } + assert_eq!(client.chain_tip, old_tip); + } + + #[tokio::test] + async fn poll_from_chain_with_better_tip_and_without_some_new_blocks() { + let mut chain = Blockchain::default().with_height(3).without_blocks(3..); + let new_tip = chain.tip(); + let old_tip = chain.at_height(1); + + let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); + let cache = UnboundedCache::new(); + let mut client = SpvClient::new(old_tip, poller, cache, NullChainListener {}); + match client.poll_best_tip().await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok((chain_tip, blocks_connected)) => { + assert_eq!(chain_tip, ChainTip::Better(new_tip)); + assert!(blocks_connected); + }, + } + assert_eq!(client.chain_tip, chain.at_height(2)); + } + + #[tokio::test] + async fn poll_from_chain_with_worse_tip() { + let mut chain = Blockchain::default().with_height(3); + let best_tip = chain.tip(); + chain.disconnect_tip(); + let worse_tip = chain.tip(); + + let poller = poll::ChainPoller::new(&mut chain, Network::Testnet); + let cache = UnboundedCache::new(); + let mut client = SpvClient::new(best_tip, poller, cache, NullChainListener {}); + match client.poll_best_tip().await { + Err(e) => panic!("Unexpected error: {:?}", e), + Ok((chain_tip, blocks_connected)) => { + assert_eq!(chain_tip, ChainTip::Worse(worse_tip)); + assert!(!blocks_connected); + }, + } + assert_eq!(client.chain_tip, best_tip); + } +} + #[cfg(test)] mod chain_notifier_tests { use crate::test_utils::{Blockchain, MockChainListener}; diff --git a/lightning-block-sync/src/test_utils.rs b/lightning-block-sync/src/test_utils.rs index 70a8982a..807a33a6 100644 --- a/lightning-block-sync/src/test_utils.rs +++ b/lightning-block-sync/src/test_utils.rs @@ -160,6 +160,13 @@ impl BlockSource for Blockchain { } } +pub struct NullChainListener; + +impl ChainListener for NullChainListener { + fn block_connected(&mut self, _block: &Block, _height: u32) {} + fn block_disconnected(&mut self, _header: &BlockHeader, _height: u32) {} +} + pub struct MockChainListener { expected_blocks_connected: VecDeque, expected_blocks_disconnected: VecDeque, -- 2.30.2