From c1938e8c9fb5d531dd735e889b2ab0f7bc8580b8 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Thu, 8 Sep 2022 15:17:05 -0500 Subject: [PATCH] Support filtered blocks in lightning-block-sync Expand the BlockSource trait to allow filtered blocks now that chain::Listen supports them (d629a7edb7241eee7fde9f5ccdf1c481d2d6297b). This makes it possible to use BIP 157/158 compact block filters with lightning-block-sync. --- lightning-block-sync/src/init.rs | 10 ++++++ lightning-block-sync/src/lib.rs | 50 +++++++++++++++++++++++--- lightning-block-sync/src/poll.rs | 32 ++++++++++------- lightning-block-sync/src/rest.rs | 7 ++-- lightning-block-sync/src/rpc.rs | 7 ++-- lightning-block-sync/src/test_utils.rs | 43 +++++++++++++++++++--- 6 files changed, 119 insertions(+), 30 deletions(-) diff --git a/lightning-block-sync/src/init.rs b/lightning-block-sync/src/init.rs index b3f745bd..7a8fada9 100644 --- a/lightning-block-sync/src/init.rs +++ b/lightning-block-sync/src/init.rs @@ -216,6 +216,16 @@ impl<'a, L: chain::Listen + ?Sized> chain::Listen for DynamicChainListener<'a, L struct ChainListenerSet<'a, L: chain::Listen + ?Sized>(Vec<(u32, &'a L)>); impl<'a, L: chain::Listen + ?Sized> chain::Listen for ChainListenerSet<'a, L> { + // Needed to differentiate test expectations. + #[cfg(test)] + fn block_connected(&self, block: &bitcoin::Block, height: u32) { + for (starting_height, chain_listener) in self.0.iter() { + if height > *starting_height { + chain_listener.block_connected(block, height); + } + } + } + fn filtered_block_connected(&self, header: &BlockHeader, txdata: &chain::transaction::TransactionData, height: u32) { for (starting_height, chain_listener) in self.0.iter() { if height > *starting_height { diff --git a/lightning-block-sync/src/lib.rs b/lightning-block-sync/src/lib.rs index 823cb5eb..5d8bc27f 100644 --- a/lightning-block-sync/src/lib.rs +++ b/lightning-block-sync/src/lib.rs @@ -68,7 +68,7 @@ pub trait BlockSource : Sync + Send { /// Returns the block for a given hash. A headers-only block source should return a `Transient` /// error. - fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block>; + fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, BlockData>; /// Returns the hash of the best block and, optionally, its height. /// @@ -152,6 +152,18 @@ pub struct BlockHeaderData { pub chainwork: Uint256, } +/// A block including either all its transactions or only the block header. +/// +/// [`BlockSource`] may be implemented to either always return full blocks or, in the case of +/// compact block filters (BIP 157/158), return header-only blocks when no pertinent transactions +/// match. See [`chain::Filter`] for details on how to notify a source of such transactions. +pub enum BlockData { + /// A block containing all its transactions. + FullBlock(Block), + /// A block header for when the block does not contain any pertinent transactions. + HeaderOnly(BlockHeader), +} + /// A lightweight client for keeping a listener in sync with the chain, allowing for Simplified /// Payment Verification (SPV). /// @@ -396,13 +408,22 @@ impl<'a, C: Cache, L: Deref> ChainNotifier<'a, C, L> where L::Target: chain::Lis chain_poller: &mut P, ) -> Result<(), (BlockSourceError, Option)> { for header in connected_blocks.drain(..).rev() { - let block = chain_poller + let height = header.height; + let block_data = chain_poller .fetch_block(&header).await .or_else(|e| Err((e, Some(new_tip))))?; - debug_assert_eq!(block.block_hash, header.block_hash); + debug_assert_eq!(block_data.block_hash, header.block_hash); + + match block_data.deref() { + BlockData::FullBlock(block) => { + self.chain_listener.block_connected(&block, height); + }, + BlockData::HeaderOnly(header) => { + self.chain_listener.filtered_block_connected(&header, &[], height); + }, + } self.header_cache.block_connected(header.block_hash, header); - self.chain_listener.block_connected(&block, header.height); new_tip = header; } @@ -707,4 +728,25 @@ mod chain_notifier_tests { Ok(_) => panic!("Expected error"), } } + + #[tokio::test] + async fn sync_from_chain_with_filtered_blocks() { + let mut chain = Blockchain::default().with_height(3).filtered_blocks(); + + let new_tip = chain.tip(); + let old_tip = chain.at_height(1); + let chain_listener = &MockChainListener::new() + .expect_filtered_block_connected(*chain.at_height(2)) + .expect_filtered_block_connected(*new_tip); + let mut notifier = ChainNotifier { + header_cache: &mut chain.header_cache(0..=1), + chain_listener, + }; + let mut poller = poll::ChainPoller::new(&mut chain, Network::Testnet); + match notifier.synchronize_listener(new_tip, &old_tip, &mut poller).await { + Err((e, _)) => panic!("Unexpected error: {:?}", e), + Ok(_) => {}, + } + } + } diff --git a/lightning-block-sync/src/poll.rs b/lightning-block-sync/src/poll.rs index 4c6cb0c0..2bb2f4a0 100644 --- a/lightning-block-sync/src/poll.rs +++ b/lightning-block-sync/src/poll.rs @@ -1,8 +1,7 @@ //! Adapters that make one or more [`BlockSource`]s simpler to poll for new chain tip transitions. -use crate::{AsyncBlockSourceResult, BlockHeaderData, BlockSource, BlockSourceError, BlockSourceResult}; +use crate::{AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource, BlockSourceError, BlockSourceResult}; -use bitcoin::blockdata::block::Block; use bitcoin::hash_types::BlockHash; use bitcoin::network::constants::Network; @@ -71,24 +70,31 @@ impl Validate for BlockHeaderData { } } -impl Validate for Block { +impl Validate for BlockData { type T = ValidatedBlock; fn validate(self, block_hash: BlockHash) -> BlockSourceResult { - let pow_valid_block_hash = self.header - .validate_pow(&self.header.target()) + let header = match &self { + BlockData::FullBlock(block) => &block.header, + BlockData::HeaderOnly(header) => header, + }; + + let pow_valid_block_hash = header + .validate_pow(&header.target()) .or_else(|e| Err(BlockSourceError::persistent(e)))?; if pow_valid_block_hash != block_hash { return Err(BlockSourceError::persistent("invalid block hash")); } - if !self.check_merkle_root() { - return Err(BlockSourceError::persistent("invalid merkle root")); - } + if let BlockData::FullBlock(block) = &self { + if !block.check_merkle_root() { + return Err(BlockSourceError::persistent("invalid merkle root")); + } - if !self.check_witness_commitment() { - return Err(BlockSourceError::persistent("invalid witness commitment")); + if !block.check_witness_commitment() { + return Err(BlockSourceError::persistent("invalid witness commitment")); + } } Ok(ValidatedBlock { block_hash, inner: self }) @@ -145,11 +151,11 @@ impl ValidatedBlockHeader { /// A block with validated data against its transaction list and corresponding block hash. pub struct ValidatedBlock { pub(crate) block_hash: BlockHash, - inner: Block, + inner: BlockData, } impl std::ops::Deref for ValidatedBlock { - type Target = Block; + type Target = BlockData; fn deref(&self) -> &Self::Target { &self.inner @@ -161,7 +167,7 @@ mod sealed { pub trait Validate {} impl Validate for crate::BlockHeaderData {} - impl Validate for bitcoin::blockdata::block::Block {} + impl Validate for crate::BlockData {} } /// The canonical `Poll` implementation used for a single `BlockSource`. diff --git a/lightning-block-sync/src/rest.rs b/lightning-block-sync/src/rest.rs index 2ddfed7d..f46e5e02 100644 --- a/lightning-block-sync/src/rest.rs +++ b/lightning-block-sync/src/rest.rs @@ -1,10 +1,9 @@ //! Simple REST client implementation which implements [`BlockSource`] against a Bitcoin Core REST //! endpoint. -use crate::{BlockHeaderData, BlockSource, AsyncBlockSourceResult}; +use crate::{BlockData, BlockHeaderData, BlockSource, AsyncBlockSourceResult}; use crate::http::{BinaryResponse, HttpEndpoint, HttpClient, JsonResponse}; -use bitcoin::blockdata::block::Block; use bitcoin::hash_types::BlockHash; use bitcoin::hashes::hex::ToHex; @@ -45,10 +44,10 @@ impl BlockSource for RestClient { }) } - fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> { + fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, BlockData> { Box::pin(async move { let resource_path = format!("block/{}.bin", header_hash.to_hex()); - Ok(self.request_resource::(&resource_path).await?) + Ok(BlockData::FullBlock(self.request_resource::(&resource_path).await?)) }) } diff --git a/lightning-block-sync/src/rpc.rs b/lightning-block-sync/src/rpc.rs index 1e0aa9d9..6e78654a 100644 --- a/lightning-block-sync/src/rpc.rs +++ b/lightning-block-sync/src/rpc.rs @@ -1,10 +1,9 @@ //! Simple RPC client implementation which implements [`BlockSource`] against a Bitcoin Core RPC //! endpoint. -use crate::{BlockHeaderData, BlockSource, AsyncBlockSourceResult}; +use crate::{BlockData, BlockHeaderData, BlockSource, AsyncBlockSourceResult}; use crate::http::{HttpClient, HttpEndpoint, HttpError, JsonResponse}; -use bitcoin::blockdata::block::Block; use bitcoin::hash_types::BlockHash; use bitcoin::hashes::hex::ToHex; @@ -91,11 +90,11 @@ impl BlockSource for RpcClient { }) } - fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> { + fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, BlockData> { Box::pin(async move { let header_hash = serde_json::json!(header_hash.to_hex()); let verbosity = serde_json::json!(0); - Ok(self.call_method("getblock", &[header_hash, verbosity]).await?) + Ok(BlockData::FullBlock(self.call_method("getblock", &[header_hash, verbosity]).await?)) }) } diff --git a/lightning-block-sync/src/test_utils.rs b/lightning-block-sync/src/test_utils.rs index 0c402deb..b9bc519b 100644 --- a/lightning-block-sync/src/test_utils.rs +++ b/lightning-block-sync/src/test_utils.rs @@ -1,4 +1,4 @@ -use crate::{AsyncBlockSourceResult, BlockHeaderData, BlockSource, BlockSourceError, UnboundedCache}; +use crate::{AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource, BlockSourceError, UnboundedCache}; use crate::poll::{Validate, ValidatedBlockHeader}; use bitcoin::blockdata::block::{Block, BlockHeader}; @@ -20,6 +20,7 @@ pub struct Blockchain { without_blocks: Option>, without_headers: bool, malformed_headers: bool, + filtered_blocks: bool, } impl Blockchain { @@ -77,6 +78,10 @@ impl Blockchain { Self { malformed_headers: true, ..self } } + pub fn filtered_blocks(self) -> Self { + Self { filtered_blocks: true, ..self } + } + pub fn fork_at_height(&self, height: usize) -> Self { assert!(height + 1 < self.blocks.len()); let mut blocks = self.blocks.clone(); @@ -146,7 +151,7 @@ impl BlockSource for Blockchain { }) } - fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> { + fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, BlockData> { Box::pin(async move { for (height, block) in self.blocks.iter().enumerate() { if block.header.block_hash() == *header_hash { @@ -156,7 +161,11 @@ impl BlockSource for Blockchain { } } - return Ok(block.clone()); + if self.filtered_blocks { + return Ok(BlockData::HeaderOnly(block.header.clone())); + } else { + return Ok(BlockData::FullBlock(block.clone())); + } } } Err(BlockSourceError::transient("block not found")) @@ -185,6 +194,7 @@ impl chain::Listen for NullChainListener { pub struct MockChainListener { expected_blocks_connected: RefCell>, + expected_filtered_blocks_connected: RefCell>, expected_blocks_disconnected: RefCell>, } @@ -192,6 +202,7 @@ impl MockChainListener { pub fn new() -> Self { Self { expected_blocks_connected: RefCell::new(VecDeque::new()), + expected_filtered_blocks_connected: RefCell::new(VecDeque::new()), expected_blocks_disconnected: RefCell::new(VecDeque::new()), } } @@ -201,6 +212,11 @@ impl MockChainListener { self } + pub fn expect_filtered_block_connected(self, block: BlockHeaderData) -> Self { + self.expected_filtered_blocks_connected.borrow_mut().push_back(block); + self + } + pub fn expect_block_disconnected(self, block: BlockHeaderData) -> Self { self.expected_blocks_disconnected.borrow_mut().push_back(block); self @@ -208,10 +224,22 @@ impl MockChainListener { } impl chain::Listen for MockChainListener { - fn filtered_block_connected(&self, header: &BlockHeader, _txdata: &chain::transaction::TransactionData, height: u32) { + fn block_connected(&self, block: &Block, height: u32) { match self.expected_blocks_connected.borrow_mut().pop_front() { None => { - panic!("Unexpected block connected: {:?}", header.block_hash()); + panic!("Unexpected block connected: {:?}", block.block_hash()); + }, + Some(expected_block) => { + assert_eq!(block.block_hash(), expected_block.header.block_hash()); + assert_eq!(height, expected_block.height); + }, + } + } + + fn filtered_block_connected(&self, header: &BlockHeader, _txdata: &chain::transaction::TransactionData, height: u32) { + match self.expected_filtered_blocks_connected.borrow_mut().pop_front() { + None => { + panic!("Unexpected filtered block connected: {:?}", header.block_hash()); }, Some(expected_block) => { assert_eq!(header.block_hash(), expected_block.header.block_hash()); @@ -244,6 +272,11 @@ impl Drop for MockChainListener { panic!("Expected blocks connected: {:?}", expected_blocks_connected); } + let expected_filtered_blocks_connected = self.expected_filtered_blocks_connected.borrow(); + if !expected_filtered_blocks_connected.is_empty() { + panic!("Expected filtered_blocks connected: {:?}", expected_filtered_blocks_connected); + } + let expected_blocks_disconnected = self.expected_blocks_disconnected.borrow(); if !expected_blocks_disconnected.is_empty() { panic!("Expected blocks disconnected: {:?}", expected_blocks_disconnected); -- 2.30.2