From e6bc2b541e8bccab7d72c8fa6828c5675a1a7431 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 14 Oct 2021 23:38:08 +0000 Subject: [PATCH] Always release `MonitorEvent`s to `ChannelManager` after 3 blocks If we have a `ChannelMonitor` update from an on-chain event which returns a `TemporaryFailure`, we block `MonitorEvent`s from that `ChannelMonitor` until the update is persisted. This prevents duplicate payment send events to the user after payments get reloaded from monitors on restart. However, if the event being avoided isn't going to generate a PaymentSent, but instead result in us claiming an HTLC from an upstream channel (ie the HTLC was forwarded), then the result of a user delaying the event is that we delay getting our money, not a duplicate event. Because user persistence may take an arbitrary amount of time, we need to bound the amount of time we can possibly wait to return events, which we do here by bounding it to 3 blocks. Thanks to Val for catching this in review. --- lightning/src/chain/chainmonitor.rs | 63 ++++++++++++++++++++++++----- 1 file changed, 52 insertions(+), 11 deletions(-) diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index d99d6708a..13a4ac378 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -29,7 +29,7 @@ use bitcoin::hash_types::Txid; use chain; use chain::{ChannelMonitorUpdateErr, Filter, WatchedOutput}; use chain::chaininterface::{BroadcasterInterface, FeeEstimator}; -use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs}; +use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS}; use chain::transaction::{OutPoint, TransactionData}; use chain::keysinterface::Sign; use util::atomic_counter::AtomicCounter; @@ -42,7 +42,7 @@ use ln::channelmanager::ChannelDetails; use prelude::*; use sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard}; use core::ops::Deref; -use core::sync::atomic::{AtomicBool, Ordering}; +use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; #[derive(Clone, Copy, Hash, PartialEq, Eq)] /// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents @@ -168,6 +168,13 @@ struct MonitorHolder { /// processed the closure event, we set this to true and return PermanentFailure for any other /// chain::Watch events. channel_perm_failed: AtomicBool, + /// The last block height at which no [`UpdateOrigin::ChainSync`] monitor updates were present + /// in `pending_monitor_updates`. + /// If it's been more than [`LATENCY_GRACE_PERIOD_BLOCKS`] since we started waiting on a chain + /// sync event, we let monitor events return to `ChannelManager` because we cannot hold them up + /// forever or we'll end up with HTLC preimages waiting to feed back into an upstream channel + /// forever, risking funds loss. + last_chain_persist_height: AtomicUsize, } impl MonitorHolder { @@ -226,6 +233,8 @@ pub struct ChainMonitor>, + /// The best block height seen, used as a proxy for the passage of time. + highest_chain_height: AtomicUsize, } impl ChainMonitor @@ -244,13 +253,25 @@ where C::Target: chain::Filter, /// calls must not exclude any transactions matching the new outputs nor any in-block /// descendants of such transactions. It is not necessary to re-fetch the block to obtain /// updated `txdata`. - fn process_chain_data(&self, header: &BlockHeader, txdata: &TransactionData, process: FN) + /// + /// Calls which represent a new blockchain tip height should set `best_height`. + fn process_chain_data(&self, header: &BlockHeader, best_height: Option, txdata: &TransactionData, process: FN) where FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { let mut dependent_txdata = Vec::new(); { let monitor_states = self.monitors.write().unwrap(); + if let Some(height) = best_height { + // If the best block height is being updated, update highest_chain_height under the + // monitors write lock. + let old_height = self.highest_chain_height.load(Ordering::Acquire); + let new_height = height as usize; + if new_height > old_height { + self.highest_chain_height.store(new_height, Ordering::Release); + } + } + for (funding_outpoint, monitor_state) in monitor_states.iter() { let monitor = &monitor_state.monitor; let mut txn_outputs; @@ -260,6 +281,14 @@ where C::Target: chain::Filter, contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()), }; let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); + if let Some(height) = best_height { + if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) { + // If there are not ChainSync persists awaiting completion, go ahead and + // set last_chain_persist_height here - we wouldn't want the first + // TemporaryFailure to always immediately be considered "overly delayed". + monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release); + } + } log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor)); match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) { @@ -304,7 +333,7 @@ where C::Target: chain::Filter, dependent_txdata.sort_unstable_by_key(|(index, _tx)| *index); dependent_txdata.dedup_by_key(|(index, _tx)| *index); let txdata: Vec<_> = dependent_txdata.iter().map(|(index, tx)| (*index, tx)).collect(); - self.process_chain_data(header, &txdata, process); + self.process_chain_data(header, None, &txdata, process); // We skip the best height the second go-around } } @@ -325,6 +354,7 @@ where C::Target: chain::Filter, fee_estimator: feeest, persister, pending_monitor_events: Mutex::new(Vec::new()), + highest_chain_height: AtomicUsize::new(0), } } @@ -428,9 +458,11 @@ where C::Target: chain::Filter, }); }, MonitorUpdateId { contents: UpdateOrigin::ChainSync(_) } => { - // We've already done everything we need to, the next time - // release_pending_monitor_events is called, any events for this ChannelMonitor - // will be returned if there's no more SyncPersistId events left. + if !monitor_data.has_pending_chainsync_updates(&pending_monitor_updates) { + monitor_data.last_chain_persist_height.store(self.highest_chain_height.load(Ordering::Acquire), Ordering::Release); + // The next time release_pending_monitor_events is called, any events for this + // ChannelMonitor will be returned. + } }, } Ok(()) @@ -470,7 +502,7 @@ where let header = &block.header; let txdata: Vec<_> = block.txdata.iter().enumerate().collect(); log_debug!(self.logger, "New best block {} at height {} provided via block_connected", header.block_hash(), height); - self.process_chain_data(header, &txdata, |monitor, txdata| { + self.process_chain_data(header, Some(height), &txdata, |monitor, txdata| { monitor.block_connected( header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger) }); @@ -497,7 +529,7 @@ where { fn transactions_confirmed(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) { log_debug!(self.logger, "{} provided transactions confirmed at height {} in block {}", txdata.len(), height, header.block_hash()); - self.process_chain_data(header, txdata, |monitor, txdata| { + self.process_chain_data(header, None, txdata, |monitor, txdata| { monitor.transactions_confirmed( header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger) }); @@ -513,7 +545,7 @@ where fn best_block_updated(&self, header: &BlockHeader, height: u32) { log_debug!(self.logger, "New best block {} at height {} provided via best_block_updated", header.block_hash(), height); - self.process_chain_data(header, &[], |monitor, txdata| { + self.process_chain_data(header, Some(height), &[], |monitor, txdata| { // While in practice there shouldn't be any recursive calls when given empty txdata, // it's still possible if a chain::Filter implementation returns a transaction. debug_assert!(txdata.is_empty()); @@ -580,6 +612,7 @@ where C::Target: chain::Filter, monitor, pending_monitor_updates: Mutex::new(pending_monitor_updates), channel_perm_failed: AtomicBool::new(false), + last_chain_persist_height: AtomicUsize::new(self.highest_chain_height.load(Ordering::Acquire)), }); persist_res } @@ -636,7 +669,10 @@ where C::Target: chain::Filter, let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0); for monitor_state in self.monitors.read().unwrap().values() { let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap()); - if is_pending_monitor_update { + if is_pending_monitor_update && + monitor_state.last_chain_persist_height.load(Ordering::Acquire) + LATENCY_GRACE_PERIOD_BLOCKS as usize + > self.highest_chain_height.load(Ordering::Acquire) + { log_info!(self.logger, "A Channel Monitor sync is still in progress, refusing to provide monitor events!"); } else { if monitor_state.channel_perm_failed.load(Ordering::Acquire) { @@ -650,6 +686,11 @@ where C::Target: chain::Filter, // updated. log_info!(self.logger, "A Channel Monitor sync returned PermanentFailure. Returning monitor events but duplicate events may appear after reload!"); } + if is_pending_monitor_update { + log_error!(self.logger, "A ChannelMonitor sync took longer than {} blocks to complete.", LATENCY_GRACE_PERIOD_BLOCKS); + log_error!(self.logger, " To avoid funds-loss, we are allowing monitor updates to be released."); + log_error!(self.logger, " This may cause duplicate payment events to be generated."); + } pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events()); } } -- 2.39.5