]> git.bitcoin.ninja Git - rust-lightning/commitdiff
WIP: Remove ChainWatchedUtil
authorJeffrey Czyz <jkczyz@gmail.com>
Fri, 7 Aug 2020 22:13:57 +0000 (15:13 -0700)
committerJeffrey Czyz <jkczyz@gmail.com>
Sun, 9 Aug 2020 19:21:59 +0000 (12:21 -0700)
lightning/src/chain/chaininterface.rs
lightning/src/chain/channelmonitor.rs
lightning/src/chain/mod.rs
lightning/src/util/test_utils.rs

index f9f7254218e70d29197cfb4fc68a10e3744c3643..22904a017a9de94ed59a5a55b4da8cd89c748821 100644 (file)
@@ -5,10 +5,6 @@
 //! disconnections, transaction broadcasting, and feerate information requests.
 
 use bitcoin::blockdata::transaction::Transaction;
-use bitcoin::blockdata::script::Script;
-use bitcoin::hash_types::Txid;
-
-use std::collections::HashSet;
 
 /// An interface to send a transaction to the Bitcoin network.
 pub trait BroadcasterInterface: Sync + Send {
@@ -46,91 +42,3 @@ pub trait FeeEstimator: Sync + Send {
 
 /// Minimum relay fee as required by bitcoin network mempool policy.
 pub const MIN_RELAY_FEE_SAT_PER_1000_WEIGHT: u64 = 4000;
-
-/// Utility for tracking registered txn/outpoints and checking for matches
-#[cfg_attr(test, derive(PartialEq))]
-pub struct ChainWatchedUtil {
-       watch_all: bool,
-
-       // We are more conservative in matching during testing to ensure everything matches *exactly*,
-       // even though during normal runtime we take more optimized match approaches...
-       #[cfg(test)]
-       watched_txn: HashSet<(Txid, Script)>,
-       #[cfg(not(test))]
-       watched_txn: HashSet<Script>,
-
-       watched_outpoints: HashSet<(Txid, u32)>,
-}
-
-impl ChainWatchedUtil {
-       /// Constructs an empty (watches nothing) ChainWatchedUtil
-       pub fn new() -> Self {
-               Self {
-                       watch_all: false,
-                       watched_txn: HashSet::new(),
-                       watched_outpoints: HashSet::new(),
-               }
-       }
-
-       /// Registers a tx for monitoring, returning true if it was a new tx and false if we'd already
-       /// been watching for it.
-       pub fn register_tx(&mut self, txid: &Txid, script_pub_key: &Script) -> bool {
-               if self.watch_all { return false; }
-               #[cfg(test)]
-               {
-                       self.watched_txn.insert((txid.clone(), script_pub_key.clone()))
-               }
-               #[cfg(not(test))]
-               {
-                       let _tx_unused = txid; // It's used in cfg(test), though
-                       self.watched_txn.insert(script_pub_key.clone())
-               }
-       }
-
-       /// Registers an outpoint for monitoring, returning true if it was a new outpoint and false if
-       /// we'd already been watching for it
-       pub fn register_outpoint(&mut self, outpoint: (Txid, u32), _script_pub_key: &Script) -> bool {
-               if self.watch_all { return false; }
-               self.watched_outpoints.insert(outpoint)
-       }
-
-       /// Sets us to match all transactions, returning true if this is a new setting and false if
-       /// we'd already been set to match everything.
-       pub fn watch_all(&mut self) -> bool {
-               if self.watch_all { return false; }
-               self.watch_all = true;
-               true
-       }
-
-       /// Checks if a given transaction matches the current filter.
-       pub fn does_match_tx(&self, tx: &Transaction) -> bool {
-               if self.watch_all {
-                       return true;
-               }
-               for out in tx.output.iter() {
-                       #[cfg(test)]
-                       for &(ref txid, ref script) in self.watched_txn.iter() {
-                               if *script == out.script_pubkey {
-                                       if tx.txid() == *txid {
-                                               return true;
-                                       }
-                               }
-                       }
-                       #[cfg(not(test))]
-                       for script in self.watched_txn.iter() {
-                               if *script == out.script_pubkey {
-                                       return true;
-                               }
-                       }
-               }
-               for input in tx.input.iter() {
-                       for outpoint in self.watched_outpoints.iter() {
-                               let &(outpoint_hash, outpoint_index) = outpoint;
-                               if outpoint_hash == input.previous_output.txid && outpoint_index == input.previous_output.vout {
-                                       return true;
-                               }
-                       }
-               }
-               false
-       }
-}
index f4b8e19796fd1a2f1b56e1178013ba30d56d2f53..852e86c938623f5845114d5db89eaa6b24cb0b6c 100644 (file)
@@ -36,7 +36,7 @@ use ln::channelmanager::{HTLCSource, PaymentPreimage, PaymentHash};
 use ln::onchaintx::{OnchainTxHandler, InputDescriptors};
 use chain;
 use chain::Notify;
-use chain::chaininterface::{ChainWatchedUtil, BroadcasterInterface, FeeEstimator};
+use chain::chaininterface::{BroadcasterInterface, FeeEstimator};
 use chain::transaction::OutPoint;
 use chain::keysinterface::{SpendableOutputDescriptor, ChannelKeys};
 use util::logger::Logger;
@@ -171,91 +171,12 @@ pub struct ChainMonitor<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L
        pub monitors: Mutex<HashMap<OutPoint, ChannelMonitor<ChanSigner>>>,
        #[cfg(not(test))]
        monitors: Mutex<HashMap<OutPoint, ChannelMonitor<ChanSigner>>>,
-       watch_events: Mutex<WatchEventCache>,
        chain_source: Option<C>,
        broadcaster: T,
        logger: L,
        fee_estimator: F
 }
 
-struct WatchEventCache {
-       watched: ChainWatchedUtil,
-       events: Vec<WatchEvent>,
-}
-
-/// An event indicating on-chain activity to watch for pertaining to a channel.
-enum WatchEvent {
-       /// Watch for a transaction with `txid` and having an output with `script_pubkey` as a spending
-       /// condition.
-       WatchTransaction {
-               /// Identifier of the transaction.
-               txid: Txid,
-
-               /// Spending condition for an output of the transaction.
-               script_pubkey: Script,
-       },
-       /// Watch for spends of a transaction output identified by `outpoint` having `script_pubkey` as
-       /// the spending condition.
-       WatchOutput {
-               /// Identifier for the output.
-               outpoint: OutPoint,
-
-               /// Spending condition for the output.
-               script_pubkey: Script,
-       }
-}
-
-impl WatchEventCache {
-       fn new() -> Self {
-               Self {
-                       watched: ChainWatchedUtil::new(),
-                       events: Vec::new(),
-               }
-       }
-
-       fn watch_tx(&mut self, txid: &Txid, script_pubkey: &Script) {
-               if self.watched.register_tx(txid, script_pubkey) {
-                       self.events.push(WatchEvent::WatchTransaction {
-                               txid: *txid,
-                               script_pubkey: script_pubkey.clone()
-                       });
-               }
-       }
-
-       fn watch_output(&mut self, outpoint: (&Txid, usize), script_pubkey: &Script) {
-               let (txid, index) = outpoint;
-               if self.watched.register_outpoint((*txid, index as u32), script_pubkey) {
-                       self.events.push(WatchEvent::WatchOutput {
-                               outpoint: OutPoint {
-                                       txid: *txid,
-                                       index: index as u16,
-                               },
-                               script_pubkey: script_pubkey.clone(),
-                       });
-               }
-       }
-
-       fn flush_events<C: Deref>(&mut self, chain_source: &Option<C>) -> bool where C::Target: chain::Notify {
-               let num_events = self.events.len();
-               match chain_source {
-                       &None => self.events.clear(),
-                       &Some(ref chain_source) => {
-                               for event in self.events.drain(..) {
-                                       match event {
-                                               WatchEvent::WatchTransaction { txid, script_pubkey } => {
-                                                       chain_source.register_tx(txid, script_pubkey)
-                                               },
-                                               WatchEvent::WatchOutput { outpoint, script_pubkey } => {
-                                                       chain_source.register_output(outpoint, script_pubkey)
-                                               },
-                                       }
-                               }
-                       }
-               }
-               num_events > 0
-       }
-}
-
 impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref> ChainMonitor<ChanSigner, C, T, F, L>
        where C::Target: chain::Notify,
              T::Target: BroadcasterInterface,
@@ -272,21 +193,23 @@ impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref> ChainMonit
        /// [`chain::Watch::release_pending_htlc_updates`]: ../../chain/trait.Watch.html#tymethod.release_pending_htlc_updates
        /// [`chain::Notify`]: ../../chain/trait.Notify.html
        pub fn block_connected(&self, header: &BlockHeader, txdata: &[(usize, &Transaction)], height: u32) -> bool {
-               let mut watch_events = self.watch_events.lock().unwrap();
-               let matched_txn: Vec<_> = txdata.iter().filter(|&&(_, tx)| watch_events.watched.does_match_tx(tx)).map(|e| *e).collect();
+               let mut new_outputs = false;
                {
                        let mut monitors = self.monitors.lock().unwrap();
                        for monitor in monitors.values_mut() {
-                               let txn_outputs = monitor.block_connected(header, &matched_txn, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
+                               let mut txn_outputs = monitor.block_connected(header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
+                               new_outputs |= !txn_outputs.is_empty();
 
-                               for (ref txid, ref outputs) in txn_outputs {
-                                       for (idx, output) in outputs.iter().enumerate() {
-                                               watch_events.watch_output((txid, idx), &output.script_pubkey);
+                               if let Some(ref chain_source) = self.chain_source {
+                                       for (txid, outputs) in txn_outputs.drain(..) {
+                                               for (idx, output) in outputs.iter().enumerate() {
+                                                       chain_source.register_output(&OutPoint { txid, index: idx as u16 }, &output.script_pubkey);
+                                               }
                                        }
-                               }
+                               };
                        }
                }
-               watch_events.flush_events(&self.chain_source)
+               new_outputs
        }
 
        /// Delegates to [`ChannelMonitor::block_disconnected`] for each watched channel.
@@ -311,7 +234,6 @@ impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref> ChainMonit
        pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F) -> Self {
                Self {
                        monitors: Mutex::new(HashMap::new()),
-                       watch_events: Mutex::new(WatchEventCache::new()),
                        chain_source,
                        broadcaster,
                        logger,
@@ -325,7 +247,6 @@ impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref> ChainMonit
        ///
        /// [`chain::Notify`]: ../../chain/trait.Notify.html
        pub fn add_monitor(&self, outpoint: OutPoint, monitor: ChannelMonitor<ChanSigner>) -> Result<(), MonitorUpdateError> {
-               let mut watch_events = self.watch_events.lock().unwrap();
                let mut monitors = self.monitors.lock().unwrap();
                let entry = match monitors.entry(outpoint) {
                        hash_map::Entry::Occupied(_) => return Err(MonitorUpdateError("Channel monitor for given outpoint is already present")),
@@ -334,16 +255,17 @@ impl<ChanSigner: ChannelKeys, C: Deref, T: Deref, F: Deref, L: Deref> ChainMonit
                {
                        let funding_txo = monitor.get_funding_txo();
                        log_trace!(self.logger, "Got new Channel Monitor for channel {}", log_bytes!(funding_txo.0.to_channel_id()[..]));
-                       watch_events.watch_tx(&funding_txo.0.txid, &funding_txo.1);
-                       watch_events.watch_output((&funding_txo.0.txid, funding_txo.0.index as usize), &funding_txo.1);
-                       for (txid, outputs) in monitor.get_outputs_to_watch().iter() {
-                               for (idx, script) in outputs.iter().enumerate() {
-                                       watch_events.watch_output((txid, idx), script);
+
+                       if let Some(ref chain_source) = self.chain_source {
+                               chain_source.register_tx(&funding_txo.0.txid, &funding_txo.1);
+                               for (txid, outputs) in monitor.get_outputs_to_watch().iter() {
+                                       for (idx, script_pubkey) in outputs.iter().enumerate() {
+                                               chain_source.register_output(&OutPoint { txid: *txid, index: idx as u16 }, &script_pubkey);
+                                       }
                                }
-                       }
+                       };
                }
                entry.insert(monitor);
-               watch_events.flush_events(&self.chain_source);
                Ok(())
        }
 
@@ -2000,12 +1922,12 @@ impl<ChanSigner: ChannelKeys> ChannelMonitor<ChanSigner> {
                        }
                }
                self.onchain_tx_handler.block_connected(txn_matched, claimable_outpoints, height, &*broadcaster, &*fee_estimator, &*logger);
-
                self.last_block_hash = block_hash;
-               for &(ref txid, ref output_scripts) in watch_outputs.iter() {
-                       self.outputs_to_watch.insert(txid.clone(), output_scripts.iter().map(|o| o.script_pubkey.clone()).collect());
-               }
 
+               watch_outputs.retain(|&(ref txid, ref txouts)| {
+                       let output_scripts = txouts.iter().map(|o| o.script_pubkey.clone()).collect();
+                       self.outputs_to_watch.insert(txid.clone(), output_scripts).is_none()
+               });
                watch_outputs
        }
 
index 6365aac2dfe2badb83064808ea1c16f62d884ece..620238c26830fe7fc5356928c1c9bead5b8f3dcd 100644 (file)
@@ -93,9 +93,9 @@ pub trait Watch: Send + Sync {
 pub trait Notify: Send + Sync {
        /// Registers interest in a transaction with `txid` and having an output with `script_pubkey` as
        /// a spending condition.
-       fn register_tx(&self, txid: Txid, script_pubkey: Script);
+       fn register_tx(&self, txid: &Txid, script_pubkey: &Script);
 
        /// Registers interest in spends of a transaction output identified by `outpoint` having
        /// `script_pubkey` as the spending condition.
-       fn register_output(&self, outpoint: OutPoint, script_pubkey: Script);
+       fn register_output(&self, outpoint: &OutPoint, script_pubkey: &Script);
 }
index fb0d532e1d1b9ed0c3df4d1f22f874566324dace..55fb0728f93adf586a038628febdb50bcc293e48 100644 (file)
@@ -410,6 +410,6 @@ impl chain::Access for TestChainSource {
 }
 
 impl chain::Notify for TestChainSource {
-       fn register_tx(&self, _txid: Txid, _script_pubkey: Script) {}
-       fn register_output(&self, _outpoint: OutPoint, _script_pubkey: Script) {}
+       fn register_tx(&self, _txid: &Txid, _script_pubkey: &Script) {}
+       fn register_output(&self, _outpoint: &OutPoint, _script_pubkey: &Script) {}
 }