Don't pause events for chainsync persistence
authorGursharan Singh <3442979+G8XSU@users.noreply.github.com>
Thu, 7 Mar 2024 19:00:41 +0000 (11:00 -0800)
committerGursharan Singh <3442979+G8XSU@users.noreply.github.com>
Fri, 26 Apr 2024 21:37:41 +0000 (14:37 -0700)
We used to wait on ChannelMonitor persistence to avoid
duplicate payment events. But this can still happen in cases where
ChannelMonitor handed the event to ChannelManager and we did not persist
ChannelManager after event handling.
It is expected to receive payment duplicate events and clients should handle these
events in an idempotent manner. Removing this hold-up of events simplifies
the logic and makes it easier to not persist ChannelMonitors on every block connect.

lightning/src/chain/chainmonitor.rs
lightning/src/ln/payment_tests.rs

index 81ba30ffb83ed964ca4fe13901be979073e2e6ad..278d882dd99a5cb393545b089783da08897b0afe 100644 (file)
@@ -29,7 +29,7 @@ use bitcoin::hash_types::{Txid, BlockHash};
 use crate::chain;
 use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput};
 use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
-use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, WithChannelMonitor, LATENCY_GRACE_PERIOD_BLOCKS};
+use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, WithChannelMonitor};
 use crate::chain::transaction::{OutPoint, TransactionData};
 use crate::ln::ChannelId;
 use crate::sign::ecdsa::WriteableEcdsaChannelSigner;
