Always release `MonitorEvent`s to `ChannelManager` after 3 blocks
[rust-lightning] / lightning / src / chain / chainmonitor.rs
index d99d6708a3c71e209dc03b229a5b36e49d3ddf97..13a4ac3786cee28f682d2be049abf1b049a388ee 100644 (file)
@@ -29,7 +29,7 @@ use bitcoin::hash_types::Txid;
 use chain;
 use chain::{ChannelMonitorUpdateErr, Filter, WatchedOutput};
 use chain::chaininterface::{BroadcasterInterface, FeeEstimator};
-use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs};
+use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS};
 use chain::transaction::{OutPoint, TransactionData};
 use chain::keysinterface::Sign;
 use util::atomic_counter::AtomicCounter;
@@ -42,7 +42,7 @@ use ln::channelmanager::ChannelDetails;
 use prelude::*;
 use sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard};
 use core::ops::Deref;
-use core::sync::atomic::{AtomicBool, Ordering};
+use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
 
 #[derive(Clone, Copy, Hash, PartialEq, Eq)]
 /// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents
@@ -168,6 +168,13 @@ struct MonitorHolder<ChannelSigner: Sign> {
        /// processed the closure event, we set this to true and return PermanentFailure for any other
        /// chain::Watch events.
        channel_perm_failed: AtomicBool,
+       /// The last block height at which no [`UpdateOrigin::ChainSync`] monitor updates were present
+       /// in `pending_monitor_updates`.
+       /// If it's been more than [`LATENCY_GRACE_PERIOD_BLOCKS`] since we started waiting on a chain
+       /// sync event, we let monitor events return to `ChannelManager` because we cannot hold them up
+       /// forever or we'll end up with HTLC preimages waiting to feed back into an upstream channel
+       /// forever, risking funds loss.
+       last_chain_persist_height: AtomicUsize,
 }
 
 impl<ChannelSigner: Sign> MonitorHolder<ChannelSigner> {
@@ -226,6 +233,8 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
        /// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
        /// from the user and not from a [`ChannelMonitor`].
        pending_monitor_events: Mutex<Vec<MonitorEvent>>,
+       /// The best block height seen, used as a proxy for the passage of time.
+       highest_chain_height: AtomicUsize,
 }
 
 impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
@@ -244,13 +253,25 @@ where C::Target: chain::Filter,
        /// calls must not exclude any transactions matching the new outputs nor any in-block
        /// descendants of such transactions. It is not necessary to re-fetch the block to obtain
        /// updated `txdata`.
-       fn process_chain_data<FN>(&self, header: &BlockHeader, txdata: &TransactionData, process: FN)
+       ///
+       /// Calls which represent a new blockchain tip height should set `best_height`.
+       fn process_chain_data<FN>(&self, header: &BlockHeader, best_height: Option<u32>, txdata: &TransactionData, process: FN)
        where
                FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
        {
                let mut dependent_txdata = Vec::new();
                {
                        let monitor_states = self.monitors.write().unwrap();
+                       if let Some(height) = best_height {
+                               // If the best block height is being updated, update highest_chain_height under the
+                               // monitors write lock.
+                               let old_height = self.highest_chain_height.load(Ordering::Acquire);
+                               let new_height = height as usize;
+                               if new_height > old_height {
+                                       self.highest_chain_height.store(new_height, Ordering::Release);
+                               }
+                       }
+
                        for (funding_outpoint, monitor_state) in monitor_states.iter() {
                                let monitor = &monitor_state.monitor;
                                let mut txn_outputs;
@@ -260,6 +281,14 @@ where C::Target: chain::Filter,
                                                contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()),
                                        };
                                        let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
+                                       if let Some(height) = best_height {
+                                               if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) {
+                                                       // If there are not ChainSync persists awaiting completion, go ahead and
+                                                       // set last_chain_persist_height here - we wouldn't want the first
+                                                       // TemporaryFailure to always immediately be considered "overly delayed".
+                                                       monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release);
+                                               }
+                                       }
 
                                        log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
                                        match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) {
@@ -304,7 +333,7 @@ where C::Target: chain::Filter,
                        dependent_txdata.sort_unstable_by_key(|(index, _tx)| *index);
                        dependent_txdata.dedup_by_key(|(index, _tx)| *index);
                        let txdata: Vec<_> = dependent_txdata.iter().map(|(index, tx)| (*index, tx)).collect();
-                       self.process_chain_data(header, &txdata, process);
+                       self.process_chain_data(header, None, &txdata, process); // We skip the best height the second go-around
                }
        }
 
