Replace use of ChainWatchInterface with WatchEvent
authorJeffrey Czyz <jkczyz@gmail.com>
Thu, 9 Jul 2020 06:37:13 +0000 (23:37 -0700)
committerJeffrey Czyz <jkczyz@gmail.com>
Tue, 4 Aug 2020 23:39:35 +0000 (16:39 -0700)
SimpleManyChannelMonitor is parameterized by ChainWatchInterface to
signal what transactions and outputs to watch for on chain. The
interface has grown to cover chain access (via get_chain_utxo) and block
block filtering (via filter_block and reentered), which has added
complexity for implementations and user (see ChainWatchInterfaceUtil).

Pull the watch functionality out as a first step to eliminating
ChainWatchInterface entirely.

lightning/src/chain/mod.rs
lightning/src/ln/channelmonitor.rs
lightning/src/ln/functional_test_utils.rs
lightning/src/ln/functional_tests.rs

index ffa5ed968be4d62e391f613a337f464b37d31b27..6acca8a5513f9be0fbb1e28d27f34f0053174da9 100644 (file)
@@ -1,5 +1,40 @@
 //! Structs and traits which allow other parts of rust-lightning to interact with the blockchain.
 
+use bitcoin::blockdata::script::Script;
+use bitcoin::hash_types::Txid;
+
+use chain::transaction::OutPoint;
+
 pub mod chaininterface;
 pub mod transaction;
 pub mod keysinterface;
+
+/// An interface for providing [`WatchEvent`]s.
+///
+/// [`WatchEvent`]: enum.WatchEvent.html
+pub trait WatchEventProvider {
+       /// Releases events produced since the last call. Subsequent calls must only return new events.
+       fn release_pending_watch_events(&self) -> Vec<WatchEvent>;
+}
+
+/// An event indicating on-chain activity to watch for pertaining to a channel.
+pub enum WatchEvent {
+       /// Watch for a transaction with `txid` and having an output with `script_pubkey` as a spending
+       /// condition.
+       WatchTransaction {
+               /// Identifier of the transaction.
+               txid: Txid,
+
+               /// Spending condition for an output of the transaction.
+               script_pubkey: Script,
+       },
+       /// Watch for spends of a transaction output identified by `outpoint` having `script_pubkey` as
+       /// the spending condition.
+       WatchOutput {
+               /// Identifier for the output.
+               outpoint: OutPoint,
+
+               /// Spending condition for the output.
+               script_pubkey: Script,
+       }
+}
index 33cc869efe8aeed5168acec5aaf56d6f5ddee8d5..1980eb5c28fc26cce8a08702c224568f6b31555d 100644 (file)
@@ -32,7 +32,8 @@ use ln::chan_utils;
 use ln::chan_utils::{CounterpartyCommitmentSecrets, HTLCOutputInCommitment, LocalCommitmentTransaction, HTLCType};
 use ln::channelmanager::{HTLCSource, PaymentPreimage, PaymentHash};
 use ln::onchaintx::{OnchainTxHandler, InputDescriptors};
-use chain::chaininterface::{ChainListener, ChainWatchInterface, BroadcasterInterface, FeeEstimator};
+use chain;
+use chain::chaininterface::{ChainListener, ChainWatchInterface, ChainWatchedUtil, BroadcasterInterface, FeeEstimator};
 use chain::transaction::OutPoint;
 use chain::keysinterface::{SpendableOutputDescriptor, ChannelKeys};
 use util::logger::Logger;
@@ -170,12 +171,55 @@ pub struct SimpleManyChannelMonitor<Key, ChanSigner: ChannelKeys, T: Deref, F: D
        pub monitors: Mutex<HashMap<Key, ChannelMonitor<ChanSigner>>>,
        #[cfg(not(test))]
        monitors: Mutex<HashMap<Key, ChannelMonitor<ChanSigner>>>,
+       watch_events: Mutex<WatchEventQueue>,
        chain_monitor: C,
        broadcaster: T,
        logger: L,
        fee_estimator: F
 }
 
