Always release `MonitorEvent`s to `ChannelManager` after 3 blocks
authorMatt Corallo <git@bluematt.me>
Thu, 14 Oct 2021 23:38:08 +0000 (23:38 +0000)
committerMatt Corallo <git@bluematt.me>
Wed, 20 Oct 2021 00:06:18 +0000 (00:06 +0000)
If we have a `ChannelMonitor` update from an on-chain event which
returns a `TemporaryFailure`, we block `MonitorEvent`s from that
`ChannelMonitor` until the update is persisted. This prevents
duplicate payment send events to the user after payments get
reloaded from monitors on restart.

However, if the event being avoided isn't going to generate a
PaymentSent, but instead result in us claiming an HTLC from an
upstream channel (ie the HTLC was forwarded), then the result of a
user delaying the event is that we delay getting our money, not a
duplicate event.

Because user persistence may take an arbitrary amount of time, we
need to bound the amount of time we can possibly wait to return
events, which we do here by bounding it to 3 blocks.

Thanks to Val for catching this in review.

lightning/src/chain/chainmonitor.rs

index d99d6708a3c71e209dc03b229a5b36e49d3ddf97..13a4ac3786cee28f682d2be049abf1b049a388ee 100644 (file)
@@ -29,7 +29,7 @@ use bitcoin::hash_types::Txid;
 use chain;
 use chain::{ChannelMonitorUpdateErr, Filter, WatchedOutput};
 use chain::chaininterface::{BroadcasterInterface, FeeEstimator};
-use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs};
+use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS};
 use chain::transaction::{OutPoint, TransactionData};
 use chain::keysinterface::Sign;
 use util::atomic_counter::AtomicCounter;
@@ -42,7 +42,7 @@ use ln::channelmanager::ChannelDetails;
 use prelude::*;
 use sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard};
 use core::ops::Deref;
-use core::sync::atomic::{AtomicBool, Ordering};
+use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
 
 #[derive(Clone, Copy, Hash, PartialEq, Eq)]
 /// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents
@@ -168,6 +168,13 @@ struct MonitorHolder<ChannelSigner: Sign> {
        /// processed the closure event, we set this to true and return PermanentFailure for any other
        /// chain::Watch events.
        channel_perm_failed: AtomicBool,
+       /// The last block height at which no [`UpdateOrigin::ChainSync`] monitor updates were present
+       /// in `pending_monitor_updates`.
+       /// If it's been more than [`LATENCY_GRACE_PERIOD_BLOCKS`] since we started waiting on a chain
+       /// sync event, we let monitor events return to `ChannelManager` because we cannot hold them up
+       /// forever or we'll end up with HTLC preimages waiting to feed back into an upstream channel
+       /// forever, risking funds loss.
+       last_chain_persist_height: AtomicUsize,
 }
 
 impl<ChannelSigner: Sign> MonitorHolder<ChannelSigner> {
@@ -226,6 +233,8 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
        /// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
        /// from the user and not from a [`ChannelMonitor`].
        pending_monitor_events: Mutex<Vec<MonitorEvent>>,
+       /// The best block height seen, used as a proxy for the passage of time.
+       highest_chain_height: AtomicUsize,
 }
 
 impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
@@ -244,13 +253,25 @@ where C::Target: chain::Filter,
        /// calls must not exclude any transactions matching the new outputs nor any in-block
        /// descendants of such transactions. It is not necessary to re-fetch the block to obtain
        /// updated `txdata`.
-       fn process_chain_data<FN>(&self, header: &BlockHeader, txdata: &TransactionData, process: FN)
+       ///
+       /// Calls which represent a new blockchain tip height should set `best_height`.
+       fn process_chain_data<FN>(&self, header: &BlockHeader, best_height: Option<u32>, txdata: &TransactionData, process: FN)
        where
                FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
        {
                let mut dependent_txdata = Vec::new();
                {
                        let monitor_states = self.monitors.write().unwrap();
+                       if let Some(height) = best_height {
+                               // If the best block height is being updated, update highest_chain_height under the
+                               // monitors write lock.
+                               let old_height = self.highest_chain_height.load(Ordering::Acquire);
+                               let new_height = height as usize;
+                               if new_height > old_height {
+                                       self.highest_chain_height.store(new_height, Ordering::Release);
+                               }
+                       }
+
                        for (funding_outpoint, monitor_state) in monitor_states.iter() {
                                let monitor = &monitor_state.monitor;
                                let mut txn_outputs;
@@ -260,6 +281,14 @@ where C::Target: chain::Filter,
                                                contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()),
                                        };
                                        let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
+                                       if let Some(height) = best_height {
+                                               if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) {
+                                                       // If there are not ChainSync persists awaiting completion, go ahead and
+                                                       // set last_chain_persist_height here - we wouldn't want the first
+                                                       // TemporaryFailure to always immediately be considered "overly delayed".
+                                                       monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release);
+                                               }
+                                       }
 
                                        log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
                                        match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) {
@@ -304,7 +333,7 @@ where C::Target: chain::Filter,
                        dependent_txdata.sort_unstable_by_key(|(index, _tx)| *index);
                        dependent_txdata.dedup_by_key(|(index, _tx)| *index);
                        let txdata: Vec<_> = dependent_txdata.iter().map(|(index, tx)| (*index, tx)).collect();
-                       self.process_chain_data(header, &txdata, process);
+                       self.process_chain_data(header, None, &txdata, process); // We skip the best height the second go-around
                }
        }
 
