use chain;
use chain::{ChannelMonitorUpdateErr, Filter, WatchedOutput};
use chain::chaininterface::{BroadcasterInterface, FeeEstimator};
-use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs};
+use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS};
use chain::transaction::{OutPoint, TransactionData};
use chain::keysinterface::Sign;
use util::atomic_counter::AtomicCounter;
use prelude::*;
use sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard};
use core::ops::Deref;
-use core::sync::atomic::{AtomicBool, Ordering};
+use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
#[derive(Clone, Copy, Hash, PartialEq, Eq)]
/// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents
/// processed the closure event, we set this to true and return PermanentFailure for any other
/// chain::Watch events.
channel_perm_failed: AtomicBool,
+ /// The last block height at which no [`UpdateOrigin::ChainSync`] monitor updates were present
+ /// in `pending_monitor_updates`.
+ /// If it's been more than [`LATENCY_GRACE_PERIOD_BLOCKS`] since we started waiting on a chain
+ /// sync event, we let monitor events return to `ChannelManager` because we cannot hold them up
+ /// forever or we'll end up with HTLC preimages waiting to feed back into an upstream channel
+ /// forever, risking funds loss.
+ last_chain_persist_height: AtomicUsize,
}
impl<ChannelSigner: Sign> MonitorHolder<ChannelSigner> {
/// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
/// from the user and not from a [`ChannelMonitor`].
pending_monitor_events: Mutex<Vec<MonitorEvent>>,
+ /// The best block height seen, used as a proxy for the passage of time.
+ highest_chain_height: AtomicUsize,
}
impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainMonitor<ChannelSigner, C, T, F, L, P>
/// calls must not exclude any transactions matching the new outputs nor any in-block
/// descendants of such transactions. It is not necessary to re-fetch the block to obtain
/// updated `txdata`.
- fn process_chain_data<FN>(&self, header: &BlockHeader, txdata: &TransactionData, process: FN)
+ ///
+ /// Calls which represent a new blockchain tip height should set `best_height`.
+ fn process_chain_data<FN>(&self, header: &BlockHeader, best_height: Option<u32>, txdata: &TransactionData, process: FN)
where
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
{
let mut dependent_txdata = Vec::new();
{
let monitor_states = self.monitors.write().unwrap();
+ if let Some(height) = best_height {
+ // If the best block height is being updated, update highest_chain_height under the
+ // monitors write lock.
+ let old_height = self.highest_chain_height.load(Ordering::Acquire);
+ let new_height = height as usize;
+ if new_height > old_height {
+ self.highest_chain_height.store(new_height, Ordering::Release);
+ }
+ }
+
for (funding_outpoint, monitor_state) in monitor_states.iter() {
let monitor = &monitor_state.monitor;
let mut txn_outputs;
contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()),
};
let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
+ if let Some(height) = best_height {
+ if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) {
+ // If there are not ChainSync persists awaiting completion, go ahead and
+ // set last_chain_persist_height here - we wouldn't want the first
+ // TemporaryFailure to always immediately be considered "overly delayed".
+ monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release);
+ }
+ }
log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) {
dependent_txdata.sort_unstable_by_key(|(index, _tx)| *index);
dependent_txdata.dedup_by_key(|(index, _tx)| *index);
let txdata: Vec<_> = dependent_txdata.iter().map(|(index, tx)| (*index, tx)).collect();
- self.process_chain_data(header, &txdata, process);
+ self.process_chain_data(header, None, &txdata, process); // We skip the best height the second go-around
}
}
fee_estimator: feeest,
persister,
pending_monitor_events: Mutex::new(Vec::new()),
+ highest_chain_height: AtomicUsize::new(0),
}
}
});
},
MonitorUpdateId { contents: UpdateOrigin::ChainSync(_) } => {
- // We've already done everything we need to, the next time
- // release_pending_monitor_events is called, any events for this ChannelMonitor
- // will be returned if there's no more SyncPersistId events left.
+ if !monitor_data.has_pending_chainsync_updates(&pending_monitor_updates) {
+ monitor_data.last_chain_persist_height.store(self.highest_chain_height.load(Ordering::Acquire), Ordering::Release);
+ // The next time release_pending_monitor_events is called, any events for this
+ // ChannelMonitor will be returned.
+ }
},
}
Ok(())
let header = &block.header;
let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
log_debug!(self.logger, "New best block {} at height {} provided via block_connected", header.block_hash(), height);
- self.process_chain_data(header, &txdata, |monitor, txdata| {
+ self.process_chain_data(header, Some(height), &txdata, |monitor, txdata| {
monitor.block_connected(
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
});
{
fn transactions_confirmed(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) {
log_debug!(self.logger, "{} provided transactions confirmed at height {} in block {}", txdata.len(), height, header.block_hash());
- self.process_chain_data(header, txdata, |monitor, txdata| {
+ self.process_chain_data(header, None, txdata, |monitor, txdata| {
monitor.transactions_confirmed(
header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
});
fn best_block_updated(&self, header: &BlockHeader, height: u32) {
log_debug!(self.logger, "New best block {} at height {} provided via best_block_updated", header.block_hash(), height);
- self.process_chain_data(header, &[], |monitor, txdata| {
+ self.process_chain_data(header, Some(height), &[], |monitor, txdata| {
// While in practice there shouldn't be any recursive calls when given empty txdata,
// it's still possible if a chain::Filter implementation returns a transaction.
debug_assert!(txdata.is_empty());
monitor,
pending_monitor_updates: Mutex::new(pending_monitor_updates),
channel_perm_failed: AtomicBool::new(false),
+ last_chain_persist_height: AtomicUsize::new(self.highest_chain_height.load(Ordering::Acquire)),
});
persist_res
}
let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
for monitor_state in self.monitors.read().unwrap().values() {
let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap());
- if is_pending_monitor_update {
+ if is_pending_monitor_update &&
+ monitor_state.last_chain_persist_height.load(Ordering::Acquire) + LATENCY_GRACE_PERIOD_BLOCKS as usize
+ > self.highest_chain_height.load(Ordering::Acquire)
+ {
log_info!(self.logger, "A Channel Monitor sync is still in progress, refusing to provide monitor events!");
} else {
if monitor_state.channel_perm_failed.load(Ordering::Acquire) {
// updated.
log_info!(self.logger, "A Channel Monitor sync returned PermanentFailure. Returning monitor events but duplicate events may appear after reload!");
}
+ if is_pending_monitor_update {
+ log_error!(self.logger, "A ChannelMonitor sync took longer than {} blocks to complete.", LATENCY_GRACE_PERIOD_BLOCKS);
+ log_error!(self.logger, " To avoid funds-loss, we are allowing monitor updates to be released.");
+ log_error!(self.logger, " This may cause duplicate payment events to be generated.");
+ }
pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events());
}
}