//! events. The remote server would make use of [`ChainMonitor`] for block processing and for
//! servicing [`ChannelMonitor`] updates from the client.
-use bitcoin::blockdata::block::BlockHeader;
+use bitcoin::blockdata::block::Header;
use bitcoin::hash_types::{Txid, BlockHash};
use crate::chain;
use crate::chain::{ChannelMonitorUpdateStatus, Filter, WatchedOutput};
use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
-use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, LATENCY_GRACE_PERIOD_BLOCKS};
+use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, Balance, MonitorEvent, TransactionOutputs, WithChannelMonitor, LATENCY_GRACE_PERIOD_BLOCKS};
use crate::chain::transaction::{OutPoint, TransactionData};
-use crate::sign::WriteableEcdsaChannelSigner;
+use crate::sign::ecdsa::WriteableEcdsaChannelSigner;
use crate::events;
use crate::events::{Event, EventHandler};
use crate::util::atomic_counter::AtomicCounter;
-use crate::util::logger::Logger;
+use crate::util::logger::{Logger, WithContext};
use crate::util::errors::APIError;
use crate::util::wakers::{Future, Notifier};
use crate::ln::channelmanager::ChannelDetails;
/// updated `txdata`.
///
/// Calls which represent a new blockchain tip height should set `best_height`.
- fn process_chain_data<FN>(&self, header: &BlockHeader, best_height: Option<u32>, txdata: &TransactionData, process: FN)
+ fn process_chain_data<FN>(&self, header: &Header, best_height: Option<u32>, txdata: &TransactionData, process: FN)
where
FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
{
}
fn update_monitor_with_chain_data<FN>(
- &self, header: &BlockHeader, best_height: Option<u32>, txdata: &TransactionData,
+ &self, header: &Header, best_height: Option<u32>, txdata: &TransactionData,
process: FN, funding_outpoint: &OutPoint, monitor_state: &MonitorHolder<ChannelSigner>
) -> Result<(), ()> where FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs> {
let monitor = &monitor_state.monitor;
+ let logger = WithChannelMonitor::from(&self.logger, &monitor);
let mut txn_outputs;
{
txn_outputs = process(monitor, txdata);
}
}
- log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
+ log_trace!(logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) {
ChannelMonitorUpdateStatus::Completed =>
- log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
+ log_trace!(logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
ChannelMonitorUpdateStatus::InProgress => {
- log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
+ log_debug!(logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
pending_monitor_updates.push(update_id);
},
ChannelMonitorUpdateStatus::UnrecoverableError => {
outpoint: OutPoint { txid, index: idx as u16 },
script_pubkey: output.script_pubkey,
};
- chain_source.register_output(output)
+ log_trace!(logger, "Adding monitoring for spends of outpoint {} to the filter", output.outpoint);
+ chain_source.register_output(output);
}
}
}
let monitors = self.monitors.read().unwrap();
for (_, monitor_holder) in &*monitors {
monitor_holder.monitor.rebroadcast_pending_claims(
- &*self.broadcaster, &*self.fee_estimator, &*self.logger
+ &*self.broadcaster, &*self.fee_estimator, &self.logger
)
}
}
L::Target: Logger,
P::Target: Persist<ChannelSigner>,
{
- fn filtered_block_connected(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) {
+ fn filtered_block_connected(&self, header: &Header, txdata: &TransactionData, height: u32) {
log_debug!(self.logger, "New best block {} at height {} provided via block_connected", header.block_hash(), height);
self.process_chain_data(header, Some(height), &txdata, |monitor, txdata| {
monitor.block_connected(
- header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
+ header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &self.logger)
});
}
- fn block_disconnected(&self, header: &BlockHeader, height: u32) {
+ fn block_disconnected(&self, header: &Header, height: u32) {
let monitor_states = self.monitors.read().unwrap();
log_debug!(self.logger, "Latest block {} at height {} removed via block_disconnected", header.block_hash(), height);
for monitor_state in monitor_states.values() {
monitor_state.monitor.block_disconnected(
- header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
+ header, height, &*self.broadcaster, &*self.fee_estimator, &self.logger);
}
}
}
L::Target: Logger,
P::Target: Persist<ChannelSigner>,
{
- fn transactions_confirmed(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) {
+ fn transactions_confirmed(&self, header: &Header, txdata: &TransactionData, height: u32) {
log_debug!(self.logger, "{} provided transactions confirmed at height {} in block {}", txdata.len(), height, header.block_hash());
self.process_chain_data(header, None, txdata, |monitor, txdata| {
monitor.transactions_confirmed(
- header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
+ header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &self.logger)
});
}
log_debug!(self.logger, "Transaction {} reorganized out of chain", txid);
let monitor_states = self.monitors.read().unwrap();
for monitor_state in monitor_states.values() {
- monitor_state.monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
+ monitor_state.monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &self.logger);
}
}
- fn best_block_updated(&self, header: &BlockHeader, height: u32) {
+ fn best_block_updated(&self, header: &Header, height: u32) {
log_debug!(self.logger, "New best block {} at height {} provided via best_block_updated", header.block_hash(), height);
self.process_chain_data(header, Some(height), &[], |monitor, txdata| {
// While in practice there shouldn't be any recursive calls when given empty txdata,
// it's still possible if a chain::Filter implementation returns a transaction.
debug_assert!(txdata.is_empty());
monitor.best_block_updated(
- header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
+ header, height, &*self.broadcaster, &*self.fee_estimator, &self.logger
+ )
});
}
- fn get_relevant_txids(&self) -> Vec<(Txid, Option<BlockHash>)> {
+ fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
let mut txids = Vec::new();
let monitor_states = self.monitors.read().unwrap();
for monitor_state in monitor_states.values() {
txids.append(&mut monitor_state.monitor.get_relevant_txids());
}
- txids.sort_unstable();
- txids.dedup();
+ txids.sort_unstable_by(|a, b| a.0.cmp(&b.0).then(b.1.cmp(&a.1)));
+ txids.dedup_by_key(|(txid, _, _)| *txid);
txids
}
}
P::Target: Persist<ChannelSigner>,
{
fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor<ChannelSigner>) -> Result<ChannelMonitorUpdateStatus, ()> {
+ let logger = WithChannelMonitor::from(&self.logger, &monitor);
let mut monitors = self.monitors.write().unwrap();
let entry = match monitors.entry(funding_outpoint) {
hash_map::Entry::Occupied(_) => {
- log_error!(self.logger, "Failed to add new channel data: channel monitor for given outpoint is already present");
+ log_error!(logger, "Failed to add new channel data: channel monitor for given outpoint is already present");
return Err(());
},
hash_map::Entry::Vacant(e) => e,
};
- log_trace!(self.logger, "Got new ChannelMonitor for channel {}", log_funding_info!(monitor));
+ log_trace!(logger, "Got new ChannelMonitor for channel {}", log_funding_info!(monitor));
let update_id = MonitorUpdateId::from_new_monitor(&monitor);
let mut pending_monitor_updates = Vec::new();
let persist_res = self.persister.persist_new_channel(funding_outpoint, &monitor, update_id);
match persist_res {
ChannelMonitorUpdateStatus::InProgress => {
- log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor));
+ log_info!(logger, "Persistence of new ChannelMonitor for channel {} in progress", log_funding_info!(monitor));
pending_monitor_updates.push(update_id);
},
ChannelMonitorUpdateStatus::Completed => {
- log_info!(self.logger, "Persistence of new ChannelMonitor for channel {} completed", log_funding_info!(monitor));
+ log_info!(logger, "Persistence of new ChannelMonitor for channel {} completed", log_funding_info!(monitor));
},
ChannelMonitorUpdateStatus::UnrecoverableError => {
let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
- log_error!(self.logger, "{}", err_str);
+ log_error!(logger, "{}", err_str);
panic!("{}", err_str);
},
}
if let Some(ref chain_source) = self.chain_source {
- monitor.load_outputs_to_watch(chain_source);
+ monitor.load_outputs_to_watch(chain_source , &self.logger);
}
entry.insert(MonitorHolder {
monitor,
fn update_channel(&self, funding_txo: OutPoint, update: &ChannelMonitorUpdate) -> ChannelMonitorUpdateStatus {
// Update the monitor that watches the channel referred to by the given outpoint.
let monitors = self.monitors.read().unwrap();
- let ret = match monitors.get(&funding_txo) {
+ match monitors.get(&funding_txo) {
None => {
- log_error!(self.logger, "Failed to update channel monitor: no such monitor registered");
+ let logger = WithContext::from(&self.logger, update.counterparty_node_id, Some(funding_txo.to_channel_id()));
+ log_error!(logger, "Failed to update channel monitor: no such monitor registered");
// We should never ever trigger this from within ChannelManager. Technically a
// user could use this object with some proxying in between which makes this
},
Some(monitor_state) => {
let monitor = &monitor_state.monitor;
- log_trace!(self.logger, "Updating ChannelMonitor for channel {}", log_funding_info!(monitor));
+ let logger = WithChannelMonitor::from(&self.logger, &monitor);
+ log_trace!(logger, "Updating ChannelMonitor for channel {}", log_funding_info!(monitor));
let update_res = monitor.update_monitor(update, &self.broadcaster, &self.fee_estimator, &self.logger);
let update_id = MonitorUpdateId::from_monitor_update(update);
// We don't want to persist a `monitor_update` which results in a failure to apply later
// while reading `channel_monitor` with updates from storage. Instead, we should persist
// the entire `channel_monitor` here.
- log_warn!(self.logger, "Failed to update ChannelMonitor for channel {}. Going ahead and persisting the entire ChannelMonitor", log_funding_info!(monitor));
+ log_warn!(logger, "Failed to update ChannelMonitor for channel {}. Going ahead and persisting the entire ChannelMonitor", log_funding_info!(monitor));
self.persister.update_persisted_channel(funding_txo, None, monitor, update_id)
} else {
self.persister.update_persisted_channel(funding_txo, Some(update), monitor, update_id)
match persist_res {
ChannelMonitorUpdateStatus::InProgress => {
pending_monitor_updates.push(update_id);
- log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} in progress", log_funding_info!(monitor));
+ log_debug!(logger, "Persistence of ChannelMonitorUpdate for channel {} in progress", log_funding_info!(monitor));
},
ChannelMonitorUpdateStatus::Completed => {
- log_debug!(self.logger, "Persistence of ChannelMonitorUpdate for channel {} completed", log_funding_info!(monitor));
+ log_debug!(logger, "Persistence of ChannelMonitorUpdate for channel {} completed", log_funding_info!(monitor));
+ },
+ ChannelMonitorUpdateStatus::UnrecoverableError => {
+ // Take the monitors lock for writing so that we poison it and any future
+ // operations going forward fail immediately.
+ core::mem::drop(pending_monitor_updates);
+ core::mem::drop(monitors);
+ let _poison = self.monitors.write().unwrap();
+ let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
+ log_error!(logger, "{}", err_str);
+ panic!("{}", err_str);
},
- ChannelMonitorUpdateStatus::UnrecoverableError => { /* we'll panic in a moment */ },
}
if update_res.is_err() {
ChannelMonitorUpdateStatus::InProgress
persist_res
}
}
- };
- if let ChannelMonitorUpdateStatus::UnrecoverableError = ret {
- // Take the monitors lock for writing so that we poison it and any future
- // operations going forward fail immediately.
- core::mem::drop(monitors);
- let _poison = self.monitors.write().unwrap();
- let err_str = "ChannelMonitor[Update] persistence failed unrecoverably. This indicates we cannot continue normal operation and must shut down.";
- log_error!(self.logger, "{}", err_str);
- panic!("{}", err_str);
}
- ret
}
fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)> {
let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
for monitor_state in self.monitors.read().unwrap().values() {
+ let logger = WithChannelMonitor::from(&self.logger, &monitor_state.monitor);
let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap());
- if is_pending_monitor_update &&
- monitor_state.last_chain_persist_height.load(Ordering::Acquire) + LATENCY_GRACE_PERIOD_BLOCKS as usize
- > self.highest_chain_height.load(Ordering::Acquire)
- {
- log_debug!(self.logger, "A Channel Monitor sync is still in progress, refusing to provide monitor events!");
- } else {
+ if !is_pending_monitor_update || monitor_state.last_chain_persist_height.load(Ordering::Acquire) + LATENCY_GRACE_PERIOD_BLOCKS as usize <= self.highest_chain_height.load(Ordering::Acquire) {
if is_pending_monitor_update {
- log_error!(self.logger, "A ChannelMonitor sync took longer than {} blocks to complete.", LATENCY_GRACE_PERIOD_BLOCKS);
- log_error!(self.logger, " To avoid funds-loss, we are allowing monitor updates to be released.");
- log_error!(self.logger, " This may cause duplicate payment events to be generated.");
+ log_error!(logger, "A ChannelMonitor sync took longer than {} blocks to complete.", LATENCY_GRACE_PERIOD_BLOCKS);
+ log_error!(logger, " To avoid funds-loss, we are allowing monitor updates to be released.");
+ log_error!(logger, " This may cause duplicate payment events to be generated.");
}
let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();
if monitor_events.len() > 0 {