/// could be in the middle of being processed without the direct mutex held.
///
/// See `ChannelManager` struct-level documentation for lock order requirements.
+ #[cfg(not(any(test, feature = "_test_utils")))]
pending_events: Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>,
+ #[cfg(any(test, feature = "_test_utils"))]
+ pub(crate) pending_events: Mutex<VecDeque<(events::Event, Option<EventCompletionAction>)>>,
+
/// A simple atomic flag to ensure only one task at a time can be processing events asynchronously.
pending_events_processor: AtomicBool,
self.pending_outbound_payments.finalize_claims(sources, &self.pending_events);
}
- fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool, next_channel_id: [u8; 32]) {
+ fn claim_funds_internal(&self, source: HTLCSource, payment_preimage: PaymentPreimage, forwarded_htlc_value_msat: Option<u64>, from_onchain: bool, next_channel_outpoint: OutPoint) {
match source {
HTLCSource::OutboundRoute { session_priv, payment_id, path, .. } => {
debug_assert!(self.background_events_processed_since_startup.load(Ordering::Acquire),
"We don't support claim_htlc claims during startup - monitors may not be available yet");
- self.pending_outbound_payments.claim_htlc(payment_id, payment_preimage, session_priv, path, from_onchain, &self.pending_events, &self.logger);
+ let ev_completion_action = EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
+ channel_funding_outpoint: next_channel_outpoint,
+ counterparty_node_id: path.hops[0].pubkey,
+ };
+ self.pending_outbound_payments.claim_htlc(payment_id, payment_preimage,
+ session_priv, path, from_onchain, ev_completion_action, &self.pending_events,
+ &self.logger);
},
HTLCSource::PreviousHopData(hop_data) => {
let prev_outpoint = hop_data.outpoint;
fee_earned_msat,
claim_from_onchain_tx: from_onchain,
prev_channel_id: Some(prev_outpoint.to_channel_id()),
- next_channel_id: Some(next_channel_id),
+ next_channel_id: Some(next_channel_outpoint.to_channel_id()),
outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
},
downstream_counterparty_and_funding_outpoint: None,
}
fn internal_update_fulfill_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) -> Result<(), MsgHandleErrInternal> {
+ let funding_txo;
let (htlc_source, forwarded_htlc_value) = {
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex = per_peer_state.get(counterparty_node_id)
let peer_state = &mut *peer_state_lock;
match peer_state.channel_by_id.entry(msg.channel_id) {
hash_map::Entry::Occupied(mut chan) => {
- try_chan_entry!(self, chan.get_mut().update_fulfill_htlc(&msg), chan)
+ let res = try_chan_entry!(self, chan.get_mut().update_fulfill_htlc(&msg), chan);
+ funding_txo = chan.get().context.get_funding_txo().expect("We won't accept a fulfill until funded");
+ res
},
hash_map::Entry::Vacant(_) => return Err(MsgHandleErrInternal::send_err_msg_no_close(format!("Got a message for a channel from the wrong node! No such channel for the passed counterparty_node_id {}", counterparty_node_id), msg.channel_id))
}
};
- self.claim_funds_internal(htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, msg.channel_id);
+ self.claim_funds_internal(htlc_source, msg.payment_preimage.clone(), Some(forwarded_htlc_value), false, funding_txo);
Ok(())
}
let peer_state = &mut *peer_state_lock;
match peer_state.channel_by_id.entry(msg.channel_id) {
hash_map::Entry::Occupied(mut chan) => {
- let funding_txo = chan.get().context.get_funding_txo();
- let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self, chan.get_mut().revoke_and_ack(&msg, &self.fee_estimator, &self.logger), chan);
+ let funding_txo_opt = chan.get().context.get_funding_txo();
+ let mon_update_blocked = if let Some(funding_txo) = funding_txo_opt {
+ self.raa_monitor_updates_held(
+ &peer_state.actions_blocking_raa_monitor_updates, funding_txo,
+ *counterparty_node_id)
+ } else { false };
+ let (htlcs_to_fail, monitor_update_opt) = try_chan_entry!(self,
+ chan.get_mut().revoke_and_ack(&msg, &self.fee_estimator, &self.logger, mon_update_blocked), chan);
let res = if let Some(monitor_update) = monitor_update_opt {
- handle_new_monitor_update!(self, funding_txo.unwrap(), monitor_update,
+ let funding_txo = funding_txo_opt
+ .expect("Funding outpoint must have been set for RAA handling to succeed");
+ handle_new_monitor_update!(self, funding_txo, monitor_update,
peer_state_lock, peer_state, per_peer_state, chan).map(|_| ())
} else { Ok(()) };
(htlcs_to_fail, res)
MonitorEvent::HTLCEvent(htlc_update) => {
if let Some(preimage) = htlc_update.payment_preimage {
log_trace!(self.logger, "Claiming HTLC with preimage {} from our monitor", log_bytes!(preimage.0));
- self.claim_funds_internal(htlc_update.source, preimage, htlc_update.htlc_value_satoshis.map(|v| v * 1000), true, funding_outpoint.to_channel_id());
+ self.claim_funds_internal(htlc_update.source, preimage, htlc_update.htlc_value_satoshis.map(|v| v * 1000), true, funding_outpoint);
} else {
log_trace!(self.logger, "Failing HTLC with hash {} from our monitor", log_bytes!(htlc_update.payment_hash.0));
let receiver = HTLCDestination::NextHopChannel { node_id: counterparty_node_id, channel_id: funding_outpoint.to_channel_id() };
// generating a `PaymentPathSuccessful` event but regenerating
// it and the `PaymentSent` on every restart until the
// `ChannelMonitor` is removed.
- pending_outbounds.claim_htlc(payment_id, preimage, session_priv, path, false, &pending_events, &args.logger);
+ let compl_action =
+ EventCompletionAction::ReleaseRAAChannelMonitorUpdate {
+ channel_funding_outpoint: monitor.get_funding_txo().0,
+ counterparty_node_id: path.hops[0].pubkey,
+ };
+ pending_outbounds.claim_htlc(payment_id, preimage, session_priv,
+ path, false, compl_action, &pending_events, &args.logger);
pending_events_read = pending_events.into_inner().unwrap();
}
},
// downstream chan is closed (because we don't have a
// channel_id -> peer map entry).
counterparty_opt.is_none(),
- monitor.get_funding_txo().0.to_channel_id()))
+ monitor.get_funding_txo().0))
} else { None }
} else {
// If it was an outbound payment, we've handled it above - if a preimage
channel_manager.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver);
}
- for (source, preimage, downstream_value, downstream_closed, downstream_chan_id) in pending_claims_to_replay {
+ for (source, preimage, downstream_value, downstream_closed, downstream_funding) in pending_claims_to_replay {
// We use `downstream_closed` in place of `from_onchain` here just as a guess - we
// don't remember in the `ChannelMonitor` where we got a preimage from, but if the
// channel is closed we just assume that it probably came from an on-chain claim.
channel_manager.claim_funds_internal(source, preimage, Some(downstream_value),
- downstream_closed, downstream_chan_id);
+ downstream_closed, downstream_funding);
}
//TODO: Broadcast channel update for closed channels, but only after we've made a
let bs_first_updates = get_htlc_update_msgs!(nodes[1], nodes[0].node.get_our_node_id());
nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &bs_first_updates.update_fulfill_htlcs[0]);
+ expect_payment_sent(&nodes[0], payment_preimage, None, false, false);
nodes[0].node.handle_commitment_signed(&nodes[1].node.get_our_node_id(), &bs_first_updates.commitment_signed);
check_added_monitors!(nodes[0], 1);
let (as_first_raa, as_first_cs) = get_revoke_commit_msgs!(nodes[0], nodes[1].node.get_our_node_id());
// Note that successful MPP payments will generate a single PaymentSent event upon the first
// path's success and a PaymentPathSuccessful event for each path's success.
let events = nodes[0].node.get_and_clear_pending_events();
- assert_eq!(events.len(), 3);
+ assert_eq!(events.len(), 2);
match events[0] {
- Event::PaymentSent { payment_id: ref id, payment_preimage: ref preimage, payment_hash: ref hash, .. } => {
- assert_eq!(Some(payment_id), *id);
- assert_eq!(payment_preimage, *preimage);
- assert_eq!(our_payment_hash, *hash);
- },
- _ => panic!("Unexpected event"),
- }
- match events[1] {
Event::PaymentPathSuccessful { payment_id: ref actual_payment_id, ref payment_hash, ref path } => {
assert_eq!(payment_id, *actual_payment_id);
assert_eq!(our_payment_hash, *payment_hash.as_ref().unwrap());
},
_ => panic!("Unexpected event"),
}
- match events[2] {
+ match events[1] {
Event::PaymentPathSuccessful { payment_id: ref actual_payment_id, ref payment_hash, ref path } => {
assert_eq!(payment_id, *actual_payment_id);
assert_eq!(our_payment_hash, *payment_hash.as_ref().unwrap());