+struct WatchEventQueue {
+       watched: ChainWatchedUtil,
+       events: Vec<chain::WatchEvent>,
+}
+
+impl WatchEventQueue {
+       fn new() -> Self {
+               Self {
+                       watched: ChainWatchedUtil::new(),
+                       events: Vec::new(),
+               }
+       }
+
+       fn watch_tx(&mut self, txid: &Txid, script_pubkey: &Script) {
+               if self.watched.register_tx(txid, script_pubkey) {
+                       self.events.push(chain::WatchEvent::WatchTransaction {
+                               txid: *txid,
+                               script_pubkey: script_pubkey.clone()
+                       });
+               }
+       }
+
+       fn watch_output(&mut self, outpoint: (&Txid, usize), script_pubkey: &Script) {
+               let (txid, index) = outpoint;
+               if self.watched.register_outpoint((*txid, index as u32), script_pubkey) {
+                       self.events.push(chain::WatchEvent::WatchOutput {
+                               outpoint: OutPoint {
+                                       txid: *txid,
+                                       index: index as u16,
+                               },
+                               script_pubkey: script_pubkey.clone(),
+                       });
+               }
+       }
+
+       fn dequeue_events(&mut self) -> Vec<chain::WatchEvent> {
+               let mut pending_events = Vec::with_capacity(self.events.len());
+               pending_events.append(&mut self.events);
+               pending_events
+       }
+}
+
 impl<Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref + Sync + Send, F: Deref + Sync + Send, L: Deref + Sync + Send, C: Deref + Sync + Send>
        ChainListener for SimpleManyChannelMonitor<Key, ChanSigner, T, F, L, C>
        where T::Target: BroadcasterInterface,
