Replace WatchEventProvider with chain::Filter
authorJeffrey Czyz <jkczyz@gmail.com>
Thu, 30 Jul 2020 17:27:41 +0000 (10:27 -0700)
committerJeffrey Czyz <jkczyz@gmail.com>
Thu, 1 Oct 2020 05:40:33 +0000 (22:40 -0700)
WatchEventProvider served as a means for replacing ChainWatchInterface.
However, it requires users to explicitly fetch WatchEvents, even if not
interested in them. Replace WatchEventProvider by chain::Filter, which
is an optional member of ChainMonitor. If set, interesting transactions
and output spends are registered such that blocks containing them can be
retrieved from a chain source in an efficient manner.

This is useful when the chain source is not a full node. For Electrum,
it allows for pre-filtered blocks. For BIP157/158, it serves as a means
to match against compact filters.

ARCH.md
fuzz/src/chanmon_consistency.rs
fuzz/src/full_stack.rs
lightning-net-tokio/src/lib.rs
lightning/src/chain/mod.rs
lightning/src/ln/channelmonitor.rs
lightning/src/ln/functional_test_utils.rs
lightning/src/ln/functional_tests.rs
lightning/src/util/test_utils.rs

diff --git a/ARCH.md b/ARCH.md
index b7276669a504ef3bfdd083cb82c9984676c2745b..c55e2ef74d368b7b944a93c513add5dfc3b5189d 100644 (file)
--- a/ARCH.md
+++ b/ARCH.md
@@ -54,9 +54,9 @@ At a high level, some of the common interfaces fit together as follows:
  |                    -----------------        \       _----------------   /           /
  |                    | chain::Access |         \     / | ChainMonitor |---------------
  |                    -----------------          \   /  ----------------
