Ensure `transactions_confirmed` is idempotent
authorMatt Corallo <git@bluematt.me>
Fri, 18 Nov 2022 18:54:16 +0000 (18:54 +0000)
committerMatt Corallo <git@bluematt.me>
Thu, 24 Nov 2022 03:40:48 +0000 (03:40 +0000)
In many complexity-reduced implementations of chain syncing using
esplora `transactions_confirmed` may be called redundantly for
transactions which were already confirmed. To ensure this is
idempotent we add two new `ConnectionStyle`s in our tests which
(a) call `transactions_confirmed` twice for each call, ensuring
simple idempotency is ensured and (b) call `transactions_confirmed`
once for each historical block every time we're connecting a new
block, ensuring we're fully idempotent even if every call is
repeated constantly.

In order to actually behave correctly this requires a simple
already-confirmed check in `ChannelMonitor`, which is included.

lightning/src/chain/channelmonitor.rs
lightning/src/ln/functional_test_utils.rs
lightning/src/ln/functional_tests.rs
lightning/src/ln/payment_tests.rs

index 1cbc85350302ab27d86758609b4632fa8b67722c..bee2bfd012e0ff3efc628b4954883804f5d6a0c7 100644 (file)
@@ -2869,7 +2869,37 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
 
                let mut watch_outputs = Vec::new();
                let mut claimable_outpoints = Vec::new();
-               for tx in &txn_matched {
+               'tx_iter: for tx in &txn_matched {
+                       let txid = tx.txid();
+                       // If a transaction has already been confirmed, ensure we don't bother processing it duplicatively.
+                       if Some(txid) == self.funding_spend_confirmed {
+                               log_debug!(logger, "Skipping redundant processing of funding-spend tx {} as it was previously confirmed", txid);
+                               continue 'tx_iter;
+                       }
+                       for ev in self.onchain_events_awaiting_threshold_conf.iter() {
+                               if ev.txid == txid {
+                                       if let Some(conf_hash) = ev.block_hash {
+                                               assert_eq!(header.block_hash(), conf_hash,
+                                                       "Transaction {} was already confirmed and is being re-confirmed in a different block.\n\
+                                                       This indicates a severe bug in the transaction connection logic - a reorg should have been processed first!", ev.txid);
+                                       }
+                                       log_debug!(logger, "Skipping redundant processing of confirming tx {} as it was previously confirmed", txid);
+                                       continue 'tx_iter;
+                               }
+                       }
+                       for htlc in self.htlcs_resolved_on_chain.iter() {
+                               if Some(txid) == htlc.resolving_txid {
+                                       log_debug!(logger, "Skipping redundant processing of HTLC resolution tx {} as it was previously confirmed", txid);
+                                       continue 'tx_iter;
+                               }
+                       }
+                       for spendable_txid in self.spendable_txids_confirmed.iter() {
+                               if txid == *spendable_txid {
+                                       log_debug!(logger, "Skipping redundant processing of spendable tx {} as it was previously confirmed", txid);
+                                       continue 'tx_iter;
+                               }
+                       }
+
                        if tx.input.len() == 1 {
                                // Assuming our keys were not leaked (in which case we're screwed no matter what),
                                // commitment transactions and HTLC transactions will all only ever have one input,
@@ -2879,7 +2909,7 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
                                if prevout.txid == self.funding_info.0.txid && prevout.vout == self.funding_info.0.index as u32 {
                                        let mut balance_spendable_csv = None;
                                        log_info!(logger, "Channel {} closed by funding output spend in txid {}.",
-                                               log_bytes!(self.funding_info.0.to_channel_id()), tx.txid());
+                                               log_bytes!(self.funding_info.0.to_channel_id()), txid);
                                        self.funding_spend_seen = true;
                                        let mut commitment_tx_to_counterparty_output = None;
                                        if (tx.input[0].sequence.0 >> 8*3) as u8 == 0x80 && (tx.lock_time.0 >> 8*3) as u8 == 0x20 {
@@ -2902,7 +2932,6 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
                                                        }
                                                }
                                        }