@@ -325,6 +354,7 @@ where C::Target: chain::Filter,
                        fee_estimator: feeest,
                        persister,
                        pending_monitor_events: Mutex::new(Vec::new()),
+                       highest_chain_height: AtomicUsize::new(0),
                }
        }
 
@@ -428,9 +458,11 @@ where C::Target: chain::Filter,
                                });
                        },
                        MonitorUpdateId { contents: UpdateOrigin::ChainSync(_) } => {
-                               // We've already done everything we need to, the next time
-                               // release_pending_monitor_events is called, any events for this ChannelMonitor
-                               // will be returned if there's no more SyncPersistId events left.
+                               if !monitor_data.has_pending_chainsync_updates(&pending_monitor_updates) {
+                                       monitor_data.last_chain_persist_height.store(self.highest_chain_height.load(Ordering::Acquire), Ordering::Release);
+                                       // The next time release_pending_monitor_events is called, any events for this
+                                       // ChannelMonitor will be returned.
+                               }
                        },
                }
                Ok(())
@@ -470,7 +502,7 @@ where
                let header = &block.header;
                let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
                log_debug!(self.logger, "New best block {} at height {} provided via block_connected", header.block_hash(), height);
-               self.process_chain_data(header, &txdata, |monitor, txdata| {
+               self.process_chain_data(header, Some(height), &txdata, |monitor, txdata| {
                        monitor.block_connected(
                                header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
                });
@@ -497,7 +529,7 @@ where
 {
        fn transactions_confirmed(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) {
                log_debug!(self.logger, "{} provided transactions confirmed at height {} in block {}", txdata.len(), height, header.block_hash());
-               self.process_chain_data(header, txdata, |monitor, txdata| {
+               self.process_chain_data(header, None, txdata, |monitor, txdata| {
                        monitor.transactions_confirmed(
                                header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
                });
@@ -513,7 +545,7 @@ where
 
        fn best_block_updated(&self, header: &BlockHeader, height: u32) {
                log_debug!(self.logger, "New best block {} at height {} provided via best_block_updated", header.block_hash(), height);
-               self.process_chain_data(header, &[], |monitor, txdata| {
+               self.process_chain_data(header, Some(height), &[], |monitor, txdata| {
                        // While in practice there shouldn't be any recursive calls when given empty txdata,
                        // it's still possible if a chain::Filter implementation returns a transaction.
                        debug_assert!(txdata.is_empty());
@@ -580,6 +612,7 @@ where C::Target: chain::Filter,
                        monitor,
                        pending_monitor_updates: Mutex::new(pending_monitor_updates),
                        channel_perm_failed: AtomicBool::new(false),
+                       last_chain_persist_height: AtomicUsize::new(self.highest_chain_height.load(Ordering::Acquire)),
                });
                persist_res
        }
@@ -636,7 +669,10 @@ where C::Target: chain::Filter,
                let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
                for monitor_state in self.monitors.read().unwrap().values() {
                        let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap());
-                       if is_pending_monitor_update {
+                       if is_pending_monitor_update &&
+                                       monitor_state.last_chain_persist_height.load(Ordering::Acquire) + LATENCY_GRACE_PERIOD_BLOCKS as usize
+                                               > self.highest_chain_height.load(Ordering::Acquire)
+                       {
                                log_info!(self.logger, "A Channel Monitor sync is still in progress, refusing to provide monitor events!");
                        } else {
                                if monitor_state.channel_perm_failed.load(Ordering::Acquire) {
@@ -650,6 +686,11 @@ where C::Target: chain::Filter,
                                        // updated.
                                        log_info!(self.logger, "A Channel Monitor sync returned PermanentFailure. Returning monitor events but duplicate events may appear after reload!");
                                }
+                               if is_pending_monitor_update {
+                                       log_error!(self.logger, "A ChannelMonitor sync took longer than {} blocks to complete.", LATENCY_GRACE_PERIOD_BLOCKS);
+                                       log_error!(self.logger, "   To avoid funds-loss, we are allowing monitor updates to be released.");
+                                       log_error!(self.logger, "   This may cause duplicate payment events to be generated.");
+                               }
                                pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events());
                        }
                }