Add a simple naive block cache in gossip sync lookups
authorMatt Corallo <git@bluematt.me>
Sun, 30 Apr 2023 02:06:19 +0000 (02:06 +0000)
committerMatt Corallo <git@bluematt.me>
Wed, 23 Aug 2023 21:48:03 +0000 (21:48 +0000)
lightning-block-sync/src/gossip.rs

index 3eb7ad4ae3537aae06c8e33edf59996614b4c740..4e66c0ce9756f016523bb9a9cfd2d65f973f5eca 100644 (file)
@@ -4,6 +4,7 @@
 
 use crate::{AsyncBlockSourceResult, BlockData, BlockSource};
 
+use bitcoin::blockdata::block::Block;
 use bitcoin::blockdata::transaction::{TxOut, OutPoint};
 use bitcoin::hash_types::BlockHash;
 
@@ -17,7 +18,8 @@ use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoResult, UtxoLookupErr
 
 use lightning::util::logger::Logger;
 
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
+use std::collections::VecDeque;
 use std::future::Future;
 use std::ops::Deref;
 
@@ -27,9 +29,6 @@ use std::ops::Deref;
 /// Note that while this is implementable for a [`BlockSource`] which returns filtered block data
 /// (i.e. [`BlockData::HeaderOnly`] for [`BlockSource::get_block`] requests), such an
 /// implementation will reject all gossip as it is not fully able to verify the UTXOs referenced.
-///
-/// For efficiency, an implementation may consider caching some set of blocks, as many redundant
-/// calls may be made.
 pub trait UtxoSource : BlockSource + 'static {
        /// Fetches the block hash of the block at the given height.
        ///
@@ -91,8 +90,11 @@ pub struct GossipVerifier<S: FutureSpawner,
        peer_manager: Arc<PeerManager<Descriptor, CM, Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Self, L>>, OM, L, CMH, NS>>,
        gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Self, L>>,
        spawn: S,
+       block_cache: Arc<Mutex<VecDeque<(u32, Block)>>>,
 }
 
+const BLOCK_CACHE_SIZE: usize = 5;
+
 impl<S: FutureSpawner,
        Blocks: Deref + Send + Sync + Clone,
        L: Deref + Send + Sync,
@@ -114,34 +116,76 @@ impl<S: FutureSpawner,
        /// This is expected to be given to a [`P2PGossipSync`] (initially constructed with `None` for
        /// the UTXO lookup) via [`P2PGossipSync::add_utxo_lookup`].
        pub fn new(source: Blocks, spawn: S, gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Self, L>>, peer_manager: Arc<PeerManager<Descriptor, CM, Arc<P2PGossipSync<Arc<NetworkGraph<L>>, Self, L>>, OM, L, CMH, NS>>) -> Self {
-               Self { source, spawn, gossiper, peer_manager }
+               Self {
+                       source, spawn, gossiper, peer_manager,
+                       block_cache: Arc::new(Mutex::new(VecDeque::with_capacity(BLOCK_CACHE_SIZE))),
+               }
        }
 
-       async fn retrieve_utxo(source: Blocks, short_channel_id: u64) -> Result<TxOut, UtxoLookupError> {
+       async fn retrieve_utxo(
+               source: Blocks, block_cache: Arc<Mutex<VecDeque<(u32, Block)>>>, short_channel_id: u64
+       ) -> Result<TxOut, UtxoLookupError> {
                let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes
                let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32;
                let output_index = (short_channel_id & 0xffff) as u16;
 
-               let block_hash = source.get_block_hash_by_height(block_height).await
-                       .map_err(|_| UtxoLookupError::UnknownTx)?;
-               let block_data = source.get_block(&block_hash).await
-                       .map_err(|_| UtxoLookupError::UnknownTx)?;
-               let mut block = match block_data {
-                       BlockData::HeaderOnly(_) => return Err(UtxoLookupError::UnknownTx),
-                       BlockData::FullBlock(block) => block,
+               let (outpoint, output);
+
+               'tx_found: loop { // Used as a simple goto
+                       macro_rules! process_block {
+                               ($block: expr) => { {
+                                       if transaction_index as usize >= $block.txdata.len() {
+                                               return Err(UtxoLookupError::UnknownTx);
+                                       }
+                                       let transaction = &$block.txdata[transaction_index as usize];
+                                       if output_index as usize >= transaction.output.len() {
+                                               return Err(UtxoLookupError::UnknownTx);
+                                       }
+
+                                       outpoint = OutPoint::new(transaction.txid(), output_index.into());
+                                       output = transaction.output[output_index as usize].clone();
+                               } }
+                       }
+                       {
+                               let recent_blocks = block_cache.lock().unwrap();
+                               for (height, block) in recent_blocks.iter() {
+                                       if *height == block_height {
+                                               process_block!(block);
+                                               break 'tx_found;
+                                       }
+                               }
+                       }
+
+                       let block_hash = source.get_block_hash_by_height(block_height).await
+                               .map_err(|_| UtxoLookupError::UnknownTx)?;
+                       let block_data = source.get_block(&block_hash).await
+                               .map_err(|_| UtxoLookupError::UnknownTx)?;
+                       let block = match block_data {
+                               BlockData::HeaderOnly(_) => return Err(UtxoLookupError::UnknownTx),
+                               BlockData::FullBlock(block) => block,
+                       };
+                       process_block!(block);
+                       {
+                               let mut recent_blocks = block_cache.lock().unwrap();
+                               let mut insert = true;
+                               for (height, _) in recent_blocks.iter() {
+                                       if *height == block_height {
+                                               insert = false;
+                                       }
+                               }
+                               if insert {
+                                       if recent_blocks.len() >= BLOCK_CACHE_SIZE {
+                                               recent_blocks.pop_front();
+                                       }
+                                       recent_blocks.push_back((block_height, block));
+                               }
+                       }
+                       break 'tx_found;
                };
-               if transaction_index as usize >= block.txdata.len() {
-                       return Err(UtxoLookupError::UnknownTx);
-               }
-               let mut transaction = block.txdata.swap_remove(transaction_index as usize);
-               if output_index as usize >= transaction.output.len() {
-                       return Err(UtxoLookupError::UnknownTx);
-               }
                let outpoint_unspent =
-                       source.is_output_unspent(OutPoint::new(transaction.txid(), output_index.into())).await
-                               .map_err(|_| UtxoLookupError::UnknownTx)?;
+                       source.is_output_unspent(outpoint).await.map_err(|_| UtxoLookupError::UnknownTx)?;
                if outpoint_unspent {
-                       Ok(transaction.output.swap_remove(output_index as usize))
+                       Ok(output)
                } else {
                        Err(UtxoLookupError::UnknownTx)
                }
@@ -190,9 +234,10 @@ impl<S: FutureSpawner,
                let fut = res.clone();
                let source = self.source.clone();
                let gossiper = Arc::clone(&self.gossiper);
+               let block_cache = Arc::clone(&self.block_cache);
                let pm = Arc::clone(&self.peer_manager);
                self.spawn.spawn(async move {
-                       let res = Self::retrieve_utxo(source, short_channel_id).await;
+                       let res = Self::retrieve_utxo(source, block_cache, short_channel_id).await;
                        fut.resolve(gossiper.network_graph(), &*gossiper, res);
                        pm.process_events();
                });