-                                       let txid = tx.txid();
                                        self.onchain_events_awaiting_threshold_conf.push(OnchainEventEntry {
                                                txid,
                                                transaction: Some((*tx).clone()),
index 677fdccfd371819b7692cf90176716fd0b20d409..64699dadcdd14580acae96b45e98089fcd96e1a0 100644 (file)
@@ -107,6 +107,14 @@ pub enum ConnectStyle {
        /// The same as `TransactionsFirst`, however when we have multiple blocks to connect, we only
        /// make a single `best_block_updated` call.
        TransactionsFirstSkippingBlocks,
+       /// The same as `TransactionsFirst`, however when we have multiple blocks to connect, we only
+       /// make a single `best_block_updated` call. Further, we call `transactions_confirmed` multiple
+       /// times to ensure it's idempotent.
+       TransactionsDuplicativelyFirstSkippingBlocks,
+       /// The same as `TransactionsFirst`, however when we have multiple blocks to connect, we only
+       /// make a single `best_block_updated` call. Further, we call `transactions_confirmed` multiple
+       /// times to ensure it's idempotent.
+       HighlyRedundantTransactionsFirstSkippingBlocks,
        /// The same as `TransactionsFirst` when connecting blocks. During disconnection only
        /// `transaction_unconfirmed` is called.
        TransactionsFirstReorgsOnlyTip,
@@ -121,14 +129,16 @@ impl ConnectStyle {
                        use core::hash::{BuildHasher, Hasher};
                        // Get a random value using the only std API to do so - the DefaultHasher
                        let rand_val = std::collections::hash_map::RandomState::new().build_hasher().finish();
-                       let res = match rand_val % 7 {
+                       let res = match rand_val % 9 {
                                0 => ConnectStyle::BestBlockFirst,
                                1 => ConnectStyle::BestBlockFirstSkippingBlocks,
                                2 => ConnectStyle::BestBlockFirstReorgsOnlyTip,
                                3 => ConnectStyle::TransactionsFirst,
                                4 => ConnectStyle::TransactionsFirstSkippingBlocks,
-                               5 => ConnectStyle::TransactionsFirstReorgsOnlyTip,
-                               6 => ConnectStyle::FullBlockViaListen,
+                               5 => ConnectStyle::TransactionsDuplicativelyFirstSkippingBlocks,
+                               6 => ConnectStyle::HighlyRedundantTransactionsFirstSkippingBlocks,
+                               7 => ConnectStyle::TransactionsFirstReorgsOnlyTip,
+                               8 => ConnectStyle::FullBlockViaListen,
                                _ => unreachable!(),
                        };
                        eprintln!("Using Block Connection Style: {:?}", res);
@@ -143,6 +153,7 @@ impl ConnectStyle {
 pub fn connect_blocks<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, depth: u32) -> BlockHash {
        let skip_intermediaries = match *node.connect_style.borrow() {
                ConnectStyle::BestBlockFirstSkippingBlocks|ConnectStyle::TransactionsFirstSkippingBlocks|
+                       ConnectStyle::TransactionsDuplicativelyFirstSkippingBlocks|ConnectStyle::HighlyRedundantTransactionsFirstSkippingBlocks|
                        ConnectStyle::BestBlockFirstReorgsOnlyTip|ConnectStyle::TransactionsFirstReorgsOnlyTip => true,
                _ => false,
        };
@@ -193,8 +204,32 @@ fn do_connect_block<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, block: Block, sk
                                node.node.best_block_updated(&block.header, height);
                                node.node.transactions_confirmed(&block.header, &txdata, height);
                        },
-                       ConnectStyle::TransactionsFirst|ConnectStyle::TransactionsFirstSkippingBlocks|ConnectStyle::TransactionsFirstReorgsOnlyTip => {
+                       ConnectStyle::TransactionsFirst|ConnectStyle::TransactionsFirstSkippingBlocks|
+                       ConnectStyle::TransactionsDuplicativelyFirstSkippingBlocks|ConnectStyle::HighlyRedundantTransactionsFirstSkippingBlocks|
+                       ConnectStyle::TransactionsFirstReorgsOnlyTip => {
+                               if *node.connect_style.borrow() == ConnectStyle::HighlyRedundantTransactionsFirstSkippingBlocks {
+                                       let mut connections = Vec::new();
+                                       for (block, height) in node.blocks.lock().unwrap().iter() {
+                                               if !block.txdata.is_empty() {
+                                                       // Reconnect all transactions we've ever seen to ensure transaction connection
+                                                       // is *really* idempotent. This is a somewhat likely deployment for some
+                                                       // esplora implementations of chain sync which try to reduce state and
+                                                       // complexity as much as possible.
+                                                       //
+                                                       // Sadly we have to clone the block here to maintain lockorder. In the
+                                                       // future we should consider Arc'ing the blocks to avoid this.
+                                                       connections.push((block.clone(), *height));
+                                               }
+                                       }
+                                       for (old_block, height) in connections {
+                                               node.chain_monitor.chain_monitor.transactions_confirmed(&old_block.header,
+                                                       &old_block.txdata.iter().enumerate().collect::<Vec<_>>(), height);
+                                       }
+                               }
                                node.chain_monitor.chain_monitor.transactions_confirmed(&block.header, &txdata, height);
+                               if *node.connect_style.borrow() == ConnectStyle::TransactionsDuplicativelyFirstSkippingBlocks {
+                                       node.chain_monitor.chain_monitor.transactions_confirmed(&block.header, &txdata, height);
+                               }
                                call_claimable_balances(node);
                                node.chain_monitor.chain_monitor.best_block_updated(&block.header, height);
                                node.node.transactions_confirmed(&block.header, &txdata, height);
@@ -226,7 +261,8 @@ pub fn disconnect_blocks<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, count: u32)
                                node.chain_monitor.chain_monitor.block_disconnected(&orig.0.header, orig.1);
                                Listen::block_disconnected(node.node, &orig.0.header, orig.1);
                        },
-                       ConnectStyle::BestBlockFirstSkippingBlocks|ConnectStyle::TransactionsFirstSkippingBlocks => {
+                       ConnectStyle::BestBlockFirstSkippingBlocks|ConnectStyle::TransactionsFirstSkippingBlocks|
+                       ConnectStyle::HighlyRedundantTransactionsFirstSkippingBlocks|ConnectStyle::TransactionsDuplicativelyFirstSkippingBlocks => {
                                if i == count - 1 {
                                        node.chain_monitor.chain_monitor.best_block_updated(&prev.0.header, prev.1);
                                        node.node.best_block_updated(&prev.0.header, prev.1);
index 33eec98c046aea0f6026d530c6ac0c957dd9a817..d240a155fad770099f3240b365a568d74c12bafc 100644 (file)
@@ -2814,12 +2814,17 @@ fn test_htlc_on_chain_success() {
        check_added_monitors!(nodes[1], 1);
        check_closed_event!(nodes[1], 1, ClosureReason::CommitmentTxConfirmed);
        let node_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().clone();
-       assert_eq!(node_txn.len(), 6); // ChannelManager : 3 (commitment tx + HTLC-Sucess * 2), ChannelMonitor : 3 (HTLC-Success, 2* RBF bumps of above HTLC txn)
+       assert!(node_txn.len() == 4 || node_txn.len() == 6); // ChannelManager : 3 (commitment tx + HTLC-Sucess * 2), ChannelMonitor : 3 (HTLC-Success, 2* RBF bumps of above HTLC txn)
        let commitment_spend =
                if node_txn[0].input[0].previous_output.txid == node_a_commitment_tx[0].txid() {
-                       check_spends!(node_txn[1], commitment_tx[0]);
-                       check_spends!(node_txn[2], commitment_tx[0]);
-                       assert_ne!(node_txn[1].input[0].previous_output.vout, node_txn[2].input[0].previous_output.vout);
+                       if node_txn.len() == 6 {
+                               // In some block `ConnectionStyle`s we may avoid broadcasting the double-spending
+                               // transactions spending the HTLC outputs of C's commitment transaction. Otherwise,
+                               // check that the extra broadcasts (double-)spend those here.
+                               check_spends!(node_txn[1], commitment_tx[0]);
+                               check_spends!(node_txn[2], commitment_tx[0]);
+                               assert_ne!(node_txn[1].input[0].previous_output.vout, node_txn[2].input[0].previous_output.vout);
+                       }
                        &node_txn[0]
                } else {
                        check_spends!(node_txn[0], commitment_tx[0]);
@@ -2834,10 +2839,11 @@ fn test_htlc_on_chain_success() {
        assert_eq!(commitment_spend.input[1].witness.last().unwrap().len(), OFFERED_HTLC_SCRIPT_WEIGHT);
        assert_eq!(commitment_spend.lock_time.0, 0);
        assert!(commitment_spend.output[0].script_pubkey.is_v0_p2wpkh()); // direct payment
-       check_spends!(node_txn[3], chan_1.3);
-       assert_eq!(node_txn[3].input[0].witness.clone().last().unwrap().len(), 71);
-       check_spends!(node_txn[4], node_txn[3]);
-       check_spends!(node_txn[5], node_txn[3]);
+       let funding_spend_offset = if node_txn.len() == 6 { 3 } else { 1 };
+       check_spends!(node_txn[funding_spend_offset], chan_1.3);
+       assert_eq!(node_txn[funding_spend_offset].input[0].witness.clone().last().unwrap().len(), 71);
+       check_spends!(node_txn[funding_spend_offset + 1], node_txn[funding_spend_offset]);
+       check_spends!(node_txn[funding_spend_offset + 2], node_txn[funding_spend_offset]);
        // We don't bother to check that B can claim the HTLC output on its commitment tx here as
        // we already checked the same situation with A.
 
index 27dc67a0dadf738c46f9cd5fda36db554b639a73..57c3140dd3ef246e20c230e44b003388c536469c 100644 (file)
@@ -785,8 +785,9 @@ fn do_test_dup_htlc_onchain_fails_on_reload(persist_manager_post_event: bool, co
        let funding_txo = OutPoint { txid: funding_tx.txid(), index: 0 };
        let mon_updates: Vec<_> = chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap()
                .get_mut(&funding_txo).unwrap().drain().collect();
-       // If we are using chain::Confirm instead of chain::Listen, we will get the same update twice
-       assert!(mon_updates.len() == 1 || mon_updates.len() == 2);
+       // If we are using chain::Confirm instead of chain::Listen, we will get the same update twice.
+       // If we're testing connection idempotency we may get substantially more.
+       assert!(mon_updates.len() >= 1);
        assert!(nodes[0].chain_monitor.release_pending_monitor_events().is_empty());
        assert!(nodes[0].node.get_and_clear_pending_events().is_empty());