/// [`ChannelMonitorUpdateStatus::InProgress`], and then calls channel_monitor_updated
/// immediately, racing our insertion of the pending update into the contained Vec.
pending_monitor_updates: Mutex<Vec<MonitorUpdateId>>,
- /// The last block height at which no [`UpdateOrigin::ChainSync`] monitor updates were present
- /// in `pending_monitor_updates`.
- /// If it's been more than [`LATENCY_GRACE_PERIOD_BLOCKS`] since we started waiting on a chain
- /// sync event, we let monitor events return to `ChannelManager` because we cannot hold them up
- /// forever or we'll end up with HTLC preimages waiting to feed back into an upstream channel
- /// forever, risking funds loss.
- ///
- /// [`LATENCY_GRACE_PERIOD_BLOCKS`]: crate::util::ser::Writeable::write
- last_chain_persist_height: AtomicUsize,
}
impl<ChannelSigner: WriteableEcdsaChannelSigner> MonitorHolder<ChannelSigner> {
pending_monitor_updates_lock.iter().any(|update_id|
if let UpdateOrigin::OffChain(_) = update_id.contents { true } else { false })
}
- fn has_pending_chainsync_updates(&self, pending_monitor_updates_lock: &MutexGuard<Vec<MonitorUpdateId>>) -> bool {
- pending_monitor_updates_lock.iter().any(|update_id|
- if let UpdateOrigin::ChainSync(_) = update_id.contents { true } else { false })
- }
}
/// A read-only reference to a current ChannelMonitor.
for funding_outpoint in funding_outpoints.iter() {
let monitor_lock = self.monitors.read().unwrap();
if let Some(monitor_state) = monitor_lock.get(funding_outpoint) {
- if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state).is_err() {
+ if self.update_monitor_with_chain_data(header, txdata, &process, funding_outpoint, &monitor_state).is_err() {
// Take the monitors lock for writing so that we poison it and any future
// operations going forward fail immediately.
core::mem::drop(monitor_lock);
let monitor_states = self.monitors.write().unwrap();
for (funding_outpoint, monitor_state) in monitor_states.iter() {
if !funding_outpoints.contains(funding_outpoint) {
- if self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state).is_err() {
+ if self.update_monitor_with_chain_data(header, txdata, &process, funding_outpoint, &monitor_state).is_err() {
log_error!(self.logger, "{}", err_str);
panic!("{}", err_str);
}
}
fn update_monitor_with_chain_data<FN>(
- &self, header: &Header, best_height: Option<u32>, txdata: &TransactionData,
- process: FN, funding_outpoint: &OutPoint, monitor_state: &MonitorHolder<ChannelSigner>
+ &self, header: &Header, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint,
+ monitor_state: &MonitorHolder<ChannelSigner>
) -> Result<(), ()> where FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs> {
let monitor = &monitor_state.monitor;
let logger = WithChannelMonitor::from(&self.logger, &monitor);
contents: UpdateOrigin::ChainSync(chain_sync_update_id),
};
let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
- if let Some(height) = best_height {
- if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) {
- // If there are not ChainSync persists awaiting completion, go ahead and
- // set last_chain_persist_height here - we wouldn't want the first
- // InProgress to always immediately be considered "overly delayed".
- monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release);
- }
- }
log_trace!(logger, "Syncing Channel Monitor for channel {} for block-data update_id {}",
log_funding_info!(monitor),
monitor_update_id: monitor_data.monitor.get_latest_update_id(),
}], monitor_data.monitor.get_counterparty_node_id()));
},
- MonitorUpdateId { contents: UpdateOrigin::ChainSync(completed_update_id) } => {
- let monitor_has_pending_updates =
- monitor_data.has_pending_chainsync_updates(&pending_monitor_updates);
- log_debug!(self.logger, "Completed chain sync monitor update {} for channel with funding outpoint {:?}, {}",
- completed_update_id,
- funding_txo,
- if monitor_has_pending_updates {
- "still have pending chain sync updates"
- } else {
- "all chain sync updates complete, releasing pending MonitorEvents"
- });
- if !monitor_has_pending_updates {
- monitor_data.last_chain_persist_height.store(self.highest_chain_height.load(Ordering::Acquire), Ordering::Release);
- // The next time release_pending_monitor_events is called, any events for this
- // ChannelMonitor will be returned.
- }
- },
+ MonitorUpdateId { contents: UpdateOrigin::ChainSync(_) } => {},
}
self.event_notifier.notify();
Ok(())
entry.insert(MonitorHolder {
monitor,
pending_monitor_updates: Mutex::new(pending_monitor_updates),
- last_chain_persist_height: AtomicUsize::new(self.highest_chain_height.load(Ordering::Acquire)),
});
Ok(persist_res)
}