@@ -325,6 +354,7 @@ where C::Target: chain::Filter,
                        fee_estimator: feeest,
                        persister,
                        pending_monitor_events: Mutex::new(Vec::new()),
+                       highest_chain_height: AtomicUsize::new(0),
                }
        }
 
@@ -428,9 +458,11 @@ where C::Target: chain::Filter,
                                });
                        },
                        MonitorUpdateId { contents: UpdateOrigin::ChainSync(_) } => {
-                               // We've already done everything we need to, the next time
-                               // release_pending_monitor_events is called, any events for this ChannelMonitor
-                               // will be returned if there's no more SyncPersistId events left.
+                               if !monitor_data.has_pending_chainsync_updates(&pending_monitor_updates) {
+                                       monitor_data.last_chain_persist_height.store(self.highest_chain_height.load(Ordering::Acquire), Ordering::Release);
+                                       // The next time release_pending_monitor_events is called, any events for this
+                                       // ChannelMonitor will be returned.
+                               }
                        },
                }
                Ok(())
@@ -470,7 +502,7 @@ where
                let header = &block.header;
                let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
                log_debug!(self.logger, "New best block {} at height {} provided via block_connected", header.block_hash(), height);
-               self.process_chain_data(header, &txdata, |monitor, txdata| {
+               self.process_chain_data(header, Some(height), &txdata, |monitor, txdata| {
                        monitor.block_connected(
                                header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
                });
@@ -497,7 +529,7 @@ where
 {
        fn transactions_confirmed(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) {
                log_debug!(self.logger, "{} provided transactions confirmed at height {} in block {}", txdata.len(), height, header.block_hash());
-               self.process_chain_data(header, txdata, |monitor, txdata| {
+               self.process_chain_data(header, None, txdata, |monitor, txdata| {
                        monitor.transactions_confirmed(
                                header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
                });
@@ -513,7 +545,7 @@ where
 
        fn best_block_updated(&self, header: &BlockHeader, height: u32) {
                log_debug!(self.logger, "New best block {} at height {} provided via best_block_updated", header.block_hash(), height);
-               self.process_chain_data(header, &[], |monitor, txdata| {
+               self.process_chain_data(header, Some(height), &[], |monitor, txdata| {
                        // While in practice there shouldn't be any recursive calls when given empty txdata,
                        // it's still possible if a chain::Filter implementation returns a transaction.
                        debug_assert!(txdata.is_empty());
@@ -580,6 +612,7 @@ where C::Target: chain::Filter,
                        monitor,
                        pending_monitor_updates: Mutex::new(pending_monitor_updates),
                        channel_perm_failed: AtomicBool::new(false),
+                       last_chain_persist_height: AtomicUsize::new(self.highest_chain_height.load(Ordering::Acquire)),
                });
                persist_res
        }
@@ -636,7 +669,10 @@ where C::Target: chain::Filter,
                let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
                for monitor_state in self.monitors.read().unwrap().values() {
                        let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap());
-                       if is_pending_monitor_update {
+                       if is_pending_monitor_update &&
+                                       monitor_state.last_chain_persist_height.load(Ordering::Acquire) + LATENCY_GRACE_PERIOD_BLOCKS as usize
+                                               > self.highest_chain_height.load(Ordering::Acquire)
+                       {
                                log_info!(self.logger, "A Channel Monitor sync is still in progress, refusing to provide monitor events!");
                        } else {
                                if monitor_state.channel_perm_failed.load(Ordering::Acquire) {
@@ -650,6 +686,11 @@ where C::Target: chain::Filter,
                                        // updated.
                                        log_info!(self.logger, "A Channel Monitor sync returned PermanentFailure. Returning monitor events but duplicate events may appear after reload!");
                                }
+                               if is_pending_monitor_update {
+                                       log_error!(self.logger, "A ChannelMonitor sync took longer than {} blocks to complete.", LATENCY_GRACE_PERIOD_BLOCKS);
+                                       log_error!(self.logger, "   To avoid funds-loss, we are allowing monitor updates to be released.");
+                                       log_error!(self.logger, "   This may cause duplicate payment events to be generated.");
+                               }
                                pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events());
                        }
                }