@@ -209,17 +209,6 @@ struct MonitorHolder<ChannelSigner: WriteableEcdsaChannelSigner> {
        /// update_persisted_channel, the user returns a
        /// [`ChannelMonitorUpdateStatus::InProgress`], and then calls channel_monitor_updated
        /// immediately, racing our insertion of the pending update into the contained Vec.
-       ///
-       /// Beyond the synchronization of updates themselves, we cannot handle user events until after
-       /// any chain updates have been stored on disk. Thus, we scan this list when returning updates
-       /// to the ChannelManager, refusing to return any updates for a ChannelMonitor which is still
-       /// being persisted fully to disk after a chain update.
-       ///
-       /// This avoids the possibility of handling, e.g. an on-chain claim, generating a claim monitor
-       /// event, resulting in the relevant ChannelManager generating a PaymentSent event and dropping
-       /// the pending payment entry, and then reloading before the monitor is persisted, resulting in
-       /// the ChannelManager re-adding the same payment entry, before the same block is replayed,
-       /// resulting in a duplicate PaymentSent event.
        pending_monitor_updates: Mutex<Vec<MonitorUpdateId>>,
        /// The last block height at which no [`UpdateOrigin::ChainSync`] monitor updates were present
        /// in `pending_monitor_updates`.
@@ -227,6 +216,8 @@ struct MonitorHolder<ChannelSigner: WriteableEcdsaChannelSigner> {
        /// sync event, we let monitor events return to `ChannelManager` because we cannot hold them up
        /// forever or we'll end up with HTLC preimages waiting to feed back into an upstream channel
        /// forever, risking funds loss.
+       ///
+       /// [`LATENCY_GRACE_PERIOD_BLOCKS`]: crate::util::ser::Writeable::write
        last_chain_persist_height: AtomicUsize,
 }
 
@@ -393,7 +384,7 @@ where C::Target: chain::Filter,
                                                chain_sync_update_id
                                        ),
                                ChannelMonitorUpdateStatus::InProgress => {
-                                       log_debug!(logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
+                                       log_debug!(logger, "Channel Monitor sync for channel {} in progress.", log_funding_info!(monitor));
                                        pending_monitor_updates.push(update_id);
                                },
                                ChannelMonitorUpdateStatus::UnrecoverableError => {
@@ -924,21 +915,12 @@ where C::Target: chain::Filter,
        fn release_pending_monitor_events(&self) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, Option<PublicKey>)> {
                let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
                for monitor_state in self.monitors.read().unwrap().values() {
-                       let logger = WithChannelMonitor::from(&self.logger, &monitor_state.monitor);
-                       let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap());
-                       if !is_pending_monitor_update || monitor_state.last_chain_persist_height.load(Ordering::Acquire) + LATENCY_GRACE_PERIOD_BLOCKS as usize <= self.highest_chain_height.load(Ordering::Acquire) {
-                               if is_pending_monitor_update {
-                                       log_error!(logger, "A ChannelMonitor sync took longer than {} blocks to complete.", LATENCY_GRACE_PERIOD_BLOCKS);
-                                       log_error!(logger, "   To avoid funds-loss, we are allowing monitor updates to be released.");
-                                       log_error!(logger, "   This may cause duplicate payment events to be generated.");
-                               }
-                               let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();
-                               if monitor_events.len() > 0 {
-                                       let monitor_outpoint = monitor_state.monitor.get_funding_txo().0;
-                                       let monitor_channel_id = monitor_state.monitor.channel_id();
-                                       let counterparty_node_id = monitor_state.monitor.get_counterparty_node_id();
-                                       pending_monitor_events.push((monitor_outpoint, monitor_channel_id, monitor_events, counterparty_node_id));
-                               }
+                       let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();
+                       if monitor_events.len() > 0 {
+                               let monitor_outpoint = monitor_state.monitor.get_funding_txo().0;
+                               let monitor_channel_id = monitor_state.monitor.channel_id();
+                               let counterparty_node_id = monitor_state.monitor.get_counterparty_node_id();
+                               pending_monitor_events.push((monitor_outpoint, monitor_channel_id, monitor_events, counterparty_node_id));
                        }
                }
                pending_monitor_events
@@ -975,15 +957,12 @@ impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L
 #[cfg(test)]
 mod tests {
        use crate::check_added_monitors;
-       use crate::{expect_payment_claimed, expect_payment_path_successful, get_event_msg};
-       use crate::{get_htlc_update_msgs, get_local_commitment_txn, get_revoke_commit_msgs, get_route_and_payment_hash, unwrap_send_err};
-       use crate::chain::{ChannelMonitorUpdateStatus, Confirm, Watch};
-       use crate::chain::channelmonitor::LATENCY_GRACE_PERIOD_BLOCKS;
+       use crate::{expect_payment_path_successful, get_event_msg};
+       use crate::{get_htlc_update_msgs, get_revoke_commit_msgs};
+       use crate::chain::{ChannelMonitorUpdateStatus, Watch};
        use crate::events::{Event, MessageSendEvent, MessageSendEventsProvider};
-       use crate::ln::channelmanager::{PaymentSendFailure, PaymentId, RecipientOnionFields};
        use crate::ln::functional_test_utils::*;
        use crate::ln::msgs::ChannelMessageHandler;
-       use crate::util::errors::APIError;
 
        #[test]
        fn test_async_ooo_offchain_updates() {
@@ -1090,76 +1069,6 @@ mod tests {
                check_added_monitors!(nodes[0], 1);
        }
 
-       fn do_chainsync_pauses_events(block_timeout: bool) {
-               // When a chainsync monitor update occurs, any MonitorUpdates should be held before being
-               // passed upstream to a `ChannelManager` via `Watch::release_pending_monitor_events`. This
-               // tests that behavior, as well as some ways it might go wrong.
-               let chanmon_cfgs = create_chanmon_cfgs(2);
-               let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
-               let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
-               let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
-               let channel = create_announced_chan_between_nodes(&nodes, 0, 1);
-
-               // Get a route for later and rebalance the channel somewhat
-               send_payment(&nodes[0], &[&nodes[1]], 10_000_000);
-               let (route, second_payment_hash, _, second_payment_secret) = get_route_and_payment_hash!(nodes[0], nodes[1], 100_000);
-
-               // First route a payment that we will claim on chain and give the recipient the preimage.
-               let (payment_preimage, payment_hash, ..) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000);
-               nodes[1].node.claim_funds(payment_preimage);
-               expect_payment_claimed!(nodes[1], payment_hash, 1_000_000);
-               nodes[1].node.get_and_clear_pending_msg_events();
-               check_added_monitors!(nodes[1], 1);
-               let remote_txn = get_local_commitment_txn!(nodes[1], channel.2);
-               assert_eq!(remote_txn.len(), 2);
-
-               // Temp-fail the block connection which will hold the channel-closed event
-               chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clear();
-               chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::InProgress);
-
-               // Connect B's commitment transaction, but only to the ChainMonitor/ChannelMonitor. The
-               // channel is now closed, but the ChannelManager doesn't know that yet.
-               let new_header = create_dummy_header(nodes[0].best_block_info().0, 0);
-               nodes[0].chain_monitor.chain_monitor.transactions_confirmed(&new_header,
-                       &[(0, &remote_txn[0]), (1, &remote_txn[1])], nodes[0].best_block_info().1 + 1);
-               assert!(nodes[0].chain_monitor.release_pending_monitor_events().is_empty());
-               nodes[0].chain_monitor.chain_monitor.best_block_updated(&new_header, nodes[0].best_block_info().1 + 1);
-               assert!(nodes[0].chain_monitor.release_pending_monitor_events().is_empty());
-
-               // If the ChannelManager tries to update the channel, however, the ChainMonitor will pass
-               // the update through to the ChannelMonitor which will refuse it (as the channel is closed).
-               chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
-               unwrap_send_err!(nodes[0].node.send_payment_with_route(&route, second_payment_hash,
-                               RecipientOnionFields::secret_only(second_payment_secret), PaymentId(second_payment_hash.0)
-                       ), false, APIError::MonitorUpdateInProgress, {});
-               check_added_monitors!(nodes[0], 1);
-
-               // However, as the ChainMonitor is still waiting for the original persistence to complete,
-               // it won't yet release the MonitorEvents.
-               assert!(nodes[0].chain_monitor.release_pending_monitor_events().is_empty());
-
-               if block_timeout {
-                       // After three blocks, pending MontiorEvents should be released either way.
-                       let latest_header = create_dummy_header(nodes[0].best_block_info().0, 0);
-                       nodes[0].chain_monitor.chain_monitor.best_block_updated(&latest_header, nodes[0].best_block_info().1 + LATENCY_GRACE_PERIOD_BLOCKS);
-               } else {
-                       let persistences = chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clone();
-                       for (funding_outpoint, update_ids) in persistences {
-                               for update_id in update_ids {
-                                       nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(funding_outpoint, update_id).unwrap();
-                               }
-                       }
-               }
-
-               expect_payment_sent(&nodes[0], payment_preimage, None, true, false);
-       }
-
-       #[test]
-       fn chainsync_pauses_events() {
-               do_chainsync_pauses_events(false);
-               do_chainsync_pauses_events(true);
-       }
-
        #[test]
        #[cfg(feature = "std")]
        fn update_during_chainsync_poisons_channel() {
@@ -1182,3 +1091,4 @@ mod tests {
                }).is_err());
        }
 }
