use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, WithChannelMonitor, LATENCY_GRACE_PERIOD_BLOCKS};
use crate::chain::transaction::{OutPoint, TransactionData};
+use crate::ln::ChannelId;
use crate::sign::ecdsa::WriteableEcdsaChannelSigner;
use crate::events;
use crate::events::{Event, EventHandler};
use crate::util::atomic_counter::AtomicCounter;
-use crate::util::logger::Logger;
+use crate::util::logger::{Logger, WithContext};
use crate::util::errors::APIError;
use crate::util::wakers::{Future, Notifier};
use crate::ln::channelmanager::ChannelDetails;
use crate::prelude::*;
use crate::sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard};
-use core::iter::FromIterator;
use core::ops::Deref;
use core::sync::atomic::{AtomicUsize, Ordering};
use bitcoin::secp256k1::PublicKey;
///
/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager
/// [`Writeable::write`]: crate::util::ser::Writeable::write
- fn persist_new_channel(&self, channel_id: OutPoint, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
+ fn persist_new_channel(&self, channel_funding_outpoint: OutPoint, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
/// Update one channel's data. The provided [`ChannelMonitor`] has already applied the given
/// update.
/// [`ChannelMonitorUpdateStatus`] for requirements when returning errors.
///
/// [`Writeable::write`]: crate::util::ser::Writeable::write
- fn update_persisted_channel(&self, channel_id: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
+ fn update_persisted_channel(&self, channel_funding_outpoint: OutPoint, update: Option<&ChannelMonitorUpdate>, data: &ChannelMonitor<ChannelSigner>, update_id: MonitorUpdateId) -> ChannelMonitorUpdateStatus;
}
struct MonitorHolder<ChannelSigner: WriteableEcdsaChannelSigner> {
persister: P,
/// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
/// from the user and not from a [`ChannelMonitor`].
- pending_monitor_events: Mutex<Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)>>,
+ pending_monitor_events: Mutex<Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, Option<PublicKey>)>>,
/// The best block height seen, used as a proxy for the passage of time.
highest_chain_height: AtomicUsize,
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
{
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
- let funding_outpoints: HashSet<OutPoint> = HashSet::from_iter(self.monitors.read().unwrap().keys().cloned());
+ let funding_outpoints = hash_set_from_iter(self.monitors.read().unwrap().keys().cloned());
for funding_outpoint in funding_outpoints.iter() {
let monitor_lock = self.monitors.read().unwrap();
if let Some(monitor_state) = monitor_lock.get(funding_outpoint) {
outpoint: OutPoint { txid, index: idx as u16 },
script_pubkey: output.script_pubkey,
};
- chain_source.register_output(output)
+ log_trace!(logger, "Adding monitoring for spends of outpoint {} to the filter", output.outpoint);
+ chain_source.register_output(output);
}
}
}
/// transactions relevant to the watched channels.
pub fn new(chain_source: Option<C>, broadcaster: T, logger: L, feeest: F, persister: P) -> Self {
Self {
- monitors: RwLock::new(HashMap::new()),
+ monitors: RwLock::new(new_hash_map()),
sync_persistence_id: AtomicCounter::new(),
chain_source,
broadcaster,
}
}
- /// Lists the funding outpoint of each [`ChannelMonitor`] being monitored.
+ /// Lists the funding outpoint and channel ID of each [`ChannelMonitor`] being monitored.
///
/// Note that [`ChannelMonitor`]s are not removed when a channel is closed as they are always
/// monitoring for on-chain state resolutions.
- pub fn list_monitors(&self) -> Vec<OutPoint> {
- self.monitors.read().unwrap().keys().map(|outpoint| *outpoint).collect()
+ pub fn list_monitors(&self) -> Vec<(OutPoint, ChannelId)> {
+ self.monitors.read().unwrap().iter().map(|(outpoint, monitor_holder)| {
+ let channel_id = monitor_holder.monitor.channel_id();
+ (*outpoint, channel_id)
+ }).collect()
}
#[cfg(not(c_bindings))]
/// Lists the pending updates for each [`ChannelMonitor`] (by `OutPoint` being monitored).
pub fn list_pending_monitor_updates(&self) -> HashMap<OutPoint, Vec<MonitorUpdateId>> {
- self.monitors.read().unwrap().iter().map(|(outpoint, holder)| {
+ hash_map_from_iter(self.monitors.read().unwrap().iter().map(|(outpoint, holder)| {
(*outpoint, holder.pending_monitor_updates.lock().unwrap().clone())
- }).collect()
+ }))
}
#[cfg(c_bindings)]
// Completed event.
return Ok(());
}
- self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::Completed {
- funding_txo,
+ let channel_id = monitor_data.monitor.channel_id();
+ self.pending_monitor_events.lock().unwrap().push((funding_txo, channel_id, vec![MonitorEvent::Completed {
+ funding_txo, channel_id,
monitor_update_id: monitor_data.monitor.get_latest_update_id(),
}], monitor_data.monitor.get_counterparty_node_id()));
},
#[cfg(any(test, fuzzing))]
pub fn force_channel_monitor_updated(&self, funding_txo: OutPoint, monitor_update_id: u64) {
let monitors = self.monitors.read().unwrap();
- let counterparty_node_id = monitors.get(&funding_txo).and_then(|m| m.monitor.get_counterparty_node_id());
- self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::Completed {
+ let (counterparty_node_id, channel_id) = if let Some(m) = monitors.get(&funding_txo) {
+ (m.monitor.get_counterparty_node_id(), m.monitor.channel_id())
+ } else {
+ (None, ChannelId::v1_from_funding_outpoint(funding_txo))
+ };
+ self.pending_monitor_events.lock().unwrap().push((funding_txo, channel_id, vec![MonitorEvent::Completed {
funding_txo,
+ channel_id,
monitor_update_id,
}], counterparty_node_id));
self.event_notifier.notify();
)
}
}
+
+ /// Triggers rebroadcasts of pending claims from force-closed channels after a transaction
+ /// signature generation failure.
+ ///
+ /// `monitor_opt` can be used as a filter to only trigger them for a specific channel monitor.
+ pub fn signer_unblocked(&self, monitor_opt: Option<OutPoint>) {
+ let monitors = self.monitors.read().unwrap();
+ if let Some(funding_txo) = monitor_opt {
+ if let Some(monitor_holder) = monitors.get(&funding_txo) {
+ monitor_holder.monitor.signer_unblocked(
+ &*self.broadcaster, &*self.fee_estimator, &self.logger
+ )
+ }
+ } else {
+ for (_, monitor_holder) in &*monitors {
+ monitor_holder.monitor.signer_unblocked(
+ &*self.broadcaster, &*self.fee_estimator, &self.logger
+ )
+ }
+ }
+ }
}
impl<ChannelSigner: WriteableEcdsaChannelSigner, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
},
}
if let Some(ref chain_source) = self.chain_source {
- monitor.load_outputs_to_watch(chain_source);
+ monitor.load_outputs_to_watch(chain_source , &self.logger);
}
entry.insert(MonitorHolder {
monitor,
}
fn update_channel(&self, funding_txo: OutPoint, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus {
+ // `ChannelMonitorUpdate`'s `channel_id` is `None` prior to 0.0.121 and all channels in those
+ // versions are V1-established. For 0.0.121+ the `channel_id` fields is always `Some`.
+ let channel_id = update.channel_id.unwrap_or(ChannelId::v1_from_funding_outpoint(funding_txo));
// Update the monitor that watches the channel referred to by the given outpoint.
let monitors = self.monitors.read().unwrap();
match monitors.get(&funding_txo) {
None => {
- log_error!(self.logger, "Failed to update channel monitor: no such monitor registered");
+ let logger = WithContext::from(&self.logger, update.counterparty_node_id, Some(channel_id));
+ log_error!(logger, "Failed to update channel monitor: no such monitor registered");
// We should never ever trigger this from within ChannelManager. Technically a
// user could use this object with some proxying in between which makes this
}
}
- fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)> {
+ fn release_pending_monitor_events(&self) -> Vec<(OutPoint, ChannelId, Vec<MonitorEvent>, Option<PublicKey>)> {
let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
for monitor_state in self.monitors.read().unwrap().values() {
let logger = WithChannelMonitor::from(&self.logger, &monitor_state.monitor);
let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();
if monitor_events.len() > 0 {
let monitor_outpoint = monitor_state.monitor.get_funding_txo().0;
+ let monitor_channel_id = monitor_state.monitor.channel_id();
let counterparty_node_id = monitor_state.monitor.get_counterparty_node_id();
- pending_monitor_events.push((monitor_outpoint, monitor_events, counterparty_node_id));
+ pending_monitor_events.push((monitor_outpoint, monitor_channel_id, monitor_events, counterparty_node_id));
}
}
}