use sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard};
use core::ops::Deref;
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+use bitcoin::secp256k1::PublicKey;
#[derive(Clone, Copy, Hash, PartialEq, Eq)]
/// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents
persister: P,
/// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
/// from the user and not from a [`ChannelMonitor`].
- pending_monitor_events: Mutex<Vec<(OutPoint, Vec<MonitorEvent>)>>,
+ pending_monitor_events: Mutex<Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)>>,
/// The best block height seen, used as a proxy for the passage of time.
highest_chain_height: AtomicUsize,
}
log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
Err(ChannelMonitorUpdateErr::PermanentFailure) => {
monitor_state.channel_perm_failed.store(true, Ordering::Release);
- self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)]));
+ self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id()));
},
Err(ChannelMonitorUpdateErr::TemporaryFailure) => {
log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::UpdateCompleted {
funding_txo,
monitor_update_id: monitor_data.monitor.get_latest_update_id(),
- }]));
+ }], monitor_data.monitor.get_counterparty_node_id()));
},
MonitorUpdateId { contents: UpdateOrigin::ChainSync(_) } => {
if !monitor_data.has_pending_chainsync_updates(&pending_monitor_updates) {
/// channel_monitor_updated once with the highest ID.
#[cfg(any(test, fuzzing))]
pub fn force_channel_monitor_updated(&self, funding_txo: OutPoint, monitor_update_id: u64) {
+ let monitors = self.monitors.read().unwrap();
+ let counterparty_node_id = monitors.get(&funding_txo).and_then(|m| m.monitor.get_counterparty_node_id());
self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::UpdateCompleted {
funding_txo,
monitor_update_id,
- }]));
+ }], counterparty_node_id));
}
#[cfg(any(test, fuzzing, feature = "_test_utils"))]
Some(monitor_state) => {
let monitor = &monitor_state.monitor;
log_trace!(self.logger, "Updating ChannelMonitor for channel {}", log_funding_info!(monitor));
- let update_res = monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger);
+ let update_res = monitor.update_monitor(&update, &self.broadcaster, &*self.fee_estimator, &self.logger);
if update_res.is_err() {
log_error!(self.logger, "Failed to update ChannelMonitor for channel {}.", log_funding_info!(monitor));
}
}
}
- fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>)> {
+ fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)> {
let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
for monitor_state in self.monitors.read().unwrap().values() {
let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap());
let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();
if monitor_events.len() > 0 {
let monitor_outpoint = monitor_state.monitor.get_funding_txo().0;
- pending_monitor_events.push((monitor_outpoint, monitor_events));
+ let counterparty_node_id = monitor_state.monitor.get_counterparty_node_id();
+ pending_monitor_events.push((monitor_outpoint, monitor_events, counterparty_node_id));
}
}
}