self.chain_monitor.update_channel(funding_txo, update)
}
- fn release_pending_monitor_events(&self) -> Vec<MonitorEvent> {
+ fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>)> {
return self.chain_monitor.release_pending_monitor_events();
}
}
persister: P,
/// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
/// from the user and not from a [`ChannelMonitor`].
- pending_monitor_events: Mutex<Vec<MonitorEvent>>,
+ pending_monitor_events: Mutex<Vec<(OutPoint, Vec<MonitorEvent>)>>,
/// The best block height seen, used as a proxy for the passage of time.
highest_chain_height: AtomicUsize,
}
log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
Err(ChannelMonitorUpdateErr::PermanentFailure) => {
monitor_state.channel_perm_failed.store(true, Ordering::Release);
- self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateFailed(*funding_outpoint));
+ self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)]));
},
Err(ChannelMonitorUpdateErr::TemporaryFailure) => {
log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
// UpdateCompleted event.
return Ok(());
}
- self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateCompleted {
+ self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::UpdateCompleted {
funding_txo,
monitor_update_id: monitor_data.monitor.get_latest_update_id(),
- });
+ }]));
},
MonitorUpdateId { contents: UpdateOrigin::ChainSync(_) } => {
if !monitor_data.has_pending_chainsync_updates(&pending_monitor_updates) {
/// channel_monitor_updated once with the highest ID.
#[cfg(any(test, fuzzing))]
pub fn force_channel_monitor_updated(&self, funding_txo: OutPoint, monitor_update_id: u64) {
- self.pending_monitor_events.lock().unwrap().push(MonitorEvent::UpdateCompleted {
+ self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::UpdateCompleted {
funding_txo,
monitor_update_id,
- });
+ }]));
}
#[cfg(any(test, fuzzing, feature = "_test_utils"))]
}
}
- fn release_pending_monitor_events(&self) -> Vec<MonitorEvent> {
+ fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>)> {
let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
for monitor_state in self.monitors.read().unwrap().values() {
let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap());
log_error!(self.logger, " To avoid funds-loss, we are allowing monitor updates to be released.");
log_error!(self.logger, " This may cause duplicate payment events to be generated.");
}
- pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events());
+ let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();
+ if monitor_events.len() > 0 {
+ let monitor_outpoint = monitor_state.monitor.get_funding_txo().0;
+ pending_monitor_events.push((monitor_outpoint, monitor_events));
+ }
}
}
pending_monitor_events
///
/// For details on asynchronous [`ChannelMonitor`] updating and returning
/// [`MonitorEvent::UpdateCompleted`] here, see [`ChannelMonitorUpdateErr::TemporaryFailure`].
- fn release_pending_monitor_events(&self) -> Vec<MonitorEvent>;
+ fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>)>;
}
/// The `Filter` trait defines behavior for indicating chain activity of interest pertaining to
assert!(updates.update_fee.is_none());
assert_eq!(updates.update_fulfill_htlcs.len(), 1);
nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]);
- expect_payment_forwarded!(nodes[1], nodes[0], Some(1000), false);
+ expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(1000), false, false);
check_added_monitors!(nodes[1], 1);
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
commitment_signed_dance!(nodes[1], nodes[2], updates.commitment_signed, false);
nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &cs_updates.update_fulfill_htlcs[0]);
let bs_updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
check_added_monitors!(nodes[1], 1);
- expect_payment_forwarded!(nodes[1], nodes[0], Some(1000), false);
+ expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(1000), false, false);
mine_transaction(&nodes[1], &bs_txn[0]);
check_closed_event!(nodes[1], 1, ClosureReason::CommitmentTxConfirmed);
assert_eq!(fulfill_msg, cs_updates.update_fulfill_htlcs[0]);
}
nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &fulfill_msg);
- expect_payment_forwarded!(nodes[1], nodes[0], Some(1000), false);
+ expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(1000), false, false);
check_added_monitors!(nodes[1], 1);
let mut bs_updates = None;
}
}
- fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder<Signer>>, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool) {
+ fn claim_funds_internal(&self, mut channel_state_lock: MutexGuard<ChannelHolder<Signer>>, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool, next_channel_id: [u8; 32]) {
match source {
HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => {
mem::drop(channel_state_lock);
} else { None };
let mut pending_events = self.pending_events.lock().unwrap();
+ let prev_channel_id = Some(prev_outpoint.to_channel_id());
+ let next_channel_id = Some(next_channel_id);
- let source_channel_id = Some(prev_outpoint.to_channel_id());
pending_events.push(events::Event::PaymentForwarded {
- source_channel_id,
fee_earned_msat,
claim_from_onchain_tx: from_onchain,
+ prev_channel_id,
+ next_channel_id,
});
}
}
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close("Failed to find corresponding channel".to_owned(), msg.channel_id))
}
};
- self.claim_funds_internal(channel_lock, htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false);
+ self.claim_funds_internal(channel_lock, htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, msg.channel_id);
Ok(())
}
let mut failed_channels = Vec::new();
let mut pending_monitor_events = self.chain_monitor.release_pending_monitor_events();
let has_pending_monitor_events = !pending_monitor_events.is_empty();
- for monitor_event in pending_monitor_events.drain(..) {
- match monitor_event {
- MonitorEvent::HTLCEvent(htlc_update) => {
- if let Some(preimage) = htlc_update.payment_preimage {
- log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
- self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage, htlc_update.onchain_value_satoshis.map(|v| v * 1000), true);
- } else {
- log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0));
- self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
- }
- },
- MonitorEvent::CommitmentTxConfirmed(funding_outpoint) |
- MonitorEvent::UpdateFailed(funding_outpoint) => {
- let mut channel_lock = self.channel_state.lock().unwrap();
- let channel_state = &mut *channel_lock;
- let by_id = &mut channel_state.by_id;
- let pending_msg_events = &mut channel_state.pending_msg_events;
- if let hash_map::Entry::Occupied(chan_entry) = by_id.entry(funding_outpoint.to_channel_id()) {
- let mut chan = remove_channel!(self, channel_state, chan_entry);
- failed_channels.push(chan.force_shutdown(false));
- if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
- pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
- msg: update
+ for (funding_outpoint, mut monitor_events) in pending_monitor_events.drain(..) {
+ for monitor_event in monitor_events.drain(..) {
+ match monitor_event {
+ MonitorEvent::HTLCEvent(htlc_update) => {
+ if let Some(preimage) = htlc_update.payment_preimage {
+ log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
+ self.claim_funds_internal(self.channel_state.lock().unwrap(), htlc_update.source, preimage, htlc_update.onchain_value_satoshis.map(|v| v * 1000), true, funding_outpoint.to_channel_id());
+ } else {
+ log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0));
+ self.fail_htlc_backwards_internal(self.channel_state.lock().unwrap(), htlc_update.source, &htlc_update.payment_hash, HTLCFailReason::Reason { failure_code: 0x4000 | 8, data: Vec::new() });
+ }
+ },
+ MonitorEvent::CommitmentTxConfirmed(funding_outpoint) |
+ MonitorEvent::UpdateFailed(funding_outpoint) => {
+ let mut channel_lock = self.channel_state.lock().unwrap();
+ let channel_state = &mut *channel_lock;
+ let by_id = &mut channel_state.by_id;
+ let pending_msg_events = &mut channel_state.pending_msg_events;
+ if let hash_map::Entry::Occupied(chan_entry) = by_id.entry(funding_outpoint.to_channel_id()) {
+ let mut chan = remove_channel!(self, channel_state, chan_entry);
+ failed_channels.push(chan.force_shutdown(false));
+ if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
+ pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+ msg: update
+ });
+ }
+ let reason = if let MonitorEvent::UpdateFailed(_) = monitor_event {
+ ClosureReason::ProcessingError { err: "Failed to persist ChannelMonitor update during chain sync".to_string() }
+ } else {
+ ClosureReason::CommitmentTxConfirmed
+ };
+ self.issue_channel_close_events(&chan, reason);
+ pending_msg_events.push(events::MessageSendEvent::HandleError {
+ node_id: chan.get_counterparty_node_id(),
+ action: msgs::ErrorAction::SendErrorMessage {
+ msg: msgs::ErrorMessage { channel_id: chan.channel_id(), data: "Channel force-closed".to_owned() }
+ },
});
}
- let reason = if let MonitorEvent::UpdateFailed(_) = monitor_event {
- ClosureReason::ProcessingError { err: "Failed to persist ChannelMonitor update during chain sync".to_string() }
- } else {
- ClosureReason::CommitmentTxConfirmed
- };
- self.issue_channel_close_events(&chan, reason);
- pending_msg_events.push(events::MessageSendEvent::HandleError {
- node_id: chan.get_counterparty_node_id(),
- action: msgs::ErrorAction::SendErrorMessage {
- msg: msgs::ErrorMessage { channel_id: chan.channel_id(), data: "Channel force-closed".to_owned() }
- },
- });
- }
- },
- MonitorEvent::UpdateCompleted { funding_txo, monitor_update_id } => {
- self.channel_monitor_updated(&funding_txo, monitor_update_id);
- },
+ },
+ MonitorEvent::UpdateCompleted { funding_txo, monitor_update_id } => {
+ self.channel_monitor_updated(&funding_txo, monitor_update_id);
+ },
+ }
}
}
}
macro_rules! expect_payment_forwarded {
- ($node: expr, $source_node: expr, $expected_fee: expr, $upstream_force_closed: expr) => {
+ ($node: expr, $prev_node: expr, $next_node: expr, $expected_fee: expr, $upstream_force_closed: expr, $downstream_force_closed: expr) => {
let events = $node.node.get_and_clear_pending_events();
assert_eq!(events.len(), 1);
match events[0] {
- Event::PaymentForwarded { fee_earned_msat, source_channel_id, claim_from_onchain_tx } => {
+ Event::PaymentForwarded { fee_earned_msat, prev_channel_id, claim_from_onchain_tx, next_channel_id } => {
assert_eq!(fee_earned_msat, $expected_fee);
if fee_earned_msat.is_some() {
- // Is the event channel_id in one of the channels between the two nodes?
- assert!($node.node.list_channels().iter().any(|x| x.counterparty.node_id == $source_node.node.get_our_node_id() && x.channel_id == source_channel_id.unwrap()));
+ // Is the event prev_channel_id in one of the channels between the two nodes?
+ assert!($node.node.list_channels().iter().any(|x| x.counterparty.node_id == $prev_node.node.get_our_node_id() && x.channel_id == prev_channel_id.unwrap()));
+ }
+ // We check for force closures since a force closed channel is removed from the
+ // node's channel list
+ if !$downstream_force_closed {
+ assert!($node.node.list_channels().iter().any(|x| x.counterparty.node_id == $next_node.node.get_our_node_id() && x.channel_id == next_channel_id.unwrap()));
}
assert_eq!(claim_from_onchain_tx, $upstream_force_closed);
},
{
$node.node.handle_update_fulfill_htlc(&$prev_node.node.get_our_node_id(), &next_msgs.as_ref().unwrap().0);
let fee = $node.node.channel_state.lock().unwrap().by_id.get(&next_msgs.as_ref().unwrap().0.channel_id).unwrap().config.forwarding_fee_base_msat;
- expect_payment_forwarded!($node, $next_node, Some(fee as u64), false);
+ expect_payment_forwarded!($node, $next_node, $prev_node, Some(fee as u64), false, false);
expected_total_fee_msat += fee as u64;
check_added_monitors!($node, 1);
let new_next_msgs = if $new_msgs {
}
let chan_id = Some(chan_1.2);
match forwarded_events[1] {
- Event::PaymentForwarded { fee_earned_msat, source_channel_id, claim_from_onchain_tx } => {
+ Event::PaymentForwarded { fee_earned_msat, prev_channel_id, claim_from_onchain_tx, next_channel_id } => {
assert_eq!(fee_earned_msat, Some(1000));
- assert_eq!(source_channel_id, chan_id);
+ assert_eq!(prev_channel_id, chan_id);
assert_eq!(claim_from_onchain_tx, true);
+ assert_eq!(next_channel_id, Some(chan_2.2));
},
_ => panic!()
}
match forwarded_events[2] {
- Event::PaymentForwarded { fee_earned_msat, source_channel_id, claim_from_onchain_tx } => {
+ Event::PaymentForwarded { fee_earned_msat, prev_channel_id, claim_from_onchain_tx, next_channel_id } => {
assert_eq!(fee_earned_msat, Some(1000));
- assert_eq!(source_channel_id, chan_id);
+ assert_eq!(prev_channel_id, chan_id);
assert_eq!(claim_from_onchain_tx, true);
+ assert_eq!(next_channel_id, Some(chan_2.2));
},
_ => panic!()
}
_ => panic!("Unexpected event"),
}
match events[1] {
- Event::PaymentForwarded { fee_earned_msat, source_channel_id, claim_from_onchain_tx } => {
+ Event::PaymentForwarded { fee_earned_msat, prev_channel_id, claim_from_onchain_tx, next_channel_id } => {
assert_eq!(fee_earned_msat, Some(1000));
- assert_eq!(source_channel_id, Some(chan_1.2));
+ assert_eq!(prev_channel_id, Some(chan_1.2));
assert_eq!(claim_from_onchain_tx, true);
+ assert_eq!(next_channel_id, Some(chan_2.2));
},
_ => panic!("Unexpected event"),
}
// Note that the fee paid is effectively double as the HTLC value (including the nodes[1] fee
// and nodes[2] fee) is rounded down and then claimed in full.
mine_transaction(&nodes[1], &htlc_success_txn[0]);
- expect_payment_forwarded!(nodes[1], nodes[0], Some(196*2), true);
+ expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(196*2), true, true);
let updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
assert!(updates.update_add_htlcs.is_empty());
assert!(updates.update_fail_htlcs.is_empty());
assert_eq!(carol_updates.update_fulfill_htlcs.len(), 1);
nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &carol_updates.update_fulfill_htlcs[0]);
- expect_payment_forwarded!(nodes[1], nodes[0], if go_onchain_before_fulfill || force_closing_node == 1 { None } else { Some(1000) }, false);
+ expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], if go_onchain_before_fulfill || force_closing_node == 1 { None } else { Some(1000) }, false, false);
// If Alice broadcasted but Bob doesn't know yet, here he prepares to tell her about the preimage.
if !go_onchain_before_fulfill && broadcast_alice {
let events = nodes[1].node.get_and_clear_pending_msg_events();
let bs_htlc_claim_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
assert_eq!(bs_htlc_claim_txn.len(), 1);
check_spends!(bs_htlc_claim_txn[0], as_commitment_tx);
- expect_payment_forwarded!(nodes[1], nodes[0], None, false);
+ expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], None, false, false);
if !confirm_before_reload {
mine_transaction(&nodes[0], &as_commitment_tx);
// ChannelManager only polls chain::Watch::release_pending_monitor_events when we
// probe it for events, so we probe non-message events here (which should just be the
// PaymentForwarded event).
- expect_payment_forwarded!(nodes[1], nodes[0], Some(1000), true);
+ expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(1000), true, true);
} else {
// Confirm the timeout tx and check that we fail the HTLC backwards
let block = Block {
assert!(updates.update_fee.is_none());
assert_eq!(updates.update_fulfill_htlcs.len(), 1);
nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]);
- expect_payment_forwarded!(nodes[1], nodes[0], Some(1000), false);
+ expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(1000), false, false);
check_added_monitors!(nodes[1], 1);
let updates_2 = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
commitment_signed_dance!(nodes[1], nodes[2], updates.commitment_signed, false);
assert!(updates.update_fee.is_none());
assert_eq!(updates.update_fulfill_htlcs.len(), 1);
nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]);
- expect_payment_forwarded!(nodes[1], nodes[0], Some(1000), false);
+ expect_payment_forwarded!(nodes[1], nodes[0], nodes[2], Some(1000), false, false);
check_added_monitors!(nodes[1], 1);
let updates_2 = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
commitment_signed_dance!(nodes[1], nodes[2], updates.commitment_signed, false);
/// This event is generated when a payment has been successfully forwarded through us and a
/// forwarding fee earned.
PaymentForwarded {
- /// The channel between the source node and us. Optional because versions prior to 0.0.107
- /// do not serialize this field.
- source_channel_id: Option<[u8; 32]>,
+ /// The incoming channel between the previous node and us. This is only `None` for events
+ /// generated or serialized by versions prior to 0.0.107.
+ prev_channel_id: Option<[u8; 32]>,
+ /// The outgoing channel between the next node and us. This is only `None` for events
+ /// generated or serialized by versions prior to 0.0.107.
+ next_channel_id: Option<[u8; 32]>,
/// The fee, in milli-satoshis, which was earned as a result of the payment.
///
/// Note that if we force-closed the channel over which we forwarded an HTLC while the HTLC
(0, VecWriteWrapper(outputs), required),
});
},
- &Event::PaymentForwarded { fee_earned_msat, source_channel_id, claim_from_onchain_tx } => {
+ &Event::PaymentForwarded { fee_earned_msat, prev_channel_id, claim_from_onchain_tx, next_channel_id } => {
7u8.write(writer)?;
write_tlv_fields!(writer, {
(0, fee_earned_msat, option),
- (1, source_channel_id, option),
+ (1, prev_channel_id, option),
(2, claim_from_onchain_tx, required),
+ (3, next_channel_id, option),
});
},
&Event::ChannelClosed { ref channel_id, ref user_channel_id, ref reason } => {
7u8 => {
let f = || {
let mut fee_earned_msat = None;
- let mut source_channel_id = None;
+ let mut prev_channel_id = None;
let mut claim_from_onchain_tx = false;
+ let mut next_channel_id = None;
read_tlv_fields!(reader, {
(0, fee_earned_msat, option),
- (1, source_channel_id, option),
+ (1, prev_channel_id, option),
(2, claim_from_onchain_tx, required),
+ (3, next_channel_id, option),
});
- Ok(Some(Event::PaymentForwarded { fee_earned_msat, source_channel_id, claim_from_onchain_tx }))
+ Ok(Some(Event::PaymentForwarded { fee_earned_msat, prev_channel_id, claim_from_onchain_tx, next_channel_id }))
};
f()
},
update_res
}
- fn release_pending_monitor_events(&self) -> Vec<MonitorEvent> {
+ fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>)> {
return self.chain_monitor.release_pending_monitor_events();
}
}