@@ -184,24 +228,19 @@ impl<Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref + Sync
         C::Target: ChainWatchInterface,
 {
        fn block_connected(&self, header: &BlockHeader, txdata: &[(usize, &Transaction)], height: u32) {
-               let mut reentered = true;
-               while reentered {
-                       let matched_indexes = self.chain_monitor.filter_block(header, txdata);
-                       let matched_txn: Vec<_> = matched_indexes.iter().map(|index| txdata[*index]).collect();
-                       let last_seen = self.chain_monitor.reentered();
-                       {
-                               let mut monitors = self.monitors.lock().unwrap();
-                               for monitor in monitors.values_mut() {
-                                       let txn_outputs = monitor.block_connected(header, &matched_txn, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
-
-                                       for (ref txid, ref outputs) in txn_outputs {
-                                               for (idx, output) in outputs.iter().enumerate() {
-                                                       self.chain_monitor.install_watch_outpoint((txid.clone(), idx as u32), &output.script_pubkey);
-                                               }
+               let mut watch_events = self.watch_events.lock().unwrap();
+               let matched_txn: Vec<_> = txdata.iter().filter(|&&(_, tx)| watch_events.watched.does_match_tx(tx)).map(|e| *e).collect();
+               {
+                       let mut monitors = self.monitors.lock().unwrap();
+                       for monitor in monitors.values_mut() {
+                               let txn_outputs = monitor.block_connected(header, &matched_txn, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
+
+                               for (ref txid, ref outputs) in txn_outputs {
+                                       for (idx, output) in outputs.iter().enumerate() {
+                                               watch_events.watch_output((txid, idx), &output.script_pubkey);
                                        }
                                }
                        }
-                       reentered = last_seen != self.chain_monitor.reentered();
                }
        }
 
@@ -224,6 +263,7 @@ impl<Key : Send + cmp::Eq + hash::Hash + 'static, ChanSigner: ChannelKeys, T: De
        pub fn new(chain_monitor: C, broadcaster: T, logger: L, feeest: F) -> SimpleManyChannelMonitor<Key, ChanSigner, T, F, L, C> {
                let res = SimpleManyChannelMonitor {
                        monitors: Mutex::new(HashMap::new()),
+                       watch_events: Mutex::new(WatchEventQueue::new()),
                        chain_monitor,
                        broadcaster,
                        logger,
@@ -235,6 +275,7 @@ impl<Key : Send + cmp::Eq + hash::Hash + 'static, ChanSigner: ChannelKeys, T: De
 
        /// Adds or updates the monitor which monitors the channel referred to by the given key.
        pub fn add_monitor_by_key(&self, key: Key, monitor: ChannelMonitor<ChanSigner>) -> Result<(), MonitorUpdateError> {
+               let mut watch_events = self.watch_events.lock().unwrap();
                let mut monitors = self.monitors.lock().unwrap();
                let entry = match monitors.entry(key) {
                        hash_map::Entry::Occupied(_) => return Err(MonitorUpdateError("Channel monitor for given key is already present")),
@@ -243,11 +284,11 @@ impl<Key : Send + cmp::Eq + hash::Hash + 'static, ChanSigner: ChannelKeys, T: De
                {
                        let funding_txo = monitor.get_funding_txo();
                        log_trace!(self.logger, "Got new Channel Monitor for channel {}", log_bytes!(funding_txo.0.to_channel_id()[..]));
-                       self.chain_monitor.install_watch_tx(&funding_txo.0.txid, &funding_txo.1);
-                       self.chain_monitor.install_watch_outpoint((funding_txo.0.txid, funding_txo.0.index as u32), &funding_txo.1);
+                       watch_events.watch_tx(&funding_txo.0.txid, &funding_txo.1);
+                       watch_events.watch_output((&funding_txo.0.txid, funding_txo.0.index as usize), &funding_txo.1);
                        for (txid, outputs) in monitor.get_outputs_to_watch().iter() {
                                for (idx, script) in outputs.iter().enumerate() {
-                                       self.chain_monitor.install_watch_outpoint((*txid, idx as u32), script);
+                                       watch_events.watch_output((txid, idx), script);
                                }
                        }
                }
@@ -314,6 +355,17 @@ impl<Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref, F: De
        }
 }
 
+impl<Key : Send + cmp::Eq + hash::Hash, ChanSigner: ChannelKeys, T: Deref, F: Deref, L: Deref, C: Deref> chain::WatchEventProvider for SimpleManyChannelMonitor<Key, ChanSigner, T, F, L, C>
+       where T::Target: BroadcasterInterface,
+             F::Target: FeeEstimator,
+             L::Target: Logger,
+        C::Target: ChainWatchInterface,
+{
+       fn release_pending_watch_events(&self) -> Vec<chain::WatchEvent> {
+               self.watch_events.lock().unwrap().dequeue_events()
+       }
+}
+
 /// If an HTLC expires within this many blocks, don't try to claim it in a shared transaction,
 /// instead claiming it in its own individual transaction.
 pub(crate) const CLTV_SHARED_CLAIM_BUFFER: u32 = 12;
@@ -849,30 +901,12 @@ pub trait ManyChannelMonitor: Send + Sync {
 
        /// Adds a monitor for the given `funding_txo`.
        ///
-       /// Implementer must also ensure that the funding_txo txid *and* outpoint are registered with
-       /// any relevant ChainWatchInterfaces such that the provided monitor receives block_connected
-       /// callbacks with the funding transaction, or any spends of it.
-       ///
-       /// Further, the implementer must also ensure that each output returned in
-       /// monitor.get_outputs_to_watch() is registered to ensure that the provided monitor learns about
-       /// any spends of any of the outputs.
-       ///
-       /// Any spends of outputs which should have been registered which aren't passed to
-       /// ChannelMonitors via block_connected may result in FUNDS LOSS.
+       /// Implementations must ensure that `monitor` receives block_connected calls for blocks with
+       /// the funding transaction or any spends of it, as well as any spends of outputs returned by
+       /// get_outputs_to_watch. Not doing so may result in LOST FUNDS.
        fn add_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitor<Self::Keys>) -> Result<(), ChannelMonitorUpdateErr>;
 
        /// Updates a monitor for the given `funding_txo`.
-       ///
-       /// Implementer must also ensure that the funding_txo txid *and* outpoint are registered with
-       /// any relevant ChainWatchInterfaces such that the provided monitor receives block_connected
-       /// callbacks with the funding transaction, or any spends of it.
-       ///
-       /// Further, the implementer must also ensure that each output returned in
-       /// monitor.get_watch_outputs() is registered to ensure that the provided monitor learns about
-       /// any spends of any of the outputs.
-       ///
-       /// Any spends of outputs which should have been registered which aren't passed to
-       /// ChannelMonitors via block_connected may result in FUNDS LOSS.
        fn update_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitorUpdate) -> Result<(), ChannelMonitorUpdateErr>;
 
        /// Used by ChannelManager to get list of HTLC resolved onchain and which needed to be updated
index 10938cf31e8a46ec72b522dc3c7a9f890152048b..607eb0de0f8d72f7986391aa6c76058fa9eaac3a 100644 (file)
@@ -1,6 +1,7 @@
 //! A bunch of useful utilities for building networks of nodes and exchanging messages between
 //! nodes for functional tests.
 
+use chain;
 use chain::chaininterface;
 use chain::transaction::OutPoint;
 use ln::channelmanager::{ChannelManager, ChannelManagerReadArgs, RAACommitmentOrder, PaymentPreimage, PaymentHash, PaymentSecret, PaymentSendFailure};
@@ -72,9 +73,29 @@ pub fn connect_blocks<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, depth: u32, he
 }
 
 pub fn connect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, block: &Block, height: u32) {
-       node.block_notifier.block_connected(block, height)
+       use chain::WatchEventProvider;
+       use chain::chaininterface::ChainListener;
+
+       let watch_events = node.chan_monitor.simple_monitor.release_pending_watch_events();
+       process_chain_watch_events(&watch_events);
+
+       let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
+       loop {
+               node.chan_monitor.simple_monitor.block_connected(&block.header, &txdata, height);
+
+               let watch_events = node.chan_monitor.simple_monitor.release_pending_watch_events();
+               process_chain_watch_events(&watch_events);
+
+               if watch_events.is_empty() {
+                       break;
+               }
+       }
+
+       node.node.block_connected(&block.header, &txdata, height);
 }
 
+fn process_chain_watch_events(_events: &Vec<chain::WatchEvent>) {}
+
 pub fn disconnect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, header: &BlockHeader, height: u32) {
        node.block_notifier.block_disconnected(header, height)
 }
index 5b29d80d4c74a611c75a8d93485628a24ba82bcc..32162c5d77463b74f37b16390f5d11e3b8cc4ab6 100644 (file)
@@ -408,9 +408,6 @@ fn test_1_conf_open() {
        let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
 
        let tx = create_chan_between_nodes_with_value_init(&nodes[0], &nodes[1], 100000, 10001, InitFeatures::known(), InitFeatures::known());
-       assert!(nodes[0].chain_monitor.does_match_tx(&tx));
-       assert!(nodes[1].chain_monitor.does_match_tx(&tx));
-
        let block = Block {
                header: BlockHeader { version: 0x20000000, prev_blockhash: Default::default(), merkle_root: Default::default(), time: 42, bits: 42, nonce: 42 },
                txdata: vec![tx],
@@ -2739,8 +2736,8 @@ fn claim_htlc_outputs_single_tx() {
 
 #[test]
 fn test_htlc_on_chain_success() {
-       // Test that in case of a unilateral close onchain, we detect the state of output thanks to
-       // ChainWatchInterface and pass the preimage backward accordingly. So here we test that ChannelManager is
+       // Test that in case of a unilateral close onchain, we detect the state of output and pass
+       // the preimage backward accordingly. So here we test that ChannelManager is
        // broadcasting the right event to other nodes in payment path.
        // We test with two HTLCs simultaneously as that was not handled correctly in the past.
        // A --------------------> B ----------------------> C (preimage)
@@ -2918,8 +2915,8 @@ fn test_htlc_on_chain_success() {
 
 #[test]
 fn test_htlc_on_chain_timeout() {
-       // Test that in case of a unilateral close onchain, we detect the state of output thanks to
-       // ChainWatchInterface and timeout the HTLC backward accordingly. So here we test that ChannelManager is
+       // Test that in case of a unilateral close onchain, we detect the state of output and
+       // timeout the HTLC backward accordingly. So here we test that ChannelManager is
        // broadcasting the right event to other nodes in payment path.
        // A ------------------> B ----------------------> C (timeout)
        //    B's commitment tx                 C's commitment tx
@@ -5092,8 +5089,8 @@ fn test_static_spendable_outputs_justice_tx_revoked_htlc_success_tx() {
 
 #[test]
 fn test_onchain_to_onchain_claim() {
-       // Test that in case of channel closure, we detect the state of output thanks to
-       // ChainWatchInterface and claim HTLC on downstream peer's remote commitment tx.
+       // Test that in case of channel closure, we detect the state of output and claim HTLC
+       // on downstream peer's remote commitment tx.
        // First, have C claim an HTLC against its own latest commitment transaction.
        // Then, broadcast these to B, which should update the monitor downstream on the A<->B
        // channel.