X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fchain%2Fchainmonitor.rs;h=71b0b3e506456d36ec16345b70e10f8e1cf12746;hb=6e86776a71cd1e383707180571fc46118c926eca;hp=d99d6708a3c71e209dc03b229a5b36e49d3ddf97;hpb=5c2ff2cb30ef1639c80b275eea209a289dd91b77;p=rust-lightning diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index d99d6708..71b0b3e5 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -29,7 +29,7 @@ use bitcoin::hash_types::Txid; use chain; use chain::{ChannelMonitorUpdateErr, Filter, WatchedOutput}; use chain::chaininterface::{BroadcasterInterface, FeeEstimator}; -use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs}; +use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS}; use chain::transaction::{OutPoint, TransactionData}; use chain::keysinterface::Sign; use util::atomic_counter::AtomicCounter; @@ -42,7 +42,7 @@ use ln::channelmanager::ChannelDetails; use prelude::*; use sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard}; use core::ops::Deref; -use core::sync::atomic::{AtomicBool, Ordering}; +use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; #[derive(Clone, Copy, Hash, PartialEq, Eq)] /// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents @@ -92,10 +92,12 @@ impl MonitorUpdateId { /// closed without broadcasting the latest state. See /// [`ChannelMonitorUpdateErr::PermanentFailure`] for more details. pub trait Persist { - /// Persist a new channel's data. The data can be stored any way you want, but the identifier - /// provided by LDK is the channel's outpoint (and it is up to you to maintain a correct - /// mapping between the outpoint and the stored channel data). Note that you **must** persist - /// every new monitor to disk. + /// Persist a new channel's data in response to a [`chain::Watch::watch_channel`] call. This is + /// called by [`ChannelManager`] for new channels, or may be called directly, e.g. on startup. + /// + /// The data can be stored any way you want, but the identifier provided by LDK is the + /// channel's outpoint (and it is up to you to maintain a correct mapping between the outpoint + /// and the stored channel data). Note that you **must** persist every new monitor to disk. /// /// The `update_id` is used to identify this call to [`ChainMonitor::channel_monitor_updated`], /// if you return [`ChannelMonitorUpdateErr::TemporaryFailure`]. @@ -103,6 +105,7 @@ pub trait Persist { /// See [`Writeable::write`] on [`ChannelMonitor`] for writing out a `ChannelMonitor` /// and [`ChannelMonitorUpdateErr`] for requirements when returning errors. /// + /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager /// [`Writeable::write`]: crate::util::ser::Writeable::write fn persist_new_channel(&self, channel_id: OutPoint, data: &ChannelMonitor, update_id: MonitorUpdateId) -> Result<(), ChannelMonitorUpdateErr>; @@ -168,6 +171,13 @@ struct MonitorHolder { /// processed the closure event, we set this to true and return PermanentFailure for any other /// chain::Watch events. channel_perm_failed: AtomicBool, + /// The last block height at which no [`UpdateOrigin::ChainSync`] monitor updates were present + /// in `pending_monitor_updates`. + /// If it's been more than [`LATENCY_GRACE_PERIOD_BLOCKS`] since we started waiting on a chain + /// sync event, we let monitor events return to `ChannelManager` because we cannot hold them up + /// forever or we'll end up with HTLC preimages waiting to feed back into an upstream channel + /// forever, risking funds loss. + last_chain_persist_height: AtomicUsize, } impl MonitorHolder { @@ -226,6 +236,8 @@ pub struct ChainMonitor>, + /// The best block height seen, used as a proxy for the passage of time. + highest_chain_height: AtomicUsize, } impl ChainMonitor @@ -244,13 +256,25 @@ where C::Target: chain::Filter, /// calls must not exclude any transactions matching the new outputs nor any in-block /// descendants of such transactions. It is not necessary to re-fetch the block to obtain /// updated `txdata`. - fn process_chain_data(&self, header: &BlockHeader, txdata: &TransactionData, process: FN) + /// + /// Calls which represent a new blockchain tip height should set `best_height`. + fn process_chain_data(&self, header: &BlockHeader, best_height: Option, txdata: &TransactionData, process: FN) where FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { let mut dependent_txdata = Vec::new(); { let monitor_states = self.monitors.write().unwrap(); + if let Some(height) = best_height { + // If the best block height is being updated, update highest_chain_height under the + // monitors write lock. + let old_height = self.highest_chain_height.load(Ordering::Acquire); + let new_height = height as usize; + if new_height > old_height { + self.highest_chain_height.store(new_height, Ordering::Release); + } + } + for (funding_outpoint, monitor_state) in monitor_states.iter() { let monitor = &monitor_state.monitor; let mut txn_outputs; @@ -260,6 +284,14 @@ where C::Target: chain::Filter, contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()), }; let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap(); + if let Some(height) = best_height { + if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) { + // If there are not ChainSync persists awaiting completion, go ahead and + // set last_chain_persist_height here - we wouldn't want the first + // TemporaryFailure to always immediately be considered "overly delayed". + monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release); + } + } log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor)); match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) { @@ -304,7 +336,7 @@ where C::Target: chain::Filter, dependent_txdata.sort_unstable_by_key(|(index, _tx)| *index); dependent_txdata.dedup_by_key(|(index, _tx)| *index); let txdata: Vec<_> = dependent_txdata.iter().map(|(index, tx)| (*index, tx)).collect(); - self.process_chain_data(header, &txdata, process); + self.process_chain_data(header, None, &txdata, process); // We skip the best height the second go-around } } @@ -325,6 +357,7 @@ where C::Target: chain::Filter, fee_estimator: feeest, persister, pending_monitor_events: Mutex::new(Vec::new()), + highest_chain_height: AtomicUsize::new(0), } } @@ -428,9 +461,11 @@ where C::Target: chain::Filter, }); }, MonitorUpdateId { contents: UpdateOrigin::ChainSync(_) } => { - // We've already done everything we need to, the next time - // release_pending_monitor_events is called, any events for this ChannelMonitor - // will be returned if there's no more SyncPersistId events left. + if !monitor_data.has_pending_chainsync_updates(&pending_monitor_updates) { + monitor_data.last_chain_persist_height.store(self.highest_chain_height.load(Ordering::Acquire), Ordering::Release); + // The next time release_pending_monitor_events is called, any events for this + // ChannelMonitor will be returned. + } }, } Ok(()) @@ -470,7 +505,7 @@ where let header = &block.header; let txdata: Vec<_> = block.txdata.iter().enumerate().collect(); log_debug!(self.logger, "New best block {} at height {} provided via block_connected", header.block_hash(), height); - self.process_chain_data(header, &txdata, |monitor, txdata| { + self.process_chain_data(header, Some(height), &txdata, |monitor, txdata| { monitor.block_connected( header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger) }); @@ -497,7 +532,7 @@ where { fn transactions_confirmed(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) { log_debug!(self.logger, "{} provided transactions confirmed at height {} in block {}", txdata.len(), height, header.block_hash()); - self.process_chain_data(header, txdata, |monitor, txdata| { + self.process_chain_data(header, None, txdata, |monitor, txdata| { monitor.transactions_confirmed( header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger) }); @@ -513,7 +548,7 @@ where fn best_block_updated(&self, header: &BlockHeader, height: u32) { log_debug!(self.logger, "New best block {} at height {} provided via best_block_updated", header.block_hash(), height); - self.process_chain_data(header, &[], |monitor, txdata| { + self.process_chain_data(header, Some(height), &[], |monitor, txdata| { // While in practice there shouldn't be any recursive calls when given empty txdata, // it's still possible if a chain::Filter implementation returns a transaction. debug_assert!(txdata.is_empty()); @@ -580,6 +615,7 @@ where C::Target: chain::Filter, monitor, pending_monitor_updates: Mutex::new(pending_monitor_updates), channel_perm_failed: AtomicBool::new(false), + last_chain_persist_height: AtomicUsize::new(self.highest_chain_height.load(Ordering::Acquire)), }); persist_res } @@ -636,7 +672,10 @@ where C::Target: chain::Filter, let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0); for monitor_state in self.monitors.read().unwrap().values() { let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap()); - if is_pending_monitor_update { + if is_pending_monitor_update && + monitor_state.last_chain_persist_height.load(Ordering::Acquire) + LATENCY_GRACE_PERIOD_BLOCKS as usize + > self.highest_chain_height.load(Ordering::Acquire) + { log_info!(self.logger, "A Channel Monitor sync is still in progress, refusing to provide monitor events!"); } else { if monitor_state.channel_perm_failed.load(Ordering::Acquire) { @@ -650,6 +689,11 @@ where C::Target: chain::Filter, // updated. log_info!(self.logger, "A Channel Monitor sync returned PermanentFailure. Returning monitor events but duplicate events may appear after reload!"); } + if is_pending_monitor_update { + log_error!(self.logger, "A ChannelMonitor sync took longer than {} blocks to complete.", LATENCY_GRACE_PERIOD_BLOCKS); + log_error!(self.logger, " To avoid funds-loss, we are allowing monitor updates to be released."); + log_error!(self.logger, " This may cause duplicate payment events to be generated."); + } pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events()); } }