//! events. The remote server would make use of [`ChainMonitor`] for block processing and for
//! servicing [`ChannelMonitor`] updates from the client.
-use bitcoin::blockdata::block::{Block, BlockHeader};
+use bitcoin::blockdata::block::BlockHeader;
use bitcoin::hash_types::Txid;
use chain;
persister: P,
/// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
/// from the user and not from a [`ChannelMonitor`].
- pending_monitor_events: Mutex<Vec<MonitorEvent>>,
+ pending_monitor_events: Mutex<Vec<(OutPoint, Vec<MonitorEvent>)>>,
/// The best block height seen, used as a proxy for the passage of time.
highest_chain_height: AtomicUsize,
}
log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
Err(ChannelMonitorUpdateErr::PermanentFailure) => {
monitor_state.channel_perm_failed.store(true, Ordering::Release);
- self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateFailed(*funding_outpoint));
+ self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)]));
},
Err(ChannelMonitorUpdateErr::TemporaryFailure) => {
log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
// UpdateCompleted event.
return Ok(());
}
- self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateCompleted {
+ self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::UpdateCompleted {
funding_txo,
monitor_update_id: monitor_data.monitor.get_latest_update_id(),
- });
+ }]));
},
MonitorUpdateId { contents: UpdateOrigin::ChainSync(_) } => {
if !monitor_data.has_pending_chainsync_updates(&pending_monitor_updates) {
/// channel_monitor_updated once with the highest ID.
#[cfg(any(test, fuzzing))]
pub fn force_channel_monitor_updated(&self, funding_txo: OutPoint, monitor_update_id: u64) {
- self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateCompleted {
+ self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::UpdateCompleted {
funding_txo,
monitor_update_id,
- });
+ }]));
}
#[cfg(any(test, fuzzing, feature = "_test_utils"))]
L::Target: Logger,
P::Target: Persist<ChannelSigner>,
{
- fn block_connected(&self, block: &Block, height: u32) {
- let header = &block.header;
- let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
+ fn filtered_block_connected(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) {
log_debug!(self.logger, "New best block {} at height {} provided via block_connected", header.block_hash(), height);
self.process_chain_data(header, Some(height), &txdata, |monitor, txdata| {
monitor.block_connected(
}
}
- fn release_pending_monitor_events(&self) -> Vec<MonitorEvent> {
+ fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>)> {
let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
for monitor_state in self.monitors.read().unwrap().values() {
let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap());
log_error!(self.logger, " To avoid funds-loss, we are allowing monitor updates to be released.");
log_error!(self.logger, " This may cause duplicate payment events to be generated.");
}
- pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events());
+ let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();
+ if monitor_events.len() > 0 {
+ let monitor_outpoint = monitor_state.monitor.get_funding_txo().0;
+ pending_monitor_events.push((monitor_outpoint, monitor_events));
+ }
}
}
pending_monitor_events
mod tests {
use bitcoin::BlockHeader;
use ::{check_added_monitors, check_closed_broadcast, check_closed_event};
- use ::{expect_payment_sent, expect_payment_sent_without_paths, expect_payment_path_successful, get_event_msg};
+ use ::{expect_payment_sent, expect_payment_claimed, expect_payment_sent_without_paths, expect_payment_path_successful, get_event_msg};
use ::{get_htlc_update_msgs, get_local_commitment_txn, get_revoke_commit_msgs, get_route_and_payment_hash, unwrap_send_err};
use chain::{ChannelMonitorUpdateErr, Confirm, Watch};
use chain::channelmonitor::LATENCY_GRACE_PERIOD_BLOCKS;
create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
// Route two payments to be claimed at the same time.
- let payment_preimage_1 = route_payment(&nodes[0], &[&nodes[1]], 1_000_000).0;
- let payment_preimage_2 = route_payment(&nodes[0], &[&nodes[1]], 1_000_000).0;
+ let (payment_preimage_1, payment_hash_1, _) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000);
+ let (payment_preimage_2, payment_hash_2, _) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000);
chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clear();
chanmon_cfgs[1].persister.set_update_ret(Err(ChannelMonitorUpdateErr::TemporaryFailure));
nodes[1].node.claim_funds(payment_preimage_1);
check_added_monitors!(nodes[1], 1);
+ expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
nodes[1].node.claim_funds(payment_preimage_2);
check_added_monitors!(nodes[1], 1);
+ expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000);
chanmon_cfgs[1].persister.set_update_ret(Ok(()));
let (route, second_payment_hash, _, second_payment_secret) = get_route_and_payment_hash!(nodes[0], nodes[1], 100_000);
// First route a payment that we will claim on chain and give the recipient the preimage.
- let payment_preimage = route_payment(&nodes[0], &[&nodes[1]], 1_000_000).0;
+ let (payment_preimage, payment_hash, _) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000);
nodes[1].node.claim_funds(payment_preimage);
+ expect_payment_claimed!(nodes[1], payment_hash, 1_000_000);
nodes[1].node.get_and_clear_pending_msg_events();
check_added_monitors!(nodes[1], 1);
let remote_txn = get_local_commitment_txn!(nodes[1], channel.2);