Implement pending claim rebroadcast on force-closed channels
authorWilmer Paulino <wilmer@wilmerpaulino.com>
Mon, 17 Apr 2023 21:17:53 +0000 (14:17 -0700)
committerWilmer Paulino <wilmer@wilmerpaulino.com>
Fri, 21 Apr 2023 21:34:41 +0000 (14:34 -0700)
This attempts to rebroadcast/fee-bump each pending claim a monitor is
tracking for a force-closed channel. This is crucial in preventing
certain classes of pinning attacks and ensures reliability if
broadcasting fails. For implementations of `FeeEstimator` that also
support mempool fee estimation, we may broadcast a fee-bumped claim
instead, ensuring we can also react to mempool fee spikes between
blocks.

lightning/src/chain/chainmonitor.rs
lightning/src/chain/channelmonitor.rs
lightning/src/chain/onchaintx.rs
lightning/src/chain/package.rs
lightning/src/ln/functional_test_utils.rs
lightning/src/ln/monitor_tests.rs

index fdaa25f69b8ea7df39dc6b246d2f062095071019..e7c2b0f18ec001d703eb0249964b90a8a3f1a5ed 100644 (file)
@@ -217,8 +217,15 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> Deref for LockedChannelMonitor<
 /// or used independently to monitor channels remotely. See the [module-level documentation] for
 /// details.
 ///
+/// Note that `ChainMonitor` should regularly trigger rebroadcasts/fee bumps of pending claims from
+/// a force-closed channel. This is crucial in preventing certain classes of pinning attacks,
+/// detecting substantial mempool feerate changes between blocks, and ensuring reliability if
+/// broadcasting fails. We recommend invoking this every 30 seconds, or lower if running in an
+/// environment with spotty connections, like on mobile.
+///
 /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
 /// [module-level documentation]: crate::chain::chainmonitor
+/// [`rebroadcast_pending_claims`]: Self::rebroadcast_pending_claims
 pub struct ChainMonitor<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
        where C::Target: chain::Filter,
         T::Target: BroadcasterInterface,
@@ -533,6 +540,20 @@ where C::Target: chain::Filter,
        pub fn get_update_future(&self) -> Future {
                self.event_notifier.get_future()
        }
+
+       /// Triggers rebroadcasts/fee-bumps of pending claims from a force-closed channel. This is
+       /// crucial in preventing certain classes of pinning attacks, detecting substantial mempool
+       /// feerate changes between blocks, and ensuring reliability if broadcasting fails. We recommend
+       /// invoking this every 30 seconds, or lower if running in an environment with spotty
+       /// connections, like on mobile.
+       pub fn rebroadcast_pending_claims(&self) {
+               let monitors = self.monitors.read().unwrap();
+               for (_, monitor_holder) in &*monitors {
+                       monitor_holder.monitor.rebroadcast_pending_claims(
+                               &*self.broadcaster, &*self.fee_estimator, &*self.logger
+                       )
+               }
+       }
 }
 
 impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
index 5ff297b1af7477c4fee3cff8d8b5db6a7f428a48..cb2183f8509b78bf1b878066ff3289018665d891 100644 (file)
@@ -1467,6 +1467,27 @@ impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitor<Signer> {
        pub fn current_best_block(&self) -> BestBlock {
                self.inner.lock().unwrap().best_block.clone()
        }
+
+       /// Triggers rebroadcasts/fee-bumps of pending claims from a force-closed channel. This is
+       /// crucial in preventing certain classes of pinning attacks, detecting substantial mempool
+       /// feerate changes between blocks, and ensuring reliability if broadcasting fails. We recommend
+       /// invoking this every 30 seconds, or lower if running in an environment with spotty
+       /// connections, like on mobile.
+       pub fn rebroadcast_pending_claims<B: Deref, F: Deref, L: Deref>(
+               &self, broadcaster: B, fee_estimator: F, logger: L,
+       )
+       where
+               B::Target: BroadcasterInterface,
+               F::Target: FeeEstimator,
+               L::Target: Logger,
+       {
+               let fee_estimator = LowerBoundedFeeEstimator::new(fee_estimator);
+               let mut inner = self.inner.lock().unwrap();
+               let current_height = inner.best_block.height;
+               inner.onchain_tx_handler.rebroadcast_pending_claims(
+                       current_height, &broadcaster, &fee_estimator, &logger,
+               );
+       }
 }
 
 impl<Signer: WriteableEcdsaChannelSigner> ChannelMonitorImpl<Signer> {
index aece19c8371566bee10913d7f0404bbdba3ea62c..04776fbb0899038e1aef56ff2cf12361d7649c2c 100644 (file)
@@ -481,6 +481,59 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner> OnchainTxHandler<ChannelSigner>
                events.into_iter().map(|(_, event)| event).collect()
        }
 
