use bitcoin::secp256k1::Secp256k1;
use bitcoin::{secp256k1, Sequence};
+use crate::blinded_path::message::{MessageContext, OffersContext};
use crate::blinded_path::{BlindedPath, NodeIdLookUp};
use crate::blinded_path::message::ForwardNode;
use crate::blinded_path::payment::{Bolt12OfferContext, Bolt12RefundContext, PaymentConstraints, PaymentContext, ReceiveTlvs};
use crate::offers::offer::{Offer, OfferBuilder};
use crate::offers::parse::Bolt12SemanticError;
use crate::offers::refund::{Refund, RefundBuilder};
+use crate::onion_message::async_payments::{AsyncPaymentsMessage, HeldHtlcAvailable, ReleaseHeldHtlc, AsyncPaymentsMessageHandler};
use crate::onion_message::messenger::{new_pending_onion_message, Destination, MessageRouter, PendingOnionMessage, Responder, ResponseInstruction};
use crate::onion_message::offers::{OffersMessage, OffersMessageHandler};
use crate::sign::{EntropySource, NodeSigner, Recipient, SignerProvider};
// Note that this may be an outbound SCID alias for the associated channel.
prev_short_channel_id: u64,
prev_htlc_id: u64,
+ prev_counterparty_node_id: Option<PublicKey>,
prev_channel_id: ChannelId,
prev_funding_outpoint: OutPoint,
prev_user_channel_id: u128,
blinded_failure: Option<BlindedFailure>,
channel_id: ChannelId,
- // This field is consumed by `claim_funds_from_hop()` when updating a force-closed backwards
+ // These fields are consumed by `claim_funds_from_hop()` when updating a force-closed backwards
// channel with a preimage provided by the forward channel.
outpoint: OutPoint,
+ counterparty_node_id: Option<PublicKey>,
}
enum OnionPayload {
},
(2, OutboundRoute) => {
(0, session_priv, required),
- };
+ },
);
/// be sent in the order they appear in the return value, however sometimes the order needs to be
/// variable at runtime (eg Channel::channel_reestablish needs to re-send messages in the order
/// they were originally sent). In those cases, this enum is also returned.
-#[derive(Clone, PartialEq)]
+#[derive(Clone, PartialEq, Debug)]
pub(super) enum RAACommitmentOrder {
/// Send the CommitmentUpdate messages first
CommitmentFirst,
},
}
+/// A pointer to a channel that is unblocked when an event is surfaced
+#[derive(Debug)]
+pub(crate) struct EventUnblockedChannel {
+ counterparty_node_id: PublicKey,
+ funding_txo: OutPoint,
+ channel_id: ChannelId,
+ blocking_action: RAAMonitorUpdateBlockingAction,
+}
+
+impl Writeable for EventUnblockedChannel {
+ fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> {
+ self.counterparty_node_id.write(writer)?;
+ self.funding_txo.write(writer)?;
+ self.channel_id.write(writer)?;
+ self.blocking_action.write(writer)
+ }
+}
+
+impl MaybeReadable for EventUnblockedChannel {
+ fn read<R: Read>(reader: &mut R) -> Result<Option<Self>, DecodeError> {
+ let counterparty_node_id = Readable::read(reader)?;
+ let funding_txo = Readable::read(reader)?;
+ let channel_id = Readable::read(reader)?;
+ let blocking_action = match RAAMonitorUpdateBlockingAction::read(reader)? {
+ Some(blocking_action) => blocking_action,
+ None => return Ok(None),
+ };
+ Ok(Some(EventUnblockedChannel {
+ counterparty_node_id,
+ funding_txo,
+ channel_id,
+ blocking_action,
+ }))
+ }
+}
+
#[derive(Debug)]
pub(crate) enum MonitorUpdateCompletionAction {
/// Indicates that a payment ultimately destined for us was claimed and we should emit an
/// [`events::Event::PaymentClaimed`] to the user if we haven't yet generated such an event for
/// this payment. Note that this is only best-effort. On restart it's possible such a duplicate
/// event can be generated.
- PaymentClaimed { payment_hash: PaymentHash },
+ PaymentClaimed {
+ payment_hash: PaymentHash,
+ /// A pending MPP claim which hasn't yet completed.
+ ///
+ /// Not written to disk.
+ pending_mpp_claim: Option<(PublicKey, ChannelId, u64, PendingMPPClaimPointer)>,
+ },
/// Indicates an [`events::Event`] should be surfaced to the user and possibly resume the
/// operation of another channel.
///
/// outbound edge.
EmitEventAndFreeOtherChannel {
event: events::Event,
- downstream_counterparty_and_funding_outpoint: Option<(PublicKey, OutPoint, ChannelId, RAAMonitorUpdateBlockingAction)>,
+ downstream_counterparty_and_funding_outpoint: Option<EventUnblockedChannel>,
},
/// Indicates we should immediately resume the operation of another channel, unless there is
/// some other reason why the channel is blocked. In practice this simply means immediately
}
impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction,
- (0, PaymentClaimed) => { (0, payment_hash, required) },
+ (0, PaymentClaimed) => {
+ (0, payment_hash, required),
+ (9999999999, pending_mpp_claim, (static_value, None)),
+ },
// Note that FreeOtherChannelImmediately should never be written - we were supposed to free
// *immediately*. However, for simplicity we implement read/write here.
(1, FreeOtherChannelImmediately) => {
(0, downstream_counterparty_node_id, required),
(2, downstream_funding_outpoint, required),
- (4, blocking_action, required),
+ (4, blocking_action, upgradable_required),
// Note that by the time we get past the required read above, downstream_funding_outpoint will be
// filled in, so we can safely unwrap it here.
(5, downstream_channel_id, (default_value, ChannelId::v1_from_funding_outpoint(downstream_funding_outpoint.0.unwrap()))),
// monitor updates which aren't properly blocked or resumed, however that's fine - we don't
// support async monitor updates even in LDK 0.0.116 and once we do we'll require no
// downgrades to prior versions.
- (1, downstream_counterparty_and_funding_outpoint, option),
+ (1, downstream_counterparty_and_funding_outpoint, upgradable_option),
},
);
// Note that by the time we get past the required read above, channel_funding_outpoint will be
// filled in, so we can safely unwrap it here.
(3, channel_id, (default_value, ChannelId::v1_from_funding_outpoint(channel_funding_outpoint.0.unwrap()))),
- };
+ }
);
+#[derive(Debug)]
+pub(crate) struct PendingMPPClaim {
+ channels_without_preimage: Vec<(PublicKey, OutPoint, ChannelId, u64)>,
+ channels_with_preimage: Vec<(PublicKey, OutPoint, ChannelId)>,
+}
+
+#[derive(Clone)]
+pub(crate) struct PendingMPPClaimPointer(Arc<Mutex<PendingMPPClaim>>);
+
+impl PartialEq for PendingMPPClaimPointer {
+ fn eq(&self, o: &Self) -> bool { Arc::ptr_eq(&self.0, &o.0) }
+}
+impl Eq for PendingMPPClaimPointer {}
+
+impl core::fmt::Debug for PendingMPPClaimPointer {
+ fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> {
+ self.0.lock().unwrap().fmt(f)
+ }
+}
+
#[derive(Clone, PartialEq, Eq, Debug)]
/// If something is blocked on the completion of an RAA-generated [`ChannelMonitorUpdate`] we track
/// the blocked action here. See enum variants for more info.
/// The HTLC ID on the inbound edge.
htlc_id: u64,
},
+ /// We claimed an MPP payment across multiple channels. We have to block removing the payment
+ /// preimage from any monitor until the last monitor is updated to contain the payment
+ /// preimage. Otherwise we may not be able to replay the preimage on the monitor(s) that
+ /// weren't updated on startup.
+ ///
+ /// This variant is *not* written to disk, instead being inferred from [`ChannelMonitor`]
+ /// state.
+ ClaimedMPPPayment {
+ pending_claim: PendingMPPClaimPointer,
+ }
}
impl RAAMonitorUpdateBlockingAction {
}
}
-impl_writeable_tlv_based_enum!(RAAMonitorUpdateBlockingAction,
- (0, ForwardedPaymentInboundClaim) => { (0, channel_id, required), (2, htlc_id, required) }
-;);
+impl_writeable_tlv_based_enum_upgradable!(RAAMonitorUpdateBlockingAction,
+ (0, ForwardedPaymentInboundClaim) => { (0, channel_id, required), (2, htlc_id, required) },
+ unread_variants: ClaimedMPPPayment
+);
+impl Readable for Option<RAAMonitorUpdateBlockingAction> {
+ fn read<R: Read>(reader: &mut R) -> Result<Self, DecodeError> {
+ Ok(RAAMonitorUpdateBlockingAction::read(reader)?)
+ }
+}
/// State we hold per-peer.
pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
let config = if override_config.is_some() { override_config.as_ref().unwrap() } else { &self.default_configuration };
match OutboundV1Channel::new(&self.fee_estimator, &self.entropy_source, &self.signer_provider, their_network_key,
their_features, channel_value_satoshis, push_msat, user_channel_id, config,
- self.best_block.read().unwrap().height, outbound_scid_alias, temporary_channel_id)
+ self.best_block.read().unwrap().height, outbound_scid_alias, temporary_channel_id, &*self.logger)
{
Ok(res) => res,
Err(e) => {
// peer has been disabled for some time), return `channel_disabled`,
// otherwise return `temporary_channel_failure`.
let chan_update_opt = self.get_channel_update_for_onion(next_packet.outgoing_scid, chan).ok();
- if chan_update_opt.as_ref().map(|u| u.contents.flags & 2 == 2).unwrap_or(false) {
+ if chan_update_opt.as_ref().map(|u| u.contents.channel_flags & 2 == 2).unwrap_or(false) {
return Err(("Forwarding channel has been disconnected for some time.", 0x1000 | 20, chan_update_opt));
} else {
return Err(("Forwarding channel is not in a ready state.", 0x1000 | 7, chan_update_opt));
chain_hash: self.chain_hash,
short_channel_id,
timestamp: chan.context.get_update_time_counter(),
- flags: (!were_node_one) as u8 | ((!enabled as u8) << 1),
+ message_flags: 1, // Only must_be_one
+ channel_flags: (!were_node_one) as u8 | ((!enabled as u8) << 1),
cltv_expiry_delta: chan.context.get_cltv_expiry_delta(),
htlc_minimum_msat: chan.context.get_counterparty_htlc_minimum_msat(),
htlc_maximum_msat: chan.context.get_announced_htlc_max_msat(),
self.pending_outbound_payments
.send_payment_for_bolt12_invoice(
invoice, payment_id, &self.router, self.list_usable_channels(),
- || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer,
- best_block_height, &self.logger, &self.pending_events,
+ || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, &self,
+ &self.secp_ctx, best_block_height, &self.logger, &self.pending_events,
|args| self.send_payment_along_path(args)
)
}
let mut per_source_pending_forward = [(
payment.prev_short_channel_id,
+ payment.prev_counterparty_node_id,
payment.prev_funding_outpoint,
payment.prev_channel_id,
payment.prev_user_channel_id,
user_channel_id: Some(payment.prev_user_channel_id),
outpoint: payment.prev_funding_outpoint,
channel_id: payment.prev_channel_id,
+ counterparty_node_id: payment.prev_counterparty_node_id,
htlc_id: payment.prev_htlc_id,
incoming_packet_shared_secret: payment.forward_info.incoming_shared_secret,
phantom_shared_secret: None,
// Process all of the forwards and failures for the channel in which the HTLCs were
// proposed to as a batch.
- let pending_forwards = (incoming_scid, incoming_funding_txo, incoming_channel_id,
- incoming_user_channel_id, htlc_forwards.drain(..).collect());
+ let pending_forwards = (
+ incoming_scid, Some(incoming_counterparty_node_id), incoming_funding_txo,
+ incoming_channel_id, incoming_user_channel_id, htlc_forwards.drain(..).collect()
+ );
self.forward_htlcs_without_forward_event(&mut [pending_forwards]);
for (htlc_fail, htlc_destination) in htlc_fails.drain(..) {
let failure = match htlc_fail {
let mut new_events = VecDeque::new();
let mut failed_forwards = Vec::new();
- let mut phantom_receives: Vec<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)> = Vec::new();
+ let mut phantom_receives: Vec<(u64, Option<PublicKey>, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)> = Vec::new();
{
let mut forward_htlcs = new_hash_map();
mem::swap(&mut forward_htlcs, &mut self.forward_htlcs.lock().unwrap());
if short_chan_id != 0 {
let mut forwarding_counterparty = None;
macro_rules! forwarding_channel_not_found {
- () => {
- for forward_info in pending_forwards.drain(..) {
+ ($forward_infos: expr) => {
+ for forward_info in $forward_infos {
match forward_info {
HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
prev_short_channel_id, prev_htlc_id, prev_channel_id, prev_funding_outpoint,
- prev_user_channel_id, forward_info: PendingHTLCInfo {
+ prev_user_channel_id, prev_counterparty_node_id, forward_info: PendingHTLCInfo {
routing, incoming_shared_secret, payment_hash, outgoing_amt_msat,
outgoing_cltv_value, ..
}
user_channel_id: Some(prev_user_channel_id),
channel_id: prev_channel_id,
outpoint: prev_funding_outpoint,
+ counterparty_node_id: prev_counterparty_node_id,
htlc_id: prev_htlc_id,
incoming_packet_shared_secret: incoming_shared_secret,
phantom_shared_secret: $phantom_ss,
outgoing_cltv_value, Some(phantom_shared_secret), false, None,
current_height, self.default_configuration.accept_mpp_keysend)
{
- Ok(info) => phantom_receives.push((prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_user_channel_id, vec![(info, prev_htlc_id)])),
+ Ok(info) => phantom_receives.push((
+ prev_short_channel_id, prev_counterparty_node_id, prev_funding_outpoint,
+ prev_channel_id, prev_user_channel_id, vec![(info, prev_htlc_id)]
+ )),
Err(InboundHTLCErr { err_code, err_data, msg }) => failed_payment!(msg, err_code, err_data, Some(phantom_shared_secret))
}
},
let (counterparty_node_id, forward_chan_id) = match chan_info_opt {
Some((cp_id, chan_id)) => (cp_id, chan_id),
None => {
- forwarding_channel_not_found!();
+ forwarding_channel_not_found!(pending_forwards.drain(..));
continue;
}
};
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
if peer_state_mutex_opt.is_none() {
- forwarding_channel_not_found!();
+ forwarding_channel_not_found!(pending_forwards.drain(..));
continue;
}
let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
let peer_state = &mut *peer_state_lock;
- if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) {
- let logger = WithChannelContext::from(&self.logger, &chan.context, None);
- for forward_info in pending_forwards.drain(..) {
- let queue_fail_htlc_res = match forward_info {
- HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
- prev_short_channel_id, prev_htlc_id, prev_channel_id, prev_funding_outpoint,
- prev_user_channel_id, forward_info: PendingHTLCInfo {
- incoming_shared_secret, payment_hash, outgoing_amt_msat, outgoing_cltv_value,
- routing: PendingHTLCRouting::Forward {
- onion_packet, blinded, ..
- }, skimmed_fee_msat, ..
+ let mut draining_pending_forwards = pending_forwards.drain(..);
+ while let Some(forward_info) = draining_pending_forwards.next() {
+ let queue_fail_htlc_res = match forward_info {
+ HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
+ prev_short_channel_id, prev_htlc_id, prev_channel_id, prev_funding_outpoint,
+ prev_user_channel_id, prev_counterparty_node_id, forward_info: PendingHTLCInfo {
+ incoming_shared_secret, payment_hash, outgoing_amt_msat, outgoing_cltv_value,
+ routing: PendingHTLCRouting::Forward {
+ ref onion_packet, blinded, ..
+ }, skimmed_fee_msat, ..
+ },
+ }) => {
+ let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData {
+ short_channel_id: prev_short_channel_id,
+ user_channel_id: Some(prev_user_channel_id),
+ counterparty_node_id: prev_counterparty_node_id,
+ channel_id: prev_channel_id,
+ outpoint: prev_funding_outpoint,
+ htlc_id: prev_htlc_id,
+ incoming_packet_shared_secret: incoming_shared_secret,
+ // Phantom payments are only PendingHTLCRouting::Receive.
+ phantom_shared_secret: None,
+ blinded_failure: blinded.map(|b| b.failure),
+ });
+ let next_blinding_point = blinded.and_then(|b| {
+ let encrypted_tlvs_ss = self.node_signer.ecdh(
+ Recipient::Node, &b.inbound_blinding_point, None
+ ).unwrap().secret_bytes();
+ onion_utils::next_hop_pubkey(
+ &self.secp_ctx, b.inbound_blinding_point, &encrypted_tlvs_ss
+ ).ok()
+ });
+
+ // Forward the HTLC over the most appropriate channel with the corresponding peer,
+ // applying non-strict forwarding.
+ // The channel with the least amount of outbound liquidity will be used to maximize the
+ // probability of being able to successfully forward a subsequent HTLC.
+ let maybe_optimal_channel = peer_state.channel_by_id.values_mut().filter_map(|phase| match phase {
+ ChannelPhase::Funded(chan) => {
+ let balances = chan.context.get_available_balances(&self.fee_estimator);
+ if outgoing_amt_msat <= balances.next_outbound_htlc_limit_msat &&
+ outgoing_amt_msat >= balances.next_outbound_htlc_minimum_msat &&
+ chan.context.is_usable() {
+ Some((chan, balances))
+ } else {
+ None
+ }
},
- }) => {
- let logger = WithChannelContext::from(&self.logger, &chan.context, Some(payment_hash));
- log_trace!(logger, "Adding HTLC from short id {} with payment_hash {} to channel with short id {} after delay", prev_short_channel_id, &payment_hash, short_chan_id);
- let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData {
- short_channel_id: prev_short_channel_id,
- user_channel_id: Some(prev_user_channel_id),
- channel_id: prev_channel_id,
- outpoint: prev_funding_outpoint,
- htlc_id: prev_htlc_id,
- incoming_packet_shared_secret: incoming_shared_secret,
- // Phantom payments are only PendingHTLCRouting::Receive.
- phantom_shared_secret: None,
- blinded_failure: blinded.map(|b| b.failure),
- });
- let next_blinding_point = blinded.and_then(|b| {
- let encrypted_tlvs_ss = self.node_signer.ecdh(
- Recipient::Node, &b.inbound_blinding_point, None
- ).unwrap().secret_bytes();
- onion_utils::next_hop_pubkey(
- &self.secp_ctx, b.inbound_blinding_point, &encrypted_tlvs_ss
- ).ok()
- });
- if let Err(e) = chan.queue_add_htlc(outgoing_amt_msat,
- payment_hash, outgoing_cltv_value, htlc_source.clone(),
- onion_packet, skimmed_fee_msat, next_blinding_point, &self.fee_estimator,
- &&logger)
- {
- if let ChannelError::Ignore(msg) = e {
- log_trace!(logger, "Failed to forward HTLC with payment_hash {}: {}", &payment_hash, msg);
+ _ => None,
+ }).min_by_key(|(_, balances)| balances.next_outbound_htlc_limit_msat).map(|(c, _)| c);
+ let optimal_channel = match maybe_optimal_channel {
+ Some(chan) => chan,
+ None => {
+ // Fall back to the specified channel to return an appropriate error.
+ if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) {
+ chan
} else {
- panic!("Stated return value requirements in send_htlc() were not met");
+ forwarding_channel_not_found!(core::iter::once(forward_info).chain(draining_pending_forwards));
+ break;
}
+ }
+ };
+
+ let logger = WithChannelContext::from(&self.logger, &optimal_channel.context, Some(payment_hash));
+ let channel_description = if optimal_channel.context.get_short_channel_id() == Some(short_chan_id) {
+ "specified"
+ } else {
+ "alternate"
+ };
+ log_trace!(logger, "Forwarding HTLC from SCID {} with payment_hash {} and next hop SCID {} over {} channel {} with corresponding peer {}",
+ prev_short_channel_id, &payment_hash, short_chan_id, channel_description, optimal_channel.context.channel_id(), &counterparty_node_id);
+ if let Err(e) = optimal_channel.queue_add_htlc(outgoing_amt_msat,
+ payment_hash, outgoing_cltv_value, htlc_source.clone(),
+ onion_packet.clone(), skimmed_fee_msat, next_blinding_point, &self.fee_estimator,
+ &&logger)
+ {
+ if let ChannelError::Ignore(msg) = e {
+ log_trace!(logger, "Failed to forward HTLC with payment_hash {} to peer {}: {}", &payment_hash, &counterparty_node_id, msg);
+ } else {
+ panic!("Stated return value requirements in send_htlc() were not met");
+ }
+
+ if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) {
let (failure_code, data) = self.get_htlc_temp_fail_err_and_data(0x1000|7, short_chan_id, chan);
failed_forwards.push((htlc_source, payment_hash,
HTLCFailReason::reason(failure_code, data),
HTLCDestination::NextHopChannel { node_id: Some(chan.context.get_counterparty_node_id()), channel_id: forward_chan_id }
));
- continue;
+ } else {
+ forwarding_channel_not_found!(core::iter::once(forward_info).chain(draining_pending_forwards));
+ break;
}
- None
- },
- HTLCForwardInfo::AddHTLC { .. } => {
- panic!("short_channel_id != 0 should imply any pending_forward entries are of type Forward");
- },
- HTLCForwardInfo::FailHTLC { htlc_id, err_packet } => {
+ }
+ None
+ },
+ HTLCForwardInfo::AddHTLC { .. } => {
+ panic!("short_channel_id != 0 should imply any pending_forward entries are of type Forward");
+ },
+ HTLCForwardInfo::FailHTLC { htlc_id, ref err_packet } => {
+ if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) {
+ let logger = WithChannelContext::from(&self.logger, &chan.context, None);
log_trace!(logger, "Failing HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id);
- Some((chan.queue_fail_htlc(htlc_id, err_packet, &&logger), htlc_id))
- },
- HTLCForwardInfo::FailMalformedHTLC { htlc_id, failure_code, sha256_of_onion } => {
+ Some((chan.queue_fail_htlc(htlc_id, err_packet.clone(), &&logger), htlc_id))
+ } else {
+ forwarding_channel_not_found!(core::iter::once(forward_info).chain(draining_pending_forwards));
+ break;
+ }
+ },
+ HTLCForwardInfo::FailMalformedHTLC { htlc_id, failure_code, sha256_of_onion } => {
+ if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) {
+ let logger = WithChannelContext::from(&self.logger, &chan.context, None);
log_trace!(logger, "Failing malformed HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id);
let res = chan.queue_fail_malformed_htlc(
htlc_id, failure_code, sha256_of_onion, &&logger
);
Some((res, htlc_id))
- },
- };
- if let Some((queue_fail_htlc_res, htlc_id)) = queue_fail_htlc_res {
- if let Err(e) = queue_fail_htlc_res {
- if let ChannelError::Ignore(msg) = e {
+ } else {
+ forwarding_channel_not_found!(core::iter::once(forward_info).chain(draining_pending_forwards));
+ break;
+ }
+ },
+ };
+ if let Some((queue_fail_htlc_res, htlc_id)) = queue_fail_htlc_res {
+ if let Err(e) = queue_fail_htlc_res {
+ if let ChannelError::Ignore(msg) = e {
+ if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) {
+ let logger = WithChannelContext::from(&self.logger, &chan.context, None);
log_trace!(logger, "Failed to fail HTLC with ID {} backwards to short_id {}: {}", htlc_id, short_chan_id, msg);
- } else {
- panic!("Stated return value requirements in queue_fail_{{malformed_}}htlc() were not met");
}
- // fail-backs are best-effort, we probably already have one
- // pending, and if not that's OK, if not, the channel is on
- // the chain and sending the HTLC-Timeout is their problem.
- continue;
+ } else {
+ panic!("Stated return value requirements in queue_fail_{{malformed_}}htlc() were not met");
}
+ // fail-backs are best-effort, we probably already have one
+ // pending, and if not that's OK, if not, the channel is on
+ // the chain and sending the HTLC-Timeout is their problem.
+ continue;
}
}
- } else {
- forwarding_channel_not_found!();
- continue;
}
} else {
'next_forwardable_htlc: for forward_info in pending_forwards.drain(..) {
match forward_info {
HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
prev_short_channel_id, prev_htlc_id, prev_channel_id, prev_funding_outpoint,
- prev_user_channel_id, forward_info: PendingHTLCInfo {
+ prev_user_channel_id, prev_counterparty_node_id, forward_info: PendingHTLCInfo {
routing, incoming_shared_secret, payment_hash, incoming_amt_msat, outgoing_amt_msat,
skimmed_fee_msat, ..
}
prev_hop: HTLCPreviousHopData {
short_channel_id: prev_short_channel_id,
user_channel_id: Some(prev_user_channel_id),
+ counterparty_node_id: prev_counterparty_node_id,
channel_id: prev_channel_id,
outpoint: prev_funding_outpoint,
htlc_id: prev_htlc_id,
failed_forwards.push((HTLCSource::PreviousHopData(HTLCPreviousHopData {
short_channel_id: $htlc.prev_hop.short_channel_id,
user_channel_id: $htlc.prev_hop.user_channel_id,
+ counterparty_node_id: $htlc.prev_hop.counterparty_node_id,
channel_id: prev_channel_id,
outpoint: prev_funding_outpoint,
htlc_id: $htlc.prev_hop.htlc_id,
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
- let mut sources = {
+ let sources = {
let mut claimable_payments = self.claimable_payments.lock().unwrap();
if let Some(payment) = claimable_payments.claimable_payments.remove(&payment_hash) {
let mut receiver_node_id = self.our_network_pubkey;
return;
}
if valid_mpp {
- for htlc in sources.drain(..) {
+ let pending_mpp_claim_ptr_opt = if sources.len() > 1 {
+ let channels_without_preimage = sources.iter().filter_map(|htlc| {
+ if let Some(cp_id) = htlc.prev_hop.counterparty_node_id {
+ let prev_hop = &htlc.prev_hop;
+ Some((cp_id, prev_hop.outpoint, prev_hop.channel_id, prev_hop.htlc_id))
+ } else {
+ None
+ }
+ }).collect();
+ Some(Arc::new(Mutex::new(PendingMPPClaim {
+ channels_without_preimage,
+ channels_with_preimage: Vec::new(),
+ })))
+ } else {
+ None
+ };
+ for htlc in sources {
+ let this_mpp_claim = pending_mpp_claim_ptr_opt.as_ref().and_then(|pending_mpp_claim|
+ if let Some(cp_id) = htlc.prev_hop.counterparty_node_id {
+ let claim_ptr = PendingMPPClaimPointer(Arc::clone(pending_mpp_claim));
+ Some((cp_id, htlc.prev_hop.channel_id, htlc.prev_hop.htlc_id, claim_ptr))
+ } else {
+ None
+ }
+ );
+ let raa_blocker = pending_mpp_claim_ptr_opt.as_ref().map(|pending_claim| {
+ RAAMonitorUpdateBlockingAction::ClaimedMPPPayment {
+ pending_claim: PendingMPPClaimPointer(Arc::clone(pending_claim)),
+ }
+ });
self.claim_funds_from_hop(
htlc.prev_hop, payment_preimage,
|_, definitely_duplicate| {
debug_assert!(!definitely_duplicate, "We shouldn't claim duplicatively from a payment");
- Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash })
+ (Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim: this_mpp_claim }), raa_blocker)
}
);
}
- }
- if !valid_mpp {
- for htlc in sources.drain(..) {
+ } else {
+ for htlc in sources {
let mut htlc_msat_height_data = htlc.value.to_be_bytes().to_vec();
htlc_msat_height_data.extend_from_slice(&self.best_block.read().unwrap().height.to_be_bytes());
let source = HTLCSource::PreviousHopData(htlc.prev_hop);
}
}
- fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>, bool) -> Option<MonitorUpdateCompletionAction>>(
+ fn claim_funds_from_hop<
+ ComplFunc: FnOnce(Option<u64>, bool) -> (Option<MonitorUpdateCompletionAction>, Option<RAAMonitorUpdateBlockingAction>)
+ >(
&self, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage,
completion_action: ComplFunc,
) {
match fulfill_res {
UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } => {
- if let Some(action) = completion_action(Some(htlc_value_msat), false) {
+ let (action_opt, raa_blocker_opt) = completion_action(Some(htlc_value_msat), false);
+ if let Some(action) = action_opt {
log_trace!(logger, "Tracking monitor update completion action for channel {}: {:?}",
chan_id, action);
peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action);
}
+ if let Some(raa_blocker) = raa_blocker_opt {
+ peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker);
+ }
if !during_init {
handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock,
peer_state, per_peer_state, chan);
}
}
UpdateFulfillCommitFetch::DuplicateClaim {} => {
- let action = if let Some(action) = completion_action(None, true) {
+ let (action_opt, raa_blocker_opt) = completion_action(None, true);
+ if let Some(raa_blocker) = raa_blocker_opt {
+ debug_assert!(peer_state.actions_blocking_raa_monitor_updates.get(&chan_id).unwrap().contains(&raa_blocker));
+ }
+ let action = if let Some(action) = action_opt {
action
} else {
return;
};
+
mem::drop(peer_state_lock);
log_trace!(logger, "Completing monitor update completion action for channel {} as claim was redundant: {:?}",
// `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are
// generally always allowed to be duplicative (and it's specifically noted in
// `PaymentForwarded`).
- self.handle_monitor_update_completion_actions(completion_action(None, false));
+ let (action_opt, raa_blocker_opt) = completion_action(None, false);
+
+ if let Some(raa_blocker) = raa_blocker_opt {
+ let counterparty_node_id = prev_hop.counterparty_node_id.or_else(||
+ // prev_hop.counterparty_node_id is always available for payments received after
+ // LDK 0.0.123, but for those received on 0.0.123 and claimed later, we need to
+ // look up the counterparty in the `action_opt`, if possible.
+ action_opt.as_ref().and_then(|action|
+ if let MonitorUpdateCompletionAction::PaymentClaimed { pending_mpp_claim, .. } = action {
+ pending_mpp_claim.as_ref().map(|(node_id, _, _, _)| *node_id)
+ } else { None }
+ )
+ );
+ if let Some(counterparty_node_id) = counterparty_node_id {
+ // TODO: Avoid always blocking the world for the write lock here.
+ let mut per_peer_state = self.per_peer_state.write().unwrap();
+ let peer_state_mutex = per_peer_state.entry(counterparty_node_id).or_insert_with(||
+ Mutex::new(PeerState {
+ channel_by_id: new_hash_map(),
+ inbound_channel_request_by_id: new_hash_map(),
+ latest_features: InitFeatures::empty(),
+ pending_msg_events: Vec::new(),
+ in_flight_monitor_updates: BTreeMap::new(),
+ monitor_update_blocked_actions: BTreeMap::new(),
+ actions_blocking_raa_monitor_updates: BTreeMap::new(),
+ is_connected: false,
+ }));
+ let mut peer_state = peer_state_mutex.lock().unwrap();
+
+ peer_state.actions_blocking_raa_monitor_updates
+ .entry(prev_hop.channel_id)
+ .or_insert_with(Vec::new)
+ .push(raa_blocker);
+ } else {
+ debug_assert!(false,
+ "RAA ChannelMonitorUpdate blockers are only set with PaymentClaimed completion actions, so we should always have a counterparty node id");
+ }
+ }
+
+ self.handle_monitor_update_completion_actions(action_opt);
}
fn finalize_claims(&self, sources: Vec<HTLCSource>) {
|htlc_claim_value_msat, definitely_duplicate| {
let chan_to_release =
if let Some(node_id) = next_channel_counterparty_node_id {
- Some((node_id, next_channel_outpoint, next_channel_id, completed_blocker))
+ Some(EventUnblockedChannel {
+ counterparty_node_id: node_id,
+ funding_txo: next_channel_outpoint,
+ channel_id: next_channel_id,
+ blocking_action: completed_blocker
+ })
} else {
// We can only get `None` here if we are processing a
// `ChannelMonitor`-originated event, in which case we
}
}), "{:?}", *background_events);
}
- None
+ (None, None)
} else if definitely_duplicate {
if let Some(other_chan) = chan_to_release {
- Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
- downstream_counterparty_node_id: other_chan.0,
- downstream_funding_outpoint: other_chan.1,
- downstream_channel_id: other_chan.2,
- blocking_action: other_chan.3,
- })
- } else { None }
+ (Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
+ downstream_counterparty_node_id: other_chan.counterparty_node_id,
+ downstream_funding_outpoint: other_chan.funding_txo,
+ downstream_channel_id: other_chan.channel_id,
+ blocking_action: other_chan.blocking_action,
+ }), None)
+ } else { (None, None) }
} else {
let total_fee_earned_msat = if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat {
if let Some(claimed_htlc_value) = htlc_claim_value_msat {
} else { None };
debug_assert!(skimmed_fee_msat <= total_fee_earned_msat,
"skimmed_fee_msat must always be included in total_fee_earned_msat");
- Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
+ (Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
event: events::Event::PaymentForwarded {
prev_channel_id: Some(prev_channel_id),
next_channel_id: Some(next_channel_id),
outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
},
downstream_counterparty_and_funding_outpoint: chan_to_release,
- })
+ }), None)
}
});
},
debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread);
debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread);
+ let mut freed_channels = Vec::new();
+
for action in actions.into_iter() {
match action {
- MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } => {
+ MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim } => {
+ if let Some((counterparty_node_id, chan_id, htlc_id, claim_ptr)) = pending_mpp_claim {
+ let per_peer_state = self.per_peer_state.read().unwrap();
+ per_peer_state.get(&counterparty_node_id).map(|peer_state_mutex| {
+ let mut peer_state = peer_state_mutex.lock().unwrap();
+ let blockers_entry = peer_state.actions_blocking_raa_monitor_updates.entry(chan_id);
+ if let btree_map::Entry::Occupied(mut blockers) = blockers_entry {
+ blockers.get_mut().retain(|blocker|
+ if let &RAAMonitorUpdateBlockingAction::ClaimedMPPPayment { pending_claim } = &blocker {
+ if *pending_claim == claim_ptr {
+ let mut pending_claim_state_lock = pending_claim.0.lock().unwrap();
+ let pending_claim_state = &mut *pending_claim_state_lock;
+ pending_claim_state.channels_without_preimage.retain(|(cp, outp, cid, hid)| {
+ if *cp == counterparty_node_id && *cid == chan_id && *hid == htlc_id {
+ pending_claim_state.channels_with_preimage.push((*cp, *outp, *cid));
+ false
+ } else { true }
+ });
+ if pending_claim_state.channels_without_preimage.is_empty() {
+ for (cp, outp, cid) in pending_claim_state.channels_with_preimage.iter() {
+ freed_channels.push((*cp, *outp, *cid, blocker.clone()));
+ }
+ }
+ !pending_claim_state.channels_without_preimage.is_empty()
+ } else { true }
+ } else { true }
+ );
+ if blockers.get().is_empty() {
+ blockers.remove();
+ }
+ }
+ });
+ }
+
let payment = self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash);
if let Some(ClaimingPayment {
amount_msat,
event, downstream_counterparty_and_funding_outpoint
} => {
self.pending_events.lock().unwrap().push_back((event, None));
- if let Some((node_id, funding_outpoint, channel_id, blocker)) = downstream_counterparty_and_funding_outpoint {
- self.handle_monitor_update_release(node_id, funding_outpoint, channel_id, Some(blocker));
+ if let Some(unblocked) = downstream_counterparty_and_funding_outpoint {
+ self.handle_monitor_update_release(
+ unblocked.counterparty_node_id, unblocked.funding_txo,
+ unblocked.channel_id, Some(unblocked.blocking_action),
+ );
}
},
MonitorUpdateCompletionAction::FreeOtherChannelImmediately {
},
}
}
+
+ for (node_id, funding_outpoint, channel_id, blocker) in freed_channels {
+ self.handle_monitor_update_release(node_id, funding_outpoint, channel_id, Some(blocker));
+ }
}
/// Handles a channel reentering a functional state, either due to reconnect or a monitor
pending_forwards: Vec<(PendingHTLCInfo, u64)>, pending_update_adds: Vec<msgs::UpdateAddHTLC>,
funding_broadcastable: Option<Transaction>,
channel_ready: Option<msgs::ChannelReady>, announcement_sigs: Option<msgs::AnnouncementSignatures>)
- -> (Option<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)>, Option<(u64, Vec<msgs::UpdateAddHTLC>)>) {
+ -> (Option<(u64, Option<PublicKey>, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)>, Option<(u64, Vec<msgs::UpdateAddHTLC>)>) {
let logger = WithChannelContext::from(&self.logger, &channel.context, None);
log_trace!(logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {} pending update_add_htlcs, {}broadcasting funding, {} channel ready, {} announcement",
&channel.context.channel_id(),
let mut htlc_forwards = None;
if !pending_forwards.is_empty() {
- htlc_forwards = Some((short_channel_id, channel.context.get_funding_txo().unwrap(),
- channel.context.channel_id(), channel.context.get_user_id(), pending_forwards));
+ htlc_forwards = Some((
+ short_channel_id, Some(channel.context.get_counterparty_node_id()),
+ channel.context.get_funding_txo().unwrap(), channel.context.channel_id(),
+ channel.context.get_user_id(), pending_forwards
+ ));
}
let mut decode_update_add_htlcs = None;
if !pending_update_adds.is_empty() {
}
#[inline]
- fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) {
+ fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, Option<PublicKey>, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) {
let push_forward_event = self.forward_htlcs_without_forward_event(per_source_pending_forwards);
if push_forward_event { self.push_pending_forwards_ev() }
}
#[inline]
- fn forward_htlcs_without_forward_event(&self, per_source_pending_forwards: &mut [(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) -> bool {
+ fn forward_htlcs_without_forward_event(&self, per_source_pending_forwards: &mut [(u64, Option<PublicKey>, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) -> bool {
let mut push_forward_event = false;
- for &mut (prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards {
+ for &mut (prev_short_channel_id, prev_counterparty_node_id, prev_funding_outpoint, prev_channel_id, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards {
let mut new_intercept_events = VecDeque::new();
let mut failed_intercept_forwards = Vec::new();
if !pending_forwards.is_empty() {
match forward_htlcs.entry(scid) {
hash_map::Entry::Occupied(mut entry) => {
entry.get_mut().push(HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
- prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info }));
+ prev_short_channel_id, prev_counterparty_node_id, prev_funding_outpoint,
+ prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info
+ }));
},
hash_map::Entry::Vacant(entry) => {
if !is_our_scid && forward_info.incoming_amt_msat.is_some() &&
intercept_id
}, None));
entry.insert(PendingAddHTLCInfo {
- prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info });
+ prev_short_channel_id, prev_counterparty_node_id, prev_funding_outpoint,
+ prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info
+ });
},
hash_map::Entry::Occupied(_) => {
let logger = WithContext::from(&self.logger, None, Some(prev_channel_id), Some(forward_info.payment_hash));
let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData {
short_channel_id: prev_short_channel_id,
user_channel_id: Some(prev_user_channel_id),
+ counterparty_node_id: prev_counterparty_node_id,
outpoint: prev_funding_outpoint,
channel_id: prev_channel_id,
htlc_id: prev_htlc_id,
// payments are being processed.
push_forward_event |= forward_htlcs_empty && decode_update_add_htlcs_empty;
entry.insert(vec!(HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
- prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info })));
+ prev_short_channel_id, prev_counterparty_node_id, prev_funding_outpoint,
+ prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info
+ })));
}
}
}
return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a channel_update for a channel from the wrong node - it shouldn't know about our private channels!".to_owned(), chan_id));
}
let were_node_one = self.get_our_node_id().serialize()[..] < chan.context.get_counterparty_node_id().serialize()[..];
- let msg_from_node_one = msg.contents.flags & 1 == 0;
+ let msg_from_node_one = msg.contents.channel_flags & 1 == 0;
if were_node_one == msg_from_node_one {
return Ok(NotifyOption::SkipPersistNoEvents);
} else {
match phase {
ChannelPhase::Funded(chan) => {
let msgs = chan.signer_maybe_unblocked(&self.logger);
- if let Some(updates) = msgs.commitment_update {
- pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
- node_id,
- updates,
- });
+ let cu_msg = msgs.commitment_update.map(|updates| events::MessageSendEvent::UpdateHTLCs {
+ node_id,
+ updates,
+ });
+ let raa_msg = msgs.revoke_and_ack.map(|msg| events::MessageSendEvent::SendRevokeAndACK {
+ node_id,
+ msg,
+ });
+ match (cu_msg, raa_msg) {
+ (Some(cu), Some(raa)) if msgs.order == RAACommitmentOrder::CommitmentFirst => {
+ pending_msg_events.push(cu);
+ pending_msg_events.push(raa);
+ },
+ (Some(cu), Some(raa)) if msgs.order == RAACommitmentOrder::RevokeAndACKFirst => {
+ pending_msg_events.push(raa);
+ pending_msg_events.push(cu);
+ },
+ (Some(cu), _) => pending_msg_events.push(cu),
+ (_, Some(raa)) => pending_msg_events.push(raa),
+ (_, _) => {},
}
if let Some(msg) = msgs.funding_signed {
pending_msg_events.push(events::MessageSendEvent::SendFundingSigned {
let entropy = &*$self.entropy_source;
let secp_ctx = &$self.secp_ctx;
- let path = $self.create_blinded_path_using_absolute_expiry(absolute_expiry)
+ let path = $self.create_blinded_paths_using_absolute_expiry(OffersContext::Unknown {}, absolute_expiry)
+ .and_then(|paths| paths.into_iter().next().ok_or(()))
.map_err(|_| Bolt12SemanticError::MissingPaths)?;
+
let builder = OfferBuilder::deriving_signing_pubkey(
node_id, expanded_key, entropy, secp_ctx
)
let entropy = &*$self.entropy_source;
let secp_ctx = &$self.secp_ctx;
- let path = $self.create_blinded_path_using_absolute_expiry(Some(absolute_expiry))
+ let context = OffersContext::OutboundPayment { payment_id };
+ let path = $self.create_blinded_paths_using_absolute_expiry(context, Some(absolute_expiry))
+ .and_then(|paths| paths.into_iter().next().ok_or(()))
.map_err(|_| Bolt12SemanticError::MissingPaths)?;
+
let builder = RefundBuilder::deriving_payer_id(
node_id, expanded_key, entropy, secp_ctx, amount_msats, payment_id
)?
}
} }
+/// Defines the maximum number of [`OffersMessage`] including different reply paths to be sent
+/// along different paths.
+/// Sending multiple requests increases the chances of successful delivery in case some
+/// paths are unavailable. However, only one invoice for a given [`PaymentId`] will be paid,
+/// even if multiple invoices are received.
+const OFFERS_MESSAGE_REQUEST_LIMIT: usize = 10;
+
impl<M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref> ChannelManager<M, T, ES, NS, SP, F, R, L>
where
M::Target: chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
Some(payer_note) => builder.payer_note(payer_note),
};
let invoice_request = builder.build_and_sign()?;
- let reply_path = self.create_blinded_path().map_err(|_| Bolt12SemanticError::MissingPaths)?;
+
+ let context = OffersContext::OutboundPayment { payment_id };
+ let reply_paths = self.create_blinded_paths(context).map_err(|_| Bolt12SemanticError::MissingPaths)?;
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();
if !offer.paths().is_empty() {
- // Send as many invoice requests as there are paths in the offer (with an upper bound).
- // Using only one path could result in a failure if the path no longer exists. But only
- // one invoice for a given payment id will be paid, even if more than one is received.
- const REQUEST_LIMIT: usize = 10;
- for path in offer.paths().into_iter().take(REQUEST_LIMIT) {
+ reply_paths
+ .iter()
+ .flat_map(|reply_path| offer.paths().iter().map(move |path| (path, reply_path)))
+ .take(OFFERS_MESSAGE_REQUEST_LIMIT)
+ .for_each(|(path, reply_path)| {
+ let message = new_pending_onion_message(
+ OffersMessage::InvoiceRequest(invoice_request.clone()),
+ Destination::BlindedPath(path.clone()),
+ Some(reply_path.clone()),
+ );
+ pending_offers_messages.push(message);
+ });
+ } else if let Some(signing_pubkey) = offer.signing_pubkey() {
+ for reply_path in reply_paths {
let message = new_pending_onion_message(
OffersMessage::InvoiceRequest(invoice_request.clone()),
- Destination::BlindedPath(path.clone()),
- Some(reply_path.clone()),
+ Destination::Node(signing_pubkey),
+ Some(reply_path),
);
pending_offers_messages.push(message);
}
- } else if let Some(signing_pubkey) = offer.signing_pubkey() {
- let message = new_pending_onion_message(
- OffersMessage::InvoiceRequest(invoice_request),
- Destination::Node(signing_pubkey),
- Some(reply_path),
- );
- pending_offers_messages.push(message);
} else {
debug_assert!(false);
return Err(Bolt12SemanticError::MissingSigningPubkey);
)?;
let builder: InvoiceBuilder<DerivedSigningPubkey> = builder.into();
let invoice = builder.allow_mpp().build_and_sign(secp_ctx)?;
- let reply_path = self.create_blinded_path()
+ let reply_paths = self.create_blinded_paths(OffersContext::Unknown {})
.map_err(|_| Bolt12SemanticError::MissingPaths)?;
let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();
if refund.paths().is_empty() {
- let message = new_pending_onion_message(
- OffersMessage::Invoice(invoice.clone()),
- Destination::Node(refund.payer_id()),
- Some(reply_path),
- );
- pending_offers_messages.push(message);
- } else {
- for path in refund.paths() {
+ for reply_path in reply_paths {
let message = new_pending_onion_message(
OffersMessage::Invoice(invoice.clone()),
- Destination::BlindedPath(path.clone()),
- Some(reply_path.clone()),
+ Destination::Node(refund.payer_id()),
+ Some(reply_path),
);
pending_offers_messages.push(message);
}
+ } else {
+ reply_paths
+ .iter()
+ .flat_map(|reply_path| refund.paths().iter().map(move |path| (path, reply_path)))
+ .take(OFFERS_MESSAGE_REQUEST_LIMIT)
+ .for_each(|(path, reply_path)| {
+ let message = new_pending_onion_message(
+ OffersMessage::Invoice(invoice.clone()),
+ Destination::BlindedPath(path.clone()),
+ Some(reply_path.clone()),
+ );
+ pending_offers_messages.push(message);
+ });
}
Ok(invoice)
inbound_payment::get_payment_preimage(payment_hash, payment_secret, &self.inbound_payment_key)
}
- /// Creates a blinded path by delegating to [`MessageRouter`] based on the path's intended
- /// lifetime.
+ /// Creates a collection of blinded paths by delegating to [`MessageRouter`] based on
+ /// the path's intended lifetime.
///
/// Whether or not the path is compact depends on whether the path is short-lived or long-lived,
/// respectively, based on the given `absolute_expiry` as seconds since the Unix epoch. See
/// [`MAX_SHORT_LIVED_RELATIVE_EXPIRY`].
- fn create_blinded_path_using_absolute_expiry(
- &self, absolute_expiry: Option<Duration>
- ) -> Result<BlindedPath, ()> {
+ fn create_blinded_paths_using_absolute_expiry(
+ &self, context: OffersContext, absolute_expiry: Option<Duration>,
+ ) -> Result<Vec<BlindedPath>, ()> {
let now = self.duration_since_epoch();
let max_short_lived_absolute_expiry = now.saturating_add(MAX_SHORT_LIVED_RELATIVE_EXPIRY);
if absolute_expiry.unwrap_or(Duration::MAX) <= max_short_lived_absolute_expiry {
- self.create_compact_blinded_path()
+ self.create_compact_blinded_paths(context)
} else {
- self.create_blinded_path()
+ self.create_blinded_paths(context)
}
}
now
}
- /// Creates a blinded path by delegating to [`MessageRouter::create_blinded_paths`].
+ /// Creates a collection of blinded paths by delegating to
+ /// [`MessageRouter::create_blinded_paths`].
///
- /// Errors if the `MessageRouter` errors or returns an empty `Vec`.
- fn create_blinded_path(&self) -> Result<BlindedPath, ()> {
+ /// Errors if the `MessageRouter` errors.
+ fn create_blinded_paths(&self, context: OffersContext) -> Result<Vec<BlindedPath>, ()> {
let recipient = self.get_our_node_id();
let secp_ctx = &self.secp_ctx;
.collect::<Vec<_>>();
self.router
- .create_blinded_paths(recipient, peers, secp_ctx)
- .and_then(|paths| paths.into_iter().next().ok_or(()))
+ .create_blinded_paths(recipient, MessageContext::Offers(context), peers, secp_ctx)
+ .and_then(|paths| (!paths.is_empty()).then(|| paths).ok_or(()))
}
- /// Creates a blinded path by delegating to [`MessageRouter::create_compact_blinded_paths`].
+ /// Creates a collection of blinded paths by delegating to
+ /// [`MessageRouter::create_compact_blinded_paths`].
///
- /// Errors if the `MessageRouter` errors or returns an empty `Vec`.
- fn create_compact_blinded_path(&self) -> Result<BlindedPath, ()> {
+ /// Errors if the `MessageRouter` errors.
+ fn create_compact_blinded_paths(&self, context: OffersContext) -> Result<Vec<BlindedPath>, ()> {
let recipient = self.get_our_node_id();
let secp_ctx = &self.secp_ctx;
.collect::<Vec<_>>();
self.router
- .create_compact_blinded_paths(recipient, peers, secp_ctx)
- .and_then(|paths| paths.into_iter().next().ok_or(()))
+ .create_compact_blinded_paths(recipient, MessageContext::Offers(context), peers, secp_ctx)
+ .and_then(|paths| (!paths.is_empty()).then(|| paths).ok_or(()))
}
/// Creates multi-hop blinded payment paths for the given `amount_msats` by delegating to
htlc_id: htlc.prev_htlc_id,
incoming_packet_shared_secret: htlc.forward_info.incoming_shared_secret,
phantom_shared_secret: None,
+ counterparty_node_id: htlc.prev_counterparty_node_id,
outpoint: htlc.prev_funding_outpoint,
channel_id: htlc.prev_channel_id,
blinded_failure: htlc.forward_info.routing.blinded_failure(),
}
#[cfg(splicing)]
- fn handle_splice(&self, counterparty_node_id: &PublicKey, msg: &msgs::Splice) {
+ fn handle_splice_init(&self, counterparty_node_id: &PublicKey, msg: &msgs::SpliceInit) {
let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close(
"Splicing not supported".to_owned(),
msg.channel_id.clone())), *counterparty_node_id);
// Quiescence
&events::MessageSendEvent::SendStfu { .. } => false,
// Splicing
- &events::MessageSendEvent::SendSplice { .. } => false,
+ &events::MessageSendEvent::SendSpliceInit { .. } => false,
&events::MessageSendEvent::SendSpliceAck { .. } => false,
&events::MessageSendEvent::SendSpliceLocked { .. } => false,
// Interactive Transaction Construction
R::Target: Router,
L::Target: Logger,
{
- fn handle_message(&self, message: OffersMessage, responder: Option<Responder>) -> ResponseInstruction<OffersMessage> {
+ fn handle_message(&self, message: OffersMessage, context: OffersContext, responder: Option<Responder>) -> ResponseInstruction<OffersMessage> {
let secp_ctx = &self.secp_ctx;
let expanded_key = &self.inbound_payment_key;
+ let abandon_if_payment = |context| {
+ match context {
+ OffersContext::OutboundPayment { payment_id } => self.abandon_payment(payment_id),
+ _ => {},
+ }
+ };
+
match message {
OffersMessage::InvoiceRequest(invoice_request) => {
let responder = match responder {
};
match result {
- Ok(()) => ResponseInstruction::NoResponse,
- Err(e) => match responder {
- Some(responder) => responder.respond(OffersMessage::InvoiceError(e)),
+ Ok(_) => ResponseInstruction::NoResponse,
+ Err(err) => match responder {
+ Some(responder) => {
+ abandon_if_payment(context);
+ responder.respond(OffersMessage::InvoiceError(err))
+ },
None => {
- log_trace!(self.logger, "No reply path for sending invoice error: {:?}", e);
- ResponseInstruction::NoResponse
+ abandon_if_payment(context);
+ log_trace!(
+ self.logger,
+ "An error response was generated, but there is no reply_path specified \
+ for sending the response. Error: {}",
+ err
+ );
+ return ResponseInstruction::NoResponse;
},
},
}
},
+ #[cfg(async_payments)]
+ OffersMessage::StaticInvoice(_invoice) => {
+ match responder {
+ Some(responder) => {
+ responder.respond(OffersMessage::InvoiceError(
+ InvoiceError::from_string("Static invoices not yet supported".to_string())
+ ))
+ },
+ None => return ResponseInstruction::NoResponse,
+ }
+ },
OffersMessage::InvoiceError(invoice_error) => {
+ abandon_if_payment(context);
log_trace!(self.logger, "Received invoice_error: {}", invoice_error);
ResponseInstruction::NoResponse
},
}
}
+impl<M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref>
+AsyncPaymentsMessageHandler for ChannelManager<M, T, ES, NS, SP, F, R, L>
+where
+ M::Target: chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
+ T::Target: BroadcasterInterface,
+ ES::Target: EntropySource,
+ NS::Target: NodeSigner,
+ SP::Target: SignerProvider,
+ F::Target: FeeEstimator,
+ R::Target: Router,
+ L::Target: Logger,
+{
+ fn held_htlc_available(
+ &self, _message: HeldHtlcAvailable, _responder: Option<Responder>
+ ) -> ResponseInstruction<ReleaseHeldHtlc> {
+ ResponseInstruction::NoResponse
+ }
+
+ fn release_held_htlc(&self, _message: ReleaseHeldHtlc) {}
+
+ fn release_pending_messages(&self) -> Vec<PendingOnionMessage<AsyncPaymentsMessage>> {
+ Vec::new()
+ }
+}
+
impl<M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref>
NodeIdLookUp for ChannelManager<M, T, ES, NS, SP, F, R, L>
where
(4, payment_data, option), // Added in 0.0.116
(5, custom_tlvs, optional_vec),
},
-;);
+);
impl_writeable_tlv_based!(PendingHTLCInfo, {
(0, routing, required),
}
}
-impl_writeable_tlv_based_enum!(PendingHTLCStatus, ;
+impl_writeable_tlv_based_enum_legacy!(PendingHTLCStatus, ;
(0, Forward),
(1, Fail),
);
impl_writeable_tlv_based_enum!(BlindedFailure,
(0, FromIntroductionNode) => {},
- (2, FromBlindedNode) => {}, ;
+ (2, FromBlindedNode) => {},
);
impl_writeable_tlv_based!(HTLCPreviousHopData, {
// Note that by the time we get past the required read for type 2 above, outpoint will be
// filled in, so we can safely unwrap it here.
(9, channel_id, (default_value, ChannelId::v1_from_funding_outpoint(outpoint.0.unwrap()))),
+ (11, counterparty_node_id, option),
});
impl Writeable for ClaimableHTLC {
// Note that by the time we get past the required read for type 6 above, prev_funding_outpoint will be
// filled in, so we can safely unwrap it here.
(7, prev_channel_id, (default_value, ChannelId::v1_from_funding_outpoint(prev_funding_outpoint.0.unwrap()))),
+ (9, prev_counterparty_node_id, option),
});
impl Writeable for HTLCForwardInfo {
for action in actions.iter() {
if let MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel {
downstream_counterparty_and_funding_outpoint:
- Some((blocked_node_id, _blocked_channel_outpoint, blocked_channel_id, blocking_action)), ..
+ Some(EventUnblockedChannel {
+ counterparty_node_id: blocked_node_id,
+ funding_txo: _,
+ channel_id: blocked_channel_id,
+ blocking_action,
+ }), ..
} = action {
if let Some(blocked_peer_state) = per_peer_state.get(blocked_node_id) {
log_trace!(logger,
// update message and would always update the local fee info, even if our peer was
// (spuriously) forwarding us our own channel_update.
let as_node_one = nodes[0].node.get_our_node_id().serialize()[..] < nodes[1].node.get_our_node_id().serialize()[..];
- let as_update = if as_node_one == (chan.0.contents.flags & 1 == 0 /* chan.0 is from node one */) { &chan.0 } else { &chan.1 };
- let bs_update = if as_node_one == (chan.0.contents.flags & 1 == 0 /* chan.0 is from node one */) { &chan.1 } else { &chan.0 };
+ let as_update = if as_node_one == (chan.0.contents.channel_flags & 1 == 0 /* chan.0 is from node one */) { &chan.0 } else { &chan.1 };
+ let bs_update = if as_node_one == (chan.0.contents.channel_flags & 1 == 0 /* chan.0 is from node one */) { &chan.1 } else { &chan.0 };
// First deliver each peers' own message, checking that the node doesn't need to be
// persisted and that its channel info remains the same.