- |                            |                   \ /
-(as RoutingMessageHandler)    v                    v
-  \                   --------------------     ---------
-   -----------------> | NetGraphMsgHandler |   | Event |
-                      --------------------     ---------
+ |                            |                   \ /          |
+(as RoutingMessageHandler)    v                    v           v
+  \                   --------------------     ---------   -----------------
+   -----------------> | NetGraphMsgHandler |   | Event |   | chain::Filter |
+                      --------------------     ---------   -----------------
 ```
index f12c2aa374b423e774283b4876271b682fc34af2..b16e9fac7dd92ee5ff7dbf10e3bc28164393cbd0 100644 (file)
@@ -83,7 +83,7 @@ impl Writer for VecWriter {
 
 struct TestChainMonitor {
        pub logger: Arc<dyn Logger>,
-       pub chain_monitor: Arc<channelmonitor::ChainMonitor<EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
+       pub chain_monitor: Arc<channelmonitor::ChainMonitor<EnforcingChannelKeys, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
        pub update_ret: Mutex<Result<(), channelmonitor::ChannelMonitorUpdateErr>>,
        // If we reload a node with an old copy of ChannelMonitors, the ChannelManager deserialization
        // logic will automatically force-close our channels for us (as we don't have an up-to-date
@@ -96,7 +96,7 @@ struct TestChainMonitor {
 impl TestChainMonitor {
        pub fn new(broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>) -> Self {
                Self {
-                       chain_monitor: Arc::new(channelmonitor::ChainMonitor::new(broadcaster, logger.clone(), feeest)),
+                       chain_monitor: Arc::new(channelmonitor::ChainMonitor::new(None, broadcaster, logger.clone(), feeest)),
                        logger,
                        update_ret: Mutex::new(Ok(())),
                        latest_monitors: Mutex::new(HashMap::new()),
index 5980f9f172cb4b6e4097d16c60e2998b53b18583..6b878fa43a571c2edec857de50c346c0535a0001 100644 (file)
@@ -145,13 +145,13 @@ impl<'a> std::hash::Hash for Peer<'a> {
 
 type ChannelMan = ChannelManager<
        EnforcingChannelKeys,
-       Arc<channelmonitor::ChainMonitor<EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
+       Arc<channelmonitor::ChainMonitor<EnforcingChannelKeys, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
        Arc<TestBroadcaster>, Arc<KeyProvider>, Arc<FuzzEstimator>, Arc<dyn Logger>>;
 type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<NetGraphMsgHandler<Arc<dyn chain::Access>, Arc<dyn Logger>>>, Arc<dyn Logger>>;
 
 struct MoneyLossDetector<'a> {
        manager: Arc<ChannelMan>,
-       monitor: Arc<channelmonitor::ChainMonitor<EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
+       monitor: Arc<channelmonitor::ChainMonitor<EnforcingChannelKeys, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
        handler: PeerMan<'a>,
 
        peers: &'a RefCell<[bool; 256]>,
@@ -165,7 +165,7 @@ struct MoneyLossDetector<'a> {
 impl<'a> MoneyLossDetector<'a> {
        pub fn new(peers: &'a RefCell<[bool; 256]>,
                   manager: Arc<ChannelMan>,
-                  monitor: Arc<channelmonitor::ChainMonitor<EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
+                  monitor: Arc<channelmonitor::ChainMonitor<EnforcingChannelKeys, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
                   handler: PeerMan<'a>) -> Self {
                MoneyLossDetector {
                        manager,
@@ -333,7 +333,7 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {
        };
 
        let broadcast = Arc::new(TestBroadcaster{});
-       let monitor = Arc::new(channelmonitor::ChainMonitor::new(broadcast.clone(), Arc::clone(&logger), fee_est.clone()));
+       let monitor = Arc::new(channelmonitor::ChainMonitor::new(None, broadcast.clone(), Arc::clone(&logger), fee_est.clone()));
 
        let keys_manager = Arc::new(KeyProvider { node_secret: our_network_key.clone(), counter: AtomicU64::new(0) });
        let mut config = UserConfig::default();
index 2e99b71df3094e027363ba1c21b341205dab2c25..beaafcac0f28eb4460102a52adbb53878c7af841 100644 (file)
@@ -35,7 +35,8 @@
 //! type FeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator;
 //! type Logger = dyn lightning::util::logger::Logger;
 //! type ChainAccess = dyn lightning::chain::Access;
-//! type ChainMonitor = lightning::ln::channelmonitor::ChainMonitor<lightning::chain::keysinterface::InMemoryChannelKeys, Arc<TxBroadcaster>, Arc<FeeEstimator>, Arc<Logger>>;
+//! type ChainFilter = dyn lightning::chain::Filter;
+//! type ChainMonitor = lightning::ln::channelmonitor::ChainMonitor<lightning::chain::keysinterface::InMemoryChannelKeys, Arc<ChainFilter>, Arc<TxBroadcaster>, Arc<FeeEstimator>, Arc<Logger>>;
 //! type ChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor, TxBroadcaster, FeeEstimator, Logger>;
 //! type PeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<lightning_net_tokio::SocketDescriptor, ChainMonitor, TxBroadcaster, FeeEstimator, ChainAccess, Logger>;
 //!
index c77883edddf89dcb8afc404fa0ca7675e6428288..7721408c4390e035cb9836a50c9e3f277762b879 100644 (file)
@@ -95,32 +95,33 @@ pub trait Watch: Send + Sync {
        fn release_pending_monitor_events(&self) -> Vec<MonitorEvent>;
 }
 
-/// An interface for providing [`WatchEvent`]s.
+/// The `Filter` trait defines behavior for indicating chain activity of interest pertaining to
+/// channels.
 ///
-/// [`WatchEvent`]: enum.WatchEvent.html
-pub trait WatchEventProvider {
-       /// Releases events produced since the last call. Subsequent calls must only return new events.
-       fn release_pending_watch_events(&self) -> Vec<WatchEvent>;
-}
-
-/// An event indicating on-chain activity to watch for pertaining to a channel.
-pub enum WatchEvent {
-       /// Watch for a transaction with `txid` and having an output with `script_pubkey` as a spending
-       /// condition.
-       WatchTransaction {
-               /// Identifier of the transaction.
-               txid: Txid,
-
-               /// Spending condition for an output of the transaction.
-               script_pubkey: Script,
-       },
-       /// Watch for spends of a transaction output identified by `outpoint` having `script_pubkey` as
-       /// the spending condition.
-       WatchOutput {
-               /// Identifier for the output.
-               outpoint: OutPoint,
+/// This is useful in order to have a [`Watch`] implementation convey to a chain source which
+/// transactions to be notified of. Notification may take the form of pre-filtering blocks or, in
+/// the case of [BIP 157]/[BIP 158], only fetching a block if the compact filter matches. If
+/// receiving full blocks from a chain source, any further filtering is unnecessary.
+///
+/// After an output has been registered, subsequent block retrievals from the chain source must not
+/// exclude any transactions matching the new criteria nor any in-block descendants of such
+/// transactions.
+///
+/// Note that use as part of a [`Watch`] implementation involves reentrancy. Therefore, the `Filter`
+/// should not block on I/O. Implementations should instead queue the newly monitored data to be
+/// processed later. Then, in order to block until the data has been processed, any `Watch`
+/// invocation that has called the `Filter` must return [`TemporaryFailure`].
+///
+/// [`Watch`]: trait.Watch.html
+/// [`TemporaryFailure`]: ../ln/channelmonitor/enum.ChannelMonitorUpdateErr.html#variant.TemporaryFailure
+/// [BIP 157]: https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki
+/// [BIP 158]: https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki
+pub trait Filter: Send + Sync {
+       /// Registers interest in a transaction with `txid` and having an output with `script_pubkey` as
+       /// a spending condition.
+       fn register_tx(&self, txid: Txid, script_pubkey: Script);
 
-               /// Spending condition for the output.
-               script_pubkey: Script,
-       }
+       /// Registers interest in spends of a transaction output identified by `outpoint` having
+       /// `script_pubkey` as the spending condition.
+       fn register_output(&self, outpoint: OutPoint, script_pubkey: Script);
 }
index e93d1a4d583f41f4fd13e3433aac2b86bb61d332..fd6334d7029e7ffccf180e3d4a21634c5a3882f1 100644 (file)
@@ -43,6 +43,7 @@ use ln::chan_utils::{CounterpartyCommitmentSecrets, HTLCOutputInCommitment, Hold
 use ln::channelmanager::{HTLCSource, PaymentPreimage, PaymentHash};
 use ln::onchaintx::{OnchainTxHandler, InputDescriptors};
 use chain;
+use chain::Filter;
 use chain::chaininterface::{ChainWatchedUtil, BroadcasterInterface, FeeEstimator};
 use chain::transaction::OutPoint;
 use chain::keysinterface::{SpendableOutputDescriptor, ChannelKeys};
@@ -196,25 +197,49 @@ impl_writeable!(HTLCUpdate, 0, { payment_hash, payment_preimage, source });
 ///
 /// [`chain::Watch`]: ../../chain/trait.Watch.html
 /// [`ChannelManager`]: ../channelmanager/struct.ChannelManager.html
-pub struct ChainMonitor<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref>
-       where T::Target: BroadcasterInterface,
+pub struct ChainMonitor<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref>
+       where C::Target: chain::Filter,
+        T::Target: BroadcasterInterface,
         F::Target: FeeEstimator,
         L::Target: Logger,
 {
        /// The monitors
        pub monitors: Mutex<HashMap<OutPoint, ChannelMonitor<ChanSigner>>>,
-       watch_events: Mutex<WatchEventQueue>,
+       watch_events: Mutex<WatchEventCache>,
+       chain_source: Option<C>,
        broadcaster: T,
        logger: L,
        fee_estimator: F
 }
 
-struct WatchEventQueue {
+struct WatchEventCache {
        watched: ChainWatchedUtil,
-       events: Vec<chain::WatchEvent>,
+       events: Vec<WatchEvent>,
 }
 
-impl WatchEventQueue {
+/// An event indicating on-chain activity to watch for pertaining to a channel.
+enum WatchEvent {
+       /// Watch for a transaction with `txid` and having an output with `script_pubkey` as a spending
+       /// condition.
+       WatchTransaction {
+               /// Identifier of the transaction.
+               txid: Txid,
+
+               /// Spending condition for an output of the transaction.
+               script_pubkey: Script,
+       },
+       /// Watch for spends of a transaction output identified by `outpoint` having `script_pubkey` as
+       /// the spending condition.
+       WatchOutput {
+               /// Identifier for the output.
+               outpoint: OutPoint,
+
+               /// Spending condition for the output.
+               script_pubkey: Script,
+       }
+}
+
+impl WatchEventCache {
        fn new() -> Self {
                Self {
                        watched: ChainWatchedUtil::new(),
@@ -224,7 +249,7 @@ impl WatchEventQueue {
 
        fn watch_tx(&mut self, txid: &Txid, script_pubkey: &Script) {
                if self.watched.register_tx(txid, script_pubkey) {
-                       self.events.push(chain::WatchEvent::WatchTransaction {
+                       self.events.push(WatchEvent::WatchTransaction {
                                txid: *txid,
                                script_pubkey: script_pubkey.clone()
                        });
@@ -234,7 +259,7 @@ impl WatchEventQueue {
        fn watch_output(&mut self, outpoint: (&Txid, usize), script_pubkey: &Script) {
                let (txid, index) = outpoint;
                if self.watched.register_outpoint((*txid, index as u32), script_pubkey) {
-                       self.events.push(chain::WatchEvent::WatchOutput {
+                       self.events.push(WatchEvent::WatchOutput {
                                outpoint: OutPoint {
                                        txid: *txid,
                                        index: index as u16,
@@ -244,10 +269,24 @@ impl WatchEventQueue {
                }
        }
 
-       fn dequeue_events(&mut self) -> Vec<chain::WatchEvent> {
-               let mut pending_events = Vec::with_capacity(self.events.len());
-               pending_events.append(&mut self.events);
-               pending_events
+       fn flush_events<C: Deref>(&mut self, chain_source: &Option<C>) -> bool where C::Target: chain::Filter {
+               let num_events = self.events.len();
+               match chain_source {
+                       &None => self.events.clear(),
+                       &Some(ref chain_source) => {
+                               for event in self.events.drain(..) {
+                                       match event {
+                                               WatchEvent::WatchTransaction { txid, script_pubkey } => {
+                                                       chain_source.register_tx(txid, script_pubkey)
+                                               },
+                                               WatchEvent::WatchOutput { outpoint, script_pubkey } => {
+                                                       chain_source.register_output(outpoint, script_pubkey)
+                                               },
+                                       }
+                               }
+                       }
+               }
+               num_events > 0
        }
 
        fn filter_block<'a>(&self, txdata: &[(usize, &'a Transaction)]) -> Vec<(usize, &'a Transaction)> {
@@ -270,8 +309,9 @@ impl WatchEventQueue {
        }
 }
 
-impl<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref> ChainMonitor<ChanSigner, T, F, L>
-       where T::Target: BroadcasterInterface,
+impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref> ChainMonitor<ChanSigner, C, T, F, L>
+       where C::Target: chain::Filter,
+             T::Target: BroadcasterInterface,
              F::Target: FeeEstimator,
              L::Target: Logger,
 {
@@ -280,9 +320,15 @@ impl<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref> ChainMonitor<ChanSig
        /// [`ChannelMonitor::block_connected`] for details. Any HTLCs that were resolved on chain will
        /// be returned by [`chain::Watch::release_pending_monitor_events`].
        ///
+       /// Calls back to [`chain::Filter`] if any monitor indicated new outputs to watch, returning
+       /// `true` if so. Subsequent calls must not exclude any transactions matching the new outputs
+       /// nor any in-block descendants of such transactions. It is not necessary to re-fetch the block
+       /// to obtain updated `txdata`.
+       ///
        /// [`ChannelMonitor::block_connected`]: struct.ChannelMonitor.html#method.block_connected
        /// [`chain::Watch::release_pending_monitor_events`]: ../../chain/trait.Watch.html#tymethod.release_pending_monitor_events
-       pub fn block_connected(&self, header: &BlockHeader, txdata: &[(usize, &Transaction)], height: u32) {
+       /// [`chain::Filter`]: ../../chain/trait.Filter.html
+       pub fn block_connected(&self, header: &BlockHeader, txdata: &[(usize, &Transaction)], height: u32) -> bool {
                let mut watch_events = self.watch_events.lock().unwrap();
                let matched_txn = watch_events.filter_block(txdata);
                {
@@ -297,6 +343,7 @@ impl<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref> ChainMonitor<ChanSig
                                }
                        }
                }
+               watch_events.flush_events(&self.chain_source)
        }
 
        /// Dispatches to per-channel monitors, which are responsible for updating their on-chain view
@@ -312,17 +359,26 @@ impl<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref> ChainMonitor<ChanSig
        }
 }
 
-impl<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref> ChainMonitor<ChanSigner, T, F, L>
-       where T::Target: BroadcasterInterface,
+impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref> ChainMonitor<ChanSigner, C, T, F, L>
+       where C::Target: chain::Filter,
+             T::Target: BroadcasterInterface,
              F::Target: FeeEstimator,
              L::Target: Logger,
 {
-       /// Creates a new object which can be used to monitor several channels given the chain
-       /// interface with which to register to receive notifications.
-       pub fn new(broadcaster: T, logger: L, feeest: F) -> Self {
+       /// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels.
+       ///
+       /// When an optional chain source implementing [`chain::Filter`] is provided, the chain monitor
+       /// will call back to it indicating transactions and outputs of interest. This allows clients to
+       /// pre-filter blocks or only fetch blocks matching a compact filter. Otherwise, clients may
+       /// always need to fetch full blocks absent another means for determining which blocks contain
+       /// transactions relevant to the watched channels.
+       ///
+       /// [`chain::Filter`]: ../../chain/trait.Filter.html
+       pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F) -> Self {
                Self {
                        monitors: Mutex::new(HashMap::new()),
-                       watch_events: Mutex::new(WatchEventQueue::new()),
+                       watch_events: Mutex::new(WatchEventCache::new()),
+                       chain_source,
                        broadcaster,
                        logger,
                        fee_estimator: feeest,
@@ -330,6 +386,10 @@ impl<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref> ChainMonitor<ChanSig
        }
 
        /// Adds the monitor that watches the channel referred to by the given outpoint.
+       ///
+       /// Calls back to [`chain::Filter`] with the funding transaction and outputs to watch.
+       ///
+       /// [`chain::Filter`]: ../../chain/trait.Filter.html
        fn add_monitor(&self, outpoint: OutPoint, monitor: ChannelMonitor<ChanSigner>) -> Result<(), MonitorUpdateError> {
                let mut watch_events = self.watch_events.lock().unwrap();
                let mut monitors = self.monitors.lock().unwrap();
@@ -349,6 +409,7 @@ impl<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref> ChainMonitor<ChanSig
                        }
                }
                entry.insert(monitor);
+               watch_events.flush_events(&self.chain_source);
                Ok(())
        }
 
@@ -365,8 +426,9 @@ impl<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref> ChainMonitor<ChanSig
        }
 }
 
-impl<ChanSigner: ChannelKeys, T: Deref + Sync + Send, F: Deref + Sync + Send, L: Deref + Sync + Send> chain::Watch for ChainMonitor<ChanSigner, T, F, L>
-       where T::Target: BroadcasterInterface,
+impl<ChanSigner: ChannelKeys, C: Deref + Sync + Send, T: Deref + Sync + Send, F: Deref + Sync + Send, L: Deref + Sync + Send> chain::Watch for ChainMonitor<ChanSigner, C, T, F, L>
+       where C::Target: chain::Filter,
+             T::Target: BroadcasterInterface,
              F::Target: FeeEstimator,
              L::Target: Logger,
 {
@@ -395,8 +457,9 @@ impl<ChanSigner: ChannelKeys, T: Deref + Sync + Send, F: Deref + Sync + Send, L:
        }
 }
 
-impl<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref> events::EventsProvider for ChainMonitor<ChanSigner, T, F, L>
-       where T::Target: BroadcasterInterface,
+impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref> events::EventsProvider for ChainMonitor<ChanSigner, C, T, F, L>
+       where C::Target: chain::Filter,
+             T::Target: BroadcasterInterface,
              F::Target: FeeEstimator,
              L::Target: Logger,
 {
@@ -409,16 +472,6 @@ impl<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref> events::EventsProvid
        }
 }
 
-impl<ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref> chain::WatchEventProvider for ChainMonitor<ChanSigner, T, F, L>
-       where T::Target: BroadcasterInterface,
-             F::Target: FeeEstimator,
-             L::Target: Logger,
-{
-       fn release_pending_watch_events(&self) -> Vec<chain::WatchEvent> {
-               self.watch_events.lock().unwrap().dequeue_events()
-       }
-}
-
 /// If an HTLC expires within this many blocks, don't try to claim it in a shared transaction,
 /// instead claiming it in its own individual transaction.
 pub(crate) const CLTV_SHARED_CLAIM_BUFFER: u32 = 12;
index 44430a6453b4a3c52180218360f15aa57c666e94..aad56ff3cb4953e6b8c2610659748c466dcef81a 100644 (file)
@@ -10,7 +10,6 @@
 //! A bunch of useful utilities for building networks of nodes and exchanging messages between
 //! nodes for functional tests.
 
-use chain;
 use chain::Watch;
 use chain::transaction::OutPoint;
 use ln::channelmanager::{ChannelManager, ChannelManagerReadArgs, RAACommitmentOrder, PaymentPreimage, PaymentHash, PaymentSecret, PaymentSendFailure};
@@ -81,28 +80,11 @@ pub fn connect_blocks<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, depth: u32, he
 }
 
 pub fn connect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, block: &Block, height: u32) {
-       use chain::WatchEventProvider;
-
-       let watch_events = node.chain_monitor.chain_monitor.release_pending_watch_events();
-       process_chain_watch_events(&watch_events);
-
        let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
-       loop {
-               node.chain_monitor.chain_monitor.block_connected(&block.header, &txdata, height);
-
-               let watch_events = node.chain_monitor.chain_monitor.release_pending_watch_events();
-               process_chain_watch_events(&watch_events);
-
-               if watch_events.is_empty() {
-                       break;
-               }
-       }
-
+       while node.chain_monitor.chain_monitor.block_connected(&block.header, &txdata, height) {}
        node.node.block_connected(&block.header, &txdata, height);
 }
 
-fn process_chain_watch_events(_events: &Vec<chain::WatchEvent>) {}
-
 pub fn disconnect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, header: &BlockHeader, height: u32) {
        node.chain_monitor.chain_monitor.block_disconnected(header, height);
        node.node.block_disconnected(header);
@@ -215,12 +197,15 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> {
                                }).unwrap();
                        }
 
-                       let channel_monitor = test_utils::TestChainMonitor::new(self.tx_broadcaster.clone(), &self.logger, &feeest);
+                       let chain_source = test_utils::TestChainSource::new(Network::Testnet);
+                       let chain_monitor = test_utils::TestChainMonitor::new(Some(&chain_source), self.tx_broadcaster.clone(), &self.logger, &feeest);
                        for deserialized_monitor in deserialized_monitors.drain(..) {
-                               if let Err(_) = channel_monitor.watch_channel(deserialized_monitor.get_funding_txo().0, deserialized_monitor) {
+                               if let Err(_) = chain_monitor.watch_channel(deserialized_monitor.get_funding_txo().0, deserialized_monitor) {
                                        panic!();
                                }
                        }
+                       assert_eq!(*chain_source.watched_txn.lock().unwrap(), *self.chain_source.watched_txn.lock().unwrap());
+                       assert_eq!(*chain_source.watched_outputs.lock().unwrap(), *self.chain_source.watched_outputs.lock().unwrap());
                }
        }
 }
@@ -1120,7 +1105,7 @@ pub fn create_node_cfgs<'a>(node_count: usize, chanmon_cfgs: &'a Vec<TestChanMon
        for i in 0..node_count {
                let seed = [i as u8; 32];
                let keys_manager = test_utils::TestKeysInterface::new(&seed, Network::Testnet);
-               let chain_monitor = test_utils::TestChainMonitor::new(&chanmon_cfgs[i].tx_broadcaster, &chanmon_cfgs[i].logger, &chanmon_cfgs[i].fee_estimator);
+               let chain_monitor = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[i].chain_source), &chanmon_cfgs[i].tx_broadcaster, &chanmon_cfgs[i].logger, &chanmon_cfgs[i].fee_estimator);
                nodes.push(NodeCfg { chain_source: &chanmon_cfgs[i].chain_source, logger: &chanmon_cfgs[i].logger, tx_broadcaster: &chanmon_cfgs[i].tx_broadcaster, fee_estimator: &chanmon_cfgs[i].fee_estimator, chain_monitor, keys_manager, node_seed: seed });
        }
 
index 03fedbf33f7dfcd5c660300a5d2a7b24adaf8346..93a09b079803f76bc9b8e4a0c74c08e4264d34ae 100644 (file)
@@ -4322,7 +4322,7 @@ fn test_no_txn_manager_serialize_deserialize() {
 
        logger = test_utils::TestLogger::new();
        fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 };
-       new_chain_monitor = test_utils::TestChainMonitor::new(nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator);
+       new_chain_monitor = test_utils::TestChainMonitor::new(Some(nodes[0].chain_source), nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator);
        nodes[0].chain_monitor = &new_chain_monitor;
        let mut chan_0_monitor_read = &chan_0_monitor_serialized.0[..];
        let (_, mut chan_0_monitor) = <(BlockHash, ChannelMonitor<EnforcingChannelKeys>)>::read(&mut chan_0_monitor_read).unwrap();
@@ -4429,7 +4429,7 @@ fn test_manager_serialize_deserialize_events() {
 
        fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 };
        logger = test_utils::TestLogger::new();
-       new_chain_monitor = test_utils::TestChainMonitor::new(nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator);
+       new_chain_monitor = test_utils::TestChainMonitor::new(Some(nodes[0].chain_source), nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator);
        nodes[0].chain_monitor = &new_chain_monitor;
        let mut chan_0_monitor_read = &chan_0_monitor_serialized.0[..];
        let (_, mut chan_0_monitor) = <(BlockHash, ChannelMonitor<EnforcingChannelKeys>)>::read(&mut chan_0_monitor_read).unwrap();
@@ -4519,7 +4519,7 @@ fn test_simple_manager_serialize_deserialize() {
 
        logger = test_utils::TestLogger::new();
        fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 };
-       new_chain_monitor = test_utils::TestChainMonitor::new(nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator);
+       new_chain_monitor = test_utils::TestChainMonitor::new(Some(nodes[0].chain_source), nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator);
        nodes[0].chain_monitor = &new_chain_monitor;
        let mut chan_0_monitor_read = &chan_0_monitor_serialized.0[..];
        let (_, mut chan_0_monitor) = <(BlockHash, ChannelMonitor<EnforcingChannelKeys>)>::read(&mut chan_0_monitor_read).unwrap();
@@ -4597,7 +4597,7 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() {
 
        logger = test_utils::TestLogger::new();
        fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 };
-       new_chain_monitor = test_utils::TestChainMonitor::new(nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator);
+       new_chain_monitor = test_utils::TestChainMonitor::new(Some(nodes[0].chain_source), nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator);
        nodes[0].chain_monitor = &new_chain_monitor;
 
        let mut node_0_stale_monitors = Vec::new();
@@ -5743,7 +5743,7 @@ fn test_key_derivation_params() {
        // We manually create the node configuration to backup the seed.
        let seed = [42; 32];
        let keys_manager = test_utils::TestKeysInterface::new(&seed, Network::Testnet);
-       let chain_monitor = test_utils::TestChainMonitor::new(&chanmon_cfgs[0].tx_broadcaster, &chanmon_cfgs[0].logger, &chanmon_cfgs[0].fee_estimator);
+       let chain_monitor = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[0].chain_source), &chanmon_cfgs[0].tx_broadcaster, &chanmon_cfgs[0].logger, &chanmon_cfgs[0].fee_estimator);
        let node = NodeCfg { chain_source: &chanmon_cfgs[0].chain_source, logger: &chanmon_cfgs[0].logger, tx_broadcaster: &chanmon_cfgs[0].tx_broadcaster, fee_estimator: &chanmon_cfgs[0].fee_estimator, chain_monitor, keys_manager, node_seed: seed };
        let mut node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
        node_cfgs.remove(0);
@@ -7494,7 +7494,7 @@ fn test_data_loss_protect() {
        tx_broadcaster = test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new())};
        fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 };
        keys_manager = test_utils::TestKeysInterface::new(&nodes[0].node_seed, Network::Testnet);
-       monitor = test_utils::TestChainMonitor::new(&tx_broadcaster, &logger, &fee_estimator);
+       monitor = test_utils::TestChainMonitor::new(Some(&chain_source), &tx_broadcaster, &logger, &fee_estimator);
        node_state_0 = {
                let mut channel_monitors = HashMap::new();
                channel_monitors.insert(OutPoint { txid: chan.3.txid(), index: 0 }, &mut chain_monitor);
@@ -8353,6 +8353,7 @@ fn test_update_err_monitor_lockdown() {
        let preimage = route_payment(&nodes[0], &vec!(&nodes[1])[..], 9_000_000).0;
 
        // Copy ChainMonitor to simulate a watchtower and update block height of node 0 until its ChannelMonitor timeout HTLC onchain
+       let chain_source = test_utils::TestChainSource::new(Network::Testnet);
        let logger = test_utils::TestLogger::with_id(format!("node {}", 0));
        let watchtower = {
                let monitors = nodes[0].chain_monitor.chain_monitor.monitors.lock().unwrap();
@@ -8362,7 +8363,7 @@ fn test_update_err_monitor_lockdown() {
                let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor<EnforcingChannelKeys>)>::read(
                                &mut ::std::io::Cursor::new(&w.0)).unwrap().1;
                assert!(new_monitor == *monitor);
-               let watchtower = test_utils::TestChainMonitor::new(&chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator);
+               let watchtower = test_utils::TestChainMonitor::new(Some(&chain_source), &chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator);
                assert!(watchtower.watch_channel(outpoint, new_monitor).is_ok());
                watchtower
        };
@@ -8410,6 +8411,7 @@ fn test_concurrent_monitor_claim() {
        route_payment(&nodes[0], &vec!(&nodes[1])[..], 9_000_000).0;
 
        // Copy ChainMonitor to simulate watchtower Alice and update block height her ChannelMonitor timeout HTLC onchain
+       let chain_source = test_utils::TestChainSource::new(Network::Testnet);
        let logger = test_utils::TestLogger::with_id(format!("node {}", "Alice"));
        let watchtower_alice = {
                let monitors = nodes[0].chain_monitor.chain_monitor.monitors.lock().unwrap();
@@ -8419,7 +8421,7 @@ fn test_concurrent_monitor_claim() {
                let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor<EnforcingChannelKeys>)>::read(
                                &mut ::std::io::Cursor::new(&w.0)).unwrap().1;
                assert!(new_monitor == *monitor);
-               let watchtower = test_utils::TestChainMonitor::new(&chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator);
+               let watchtower = test_utils::TestChainMonitor::new(Some(&chain_source), &chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator);
                assert!(watchtower.watch_channel(outpoint, new_monitor).is_ok());
                watchtower
        };
@@ -8434,6 +8436,7 @@ fn test_concurrent_monitor_claim() {
        }
 
        // Copy ChainMonitor to simulate watchtower Bob and make it receive a commitment update first.
+       let chain_source = test_utils::TestChainSource::new(Network::Testnet);
        let logger = test_utils::TestLogger::with_id(format!("node {}", "Bob"));
        let watchtower_bob = {
                let monitors = nodes[0].chain_monitor.chain_monitor.monitors.lock().unwrap();
@@ -8443,7 +8446,7 @@ fn test_concurrent_monitor_claim() {
                let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor<EnforcingChannelKeys>)>::read(
                                &mut ::std::io::Cursor::new(&w.0)).unwrap().1;
                assert!(new_monitor == *monitor);
-               let watchtower = test_utils::TestChainMonitor::new(&chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator);
+               let watchtower = test_utils::TestChainMonitor::new(Some(&chain_source), &chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator);
                assert!(watchtower.watch_channel(outpoint, new_monitor).is_ok());
                watchtower
        };
index bc084b77ea07f6b946ef0c7ad14e0720a9ce24d9..6cdfb7f9d63cb5708f414472189ec23af5ef63ad 100644 (file)
@@ -27,7 +27,7 @@ use bitcoin::blockdata::transaction::{Transaction, TxOut};
 use bitcoin::blockdata::script::{Builder, Script};
 use bitcoin::blockdata::opcodes;
 use bitcoin::network::constants::Network;
-use bitcoin::hash_types::BlockHash;
+use bitcoin::hash_types::{BlockHash, Txid};
 
 use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1, Signature};
 
@@ -37,7 +37,7 @@ use std::time::Duration;
 use std::sync::Mutex;
 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
 use std::{cmp, mem};
-use std::collections::HashMap;
+use std::collections::{HashMap, HashSet};
 
 pub struct TestVecWriter(pub Vec<u8>);
 impl Writer for TestVecWriter {
@@ -62,18 +62,18 @@ impl chaininterface::FeeEstimator for TestFeeEstimator {
 pub struct TestChainMonitor<'a> {
        pub added_monitors: Mutex<Vec<(OutPoint, channelmonitor::ChannelMonitor<EnforcingChannelKeys>)>>,
        pub latest_monitor_update_id: Mutex<HashMap<[u8; 32], (OutPoint, u64)>>,
-       pub chain_monitor: channelmonitor::ChainMonitor<EnforcingChannelKeys, &'a chaininterface::BroadcasterInterface, &'a TestFeeEstimator, &'a TestLogger>,
+       pub chain_monitor: channelmonitor::ChainMonitor<EnforcingChannelKeys, &'a TestChainSource, &'a chaininterface::BroadcasterInterface, &'a TestFeeEstimator, &'a TestLogger>,
        pub update_ret: Mutex<Result<(), channelmonitor::ChannelMonitorUpdateErr>>,
        // If this is set to Some(), after the next return, we'll always return this until update_ret
        // is changed:
        pub next_update_ret: Mutex<Option<Result<(), channelmonitor::ChannelMonitorUpdateErr>>>,
 }
 impl<'a> TestChainMonitor<'a> {
-       pub fn new(broadcaster: &'a chaininterface::BroadcasterInterface, logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator) -> Self {
+       pub fn new(chain_source: Option<&'a TestChainSource>, broadcaster: &'a chaininterface::BroadcasterInterface, logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator) -> Self {
                Self {
                        added_monitors: Mutex::new(Vec::new()),
                        latest_monitor_update_id: Mutex::new(HashMap::new()),
-                       chain_monitor: channelmonitor::ChainMonitor::new(broadcaster, logger, fee_estimator),
+                       chain_monitor: channelmonitor::ChainMonitor::new(chain_source, broadcaster, logger, fee_estimator),
                        update_ret: Mutex::new(Ok(())),
                        next_update_ret: Mutex::new(None),
                }
@@ -396,6 +396,8 @@ impl TestKeysInterface {
 pub struct TestChainSource {
        pub genesis_hash: BlockHash,
        pub utxo_ret: Mutex<Result<TxOut, chain::AccessError>>,
+       pub watched_txn: Mutex<HashSet<(Txid, Script)>>,
+       pub watched_outputs: Mutex<HashSet<(OutPoint, Script)>>,
 }
 
 impl TestChainSource {
@@ -404,6 +406,8 @@ impl TestChainSource {
                Self {
                        genesis_hash: genesis_block(network).block_hash(),
                        utxo_ret: Mutex::new(Ok(TxOut { value: u64::max_value(), script_pubkey })),
+                       watched_txn: Mutex::new(HashSet::new()),
+                       watched_outputs: Mutex::new(HashSet::new()),
                }
        }
 }
@@ -417,3 +421,13 @@ impl chain::Access for TestChainSource {
                self.utxo_ret.lock().unwrap().clone()
        }
 }
+
+impl chain::Filter for TestChainSource {
+       fn register_tx(&self, txid: Txid, script_pubkey: Script) {
+               self.watched_txn.lock().unwrap().insert((txid, script_pubkey));
+       }
+
+       fn register_output(&self, outpoint: OutPoint, script_pubkey: Script) {
+               self.watched_outputs.lock().unwrap().insert((outpoint, script_pubkey));
+       }
+}