From: Jeffrey Czyz Date: Thu, 30 Jul 2020 17:27:41 +0000 (-0700) Subject: Replace WatchEventProvider with chain::Filter X-Git-Tag: v0.0.12~21^2~8 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=98bc46beb9d364915c7c43c96b154b0d5efa0ba3;p=rust-lightning Replace WatchEventProvider with chain::Filter WatchEventProvider served as a means for replacing ChainWatchInterface. However, it requires users to explicitly fetch WatchEvents, even if not interested in them. Replace WatchEventProvider by chain::Filter, which is an optional member of ChainMonitor. If set, interesting transactions and output spends are registered such that blocks containing them can be retrieved from a chain source in an efficient manner. This is useful when the chain source is not a full node. For Electrum, it allows for pre-filtered blocks. For BIP157/158, it serves as a means to match against compact filters. --- diff --git a/ARCH.md b/ARCH.md index b7276669a..c55e2ef74 100644 --- a/ARCH.md +++ b/ARCH.md @@ -54,9 +54,9 @@ At a high level, some of the common interfaces fit together as follows: | ----------------- \ _---------------- / / | | chain::Access | \ / | ChainMonitor |--------------- | ----------------- \ / ---------------- - | | \ / -(as RoutingMessageHandler) v v - \ -------------------- --------- - -----------------> | NetGraphMsgHandler | | Event | - -------------------- --------- + | | \ / | +(as RoutingMessageHandler) v v v + \ -------------------- --------- ----------------- + -----------------> | NetGraphMsgHandler | | Event | | chain::Filter | + -------------------- --------- ----------------- ``` diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index f12c2aa37..b16e9fac7 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -83,7 +83,7 @@ impl Writer for VecWriter { struct TestChainMonitor { pub logger: Arc, - pub chain_monitor: Arc, Arc, Arc>>, + pub chain_monitor: Arc, Arc, Arc, Arc>>, pub update_ret: Mutex>, // If we reload a node with an old copy of ChannelMonitors, the ChannelManager deserialization // logic will automatically force-close our channels for us (as we don't have an up-to-date @@ -96,7 +96,7 @@ struct TestChainMonitor { impl TestChainMonitor { pub fn new(broadcaster: Arc, logger: Arc, feeest: Arc) -> Self { Self { - chain_monitor: Arc::new(channelmonitor::ChainMonitor::new(broadcaster, logger.clone(), feeest)), + chain_monitor: Arc::new(channelmonitor::ChainMonitor::new(None, broadcaster, logger.clone(), feeest)), logger, update_ret: Mutex::new(Ok(())), latest_monitors: Mutex::new(HashMap::new()), diff --git a/fuzz/src/full_stack.rs b/fuzz/src/full_stack.rs index 5980f9f17..6b878fa43 100644 --- a/fuzz/src/full_stack.rs +++ b/fuzz/src/full_stack.rs @@ -145,13 +145,13 @@ impl<'a> std::hash::Hash for Peer<'a> { type ChannelMan = ChannelManager< EnforcingChannelKeys, - Arc, Arc, Arc>>, + Arc, Arc, Arc, Arc>>, Arc, Arc, Arc, Arc>; type PeerMan<'a> = PeerManager, Arc, Arc, Arc>>, Arc>; struct MoneyLossDetector<'a> { manager: Arc, - monitor: Arc, Arc, Arc>>, + monitor: Arc, Arc, Arc, Arc>>, handler: PeerMan<'a>, peers: &'a RefCell<[bool; 256]>, @@ -165,7 +165,7 @@ struct MoneyLossDetector<'a> { impl<'a> MoneyLossDetector<'a> { pub fn new(peers: &'a RefCell<[bool; 256]>, manager: Arc, - monitor: Arc, Arc, Arc>>, + monitor: Arc, Arc, Arc, Arc>>, handler: PeerMan<'a>) -> Self { MoneyLossDetector { manager, @@ -333,7 +333,7 @@ pub fn do_test(data: &[u8], logger: &Arc) { }; let broadcast = Arc::new(TestBroadcaster{}); - let monitor = Arc::new(channelmonitor::ChainMonitor::new(broadcast.clone(), Arc::clone(&logger), fee_est.clone())); + let monitor = Arc::new(channelmonitor::ChainMonitor::new(None, broadcast.clone(), Arc::clone(&logger), fee_est.clone())); let keys_manager = Arc::new(KeyProvider { node_secret: our_network_key.clone(), counter: AtomicU64::new(0) }); let mut config = UserConfig::default(); diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 2e99b71df..beaafcac0 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -35,7 +35,8 @@ //! type FeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator; //! type Logger = dyn lightning::util::logger::Logger; //! type ChainAccess = dyn lightning::chain::Access; -//! type ChainMonitor = lightning::ln::channelmonitor::ChainMonitor, Arc, Arc>; +//! type ChainFilter = dyn lightning::chain::Filter; +//! type ChainMonitor = lightning::ln::channelmonitor::ChainMonitor, Arc, Arc, Arc>; //! type ChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager; //! type PeerManager = lightning::ln::peer_handler::SimpleArcPeerManager; //! diff --git a/lightning/src/chain/mod.rs b/lightning/src/chain/mod.rs index c77883edd..7721408c4 100644 --- a/lightning/src/chain/mod.rs +++ b/lightning/src/chain/mod.rs @@ -95,32 +95,33 @@ pub trait Watch: Send + Sync { fn release_pending_monitor_events(&self) -> Vec; } -/// An interface for providing [`WatchEvent`]s. +/// The `Filter` trait defines behavior for indicating chain activity of interest pertaining to +/// channels. /// -/// [`WatchEvent`]: enum.WatchEvent.html -pub trait WatchEventProvider { - /// Releases events produced since the last call. Subsequent calls must only return new events. - fn release_pending_watch_events(&self) -> Vec; -} - -/// An event indicating on-chain activity to watch for pertaining to a channel. -pub enum WatchEvent { - /// Watch for a transaction with `txid` and having an output with `script_pubkey` as a spending - /// condition. - WatchTransaction { - /// Identifier of the transaction. - txid: Txid, - - /// Spending condition for an output of the transaction. - script_pubkey: Script, - }, - /// Watch for spends of a transaction output identified by `outpoint` having `script_pubkey` as - /// the spending condition. - WatchOutput { - /// Identifier for the output. - outpoint: OutPoint, +/// This is useful in order to have a [`Watch`] implementation convey to a chain source which +/// transactions to be notified of. Notification may take the form of pre-filtering blocks or, in +/// the case of [BIP 157]/[BIP 158], only fetching a block if the compact filter matches. If +/// receiving full blocks from a chain source, any further filtering is unnecessary. +/// +/// After an output has been registered, subsequent block retrievals from the chain source must not +/// exclude any transactions matching the new criteria nor any in-block descendants of such +/// transactions. +/// +/// Note that use as part of a [`Watch`] implementation involves reentrancy. Therefore, the `Filter` +/// should not block on I/O. Implementations should instead queue the newly monitored data to be +/// processed later. Then, in order to block until the data has been processed, any `Watch` +/// invocation that has called the `Filter` must return [`TemporaryFailure`]. +/// +/// [`Watch`]: trait.Watch.html +/// [`TemporaryFailure`]: ../ln/channelmonitor/enum.ChannelMonitorUpdateErr.html#variant.TemporaryFailure +/// [BIP 157]: https://github.com/bitcoin/bips/blob/master/bip-0157.mediawiki +/// [BIP 158]: https://github.com/bitcoin/bips/blob/master/bip-0158.mediawiki +pub trait Filter: Send + Sync { + /// Registers interest in a transaction with `txid` and having an output with `script_pubkey` as + /// a spending condition. + fn register_tx(&self, txid: Txid, script_pubkey: Script); - /// Spending condition for the output. - script_pubkey: Script, - } + /// Registers interest in spends of a transaction output identified by `outpoint` having + /// `script_pubkey` as the spending condition. + fn register_output(&self, outpoint: OutPoint, script_pubkey: Script); } diff --git a/lightning/src/ln/channelmonitor.rs b/lightning/src/ln/channelmonitor.rs index e93d1a4d5..fd6334d70 100644 --- a/lightning/src/ln/channelmonitor.rs +++ b/lightning/src/ln/channelmonitor.rs @@ -43,6 +43,7 @@ use ln::chan_utils::{CounterpartyCommitmentSecrets, HTLCOutputInCommitment, Hold use ln::channelmanager::{HTLCSource, PaymentPreimage, PaymentHash}; use ln::onchaintx::{OnchainTxHandler, InputDescriptors}; use chain; +use chain::Filter; use chain::chaininterface::{ChainWatchedUtil, BroadcasterInterface, FeeEstimator}; use chain::transaction::OutPoint; use chain::keysinterface::{SpendableOutputDescriptor, ChannelKeys}; @@ -196,25 +197,49 @@ impl_writeable!(HTLCUpdate, 0, { payment_hash, payment_preimage, source }); /// /// [`chain::Watch`]: ../../chain/trait.Watch.html /// [`ChannelManager`]: ../channelmanager/struct.ChannelManager.html -pub struct ChainMonitor - where T::Target: BroadcasterInterface, +pub struct ChainMonitor + where C::Target: chain::Filter, + T::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, { /// The monitors pub monitors: Mutex>>, - watch_events: Mutex, + watch_events: Mutex, + chain_source: Option, broadcaster: T, logger: L, fee_estimator: F } -struct WatchEventQueue { +struct WatchEventCache { watched: ChainWatchedUtil, - events: Vec, + events: Vec, } -impl WatchEventQueue { +/// An event indicating on-chain activity to watch for pertaining to a channel. +enum WatchEvent { + /// Watch for a transaction with `txid` and having an output with `script_pubkey` as a spending + /// condition. + WatchTransaction { + /// Identifier of the transaction. + txid: Txid, + + /// Spending condition for an output of the transaction. + script_pubkey: Script, + }, + /// Watch for spends of a transaction output identified by `outpoint` having `script_pubkey` as + /// the spending condition. + WatchOutput { + /// Identifier for the output. + outpoint: OutPoint, + + /// Spending condition for the output. + script_pubkey: Script, + } +} + +impl WatchEventCache { fn new() -> Self { Self { watched: ChainWatchedUtil::new(), @@ -224,7 +249,7 @@ impl WatchEventQueue { fn watch_tx(&mut self, txid: &Txid, script_pubkey: &Script) { if self.watched.register_tx(txid, script_pubkey) { - self.events.push(chain::WatchEvent::WatchTransaction { + self.events.push(WatchEvent::WatchTransaction { txid: *txid, script_pubkey: script_pubkey.clone() }); @@ -234,7 +259,7 @@ impl WatchEventQueue { fn watch_output(&mut self, outpoint: (&Txid, usize), script_pubkey: &Script) { let (txid, index) = outpoint; if self.watched.register_outpoint((*txid, index as u32), script_pubkey) { - self.events.push(chain::WatchEvent::WatchOutput { + self.events.push(WatchEvent::WatchOutput { outpoint: OutPoint { txid: *txid, index: index as u16, @@ -244,10 +269,24 @@ impl WatchEventQueue { } } - fn dequeue_events(&mut self) -> Vec { - let mut pending_events = Vec::with_capacity(self.events.len()); - pending_events.append(&mut self.events); - pending_events + fn flush_events(&mut self, chain_source: &Option) -> bool where C::Target: chain::Filter { + let num_events = self.events.len(); + match chain_source { + &None => self.events.clear(), + &Some(ref chain_source) => { + for event in self.events.drain(..) { + match event { + WatchEvent::WatchTransaction { txid, script_pubkey } => { + chain_source.register_tx(txid, script_pubkey) + }, + WatchEvent::WatchOutput { outpoint, script_pubkey } => { + chain_source.register_output(outpoint, script_pubkey) + }, + } + } + } + } + num_events > 0 } fn filter_block<'a>(&self, txdata: &[(usize, &'a Transaction)]) -> Vec<(usize, &'a Transaction)> { @@ -270,8 +309,9 @@ impl WatchEventQueue { } } -impl ChainMonitor - where T::Target: BroadcasterInterface, +impl ChainMonitor + where C::Target: chain::Filter, + T::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, { @@ -280,9 +320,15 @@ impl ChainMonitor bool { let mut watch_events = self.watch_events.lock().unwrap(); let matched_txn = watch_events.filter_block(txdata); { @@ -297,6 +343,7 @@ impl ChainMonitor ChainMonitor ChainMonitor - where T::Target: BroadcasterInterface, +impl ChainMonitor + where C::Target: chain::Filter, + T::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, { - /// Creates a new object which can be used to monitor several channels given the chain - /// interface with which to register to receive notifications. - pub fn new(broadcaster: T, logger: L, feeest: F) -> Self { + /// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels. + /// + /// When an optional chain source implementing [`chain::Filter`] is provided, the chain monitor + /// will call back to it indicating transactions and outputs of interest. This allows clients to + /// pre-filter blocks or only fetch blocks matching a compact filter. Otherwise, clients may + /// always need to fetch full blocks absent another means for determining which blocks contain + /// transactions relevant to the watched channels. + /// + /// [`chain::Filter`]: ../../chain/trait.Filter.html + pub fn new(chain_source: Option, broadcaster: T, logger: L, feeest: F) -> Self { Self { monitors: Mutex::new(HashMap::new()), - watch_events: Mutex::new(WatchEventQueue::new()), + watch_events: Mutex::new(WatchEventCache::new()), + chain_source, broadcaster, logger, fee_estimator: feeest, @@ -330,6 +386,10 @@ impl ChainMonitor) -> Result<(), MonitorUpdateError> { let mut watch_events = self.watch_events.lock().unwrap(); let mut monitors = self.monitors.lock().unwrap(); @@ -349,6 +409,7 @@ impl ChainMonitor ChainMonitor chain::Watch for ChainMonitor - where T::Target: BroadcasterInterface, +impl chain::Watch for ChainMonitor + where C::Target: chain::Filter, + T::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, { @@ -395,8 +457,9 @@ impl events::EventsProvider for ChainMonitor - where T::Target: BroadcasterInterface, +impl events::EventsProvider for ChainMonitor + where C::Target: chain::Filter, + T::Target: BroadcasterInterface, F::Target: FeeEstimator, L::Target: Logger, { @@ -409,16 +472,6 @@ impl events::EventsProvid } } -impl chain::WatchEventProvider for ChainMonitor - where T::Target: BroadcasterInterface, - F::Target: FeeEstimator, - L::Target: Logger, -{ - fn release_pending_watch_events(&self) -> Vec { - self.watch_events.lock().unwrap().dequeue_events() - } -} - /// If an HTLC expires within this many blocks, don't try to claim it in a shared transaction, /// instead claiming it in its own individual transaction. pub(crate) const CLTV_SHARED_CLAIM_BUFFER: u32 = 12; diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index 44430a645..aad56ff3c 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -10,7 +10,6 @@ //! A bunch of useful utilities for building networks of nodes and exchanging messages between //! nodes for functional tests. -use chain; use chain::Watch; use chain::transaction::OutPoint; use ln::channelmanager::{ChannelManager, ChannelManagerReadArgs, RAACommitmentOrder, PaymentPreimage, PaymentHash, PaymentSecret, PaymentSendFailure}; @@ -81,28 +80,11 @@ pub fn connect_blocks<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, depth: u32, he } pub fn connect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, block: &Block, height: u32) { - use chain::WatchEventProvider; - - let watch_events = node.chain_monitor.chain_monitor.release_pending_watch_events(); - process_chain_watch_events(&watch_events); - let txdata: Vec<_> = block.txdata.iter().enumerate().collect(); - loop { - node.chain_monitor.chain_monitor.block_connected(&block.header, &txdata, height); - - let watch_events = node.chain_monitor.chain_monitor.release_pending_watch_events(); - process_chain_watch_events(&watch_events); - - if watch_events.is_empty() { - break; - } - } - + while node.chain_monitor.chain_monitor.block_connected(&block.header, &txdata, height) {} node.node.block_connected(&block.header, &txdata, height); } -fn process_chain_watch_events(_events: &Vec) {} - pub fn disconnect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, header: &BlockHeader, height: u32) { node.chain_monitor.chain_monitor.block_disconnected(header, height); node.node.block_disconnected(header); @@ -215,12 +197,15 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> { }).unwrap(); } - let channel_monitor = test_utils::TestChainMonitor::new(self.tx_broadcaster.clone(), &self.logger, &feeest); + let chain_source = test_utils::TestChainSource::new(Network::Testnet); + let chain_monitor = test_utils::TestChainMonitor::new(Some(&chain_source), self.tx_broadcaster.clone(), &self.logger, &feeest); for deserialized_monitor in deserialized_monitors.drain(..) { - if let Err(_) = channel_monitor.watch_channel(deserialized_monitor.get_funding_txo().0, deserialized_monitor) { + if let Err(_) = chain_monitor.watch_channel(deserialized_monitor.get_funding_txo().0, deserialized_monitor) { panic!(); } } + assert_eq!(*chain_source.watched_txn.lock().unwrap(), *self.chain_source.watched_txn.lock().unwrap()); + assert_eq!(*chain_source.watched_outputs.lock().unwrap(), *self.chain_source.watched_outputs.lock().unwrap()); } } } @@ -1120,7 +1105,7 @@ pub fn create_node_cfgs<'a>(node_count: usize, chanmon_cfgs: &'a Vec)>::read(&mut chan_0_monitor_read).unwrap(); @@ -4429,7 +4429,7 @@ fn test_manager_serialize_deserialize_events() { fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 }; logger = test_utils::TestLogger::new(); - new_chain_monitor = test_utils::TestChainMonitor::new(nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator); + new_chain_monitor = test_utils::TestChainMonitor::new(Some(nodes[0].chain_source), nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator); nodes[0].chain_monitor = &new_chain_monitor; let mut chan_0_monitor_read = &chan_0_monitor_serialized.0[..]; let (_, mut chan_0_monitor) = <(BlockHash, ChannelMonitor)>::read(&mut chan_0_monitor_read).unwrap(); @@ -4519,7 +4519,7 @@ fn test_simple_manager_serialize_deserialize() { logger = test_utils::TestLogger::new(); fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 }; - new_chain_monitor = test_utils::TestChainMonitor::new(nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator); + new_chain_monitor = test_utils::TestChainMonitor::new(Some(nodes[0].chain_source), nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator); nodes[0].chain_monitor = &new_chain_monitor; let mut chan_0_monitor_read = &chan_0_monitor_serialized.0[..]; let (_, mut chan_0_monitor) = <(BlockHash, ChannelMonitor)>::read(&mut chan_0_monitor_read).unwrap(); @@ -4597,7 +4597,7 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() { logger = test_utils::TestLogger::new(); fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 }; - new_chain_monitor = test_utils::TestChainMonitor::new(nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator); + new_chain_monitor = test_utils::TestChainMonitor::new(Some(nodes[0].chain_source), nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator); nodes[0].chain_monitor = &new_chain_monitor; let mut node_0_stale_monitors = Vec::new(); @@ -5743,7 +5743,7 @@ fn test_key_derivation_params() { // We manually create the node configuration to backup the seed. let seed = [42; 32]; let keys_manager = test_utils::TestKeysInterface::new(&seed, Network::Testnet); - let chain_monitor = test_utils::TestChainMonitor::new(&chanmon_cfgs[0].tx_broadcaster, &chanmon_cfgs[0].logger, &chanmon_cfgs[0].fee_estimator); + let chain_monitor = test_utils::TestChainMonitor::new(Some(&chanmon_cfgs[0].chain_source), &chanmon_cfgs[0].tx_broadcaster, &chanmon_cfgs[0].logger, &chanmon_cfgs[0].fee_estimator); let node = NodeCfg { chain_source: &chanmon_cfgs[0].chain_source, logger: &chanmon_cfgs[0].logger, tx_broadcaster: &chanmon_cfgs[0].tx_broadcaster, fee_estimator: &chanmon_cfgs[0].fee_estimator, chain_monitor, keys_manager, node_seed: seed }; let mut node_cfgs = create_node_cfgs(3, &chanmon_cfgs); node_cfgs.remove(0); @@ -7494,7 +7494,7 @@ fn test_data_loss_protect() { tx_broadcaster = test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new())}; fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 }; keys_manager = test_utils::TestKeysInterface::new(&nodes[0].node_seed, Network::Testnet); - monitor = test_utils::TestChainMonitor::new(&tx_broadcaster, &logger, &fee_estimator); + monitor = test_utils::TestChainMonitor::new(Some(&chain_source), &tx_broadcaster, &logger, &fee_estimator); node_state_0 = { let mut channel_monitors = HashMap::new(); channel_monitors.insert(OutPoint { txid: chan.3.txid(), index: 0 }, &mut chain_monitor); @@ -8353,6 +8353,7 @@ fn test_update_err_monitor_lockdown() { let preimage = route_payment(&nodes[0], &vec!(&nodes[1])[..], 9_000_000).0; // Copy ChainMonitor to simulate a watchtower and update block height of node 0 until its ChannelMonitor timeout HTLC onchain + let chain_source = test_utils::TestChainSource::new(Network::Testnet); let logger = test_utils::TestLogger::with_id(format!("node {}", 0)); let watchtower = { let monitors = nodes[0].chain_monitor.chain_monitor.monitors.lock().unwrap(); @@ -8362,7 +8363,7 @@ fn test_update_err_monitor_lockdown() { let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor)>::read( &mut ::std::io::Cursor::new(&w.0)).unwrap().1; assert!(new_monitor == *monitor); - let watchtower = test_utils::TestChainMonitor::new(&chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator); + let watchtower = test_utils::TestChainMonitor::new(Some(&chain_source), &chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator); assert!(watchtower.watch_channel(outpoint, new_monitor).is_ok()); watchtower }; @@ -8410,6 +8411,7 @@ fn test_concurrent_monitor_claim() { route_payment(&nodes[0], &vec!(&nodes[1])[..], 9_000_000).0; // Copy ChainMonitor to simulate watchtower Alice and update block height her ChannelMonitor timeout HTLC onchain + let chain_source = test_utils::TestChainSource::new(Network::Testnet); let logger = test_utils::TestLogger::with_id(format!("node {}", "Alice")); let watchtower_alice = { let monitors = nodes[0].chain_monitor.chain_monitor.monitors.lock().unwrap(); @@ -8419,7 +8421,7 @@ fn test_concurrent_monitor_claim() { let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor)>::read( &mut ::std::io::Cursor::new(&w.0)).unwrap().1; assert!(new_monitor == *monitor); - let watchtower = test_utils::TestChainMonitor::new(&chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator); + let watchtower = test_utils::TestChainMonitor::new(Some(&chain_source), &chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator); assert!(watchtower.watch_channel(outpoint, new_monitor).is_ok()); watchtower }; @@ -8434,6 +8436,7 @@ fn test_concurrent_monitor_claim() { } // Copy ChainMonitor to simulate watchtower Bob and make it receive a commitment update first. + let chain_source = test_utils::TestChainSource::new(Network::Testnet); let logger = test_utils::TestLogger::with_id(format!("node {}", "Bob")); let watchtower_bob = { let monitors = nodes[0].chain_monitor.chain_monitor.monitors.lock().unwrap(); @@ -8443,7 +8446,7 @@ fn test_concurrent_monitor_claim() { let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor)>::read( &mut ::std::io::Cursor::new(&w.0)).unwrap().1; assert!(new_monitor == *monitor); - let watchtower = test_utils::TestChainMonitor::new(&chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator); + let watchtower = test_utils::TestChainMonitor::new(Some(&chain_source), &chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator); assert!(watchtower.watch_channel(outpoint, new_monitor).is_ok()); watchtower }; diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index bc084b77e..6cdfb7f9d 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -27,7 +27,7 @@ use bitcoin::blockdata::transaction::{Transaction, TxOut}; use bitcoin::blockdata::script::{Builder, Script}; use bitcoin::blockdata::opcodes; use bitcoin::network::constants::Network; -use bitcoin::hash_types::BlockHash; +use bitcoin::hash_types::{BlockHash, Txid}; use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1, Signature}; @@ -37,7 +37,7 @@ use std::time::Duration; use std::sync::Mutex; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::{cmp, mem}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; pub struct TestVecWriter(pub Vec); impl Writer for TestVecWriter { @@ -62,18 +62,18 @@ impl chaininterface::FeeEstimator for TestFeeEstimator { pub struct TestChainMonitor<'a> { pub added_monitors: Mutex)>>, pub latest_monitor_update_id: Mutex>, - pub chain_monitor: channelmonitor::ChainMonitor, + pub chain_monitor: channelmonitor::ChainMonitor, pub update_ret: Mutex>, // If this is set to Some(), after the next return, we'll always return this until update_ret // is changed: pub next_update_ret: Mutex>>, } impl<'a> TestChainMonitor<'a> { - pub fn new(broadcaster: &'a chaininterface::BroadcasterInterface, logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator) -> Self { + pub fn new(chain_source: Option<&'a TestChainSource>, broadcaster: &'a chaininterface::BroadcasterInterface, logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator) -> Self { Self { added_monitors: Mutex::new(Vec::new()), latest_monitor_update_id: Mutex::new(HashMap::new()), - chain_monitor: channelmonitor::ChainMonitor::new(broadcaster, logger, fee_estimator), + chain_monitor: channelmonitor::ChainMonitor::new(chain_source, broadcaster, logger, fee_estimator), update_ret: Mutex::new(Ok(())), next_update_ret: Mutex::new(None), } @@ -396,6 +396,8 @@ impl TestKeysInterface { pub struct TestChainSource { pub genesis_hash: BlockHash, pub utxo_ret: Mutex>, + pub watched_txn: Mutex>, + pub watched_outputs: Mutex>, } impl TestChainSource { @@ -404,6 +406,8 @@ impl TestChainSource { Self { genesis_hash: genesis_block(network).block_hash(), utxo_ret: Mutex::new(Ok(TxOut { value: u64::max_value(), script_pubkey })), + watched_txn: Mutex::new(HashSet::new()), + watched_outputs: Mutex::new(HashSet::new()), } } } @@ -417,3 +421,13 @@ impl chain::Access for TestChainSource { self.utxo_ret.lock().unwrap().clone() } } + +impl chain::Filter for TestChainSource { + fn register_tx(&self, txid: Txid, script_pubkey: Script) { + self.watched_txn.lock().unwrap().insert((txid, script_pubkey)); + } + + fn register_output(&self, outpoint: OutPoint, script_pubkey: Script) { + self.watched_outputs.lock().unwrap().insert((outpoint, script_pubkey)); + } +}