Replace WatchEvent usage with get_outputs_to_watch
authorJeffrey Czyz <jkczyz@gmail.com>
Fri, 7 Aug 2020 22:13:57 +0000 (15:13 -0700)
committerJeffrey Czyz <jkczyz@gmail.com>
Thu, 1 Oct 2020 05:41:23 +0000 (22:41 -0700)
Outputs to watch are tracked by ChannelMonitor as of
73dce207dd0ea6c3ac57af3ebb8b87ee03e82c9e. Instead of determining new
outputs to watch independently using ChainWatchedUtil, do so by
comparing against outputs already tracked. Thus, ChainWatchedUtil and
WatchEvent are no longer needed.

lightning/src/chain/chaininterface.rs
lightning/src/chain/mod.rs
lightning/src/ln/channelmonitor.rs
lightning/src/ln/onchaintx.rs
lightning/src/util/test_utils.rs

index a63e0bf1069475f4362ae69c17265519fadc8381..91e604838ecbe0b4c6867184bf3146f6a18dfb65 100644 (file)
 //! disconnections, transaction broadcasting, and feerate information requests.
 
 use bitcoin::blockdata::transaction::Transaction;
-use bitcoin::blockdata::script::Script;
-use bitcoin::hash_types::Txid;
-
-use std::collections::HashSet;
 
 /// An interface to send a transaction to the Bitcoin network.
 pub trait BroadcasterInterface: Sync + Send {
@@ -55,91 +51,3 @@ pub trait FeeEstimator: Sync + Send {
 
 /// Minimum relay fee as required by bitcoin network mempool policy.
 pub const MIN_RELAY_FEE_SAT_PER_1000_WEIGHT: u64 = 4000;
-
-/// Utility for tracking registered txn/outpoints and checking for matches
-#[cfg_attr(test, derive(PartialEq))]
-pub struct ChainWatchedUtil {
-       watch_all: bool,
-
-       // We are more conservative in matching during testing to ensure everything matches *exactly*,
-       // even though during normal runtime we take more optimized match approaches...
-       #[cfg(test)]
-       watched_txn: HashSet<(Txid, Script)>,
-       #[cfg(not(test))]
-       watched_txn: HashSet<Script>,
-
-       watched_outpoints: HashSet<(Txid, u32)>,
-}
-
-impl ChainWatchedUtil {
-       /// Constructs an empty (watches nothing) ChainWatchedUtil
-       pub fn new() -> Self {
-               Self {
-                       watch_all: false,
-                       watched_txn: HashSet::new(),
-                       watched_outpoints: HashSet::new(),
-               }
-       }
-
-       /// Registers a tx for monitoring, returning true if it was a new tx and false if we'd already
-       /// been watching for it.
-       pub fn register_tx(&mut self, txid: &Txid, script_pub_key: &Script) -> bool {
-               if self.watch_all { return false; }
-               #[cfg(test)]
-               {
-                       self.watched_txn.insert((txid.clone(), script_pub_key.clone()))
-               }
-               #[cfg(not(test))]
-               {
-                       let _tx_unused = txid; // It's used in cfg(test), though
-                       self.watched_txn.insert(script_pub_key.clone())
-               }
-       }
-
-       /// Registers an outpoint for monitoring, returning true if it was a new outpoint and false if
-       /// we'd already been watching for it
-       pub fn register_outpoint(&mut self, outpoint: (Txid, u32), _script_pub_key: &Script) -> bool {
-               if self.watch_all { return false; }
-               self.watched_outpoints.insert(outpoint)
-       }
-
-       /// Sets us to match all transactions, returning true if this is a new setting and false if
-       /// we'd already been set to match everything.
-       pub fn watch_all(&mut self) -> bool {
-               if self.watch_all { return false; }
-               self.watch_all = true;
-               true
-       }
-
-       /// Checks if a given transaction matches the current filter.
-       pub fn does_match_tx(&self, tx: &Transaction) -> bool {
-               if self.watch_all {
-                       return true;
-               }
-               for out in tx.output.iter() {
-                       #[cfg(test)]
-                       for &(ref txid, ref script) in self.watched_txn.iter() {
-                               if *script == out.script_pubkey {
-                                       if tx.txid() == *txid {
-                                               return true;
-                                       }
-                               }
-                       }
-                       #[cfg(not(test))]
-                       for script in self.watched_txn.iter() {
-                               if *script == out.script_pubkey {
-                                       return true;
-                               }
-                       }
-               }
-               for input in tx.input.iter() {
-                       for outpoint in self.watched_outpoints.iter() {
-                               let &(outpoint_hash, outpoint_index) = outpoint;
-                               if outpoint_hash == input.previous_output.txid && outpoint_index == input.previous_output.vout {
-                                       return true;
-                               }
-                       }
-               }
-               false
-       }
-}
index 0f1394352a847cc5454644580dd6de178468d2cc..9333c4ef745330ec807aef106a61fbabe526b94d 100644 (file)
@@ -73,8 +73,7 @@ pub trait Watch: Send + Sync {
        ///
        /// Implementations are responsible for watching the chain for the funding transaction along
        /// with any spends of outputs returned by [`get_outputs_to_watch`]. In practice, this means
-       /// calling [`block_connected`] and [`block_disconnected`] on the monitor and including all such
-       /// transactions that meet this criteria.
+       /// calling [`block_connected`] and [`block_disconnected`] on the monitor.
        ///
        /// [`get_outputs_to_watch`]: ../ln/channelmonitor/struct.ChannelMonitor.html#method.get_outputs_to_watch
        /// [`block_connected`]: ../ln/channelmonitor/struct.ChannelMonitor.html#method.block_connected
@@ -119,9 +118,9 @@ pub trait Watch: Send + Sync {
 pub trait Filter: Send + Sync {
        /// Registers interest in a transaction with `txid` and having an output with `script_pubkey` as
        /// a spending condition.
-       fn register_tx(&self, txid: Txid, script_pubkey: Script);
+       fn register_tx(&self, txid: &Txid, script_pubkey: &Script);
 
        /// Registers interest in spends of a transaction output identified by `outpoint` having
        /// `script_pubkey` as the spending condition.
-       fn register_output(&self, outpoint: OutPoint, script_pubkey: Script);
+       fn register_output(&self, outpoint: &OutPoint, script_pubkey: &Script);
 }
index 86799c7b6230434b7b054f6bd54dd0f811db3b86..803a20ce3ab8042cc7d330060b1699c692ebf737 100644 (file)
@@ -44,7 +44,7 @@ use ln::channelmanager::{HTLCSource, PaymentPreimage, PaymentHash};
 use ln::onchaintx::{OnchainTxHandler, InputDescriptors};
 use chain;
 use chain::Filter;
-use chain::chaininterface::{ChainWatchedUtil, BroadcasterInterface, FeeEstimator};
+use chain::chaininterface::{BroadcasterInterface, FeeEstimator};
 use chain::transaction::OutPoint;
 use chain::keysinterface::{SpendableOutputDescriptor, ChannelKeys};
 use util::logger::Logger;
@@ -205,110 +205,12 @@ pub struct ChainMonitor<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L
 {
        /// The monitors
        pub monitors: Mutex<HashMap<OutPoint, ChannelMonitor<ChanSigner>>>,
-       watch_events: Mutex<WatchEventCache>,
        chain_source: Option<C>,
        broadcaster: T,
        logger: L,
        fee_estimator: F
 }
 
-struct WatchEventCache {
-       watched: ChainWatchedUtil,
-       events: Vec<WatchEvent>,
-}
-
-/// An event indicating on-chain activity to watch for pertaining to a channel.
-enum WatchEvent {
-       /// Watch for a transaction with `txid` and having an output with `script_pubkey` as a spending
-       /// condition.
-       WatchTransaction {
-               /// Identifier of the transaction.
-               txid: Txid,
-
-               /// Spending condition for an output of the transaction.
-               script_pubkey: Script,
-       },
-       /// Watch for spends of a transaction output identified by `outpoint` having `script_pubkey` as
-       /// the spending condition.
-       WatchOutput {
-               /// Identifier for the output.
-               outpoint: OutPoint,
-
-               /// Spending condition for the output.
-               script_pubkey: Script,
-       }
-}
-
-impl WatchEventCache {
-       fn new() -> Self {
-               Self {
-                       watched: ChainWatchedUtil::new(),
-                       events: Vec::new(),
-               }
-       }
-
-       fn watch_tx(&mut self, txid: &Txid, script_pubkey: &Script) {
-               if self.watched.register_tx(txid, script_pubkey) {
-                       self.events.push(WatchEvent::WatchTransaction {
-                               txid: *txid,
-                               script_pubkey: script_pubkey.clone()
-                       });
-               }
-       }
-
-       fn watch_output(&mut self, outpoint: (&Txid, usize), script_pubkey: &Script) {
-               let (txid, index) = outpoint;
-               if self.watched.register_outpoint((*txid, index as u32), script_pubkey) {
-                       self.events.push(WatchEvent::WatchOutput {
-                               outpoint: OutPoint {
-                                       txid: *txid,
-                                       index: index as u16,
-                               },
-                               script_pubkey: script_pubkey.clone(),
-                       });
-               }
-       }
-
-       fn flush_events<C: Deref>(&mut self, chain_source: &Option<C>) -> bool where C::Target: chain::Filter {
-               let num_events = self.events.len();
-               match chain_source {
-                       &None => self.events.clear(),
-                       &Some(ref chain_source) => {
-                               for event in self.events.drain(..) {
-                                       match event {
-                                               WatchEvent::WatchTransaction { txid, script_pubkey } => {
-                                                       chain_source.register_tx(txid, script_pubkey)
-                                               },
-                                               WatchEvent::WatchOutput { outpoint, script_pubkey } => {
-                                                       chain_source.register_output(outpoint, script_pubkey)
-                                               },
-                                       }
-                               }
-                       }
-               }
-               num_events > 0
-       }
-
-       fn filter_block<'a>(&self, txdata: &[(usize, &'a Transaction)]) -> Vec<(usize, &'a Transaction)> {
-               let mut matched_txids = HashSet::new();
-               txdata.iter().filter(|&&(_, tx)| {
-                       // A tx matches the filter if it either matches the filter directly (via does_match_tx)
-                       // or if it is a descendant of another matched transaction within the same block.
-                       let mut matched = self.watched.does_match_tx(tx);
-                       for input in tx.input.iter() {
-                               if matched || matched_txids.contains(&input.previous_output.txid) {
-                                       matched = true;
-                                       break;
-                               }
-                       }
-                       if matched {
-                               matched_txids.insert(tx.txid());
-                       }
-                       matched
-               }).map(|e| *e).collect()
-       }
-}
-
 impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref> ChainMonitor<ChanSigner, C, T, F, L>
        where C::Target: chain::Filter,
              T::Target: BroadcasterInterface,
@@ -329,21 +231,23 @@ impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref> ChainMonit
        /// [`chain::Watch::release_pending_monitor_events`]: ../../chain/trait.Watch.html#tymethod.release_pending_monitor_events
        /// [`chain::Filter`]: ../../chain/trait.Filter.html
        pub fn block_connected(&self, header: &BlockHeader, txdata: &[(usize, &Transaction)], height: u32) -> bool {
-               let mut watch_events = self.watch_events.lock().unwrap();
-               let matched_txn = watch_events.filter_block(txdata);
+               let mut has_new_outputs_to_watch = false;
                {
                        let mut monitors = self.monitors.lock().unwrap();
                        for monitor in monitors.values_mut() {
-                               let txn_outputs = monitor.block_connected(header, &matched_txn, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
+                               let mut txn_outputs = monitor.block_connected(header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
+                               has_new_outputs_to_watch |= !txn_outputs.is_empty();
 
-                               for (ref txid, ref outputs) in txn_outputs {
-                                       for (idx, output) in outputs.iter().enumerate() {
-                                               watch_events.watch_output((txid, idx), &output.script_pubkey);
+                               if let Some(ref chain_source) = self.chain_source {
+                                       for (txid, outputs) in txn_outputs.drain(..) {
+                                               for (idx, output) in outputs.iter().enumerate() {
+                                                       chain_source.register_output(&OutPoint { txid, index: idx as u16 }, &output.script_pubkey);
+                                               }
                                        }
                                }
                        }
                }
-               watch_events.flush_events(&self.chain_source)
+               has_new_outputs_to_watch
        }
 
        /// Dispatches to per-channel monitors, which are responsible for updating their on-chain view
@@ -377,7 +281,6 @@ impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref> ChainMonit
        pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F) -> Self {
                Self {
                        monitors: Mutex::new(HashMap::new()),
-                       watch_events: Mutex::new(WatchEventCache::new()),
                        chain_source,
                        broadcaster,
                        logger,
@@ -391,7 +294,6 @@ impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref> ChainMonit
        ///
        /// [`chain::Filter`]: ../../chain/trait.Filter.html
        fn add_monitor(&self, outpoint: OutPoint, monitor: ChannelMonitor<ChanSigner>) -> Result<(), MonitorUpdateError> {
-               let mut watch_events = self.watch_events.lock().unwrap();
                let mut monitors = self.monitors.lock().unwrap();
                let entry = match monitors.entry(outpoint) {
                        hash_map::Entry::Occupied(_) => return Err(MonitorUpdateError("Channel monitor for given outpoint is already present")),
@@ -400,16 +302,17 @@ impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref> ChainMonit
                {
                        let funding_txo = monitor.get_funding_txo();
                        log_trace!(self.logger, "Got new Channel Monitor for channel {}", log_bytes!(funding_txo.0.to_channel_id()[..]));
-                       watch_events.watch_tx(&funding_txo.0.txid, &funding_txo.1);
-                       watch_events.watch_output((&funding_txo.0.txid, funding_txo.0.index as usize), &funding_txo.1);
-                       for (txid, outputs) in monitor.get_outputs_to_watch().iter() {
-                               for (idx, script) in outputs.iter().enumerate() {
-                                       watch_events.watch_output((txid, idx), script);
+
+                       if let Some(ref chain_source) = self.chain_source {
+                               chain_source.register_tx(&funding_txo.0.txid, &funding_txo.1);
+                               for (txid, outputs) in monitor.get_outputs_to_watch().iter() {
+                                       for (idx, script_pubkey) in outputs.iter().enumerate() {
+                                               chain_source.register_output(&OutPoint { txid: *txid, index: idx as u16 }, &script_pubkey);
+                                       }
                                }
                        }
                }
                entry.insert(monitor);
-               watch_events.flush_events(&self.chain_source);
                Ok(())
        }
 
@@ -1958,16 +1861,17 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
        /// - detect settled outputs for later spending
        /// - schedule and bump any in-flight claims
        ///
-       /// Returns any transaction outputs from `txn_matched` that spends of should be watched for.
-       /// After called these are also available via [`get_outputs_to_watch`].
+       /// Returns any new outputs to watch from `txdata`; after called, these are also included in
+       /// [`get_outputs_to_watch`].
        ///
        /// [`get_outputs_to_watch`]: #method.get_outputs_to_watch
-       pub fn block_connected<B: Deref, F: Deref, L: Deref>(&mut self, header: &BlockHeader, txn_matched: &[(usize, &Transaction)], height: u32, broadcaster: B, fee_estimator: F, logger: L)-> Vec<(Txid, Vec<TxOut>)>
+       pub fn block_connected<B: Deref, F: Deref, L: Deref>(&mut self, header: &BlockHeader, txdata: &[(usize, &Transaction)], height: u32, broadcaster: B, fee_estimator: F, logger: L)-> Vec<(Txid, Vec<TxOut>)>
                where B::Target: BroadcasterInterface,
                      F::Target: FeeEstimator,
                                        L::Target: Logger,
        {
-               for &(_, tx) in txn_matched {
+               let txn_matched = self.filter_block(txdata);
+               for tx in &txn_matched {
                        let mut output_val = 0;
                        for out in tx.output.iter() {
                                if out.value > 21_000_000_0000_0000 { panic!("Value-overflowing transaction provided to block connected"); }
@@ -1981,7 +1885,7 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
 
                let mut watch_outputs = Vec::new();
                let mut claimable_outpoints = Vec::new();
-               for &(_, tx) in txn_matched {
+               for tx in &txn_matched {
                        if tx.input.len() == 1 {
                                // Assuming our keys were not leaked (in which case we're screwed no matter what),
                                // commitment transactions and HTLC transactions will all only ever have one input,
@@ -2056,13 +1960,15 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
                        }
                }
 
-               self.onchain_tx_handler.block_connected(txn_matched, claimable_outpoints, height, &*broadcaster, &*fee_estimator, &*logger);
-
+               self.onchain_tx_handler.block_connected(&txn_matched, claimable_outpoints, height, &*broadcaster, &*fee_estimator, &*logger);
                self.last_block_hash = block_hash;
-               for &(ref txid, ref output_scripts) in watch_outputs.iter() {
-                       self.outputs_to_watch.insert(txid.clone(), output_scripts.iter().map(|o| o.script_pubkey.clone()).collect());
-               }
 
+               // Determine new outputs to watch by comparing against previously known outputs to watch,
+               // updating the latter in the process.
+               watch_outputs.retain(|&(ref txid, ref txouts)| {
+                       let output_scripts = txouts.iter().map(|o| o.script_pubkey.clone()).collect();
+                       self.outputs_to_watch.insert(txid.clone(), output_scripts).is_none()
+               });
                watch_outputs
        }
 
@@ -2087,6 +1993,40 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
                self.last_block_hash = block_hash;
        }
 
+       /// Filters a block's `txdata` for transactions spending watched outputs or for any child
+       /// transactions thereof.
+       fn filter_block<'a>(&self, txdata: &[(usize, &'a Transaction)]) -> Vec<&'a Transaction> {
+               let mut matched_txn = HashSet::new();
+               txdata.iter().filter(|&&(_, tx)| {
+                       let mut matches = self.spends_watched_output(tx);
+                       for input in tx.input.iter() {
+                               if matches { break; }
+                               if matched_txn.contains(&input.previous_output.txid) {
+                                       matches = true;
+                               }
+                       }
+                       if matches {
+                               matched_txn.insert(tx.txid());
+                       }
+                       matches
+               }).map(|(_, tx)| *tx).collect()
+       }
+
+       /// Checks if a given transaction spends any watched outputs.
+       fn spends_watched_output(&self, tx: &Transaction) -> bool {
+               for input in tx.input.iter() {
+                       if let Some(outputs) = self.get_outputs_to_watch().get(&input.previous_output.txid) {
+                               for (idx, _script_pubkey) in outputs.iter().enumerate() {
+                                       if idx == input.previous_output.vout as usize {
+                                               return true;
+                                       }
+                               }
+                       }
+               }
+
+               false
+       }
+
        fn would_broadcast_at_height<L: Deref>(&self, height: u32, logger: &L) -> bool where L::Target: Logger {
                // We need to consider all HTLCs which are:
                //  * in any unrevoked counterparty commitment transaction, as they could broadcast said
index 8d5ed6cbc3c0771789020ad2fd73930b8f69c0cc..dbe86c869ce4ddfa720a4f8c94d43e405305661c 100644 (file)
@@ -657,7 +657,7 @@ impl<ChanSigner: ChannelKeys> OnchainTxHandler<ChanSigner> {
                None
        }
 
-       pub(super) fn block_connected<B: Deref, F: Deref, L: Deref>(&mut self, txn_matched: &[(usize, &Transaction)], claimable_outpoints: Vec<ClaimRequest>, height: u32, broadcaster: B, fee_estimator: F, logger: L)
+       pub(crate) fn block_connected<B: Deref, F: Deref, L: Deref>(&mut self, txn_matched: &[&Transaction], claimable_outpoints: Vec<ClaimRequest>, height: u32, broadcaster: B, fee_estimator: F, logger: L)
                where B::Target: BroadcasterInterface,
                      F::Target: FeeEstimator,
                                        L::Target: Logger,
@@ -706,7 +706,7 @@ impl<ChanSigner: ChannelKeys> OnchainTxHandler<ChanSigner> {
                }
 
                let mut bump_candidates = HashMap::new();
-               for &(_, tx) in txn_matched {
+               for tx in txn_matched {
                        // Scan all input to verify is one of the outpoint spent is of interest for us
                        let mut claimed_outputs_material = Vec::new();
                        for inp in &tx.input {
index 6cdfb7f9d63cb5708f414472189ec23af5ef63ad..210d62485acf4131d7904a18477b9e82d9d655c2 100644 (file)
@@ -423,11 +423,11 @@ impl chain::Access for TestChainSource {
 }
 
 impl chain::Filter for TestChainSource {
-       fn register_tx(&self, txid: Txid, script_pubkey: Script) {
-               self.watched_txn.lock().unwrap().insert((txid, script_pubkey));
+       fn register_tx(&self, txid: &Txid, script_pubkey: &Script) {
+               self.watched_txn.lock().unwrap().insert((*txid, script_pubkey.clone()));
        }
 
-       fn register_output(&self, outpoint: OutPoint, script_pubkey: Script) {
-               self.watched_outputs.lock().unwrap().insert((outpoint, script_pubkey));
+       fn register_output(&self, outpoint: &OutPoint, script_pubkey: &Script) {
+               self.watched_outputs.lock().unwrap().insert((*outpoint, script_pubkey.clone()));
        }
 }