+
index d1fa58372dd93dbe7e86c2af853d6b55f150d508..33cabbf25b0f9c509a7ac225570b1cc8a0c495be 100644 (file)
 //! serialization ordering between ChannelManager/ChannelMonitors and ensuring we can still retry
 //! payments thereafter.
 
-use crate::chain::{ChannelMonitorUpdateStatus, Confirm, Listen, Watch};
+use crate::chain::{ChannelMonitorUpdateStatus, Confirm, Listen};
 use crate::chain::channelmonitor::{ANTI_REORG_DELAY, HTLC_FAIL_BACK_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS};
 use crate::sign::EntropySource;
-use crate::chain::transaction::OutPoint;
 use crate::events::{ClosureReason, Event, HTLCDestination, MessageSendEvent, MessageSendEventsProvider, PathFailure, PaymentFailureReason, PaymentPurpose};
 use crate::ln::channel::{EXPIRE_PREV_CONFIG_TICKS, commit_tx_fee_msat, get_holder_selected_channel_reserve_satoshis, ANCHOR_OUTPUT_VALUE_SATOSHI};
 use crate::ln::channelmanager::{BREAKDOWN_TIMEOUT, MPP_TIMEOUT_TICKS, MIN_CLTV_EXPIRY_DELTA, PaymentId, PaymentSendFailure, RecentPaymentDetails, RecipientOnionFields, HTLCForwardInfo, PendingHTLCRouting, PendingAddHTLCInfo};
@@ -1030,16 +1029,15 @@ fn test_completed_payment_not_retryable_on_reload() {
        do_test_completed_payment_not_retryable_on_reload(false);
 }
 