+       /// Triggers rebroadcasts/fee-bumps of pending claims from a force-closed channel. This is
+       /// crucial in preventing certain classes of pinning attacks, detecting substantial mempool
+       /// feerate changes between blocks, and ensuring reliability if broadcasting fails. We recommend
+       /// invoking this every 30 seconds, or lower if running in an environment with spotty
+       /// connections, like on mobile.
+       pub(crate) fn rebroadcast_pending_claims<B: Deref, F: Deref, L: Deref>(
+               &mut self, current_height: u32, broadcaster: &B, fee_estimator: &LowerBoundedFeeEstimator<F>,
+               logger: &L,
+       )
+       where
+               B::Target: BroadcasterInterface,
+               F::Target: FeeEstimator,
+               L::Target: Logger,
+       {
+               let mut bump_requests = Vec::with_capacity(self.pending_claim_requests.len());
+               for (package_id, request) in self.pending_claim_requests.iter() {
+                       let inputs = request.outpoints();
+                       log_info!(logger, "Triggering rebroadcast/fee-bump for request with inputs {:?}", inputs);
+                       bump_requests.push((*package_id, request.clone()));
+               }
+               for (package_id, request) in bump_requests {
+                       self.generate_claim(current_height, &request, false /* force_feerate_bump */, fee_estimator, logger)
+                               .map(|(_, new_feerate, claim)| {
+                                       let mut bumped_feerate = false;
+                                       if let Some(mut_request) = self.pending_claim_requests.get_mut(&package_id) {
+                                               bumped_feerate = request.previous_feerate() > new_feerate;
+                                               mut_request.set_feerate(new_feerate);
+                                       }
+                                       match claim {
+                                               OnchainClaim::Tx(tx) => {
+                                                       let log_start = if bumped_feerate { "Broadcasting RBF-bumped" } else { "Rebroadcasting" };
+                                                       log_info!(logger, "{} onchain {}", log_start, log_tx!(tx));
+                                                       broadcaster.broadcast_transaction(&tx);
+                                               },
+                                               #[cfg(anchors)]
+                                               OnchainClaim::Event(event) => {
+                                                       let log_start = if bumped_feerate { "Yielding fee-bumped" } else { "Replaying" };
+                                                       log_info!(logger, "{} onchain event to spend inputs {:?}", log_start,
+                                                               request.outpoints());
+                                                       #[cfg(debug_assertions)] {
+                                                               debug_assert!(request.requires_external_funding());
+                                                               let num_existing = self.pending_claim_events.iter()
+                                                                       .filter(|entry| entry.0 == package_id).count();
+                                                               assert!(num_existing == 0 || num_existing == 1);
+                                                       }
+                                                       self.pending_claim_events.retain(|event| event.0 != package_id);
+                                                       self.pending_claim_events.push((package_id, event));
+                                               }
+                                       }
+                               });
+               }
+       }
+
        /// Lightning security model (i.e being able to redeem/timeout HTLC or penalize counterparty
        /// onchain) lays on the assumption of claim transactions getting confirmed before timelock
        /// expiration (CSV or CLTV following cases). In case of high-fee spikes, claim tx may get stuck
index 5974a65d2a3e7527102c57cd8781a66d4a4b21e6..b3f6f50ed1d17fb390c04a2995eb98e9ef3fd558 100644 (file)
@@ -554,6 +554,9 @@ impl PackageTemplate {
        pub(crate) fn aggregable(&self) -> bool {
                self.aggregable
        }
+       pub(crate) fn previous_feerate(&self) -> u64 {
+               self.feerate_previous
+       }
        pub(crate) fn set_feerate(&mut self, new_feerate: u64) {
                self.feerate_previous = new_feerate;
        }
index 6d96877625ccb4c74d1b41c0d56ee1ab5c46c1d0..b695e20a8dd22ba589f2e3cf0a0bd8f4dbad0ddc 100644 (file)
@@ -151,6 +151,20 @@ impl ConnectStyle {
                }
        }
 