-
-fn do_test_dup_htlc_onchain_fails_on_reload(persist_manager_post_event: bool, confirm_commitment_tx: bool, payment_timeout: bool) {
+fn do_test_dup_htlc_onchain_doesnt_fail_on_reload(persist_manager_post_event: bool, confirm_commitment_tx: bool, payment_timeout: bool) {
        // When a Channel is closed, any outbound HTLCs which were relayed through it are simply
-       // dropped when the Channel is. From there, the ChannelManager relies on the ChannelMonitor
-       // having a copy of the relevant fail-/claim-back data and processes the HTLC fail/claim when
-       // the ChannelMonitor tells it to.
+       // dropped. From there, the ChannelManager relies on the ChannelMonitor having a copy of the
+       // relevant fail-/claim-back data and processes the HTLC fail/claim when the ChannelMonitor tells
+       // it to.
        //
-       // If, due to an on-chain event, an HTLC is failed/claimed, we should avoid providing the
-       // ChannelManager the HTLC event until after the monitor is re-persisted. This should prevent a
-       // duplicate HTLC fail/claim (e.g. via a PaymentPathFailed event).
+       // If, due to an on-chain event, an HTLC is failed/claimed, we provide the
+       // ChannelManager with the HTLC event without waiting for ChannelMonitor persistence.
+       // This might generate duplicate HTLC fail/claim (e.g. via a PaymentPathFailed event) on reload.
        let chanmon_cfgs = create_chanmon_cfgs(2);
        let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
        let persister;
@@ -1112,14 +1110,9 @@ fn do_test_dup_htlc_onchain_fails_on_reload(persist_manager_post_event: bool, co
                connect_block(&nodes[0], &claim_block);
        }
 
-       let funding_txo = OutPoint { txid: funding_tx.txid(), index: 0 };
-       let mon_updates: Vec<_> = chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap()
-               .get_mut(&funding_txo).unwrap().drain().collect();
-       // If we are using chain::Confirm instead of chain::Listen, we will get the same update twice.
-       // If we're testing connection idempotency we may get substantially more.
-       assert!(mon_updates.len() >= 1);
-       assert!(nodes[0].chain_monitor.release_pending_monitor_events().is_empty());
-       assert!(nodes[0].node.get_and_clear_pending_events().is_empty());
+       // Note that we skip persisting ChannelMonitors. We should still be generating the payment sent
+       // event without ChannelMonitor persistence. If we reset to a previous state on reload, the block
+       // should be replayed and we'll regenerate the event.
 
        // If we persist the ChannelManager here, we should get the PaymentSent event after
        // deserialization.
@@ -1128,13 +1121,7 @@ fn do_test_dup_htlc_onchain_fails_on_reload(persist_manager_post_event: bool, co
                chan_manager_serialized = nodes[0].node.encode();
        }
 
-       // Now persist the ChannelMonitor and inform the ChainMonitor that we're done, generating the
-       // payment sent event.
-       chanmon_cfgs[0].persister.set_update_ret(ChannelMonitorUpdateStatus::Completed);
        let chan_0_monitor_serialized = get_monitor!(nodes[0], chan_id).encode();
-       for update in mon_updates {
-               nodes[0].chain_monitor.chain_monitor.channel_monitor_updated(funding_txo, update).unwrap();
-       }
        if payment_timeout {
                expect_payment_failed!(nodes[0], payment_hash, false);
        } else {
@@ -1168,13 +1155,13 @@ fn do_test_dup_htlc_onchain_fails_on_reload(persist_manager_post_event: bool, co
 }
 
 #[test]
-fn test_dup_htlc_onchain_fails_on_reload() {
-       do_test_dup_htlc_onchain_fails_on_reload(true, true, true);
-       do_test_dup_htlc_onchain_fails_on_reload(true, true, false);
-       do_test_dup_htlc_onchain_fails_on_reload(true, false, false);
-       do_test_dup_htlc_onchain_fails_on_reload(false, true, true);
-       do_test_dup_htlc_onchain_fails_on_reload(false, true, false);
-       do_test_dup_htlc_onchain_fails_on_reload(false, false, false);
+fn test_dup_htlc_onchain_doesnt_fail_on_reload() {
+       do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, true, true);
+       do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, true, false);
+       do_test_dup_htlc_onchain_doesnt_fail_on_reload(true, false, false);
+       do_test_dup_htlc_onchain_doesnt_fail_on_reload(false, true, true);
+       do_test_dup_htlc_onchain_doesnt_fail_on_reload(false, true, false);
+       do_test_dup_htlc_onchain_doesnt_fail_on_reload(false, false, false);
 }
 
 #[test]