+       pub fn updates_best_block_first(&self) -> bool {
+               match self {
+                       ConnectStyle::BestBlockFirst => true,
+                       ConnectStyle::BestBlockFirstSkippingBlocks => true,
+                       ConnectStyle::BestBlockFirstReorgsOnlyTip => true,
+                       ConnectStyle::TransactionsFirst => false,
+                       ConnectStyle::TransactionsFirstSkippingBlocks => false,
+                       ConnectStyle::TransactionsDuplicativelyFirstSkippingBlocks => false,
+                       ConnectStyle::HighlyRedundantTransactionsFirstSkippingBlocks => false,
+                       ConnectStyle::TransactionsFirstReorgsOnlyTip => false,
+                       ConnectStyle::FullBlockViaListen => false,
+               }
+       }
+
        fn random_style() -> ConnectStyle {
                #[cfg(feature = "std")] {
                        use core::hash::{BuildHasher, Hasher};
index d09f4229c8cd292a149e670071ecc6e701c1c6b6..5b2b29dbb76f11675a2269cc3ebafb527eda063d 100644 (file)
@@ -1778,6 +1778,157 @@ fn test_restored_packages_retry() {
        do_test_restored_packages_retry(true);
 }
 
+fn do_test_monitor_rebroadcast_pending_claims(anchors: bool) {
+       // Test that we will retry broadcasting pending claims for a force-closed channel on every
+       // `ChainMonitor::rebroadcast_pending_claims` call.
+       if anchors {
+               assert!(cfg!(anchors));
+       }
+       let secp = Secp256k1::new();
+       let mut chanmon_cfgs = create_chanmon_cfgs(2);
+       let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+       let mut config = test_default_channel_config();
+       if anchors {
+               #[cfg(anchors)] {
+                       config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true;
+               }
+       }
+       let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[Some(config), Some(config)]);
+       let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+
+       let (_, _, _, chan_id, funding_tx) = create_chan_between_nodes_with_value(
+               &nodes[0], &nodes[1], 1_000_000, 500_000_000
+       );
+       const HTLC_AMT_MSAT: u64 = 1_000_000;
+       const HTLC_AMT_SAT: u64 = HTLC_AMT_MSAT / 1000;
+       route_payment(&nodes[0], &[&nodes[1]], HTLC_AMT_MSAT);
+
+       let htlc_expiry = nodes[0].best_block_info().1 + TEST_FINAL_CLTV + 1;
+
+       let commitment_txn = get_local_commitment_txn!(&nodes[0], &chan_id);
+       assert_eq!(commitment_txn.len(), if anchors { 1 /* commitment tx only */} else { 2 /* commitment and htlc timeout tx */ });
+       check_spends!(&commitment_txn[0], &funding_tx);
+       mine_transaction(&nodes[0], &commitment_txn[0]);
+       check_closed_broadcast!(&nodes[0], true);
+       check_closed_event(&nodes[0], 1, ClosureReason::CommitmentTxConfirmed, false);
+       check_added_monitors(&nodes[0], 1);
+
+       // Set up a helper closure we'll use throughout our test. We should only expect retries without
+       // bumps if fees have not increased after a block has been connected (assuming the height timer
+       // re-evaluates at every block) or after `ChainMonitor::rebroadcast_pending_claims` is called.
+       let mut prev_htlc_tx_feerate = None;
+       let mut check_htlc_retry = |should_retry: bool, should_bump: bool| -> Option<Transaction> {
+               let (htlc_tx, htlc_tx_feerate) = if anchors {
+                       assert!(nodes[0].tx_broadcaster.txn_broadcast().is_empty());
+                       let mut events = nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events();
+                       assert_eq!(events.len(), if should_retry { 1 } else { 0 });
+                       if !should_retry {
+                               return None;
+                       }
+                       #[allow(unused_assignments)]
+                       let mut tx = Transaction {
+                               version: 2,
+                               lock_time: bitcoin::PackedLockTime::ZERO,
+                               input: vec![],
+                               output: vec![],
+                       };
+                       #[allow(unused_assignments)]
+                       let mut feerate = 0;
+                       #[cfg(anchors)] {
+                               feerate = if let Event::BumpTransaction(BumpTransactionEvent::HTLCResolution {
+                                       target_feerate_sat_per_1000_weight, mut htlc_descriptors, tx_lock_time,
+                               }) = events.pop().unwrap() {
+                                       assert_eq!(htlc_descriptors.len(), 1);
+                                       let descriptor = htlc_descriptors.pop().unwrap();
+                                       assert_eq!(descriptor.commitment_txid, commitment_txn[0].txid());
+                                       let htlc_output_idx = descriptor.htlc.transaction_output_index.unwrap() as usize;
+                                       assert!(htlc_output_idx < commitment_txn[0].output.len());
+                                       tx.lock_time = tx_lock_time;
+                                       // Note that we don't care about actually making the HTLC transaction meet the
+                                       // feerate for the test, we just want to make sure the feerates we receive from
+                                       // the events never decrease.
+                                       tx.input.push(descriptor.unsigned_tx_input());
+                                       let signer = nodes[0].keys_manager.derive_channel_keys(
+                                               descriptor.channel_value_satoshis, &descriptor.channel_keys_id,
+                                       );
+                                       let per_commitment_point = signer.get_per_commitment_point(
+                                               descriptor.per_commitment_number, &secp
+                                       );
+                                       tx.output.push(descriptor.tx_output(&per_commitment_point, &secp));
+                                       let our_sig = signer.sign_holder_htlc_transaction(&mut tx, 0, &descriptor, &secp).unwrap();
+                                       let witness_script = descriptor.witness_script(&per_commitment_point, &secp);
+                                       tx.input[0].witness = descriptor.tx_input_witness(&our_sig, &witness_script);
+                                       target_feerate_sat_per_1000_weight as u64
+                               } else { panic!("unexpected event"); };
+                       }
+                       (tx, feerate)
+               } else {
+                       assert!(nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events().is_empty());
+                       let mut txn = nodes[0].tx_broadcaster.txn_broadcast();
+                       assert_eq!(txn.len(), if should_retry { 1 } else { 0 });
+                       if !should_retry {
+                               return None;
+                       }
+                       let htlc_tx = txn.pop().unwrap();
+                       check_spends!(htlc_tx, commitment_txn[0]);
+                       let htlc_tx_fee = HTLC_AMT_SAT - htlc_tx.output[0].value;
+                       let htlc_tx_feerate = htlc_tx_fee * 1000 / htlc_tx.weight() as u64;
+                       (htlc_tx, htlc_tx_feerate)
+               };
+               if should_bump {
+                       assert!(htlc_tx_feerate > prev_htlc_tx_feerate.take().unwrap());
+               } else if let Some(prev_feerate) = prev_htlc_tx_feerate.take() {
+                       assert_eq!(htlc_tx_feerate, prev_feerate);
+               }
+               prev_htlc_tx_feerate = Some(htlc_tx_feerate);
+               Some(htlc_tx)
+       };
+
+       // Connect blocks up to one before the HTLC expires. This should not result in a claim/retry.
+       connect_blocks(&nodes[0], htlc_expiry - nodes[0].best_block_info().1 - 2);
+       check_htlc_retry(false, false);
+
+       // Connect one more block, producing our first claim.
+       connect_blocks(&nodes[0], 1);
+       check_htlc_retry(true, false);
+
+       // Connect one more block, expecting a retry with a fee bump. Unfortunately, we cannot bump HTLC
+       // transactions pre-anchors.
+       connect_blocks(&nodes[0], 1);
+       check_htlc_retry(true, anchors);
+
+       // Trigger a call and we should have another retry, but without a bump.
+       nodes[0].chain_monitor.chain_monitor.rebroadcast_pending_claims();
+       check_htlc_retry(true, false);
+
+       // Double the feerate and trigger a call, expecting a fee-bumped retry.
+       *nodes[0].fee_estimator.sat_per_kw.lock().unwrap() *= 2;
+       nodes[0].chain_monitor.chain_monitor.rebroadcast_pending_claims();
+       check_htlc_retry(true, anchors);
+
+       // Connect one more block, expecting a retry with a fee bump. Unfortunately, we cannot bump HTLC
+       // transactions pre-anchors.
+       connect_blocks(&nodes[0], 1);
+       let htlc_tx = check_htlc_retry(true, anchors).unwrap();
+
+       // Mine the HTLC transaction to ensure we don't retry claims while they're confirmed.
+       mine_transaction(&nodes[0], &htlc_tx);
+       // If we have a `ConnectStyle` that advertises the new block first without the transasctions,
+       // we'll receive an extra bumped claim.
+       if nodes[0].connect_style.borrow().updates_best_block_first() {
+               check_htlc_retry(true, anchors);
+       }
+       nodes[0].chain_monitor.chain_monitor.rebroadcast_pending_claims();
+       check_htlc_retry(false, false);
+}
+
+#[test]
+fn test_monitor_timer_based_claim() {
+       do_test_monitor_rebroadcast_pending_claims(false);
+       #[cfg(anchors)]
+       do_test_monitor_rebroadcast_pending_claims(true);
+}
+
 #[cfg(anchors)]
 #[test]
 fn test_yield_anchors_events() {