X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;ds=sidebyside;f=lightning%2Fsrc%2Fln%2Fchannelmanager.rs;h=274edf6d7b4238495eb87390bb8453ad76ca3a8b;hb=refs%2Fheads%2Fupstream%2Fmain;hp=508d13a15822acffe1fc7a5d2decfd333632275f;hpb=249886497e83f192ad96785acb668fca92398101;p=rust-lightning diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 508d13a1..561053d8 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -31,6 +31,7 @@ use bitcoin::secp256k1::{SecretKey,PublicKey}; use bitcoin::secp256k1::Secp256k1; use bitcoin::{secp256k1, Sequence}; +use crate::blinded_path::message::{MessageContext, OffersContext}; use crate::blinded_path::{BlindedPath, NodeIdLookUp}; use crate::blinded_path::message::ForwardNode; use crate::blinded_path::payment::{Bolt12OfferContext, Bolt12RefundContext, PaymentConstraints, PaymentContext, ReceiveTlvs}; @@ -40,7 +41,7 @@ use crate::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, Fee use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, WithChannelMonitor, ChannelMonitorUpdateStep, HTLC_FAIL_BACK_BUFFER, CLTV_CLAIM_BUFFER, LATENCY_GRACE_PERIOD_BLOCKS, ANTI_REORG_DELAY, MonitorEvent, CLOSED_CHANNEL_UPDATE_ID}; use crate::chain::transaction::{OutPoint, TransactionData}; use crate::events; -use crate::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination, PaymentFailureReason}; +use crate::events::{Event, EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider, ClosureReason, HTLCDestination, PaymentFailureReason, ReplayEvent}; // Since this struct is returned in `list_channels` methods, expose it here in case users want to // construct one themselves. use crate::ln::inbound_payment; @@ -66,6 +67,7 @@ use crate::offers::invoice_request::{DerivedPayerId, InvoiceRequestBuilder}; use crate::offers::offer::{Offer, OfferBuilder}; use crate::offers::parse::Bolt12SemanticError; use crate::offers::refund::{Refund, RefundBuilder}; +use crate::onion_message::async_payments::{AsyncPaymentsMessage, HeldHtlcAvailable, ReleaseHeldHtlc, AsyncPaymentsMessageHandler}; use crate::onion_message::messenger::{new_pending_onion_message, Destination, MessageRouter, PendingOnionMessage, Responder, ResponseInstruction}; use crate::onion_message::offers::{OffersMessage, OffersMessageHandler}; use crate::sign::{EntropySource, NodeSigner, Recipient, SignerProvider}; @@ -306,6 +308,7 @@ pub(super) struct PendingAddHTLCInfo { // Note that this may be an outbound SCID alias for the associated channel. prev_short_channel_id: u64, prev_htlc_id: u64, + prev_counterparty_node_id: Option, prev_channel_id: ChannelId, prev_funding_outpoint: OutPoint, prev_user_channel_id: u128, @@ -349,9 +352,10 @@ pub(crate) struct HTLCPreviousHopData { blinded_failure: Option, channel_id: ChannelId, - // This field is consumed by `claim_funds_from_hop()` when updating a force-closed backwards + // These fields are consumed by `claim_funds_from_hop()` when updating a force-closed backwards // channel with a preimage provided by the forward channel. outpoint: OutPoint, + counterparty_node_id: Option, } enum OnionPayload { @@ -472,7 +476,7 @@ impl_writeable_tlv_based_enum!(SentHTLCId, }, (2, OutboundRoute) => { (0, session_priv, required), - }; + }, ); @@ -666,7 +670,7 @@ pub(super) const MIN_HTLC_RELAY_HOLDING_CELL_MILLIS: u64 = 100; /// be sent in the order they appear in the return value, however sometimes the order needs to be /// variable at runtime (eg Channel::channel_reestablish needs to re-send messages in the order /// they were originally sent). In those cases, this enum is also returned. -#[derive(Clone, PartialEq)] +#[derive(Clone, PartialEq, Debug)] pub(super) enum RAACommitmentOrder { /// Send the CommitmentUpdate messages first CommitmentFirst, @@ -755,13 +759,55 @@ enum BackgroundEvent { }, } +/// A pointer to a channel that is unblocked when an event is surfaced +#[derive(Debug)] +pub(crate) struct EventUnblockedChannel { + counterparty_node_id: PublicKey, + funding_txo: OutPoint, + channel_id: ChannelId, + blocking_action: RAAMonitorUpdateBlockingAction, +} + +impl Writeable for EventUnblockedChannel { + fn write(&self, writer: &mut W) -> Result<(), io::Error> { + self.counterparty_node_id.write(writer)?; + self.funding_txo.write(writer)?; + self.channel_id.write(writer)?; + self.blocking_action.write(writer) + } +} + +impl MaybeReadable for EventUnblockedChannel { + fn read(reader: &mut R) -> Result, DecodeError> { + let counterparty_node_id = Readable::read(reader)?; + let funding_txo = Readable::read(reader)?; + let channel_id = Readable::read(reader)?; + let blocking_action = match RAAMonitorUpdateBlockingAction::read(reader)? { + Some(blocking_action) => blocking_action, + None => return Ok(None), + }; + Ok(Some(EventUnblockedChannel { + counterparty_node_id, + funding_txo, + channel_id, + blocking_action, + })) + } +} + #[derive(Debug)] pub(crate) enum MonitorUpdateCompletionAction { /// Indicates that a payment ultimately destined for us was claimed and we should emit an /// [`events::Event::PaymentClaimed`] to the user if we haven't yet generated such an event for /// this payment. Note that this is only best-effort. On restart it's possible such a duplicate /// event can be generated. - PaymentClaimed { payment_hash: PaymentHash }, + PaymentClaimed { + payment_hash: PaymentHash, + /// A pending MPP claim which hasn't yet completed. + /// + /// Not written to disk. + pending_mpp_claim: Option<(PublicKey, ChannelId, u64, PendingMPPClaimPointer)>, + }, /// Indicates an [`events::Event`] should be surfaced to the user and possibly resume the /// operation of another channel. /// @@ -772,7 +818,7 @@ pub(crate) enum MonitorUpdateCompletionAction { /// outbound edge. EmitEventAndFreeOtherChannel { event: events::Event, - downstream_counterparty_and_funding_outpoint: Option<(PublicKey, OutPoint, ChannelId, RAAMonitorUpdateBlockingAction)>, + downstream_counterparty_and_funding_outpoint: Option, }, /// Indicates we should immediately resume the operation of another channel, unless there is /// some other reason why the channel is blocked. In practice this simply means immediately @@ -795,13 +841,16 @@ pub(crate) enum MonitorUpdateCompletionAction { } impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction, - (0, PaymentClaimed) => { (0, payment_hash, required) }, + (0, PaymentClaimed) => { + (0, payment_hash, required), + (9999999999, pending_mpp_claim, (static_value, None)), + }, // Note that FreeOtherChannelImmediately should never be written - we were supposed to free // *immediately*. However, for simplicity we implement read/write here. (1, FreeOtherChannelImmediately) => { (0, downstream_counterparty_node_id, required), (2, downstream_funding_outpoint, required), - (4, blocking_action, required), + (4, blocking_action, upgradable_required), // Note that by the time we get past the required read above, downstream_funding_outpoint will be // filled in, so we can safely unwrap it here. (5, downstream_channel_id, (default_value, ChannelId::v1_from_funding_outpoint(downstream_funding_outpoint.0.unwrap()))), @@ -813,7 +862,7 @@ impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction, // monitor updates which aren't properly blocked or resumed, however that's fine - we don't // support async monitor updates even in LDK 0.0.116 and once we do we'll require no // downgrades to prior versions. - (1, downstream_counterparty_and_funding_outpoint, option), + (1, downstream_counterparty_and_funding_outpoint, upgradable_option), }, ); @@ -832,9 +881,29 @@ impl_writeable_tlv_based_enum!(EventCompletionAction, // Note that by the time we get past the required read above, channel_funding_outpoint will be // filled in, so we can safely unwrap it here. (3, channel_id, (default_value, ChannelId::v1_from_funding_outpoint(channel_funding_outpoint.0.unwrap()))), - }; + } ); +#[derive(Debug)] +pub(crate) struct PendingMPPClaim { + channels_without_preimage: Vec<(PublicKey, OutPoint, ChannelId, u64)>, + channels_with_preimage: Vec<(PublicKey, OutPoint, ChannelId)>, +} + +#[derive(Clone)] +pub(crate) struct PendingMPPClaimPointer(Arc>); + +impl PartialEq for PendingMPPClaimPointer { + fn eq(&self, o: &Self) -> bool { Arc::ptr_eq(&self.0, &o.0) } +} +impl Eq for PendingMPPClaimPointer {} + +impl core::fmt::Debug for PendingMPPClaimPointer { + fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> { + self.0.lock().unwrap().fmt(f) + } +} + #[derive(Clone, PartialEq, Eq, Debug)] /// If something is blocked on the completion of an RAA-generated [`ChannelMonitorUpdate`] we track /// the blocked action here. See enum variants for more info. @@ -848,6 +917,16 @@ pub(crate) enum RAAMonitorUpdateBlockingAction { /// The HTLC ID on the inbound edge. htlc_id: u64, }, + /// We claimed an MPP payment across multiple channels. We have to block removing the payment + /// preimage from any monitor until the last monitor is updated to contain the payment + /// preimage. Otherwise we may not be able to replay the preimage on the monitor(s) that + /// weren't updated on startup. + /// + /// This variant is *not* written to disk, instead being inferred from [`ChannelMonitor`] + /// state. + ClaimedMPPPayment { + pending_claim: PendingMPPClaimPointer, + } } impl RAAMonitorUpdateBlockingAction { @@ -859,10 +938,16 @@ impl RAAMonitorUpdateBlockingAction { } } -impl_writeable_tlv_based_enum!(RAAMonitorUpdateBlockingAction, - (0, ForwardedPaymentInboundClaim) => { (0, channel_id, required), (2, htlc_id, required) } -;); +impl_writeable_tlv_based_enum_upgradable!(RAAMonitorUpdateBlockingAction, + (0, ForwardedPaymentInboundClaim) => { (0, channel_id, required), (2, htlc_id, required) }, + unread_variants: ClaimedMPPPayment +); +impl Readable for Option { + fn read(reader: &mut R) -> Result { + Ok(RAAMonitorUpdateBlockingAction::read(reader)?) + } +} /// State we hold per-peer. pub(super) struct PeerState where SP::Target: SignerProvider { @@ -1310,35 +1395,38 @@ where /// } /// /// // On the event processing thread once the peer has responded -/// channel_manager.process_pending_events(&|event| match event { -/// Event::FundingGenerationReady { -/// temporary_channel_id, counterparty_node_id, channel_value_satoshis, output_script, -/// user_channel_id, .. -/// } => { -/// assert_eq!(user_channel_id, 42); -/// let funding_transaction = wallet.create_funding_transaction( -/// channel_value_satoshis, output_script -/// ); -/// match channel_manager.funding_transaction_generated( -/// &temporary_channel_id, &counterparty_node_id, funding_transaction -/// ) { -/// Ok(()) => println!("Funding channel {}", temporary_channel_id), -/// Err(e) => println!("Error funding channel {}: {:?}", temporary_channel_id, e), -/// } -/// }, -/// Event::ChannelPending { channel_id, user_channel_id, former_temporary_channel_id, .. } => { -/// assert_eq!(user_channel_id, 42); -/// println!( -/// "Channel {} now {} pending (funding transaction has been broadcasted)", channel_id, -/// former_temporary_channel_id.unwrap() -/// ); -/// }, -/// Event::ChannelReady { channel_id, user_channel_id, .. } => { -/// assert_eq!(user_channel_id, 42); -/// println!("Channel {} ready", channel_id); -/// }, -/// // ... -/// # _ => {}, +/// channel_manager.process_pending_events(&|event| { +/// match event { +/// Event::FundingGenerationReady { +/// temporary_channel_id, counterparty_node_id, channel_value_satoshis, output_script, +/// user_channel_id, .. +/// } => { +/// assert_eq!(user_channel_id, 42); +/// let funding_transaction = wallet.create_funding_transaction( +/// channel_value_satoshis, output_script +/// ); +/// match channel_manager.funding_transaction_generated( +/// &temporary_channel_id, &counterparty_node_id, funding_transaction +/// ) { +/// Ok(()) => println!("Funding channel {}", temporary_channel_id), +/// Err(e) => println!("Error funding channel {}: {:?}", temporary_channel_id, e), +/// } +/// }, +/// Event::ChannelPending { channel_id, user_channel_id, former_temporary_channel_id, .. } => { +/// assert_eq!(user_channel_id, 42); +/// println!( +/// "Channel {} now {} pending (funding transaction has been broadcasted)", channel_id, +/// former_temporary_channel_id.unwrap() +/// ); +/// }, +/// Event::ChannelReady { channel_id, user_channel_id, .. } => { +/// assert_eq!(user_channel_id, 42); +/// println!("Channel {} ready", channel_id); +/// }, +/// // ... +/// # _ => {}, +/// } +/// Ok(()) /// }); /// # } /// ``` @@ -1362,28 +1450,31 @@ where /// # fn example(channel_manager: T) { /// # let channel_manager = channel_manager.get_cm(); /// # let error_message = "Channel force-closed"; -/// channel_manager.process_pending_events(&|event| match event { -/// Event::OpenChannelRequest { temporary_channel_id, counterparty_node_id, .. } => { -/// if !is_trusted(counterparty_node_id) { -/// match channel_manager.force_close_without_broadcasting_txn( -/// &temporary_channel_id, &counterparty_node_id, error_message.to_string() -/// ) { -/// Ok(()) => println!("Rejecting channel {}", temporary_channel_id), -/// Err(e) => println!("Error rejecting channel {}: {:?}", temporary_channel_id, e), +/// channel_manager.process_pending_events(&|event| { +/// match event { +/// Event::OpenChannelRequest { temporary_channel_id, counterparty_node_id, .. } => { +/// if !is_trusted(counterparty_node_id) { +/// match channel_manager.force_close_without_broadcasting_txn( +/// &temporary_channel_id, &counterparty_node_id, error_message.to_string() +/// ) { +/// Ok(()) => println!("Rejecting channel {}", temporary_channel_id), +/// Err(e) => println!("Error rejecting channel {}: {:?}", temporary_channel_id, e), +/// } +/// return Ok(()); /// } -/// return; -/// } /// -/// let user_channel_id = 43; -/// match channel_manager.accept_inbound_channel( -/// &temporary_channel_id, &counterparty_node_id, user_channel_id -/// ) { -/// Ok(()) => println!("Accepting channel {}", temporary_channel_id), -/// Err(e) => println!("Error accepting channel {}: {:?}", temporary_channel_id, e), -/// } -/// }, -/// // ... -/// # _ => {}, +/// let user_channel_id = 43; +/// match channel_manager.accept_inbound_channel( +/// &temporary_channel_id, &counterparty_node_id, user_channel_id +/// ) { +/// Ok(()) => println!("Accepting channel {}", temporary_channel_id), +/// Err(e) => println!("Error accepting channel {}: {:?}", temporary_channel_id, e), +/// } +/// }, +/// // ... +/// # _ => {}, +/// } +/// Ok(()) /// }); /// # } /// ``` @@ -1412,13 +1503,16 @@ where /// } /// /// // On the event processing thread -/// channel_manager.process_pending_events(&|event| match event { -/// Event::ChannelClosed { channel_id, user_channel_id, .. } => { -/// assert_eq!(user_channel_id, 42); -/// println!("Channel {} closed", channel_id); -/// }, -/// // ... -/// # _ => {}, +/// channel_manager.process_pending_events(&|event| { +/// match event { +/// Event::ChannelClosed { channel_id, user_channel_id, .. } => { +/// assert_eq!(user_channel_id, 42); +/// println!("Channel {} closed", channel_id); +/// }, +/// // ... +/// # _ => {}, +/// } +/// Ok(()) /// }); /// # } /// ``` @@ -1468,30 +1562,33 @@ where /// }; /// /// // On the event processing thread -/// channel_manager.process_pending_events(&|event| match event { -/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose { -/// PaymentPurpose::Bolt11InvoicePayment { payment_preimage: Some(payment_preimage), .. } => { -/// assert_eq!(payment_hash, known_payment_hash); -/// println!("Claiming payment {}", payment_hash); -/// channel_manager.claim_funds(payment_preimage); +/// channel_manager.process_pending_events(&|event| { +/// match event { +/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose { +/// PaymentPurpose::Bolt11InvoicePayment { payment_preimage: Some(payment_preimage), .. } => { +/// assert_eq!(payment_hash, known_payment_hash); +/// println!("Claiming payment {}", payment_hash); +/// channel_manager.claim_funds(payment_preimage); +/// }, +/// PaymentPurpose::Bolt11InvoicePayment { payment_preimage: None, .. } => { +/// println!("Unknown payment hash: {}", payment_hash); +/// }, +/// PaymentPurpose::SpontaneousPayment(payment_preimage) => { +/// assert_ne!(payment_hash, known_payment_hash); +/// println!("Claiming spontaneous payment {}", payment_hash); +/// channel_manager.claim_funds(payment_preimage); +/// }, +/// // ... +/// # _ => {}, /// }, -/// PaymentPurpose::Bolt11InvoicePayment { payment_preimage: None, .. } => { -/// println!("Unknown payment hash: {}", payment_hash); -/// }, -/// PaymentPurpose::SpontaneousPayment(payment_preimage) => { -/// assert_ne!(payment_hash, known_payment_hash); -/// println!("Claiming spontaneous payment {}", payment_hash); -/// channel_manager.claim_funds(payment_preimage); +/// Event::PaymentClaimed { payment_hash, amount_msat, .. } => { +/// assert_eq!(payment_hash, known_payment_hash); +/// println!("Claimed {} msats", amount_msat); /// }, /// // ... -/// # _ => {}, -/// }, -/// Event::PaymentClaimed { payment_hash, amount_msat, .. } => { -/// assert_eq!(payment_hash, known_payment_hash); -/// println!("Claimed {} msats", amount_msat); -/// }, -/// // ... -/// # _ => {}, +/// # _ => {}, +/// } +/// Ok(()) /// }); /// # } /// ``` @@ -1534,11 +1631,14 @@ where /// ); /// /// // On the event processing thread -/// channel_manager.process_pending_events(&|event| match event { -/// Event::PaymentSent { payment_hash, .. } => println!("Paid {}", payment_hash), -/// Event::PaymentFailed { payment_hash, .. } => println!("Failed paying {}", payment_hash), -/// // ... -/// # _ => {}, +/// channel_manager.process_pending_events(&|event| { +/// match event { +/// Event::PaymentSent { payment_hash, .. } => println!("Paid {}", payment_hash), +/// Event::PaymentFailed { payment_hash, .. } => println!("Failed paying {}", payment_hash), +/// // ... +/// # _ => {}, +/// } +/// Ok(()) /// }); /// # } /// ``` @@ -1572,23 +1672,25 @@ where /// let bech32_offer = offer.to_string(); /// /// // On the event processing thread -/// channel_manager.process_pending_events(&|event| match event { -/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose { -/// PaymentPurpose::Bolt12OfferPayment { payment_preimage: Some(payment_preimage), .. } => { -/// println!("Claiming payment {}", payment_hash); -/// channel_manager.claim_funds(payment_preimage); +/// channel_manager.process_pending_events(&|event| { +/// match event { +/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose { +/// PaymentPurpose::Bolt12OfferPayment { payment_preimage: Some(payment_preimage), .. } => { +/// println!("Claiming payment {}", payment_hash); +/// channel_manager.claim_funds(payment_preimage); +/// }, +/// PaymentPurpose::Bolt12OfferPayment { payment_preimage: None, .. } => { +/// println!("Unknown payment hash: {}", payment_hash); +/// } +/// # _ => {}, /// }, -/// PaymentPurpose::Bolt12OfferPayment { payment_preimage: None, .. } => { -/// println!("Unknown payment hash: {}", payment_hash); +/// Event::PaymentClaimed { payment_hash, amount_msat, .. } => { +/// println!("Claimed {} msats", amount_msat); /// }, /// // ... -/// # _ => {}, -/// }, -/// Event::PaymentClaimed { payment_hash, amount_msat, .. } => { -/// println!("Claimed {} msats", amount_msat); -/// }, -/// // ... -/// # _ => {}, +/// # _ => {}, +/// } +/// Ok(()) /// }); /// # Ok(()) /// # } @@ -1634,12 +1736,15 @@ where /// ); /// /// // On the event processing thread -/// channel_manager.process_pending_events(&|event| match event { -/// Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id), -/// Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id), -/// Event::InvoiceRequestFailed { payment_id, .. } => println!("Failed paying {}", payment_id), -/// // ... -/// # _ => {}, +/// channel_manager.process_pending_events(&|event| { +/// match event { +/// Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id), +/// Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id), +/// Event::InvoiceRequestFailed { payment_id, .. } => println!("Failed paying {}", payment_id), +/// // ... +/// # _ => {}, +/// } +/// Ok(()) /// }); /// # } /// ``` @@ -1694,11 +1799,14 @@ where /// ); /// /// // On the event processing thread -/// channel_manager.process_pending_events(&|event| match event { -/// Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id), -/// Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id), -/// // ... -/// # _ => {}, +/// channel_manager.process_pending_events(&|event| { +/// match event { +/// Event::PaymentSent { payment_id: Some(payment_id), .. } => println!("Paid {}", payment_id), +/// Event::PaymentFailed { payment_id, .. } => println!("Failed paying {}", payment_id), +/// // ... +/// # _ => {}, +/// } +/// Ok(()) /// }); /// # Ok(()) /// # } @@ -1724,18 +1832,19 @@ where /// }; /// /// // On the event processing thread -/// channel_manager.process_pending_events(&|event| match event { -/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose { -/// PaymentPurpose::Bolt12RefundPayment { payment_preimage: Some(payment_preimage), .. } => { -/// assert_eq!(payment_hash, known_payment_hash); -/// println!("Claiming payment {}", payment_hash); -/// channel_manager.claim_funds(payment_preimage); -/// }, -/// PaymentPurpose::Bolt12RefundPayment { payment_preimage: None, .. } => { -/// println!("Unknown payment hash: {}", payment_hash); -/// }, -/// // ... -/// # _ => {}, +/// channel_manager.process_pending_events(&|event| { +/// match event { +/// Event::PaymentClaimable { payment_hash, purpose, .. } => match purpose { +/// PaymentPurpose::Bolt12RefundPayment { payment_preimage: Some(payment_preimage), .. } => { +/// assert_eq!(payment_hash, known_payment_hash); +/// println!("Claiming payment {}", payment_hash); +/// channel_manager.claim_funds(payment_preimage); +/// }, +/// PaymentPurpose::Bolt12RefundPayment { payment_preimage: None, .. } => { +/// println!("Unknown payment hash: {}", payment_hash); +/// }, +/// // ... +/// # _ => {}, /// }, /// Event::PaymentClaimed { payment_hash, amount_msat, .. } => { /// assert_eq!(payment_hash, known_payment_hash); @@ -1743,6 +1852,8 @@ where /// }, /// // ... /// # _ => {}, +/// } +/// Ok(()) /// }); /// # } /// ``` @@ -2746,8 +2857,9 @@ macro_rules! handle_new_monitor_update { macro_rules! process_events_body { ($self: expr, $event_to_handle: expr, $handle_event: expr) => { + let mut handling_failed = false; let mut processed_all_events = false; - while !processed_all_events { + while !handling_failed && !processed_all_events { if $self.pending_events_processor.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed).is_err() { return; } @@ -2771,24 +2883,34 @@ macro_rules! process_events_body { } let pending_events = $self.pending_events.lock().unwrap().clone(); - let num_events = pending_events.len(); if !pending_events.is_empty() { result = NotifyOption::DoPersist; } let mut post_event_actions = Vec::new(); + let mut num_handled_events = 0; for (event, action_opt) in pending_events { $event_to_handle = event; - $handle_event; - if let Some(action) = action_opt { - post_event_actions.push(action); + match $handle_event { + Ok(()) => { + if let Some(action) = action_opt { + post_event_actions.push(action); + } + num_handled_events += 1; + } + Err(_e) => { + // If we encounter an error we stop handling events and make sure to replay + // any unhandled events on the next invocation. + handling_failed = true; + break; + } } } { let mut pending_events = $self.pending_events.lock().unwrap(); - pending_events.drain(..num_events); + pending_events.drain(..num_handled_events); processed_all_events = pending_events.is_empty(); // Note that `push_pending_forwards_ev` relies on `pending_events_processor` being // updated here with the `pending_events` lock acquired. @@ -2989,7 +3111,7 @@ where let config = if override_config.is_some() { override_config.as_ref().unwrap() } else { &self.default_configuration }; match OutboundV1Channel::new(&self.fee_estimator, &self.entropy_source, &self.signer_provider, their_network_key, their_features, channel_value_satoshis, push_msat, user_channel_id, config, - self.best_block.read().unwrap().height, outbound_scid_alias, temporary_channel_id) + self.best_block.read().unwrap().height, outbound_scid_alias, temporary_channel_id, &*self.logger) { Ok(res) => res, Err(e) => { @@ -3505,7 +3627,7 @@ where // peer has been disabled for some time), return `channel_disabled`, // otherwise return `temporary_channel_failure`. let chan_update_opt = self.get_channel_update_for_onion(next_packet.outgoing_scid, chan).ok(); - if chan_update_opt.as_ref().map(|u| u.contents.flags & 2 == 2).unwrap_or(false) { + if chan_update_opt.as_ref().map(|u| u.contents.channel_flags & 2 == 2).unwrap_or(false) { return Err(("Forwarding channel has been disconnected for some time.", 0x1000 | 20, chan_update_opt)); } else { return Err(("Forwarding channel is not in a ready state.", 0x1000 | 7, chan_update_opt)); @@ -3784,7 +3906,8 @@ where chain_hash: self.chain_hash, short_channel_id, timestamp: chan.context.get_update_time_counter(), - flags: (!were_node_one) as u8 | ((!enabled as u8) << 1), + message_flags: 1, // Only must_be_one + channel_flags: (!were_node_one) as u8 | ((!enabled as u8) << 1), cltv_expiry_delta: chan.context.get_cltv_expiry_delta(), htlc_minimum_msat: chan.context.get_counterparty_htlc_minimum_msat(), htlc_maximum_msat: chan.context.get_announced_htlc_max_msat(), @@ -4031,8 +4154,8 @@ where self.pending_outbound_payments .send_payment_for_bolt12_invoice( invoice, payment_id, &self.router, self.list_usable_channels(), - || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, - best_block_height, &self.logger, &self.pending_events, + || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, &self, + &self.secp_ctx, best_block_height, &self.logger, &self.pending_events, |args| self.send_payment_along_path(args) ) } @@ -4692,6 +4815,7 @@ where let mut per_source_pending_forward = [( payment.prev_short_channel_id, + payment.prev_counterparty_node_id, payment.prev_funding_outpoint, payment.prev_channel_id, payment.prev_user_channel_id, @@ -4722,6 +4846,7 @@ where user_channel_id: Some(payment.prev_user_channel_id), outpoint: payment.prev_funding_outpoint, channel_id: payment.prev_channel_id, + counterparty_node_id: payment.prev_counterparty_node_id, htlc_id: payment.prev_htlc_id, incoming_packet_shared_secret: payment.forward_info.incoming_shared_secret, phantom_shared_secret: None, @@ -4851,8 +4976,10 @@ where // Process all of the forwards and failures for the channel in which the HTLCs were // proposed to as a batch. - let pending_forwards = (incoming_scid, incoming_funding_txo, incoming_channel_id, - incoming_user_channel_id, htlc_forwards.drain(..).collect()); + let pending_forwards = ( + incoming_scid, Some(incoming_counterparty_node_id), incoming_funding_txo, + incoming_channel_id, incoming_user_channel_id, htlc_forwards.drain(..).collect() + ); self.forward_htlcs_without_forward_event(&mut [pending_forwards]); for (htlc_fail, htlc_destination) in htlc_fails.drain(..) { let failure = match htlc_fail { @@ -4886,7 +5013,7 @@ where let mut new_events = VecDeque::new(); let mut failed_forwards = Vec::new(); - let mut phantom_receives: Vec<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)> = Vec::new(); + let mut phantom_receives: Vec<(u64, Option, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)> = Vec::new(); { let mut forward_htlcs = new_hash_map(); mem::swap(&mut forward_htlcs, &mut self.forward_htlcs.lock().unwrap()); @@ -4895,12 +5022,12 @@ where if short_chan_id != 0 { let mut forwarding_counterparty = None; macro_rules! forwarding_channel_not_found { - () => { - for forward_info in pending_forwards.drain(..) { + ($forward_infos: expr) => { + for forward_info in $forward_infos { match forward_info { HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo { prev_short_channel_id, prev_htlc_id, prev_channel_id, prev_funding_outpoint, - prev_user_channel_id, forward_info: PendingHTLCInfo { + prev_user_channel_id, prev_counterparty_node_id, forward_info: PendingHTLCInfo { routing, incoming_shared_secret, payment_hash, outgoing_amt_msat, outgoing_cltv_value, .. } @@ -4915,6 +5042,7 @@ where user_channel_id: Some(prev_user_channel_id), channel_id: prev_channel_id, outpoint: prev_funding_outpoint, + counterparty_node_id: prev_counterparty_node_id, htlc_id: prev_htlc_id, incoming_packet_shared_secret: incoming_shared_secret, phantom_shared_secret: $phantom_ss, @@ -4977,7 +5105,10 @@ where outgoing_cltv_value, Some(phantom_shared_secret), false, None, current_height, self.default_configuration.accept_mpp_keysend) { - Ok(info) => phantom_receives.push((prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_user_channel_id, vec![(info, prev_htlc_id)])), + Ok(info) => phantom_receives.push(( + prev_short_channel_id, prev_counterparty_node_id, prev_funding_outpoint, + prev_channel_id, prev_user_channel_id, vec![(info, prev_htlc_id)] + )), Err(InboundHTLCErr { err_code, err_data, msg }) => failed_payment!(msg, err_code, err_data, Some(phantom_shared_secret)) } }, @@ -5004,7 +5135,7 @@ where let (counterparty_node_id, forward_chan_id) = match chan_info_opt { Some((cp_id, chan_id)) => (cp_id, chan_id), None => { - forwarding_channel_not_found!(); + forwarding_channel_not_found!(pending_forwards.drain(..)); continue; } }; @@ -5012,103 +5143,156 @@ where let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id); if peer_state_mutex_opt.is_none() { - forwarding_channel_not_found!(); + forwarding_channel_not_found!(pending_forwards.drain(..)); continue; } let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap(); let peer_state = &mut *peer_state_lock; - if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) { - let logger = WithChannelContext::from(&self.logger, &chan.context, None); - for forward_info in pending_forwards.drain(..) { - let queue_fail_htlc_res = match forward_info { - HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo { - prev_short_channel_id, prev_htlc_id, prev_channel_id, prev_funding_outpoint, - prev_user_channel_id, forward_info: PendingHTLCInfo { - incoming_shared_secret, payment_hash, outgoing_amt_msat, outgoing_cltv_value, - routing: PendingHTLCRouting::Forward { - onion_packet, blinded, .. - }, skimmed_fee_msat, .. + let mut draining_pending_forwards = pending_forwards.drain(..); + while let Some(forward_info) = draining_pending_forwards.next() { + let queue_fail_htlc_res = match forward_info { + HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo { + prev_short_channel_id, prev_htlc_id, prev_channel_id, prev_funding_outpoint, + prev_user_channel_id, prev_counterparty_node_id, forward_info: PendingHTLCInfo { + incoming_shared_secret, payment_hash, outgoing_amt_msat, outgoing_cltv_value, + routing: PendingHTLCRouting::Forward { + ref onion_packet, blinded, .. + }, skimmed_fee_msat, .. + }, + }) => { + let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData { + short_channel_id: prev_short_channel_id, + user_channel_id: Some(prev_user_channel_id), + counterparty_node_id: prev_counterparty_node_id, + channel_id: prev_channel_id, + outpoint: prev_funding_outpoint, + htlc_id: prev_htlc_id, + incoming_packet_shared_secret: incoming_shared_secret, + // Phantom payments are only PendingHTLCRouting::Receive. + phantom_shared_secret: None, + blinded_failure: blinded.map(|b| b.failure), + }); + let next_blinding_point = blinded.and_then(|b| { + let encrypted_tlvs_ss = self.node_signer.ecdh( + Recipient::Node, &b.inbound_blinding_point, None + ).unwrap().secret_bytes(); + onion_utils::next_hop_pubkey( + &self.secp_ctx, b.inbound_blinding_point, &encrypted_tlvs_ss + ).ok() + }); + + // Forward the HTLC over the most appropriate channel with the corresponding peer, + // applying non-strict forwarding. + // The channel with the least amount of outbound liquidity will be used to maximize the + // probability of being able to successfully forward a subsequent HTLC. + let maybe_optimal_channel = peer_state.channel_by_id.values_mut().filter_map(|phase| match phase { + ChannelPhase::Funded(chan) => { + let balances = chan.context.get_available_balances(&self.fee_estimator); + if outgoing_amt_msat <= balances.next_outbound_htlc_limit_msat && + outgoing_amt_msat >= balances.next_outbound_htlc_minimum_msat && + chan.context.is_usable() { + Some((chan, balances)) + } else { + None + } }, - }) => { - let logger = WithChannelContext::from(&self.logger, &chan.context, Some(payment_hash)); - log_trace!(logger, "Adding HTLC from short id {} with payment_hash {} to channel with short id {} after delay", prev_short_channel_id, &payment_hash, short_chan_id); - let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData { - short_channel_id: prev_short_channel_id, - user_channel_id: Some(prev_user_channel_id), - channel_id: prev_channel_id, - outpoint: prev_funding_outpoint, - htlc_id: prev_htlc_id, - incoming_packet_shared_secret: incoming_shared_secret, - // Phantom payments are only PendingHTLCRouting::Receive. - phantom_shared_secret: None, - blinded_failure: blinded.map(|b| b.failure), - }); - let next_blinding_point = blinded.and_then(|b| { - let encrypted_tlvs_ss = self.node_signer.ecdh( - Recipient::Node, &b.inbound_blinding_point, None - ).unwrap().secret_bytes(); - onion_utils::next_hop_pubkey( - &self.secp_ctx, b.inbound_blinding_point, &encrypted_tlvs_ss - ).ok() - }); - if let Err(e) = chan.queue_add_htlc(outgoing_amt_msat, - payment_hash, outgoing_cltv_value, htlc_source.clone(), - onion_packet, skimmed_fee_msat, next_blinding_point, &self.fee_estimator, - &&logger) - { - if let ChannelError::Ignore(msg) = e { - log_trace!(logger, "Failed to forward HTLC with payment_hash {}: {}", &payment_hash, msg); + _ => None, + }).min_by_key(|(_, balances)| balances.next_outbound_htlc_limit_msat).map(|(c, _)| c); + let optimal_channel = match maybe_optimal_channel { + Some(chan) => chan, + None => { + // Fall back to the specified channel to return an appropriate error. + if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) { + chan } else { - panic!("Stated return value requirements in send_htlc() were not met"); + forwarding_channel_not_found!(core::iter::once(forward_info).chain(draining_pending_forwards)); + break; } + } + }; + + let logger = WithChannelContext::from(&self.logger, &optimal_channel.context, Some(payment_hash)); + let channel_description = if optimal_channel.context.get_short_channel_id() == Some(short_chan_id) { + "specified" + } else { + "alternate" + }; + log_trace!(logger, "Forwarding HTLC from SCID {} with payment_hash {} and next hop SCID {} over {} channel {} with corresponding peer {}", + prev_short_channel_id, &payment_hash, short_chan_id, channel_description, optimal_channel.context.channel_id(), &counterparty_node_id); + if let Err(e) = optimal_channel.queue_add_htlc(outgoing_amt_msat, + payment_hash, outgoing_cltv_value, htlc_source.clone(), + onion_packet.clone(), skimmed_fee_msat, next_blinding_point, &self.fee_estimator, + &&logger) + { + if let ChannelError::Ignore(msg) = e { + log_trace!(logger, "Failed to forward HTLC with payment_hash {} to peer {}: {}", &payment_hash, &counterparty_node_id, msg); + } else { + panic!("Stated return value requirements in send_htlc() were not met"); + } + + if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) { let (failure_code, data) = self.get_htlc_temp_fail_err_and_data(0x1000|7, short_chan_id, chan); failed_forwards.push((htlc_source, payment_hash, HTLCFailReason::reason(failure_code, data), HTLCDestination::NextHopChannel { node_id: Some(chan.context.get_counterparty_node_id()), channel_id: forward_chan_id } )); - continue; + } else { + forwarding_channel_not_found!(core::iter::once(forward_info).chain(draining_pending_forwards)); + break; } - None - }, - HTLCForwardInfo::AddHTLC { .. } => { - panic!("short_channel_id != 0 should imply any pending_forward entries are of type Forward"); - }, - HTLCForwardInfo::FailHTLC { htlc_id, err_packet } => { + } + None + }, + HTLCForwardInfo::AddHTLC { .. } => { + panic!("short_channel_id != 0 should imply any pending_forward entries are of type Forward"); + }, + HTLCForwardInfo::FailHTLC { htlc_id, ref err_packet } => { + if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) { + let logger = WithChannelContext::from(&self.logger, &chan.context, None); log_trace!(logger, "Failing HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id); - Some((chan.queue_fail_htlc(htlc_id, err_packet, &&logger), htlc_id)) - }, - HTLCForwardInfo::FailMalformedHTLC { htlc_id, failure_code, sha256_of_onion } => { + Some((chan.queue_fail_htlc(htlc_id, err_packet.clone(), &&logger), htlc_id)) + } else { + forwarding_channel_not_found!(core::iter::once(forward_info).chain(draining_pending_forwards)); + break; + } + }, + HTLCForwardInfo::FailMalformedHTLC { htlc_id, failure_code, sha256_of_onion } => { + if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) { + let logger = WithChannelContext::from(&self.logger, &chan.context, None); log_trace!(logger, "Failing malformed HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id); let res = chan.queue_fail_malformed_htlc( htlc_id, failure_code, sha256_of_onion, &&logger ); Some((res, htlc_id)) - }, - }; - if let Some((queue_fail_htlc_res, htlc_id)) = queue_fail_htlc_res { - if let Err(e) = queue_fail_htlc_res { - if let ChannelError::Ignore(msg) = e { + } else { + forwarding_channel_not_found!(core::iter::once(forward_info).chain(draining_pending_forwards)); + break; + } + }, + }; + if let Some((queue_fail_htlc_res, htlc_id)) = queue_fail_htlc_res { + if let Err(e) = queue_fail_htlc_res { + if let ChannelError::Ignore(msg) = e { + if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) { + let logger = WithChannelContext::from(&self.logger, &chan.context, None); log_trace!(logger, "Failed to fail HTLC with ID {} backwards to short_id {}: {}", htlc_id, short_chan_id, msg); - } else { - panic!("Stated return value requirements in queue_fail_{{malformed_}}htlc() were not met"); } - // fail-backs are best-effort, we probably already have one - // pending, and if not that's OK, if not, the channel is on - // the chain and sending the HTLC-Timeout is their problem. - continue; + } else { + panic!("Stated return value requirements in queue_fail_{{malformed_}}htlc() were not met"); } + // fail-backs are best-effort, we probably already have one + // pending, and if not that's OK, if not, the channel is on + // the chain and sending the HTLC-Timeout is their problem. + continue; } } - } else { - forwarding_channel_not_found!(); - continue; } } else { 'next_forwardable_htlc: for forward_info in pending_forwards.drain(..) { match forward_info { HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo { prev_short_channel_id, prev_htlc_id, prev_channel_id, prev_funding_outpoint, - prev_user_channel_id, forward_info: PendingHTLCInfo { + prev_user_channel_id, prev_counterparty_node_id, forward_info: PendingHTLCInfo { routing, incoming_shared_secret, payment_hash, incoming_amt_msat, outgoing_amt_msat, skimmed_fee_msat, .. } @@ -5146,6 +5330,7 @@ where prev_hop: HTLCPreviousHopData { short_channel_id: prev_short_channel_id, user_channel_id: Some(prev_user_channel_id), + counterparty_node_id: prev_counterparty_node_id, channel_id: prev_channel_id, outpoint: prev_funding_outpoint, htlc_id: prev_htlc_id, @@ -5178,6 +5363,7 @@ where failed_forwards.push((HTLCSource::PreviousHopData(HTLCPreviousHopData { short_channel_id: $htlc.prev_hop.short_channel_id, user_channel_id: $htlc.prev_hop.user_channel_id, + counterparty_node_id: $htlc.prev_hop.counterparty_node_id, channel_id: prev_channel_id, outpoint: prev_funding_outpoint, htlc_id: $htlc.prev_hop.htlc_id, @@ -6063,7 +6249,7 @@ where let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); - let mut sources = { + let sources = { let mut claimable_payments = self.claimable_payments.lock().unwrap(); if let Some(payment) = claimable_payments.claimable_payments.remove(&payment_hash) { let mut receiver_node_id = self.our_network_pubkey; @@ -6158,18 +6344,46 @@ where return; } if valid_mpp { - for htlc in sources.drain(..) { + let pending_mpp_claim_ptr_opt = if sources.len() > 1 { + let channels_without_preimage = sources.iter().filter_map(|htlc| { + if let Some(cp_id) = htlc.prev_hop.counterparty_node_id { + let prev_hop = &htlc.prev_hop; + Some((cp_id, prev_hop.outpoint, prev_hop.channel_id, prev_hop.htlc_id)) + } else { + None + } + }).collect(); + Some(Arc::new(Mutex::new(PendingMPPClaim { + channels_without_preimage, + channels_with_preimage: Vec::new(), + }))) + } else { + None + }; + for htlc in sources { + let this_mpp_claim = pending_mpp_claim_ptr_opt.as_ref().and_then(|pending_mpp_claim| + if let Some(cp_id) = htlc.prev_hop.counterparty_node_id { + let claim_ptr = PendingMPPClaimPointer(Arc::clone(pending_mpp_claim)); + Some((cp_id, htlc.prev_hop.channel_id, htlc.prev_hop.htlc_id, claim_ptr)) + } else { + None + } + ); + let raa_blocker = pending_mpp_claim_ptr_opt.as_ref().map(|pending_claim| { + RAAMonitorUpdateBlockingAction::ClaimedMPPPayment { + pending_claim: PendingMPPClaimPointer(Arc::clone(pending_claim)), + } + }); self.claim_funds_from_hop( htlc.prev_hop, payment_preimage, |_, definitely_duplicate| { debug_assert!(!definitely_duplicate, "We shouldn't claim duplicatively from a payment"); - Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash }) + (Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim: this_mpp_claim }), raa_blocker) } ); } - } - if !valid_mpp { - for htlc in sources.drain(..) { + } else { + for htlc in sources { let mut htlc_msat_height_data = htlc.value.to_be_bytes().to_vec(); htlc_msat_height_data.extend_from_slice(&self.best_block.read().unwrap().height.to_be_bytes()); let source = HTLCSource::PreviousHopData(htlc.prev_hop); @@ -6187,7 +6401,9 @@ where } } - fn claim_funds_from_hop, bool) -> Option>( + fn claim_funds_from_hop< + ComplFunc: FnOnce(Option, bool) -> (Option, Option) + >( &self, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage, completion_action: ComplFunc, ) { @@ -6227,11 +6443,15 @@ where match fulfill_res { UpdateFulfillCommitFetch::NewClaim { htlc_value_msat, monitor_update } => { - if let Some(action) = completion_action(Some(htlc_value_msat), false) { + let (action_opt, raa_blocker_opt) = completion_action(Some(htlc_value_msat), false); + if let Some(action) = action_opt { log_trace!(logger, "Tracking monitor update completion action for channel {}: {:?}", chan_id, action); peer_state.monitor_update_blocked_actions.entry(chan_id).or_insert(Vec::new()).push(action); } + if let Some(raa_blocker) = raa_blocker_opt { + peer_state.actions_blocking_raa_monitor_updates.entry(chan_id).or_insert_with(Vec::new).push(raa_blocker); + } if !during_init { handle_new_monitor_update!(self, prev_hop.outpoint, monitor_update, peer_state_lock, peer_state, per_peer_state, chan); @@ -6249,11 +6469,16 @@ where } } UpdateFulfillCommitFetch::DuplicateClaim {} => { - let action = if let Some(action) = completion_action(None, true) { + let (action_opt, raa_blocker_opt) = completion_action(None, true); + if let Some(raa_blocker) = raa_blocker_opt { + debug_assert!(peer_state.actions_blocking_raa_monitor_updates.get(&chan_id).unwrap().contains(&raa_blocker)); + } + let action = if let Some(action) = action_opt { action } else { return; }; + mem::drop(peer_state_lock); log_trace!(logger, "Completing monitor update completion action for channel {} as claim was redundant: {:?}", @@ -6340,7 +6565,46 @@ where // `ChannelMonitor` we've provided the above update to. Instead, note that `Event`s are // generally always allowed to be duplicative (and it's specifically noted in // `PaymentForwarded`). - self.handle_monitor_update_completion_actions(completion_action(None, false)); + let (action_opt, raa_blocker_opt) = completion_action(None, false); + + if let Some(raa_blocker) = raa_blocker_opt { + let counterparty_node_id = prev_hop.counterparty_node_id.or_else(|| + // prev_hop.counterparty_node_id is always available for payments received after + // LDK 0.0.123, but for those received on 0.0.123 and claimed later, we need to + // look up the counterparty in the `action_opt`, if possible. + action_opt.as_ref().and_then(|action| + if let MonitorUpdateCompletionAction::PaymentClaimed { pending_mpp_claim, .. } = action { + pending_mpp_claim.as_ref().map(|(node_id, _, _, _)| *node_id) + } else { None } + ) + ); + if let Some(counterparty_node_id) = counterparty_node_id { + // TODO: Avoid always blocking the world for the write lock here. + let mut per_peer_state = self.per_peer_state.write().unwrap(); + let peer_state_mutex = per_peer_state.entry(counterparty_node_id).or_insert_with(|| + Mutex::new(PeerState { + channel_by_id: new_hash_map(), + inbound_channel_request_by_id: new_hash_map(), + latest_features: InitFeatures::empty(), + pending_msg_events: Vec::new(), + in_flight_monitor_updates: BTreeMap::new(), + monitor_update_blocked_actions: BTreeMap::new(), + actions_blocking_raa_monitor_updates: BTreeMap::new(), + is_connected: false, + })); + let mut peer_state = peer_state_mutex.lock().unwrap(); + + peer_state.actions_blocking_raa_monitor_updates + .entry(prev_hop.channel_id) + .or_insert_with(Vec::new) + .push(raa_blocker); + } else { + debug_assert!(false, + "RAA ChannelMonitorUpdate blockers are only set with PaymentClaimed completion actions, so we should always have a counterparty node id"); + } + } + + self.handle_monitor_update_completion_actions(action_opt); } fn finalize_claims(&self, sources: Vec) { @@ -6377,7 +6641,12 @@ where |htlc_claim_value_msat, definitely_duplicate| { let chan_to_release = if let Some(node_id) = next_channel_counterparty_node_id { - Some((node_id, next_channel_outpoint, next_channel_id, completed_blocker)) + Some(EventUnblockedChannel { + counterparty_node_id: node_id, + funding_txo: next_channel_outpoint, + channel_id: next_channel_id, + blocking_action: completed_blocker + }) } else { // We can only get `None` here if we are processing a // `ChannelMonitor`-originated event, in which case we @@ -6434,16 +6703,16 @@ where } }), "{:?}", *background_events); } - None + (None, None) } else if definitely_duplicate { if let Some(other_chan) = chan_to_release { - Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately { - downstream_counterparty_node_id: other_chan.0, - downstream_funding_outpoint: other_chan.1, - downstream_channel_id: other_chan.2, - blocking_action: other_chan.3, - }) - } else { None } + (Some(MonitorUpdateCompletionAction::FreeOtherChannelImmediately { + downstream_counterparty_node_id: other_chan.counterparty_node_id, + downstream_funding_outpoint: other_chan.funding_txo, + downstream_channel_id: other_chan.channel_id, + blocking_action: other_chan.blocking_action, + }), None) + } else { (None, None) } } else { let total_fee_earned_msat = if let Some(forwarded_htlc_value) = forwarded_htlc_value_msat { if let Some(claimed_htlc_value) = htlc_claim_value_msat { @@ -6452,7 +6721,7 @@ where } else { None }; debug_assert!(skimmed_fee_msat <= total_fee_earned_msat, "skimmed_fee_msat must always be included in total_fee_earned_msat"); - Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel { + (Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel { event: events::Event::PaymentForwarded { prev_channel_id: Some(prev_channel_id), next_channel_id: Some(next_channel_id), @@ -6464,7 +6733,7 @@ where outbound_amount_forwarded_msat: forwarded_htlc_value_msat, }, downstream_counterparty_and_funding_outpoint: chan_to_release, - }) + }), None) } }); }, @@ -6481,9 +6750,44 @@ where debug_assert_ne!(self.claimable_payments.held_by_thread(), LockHeldState::HeldByThread); debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread); + let mut freed_channels = Vec::new(); + for action in actions.into_iter() { match action { - MonitorUpdateCompletionAction::PaymentClaimed { payment_hash } => { + MonitorUpdateCompletionAction::PaymentClaimed { payment_hash, pending_mpp_claim } => { + if let Some((counterparty_node_id, chan_id, htlc_id, claim_ptr)) = pending_mpp_claim { + let per_peer_state = self.per_peer_state.read().unwrap(); + per_peer_state.get(&counterparty_node_id).map(|peer_state_mutex| { + let mut peer_state = peer_state_mutex.lock().unwrap(); + let blockers_entry = peer_state.actions_blocking_raa_monitor_updates.entry(chan_id); + if let btree_map::Entry::Occupied(mut blockers) = blockers_entry { + blockers.get_mut().retain(|blocker| + if let &RAAMonitorUpdateBlockingAction::ClaimedMPPPayment { pending_claim } = &blocker { + if *pending_claim == claim_ptr { + let mut pending_claim_state_lock = pending_claim.0.lock().unwrap(); + let pending_claim_state = &mut *pending_claim_state_lock; + pending_claim_state.channels_without_preimage.retain(|(cp, outp, cid, hid)| { + if *cp == counterparty_node_id && *cid == chan_id && *hid == htlc_id { + pending_claim_state.channels_with_preimage.push((*cp, *outp, *cid)); + false + } else { true } + }); + if pending_claim_state.channels_without_preimage.is_empty() { + for (cp, outp, cid) in pending_claim_state.channels_with_preimage.iter() { + freed_channels.push((*cp, *outp, *cid, blocker.clone())); + } + } + !pending_claim_state.channels_without_preimage.is_empty() + } else { true } + } else { true } + ); + if blockers.get().is_empty() { + blockers.remove(); + } + } + }); + } + let payment = self.claimable_payments.lock().unwrap().pending_claiming_payments.remove(&payment_hash); if let Some(ClaimingPayment { amount_msat, @@ -6508,8 +6812,11 @@ where event, downstream_counterparty_and_funding_outpoint } => { self.pending_events.lock().unwrap().push_back((event, None)); - if let Some((node_id, funding_outpoint, channel_id, blocker)) = downstream_counterparty_and_funding_outpoint { - self.handle_monitor_update_release(node_id, funding_outpoint, channel_id, Some(blocker)); + if let Some(unblocked) = downstream_counterparty_and_funding_outpoint { + self.handle_monitor_update_release( + unblocked.counterparty_node_id, unblocked.funding_txo, + unblocked.channel_id, Some(unblocked.blocking_action), + ); } }, MonitorUpdateCompletionAction::FreeOtherChannelImmediately { @@ -6524,6 +6831,10 @@ where }, } } + + for (node_id, funding_outpoint, channel_id, blocker) in freed_channels { + self.handle_monitor_update_release(node_id, funding_outpoint, channel_id, Some(blocker)); + } } /// Handles a channel reentering a functional state, either due to reconnect or a monitor @@ -6534,7 +6845,7 @@ where pending_forwards: Vec<(PendingHTLCInfo, u64)>, pending_update_adds: Vec, funding_broadcastable: Option, channel_ready: Option, announcement_sigs: Option) - -> (Option<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)>, Option<(u64, Vec)>) { + -> (Option<(u64, Option, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)>, Option<(u64, Vec)>) { let logger = WithChannelContext::from(&self.logger, &channel.context, None); log_trace!(logger, "Handling channel resumption for channel {} with {} RAA, {} commitment update, {} pending forwards, {} pending update_add_htlcs, {}broadcasting funding, {} channel ready, {} announcement", &channel.context.channel_id(), @@ -6550,8 +6861,11 @@ where let mut htlc_forwards = None; if !pending_forwards.is_empty() { - htlc_forwards = Some((short_channel_id, channel.context.get_funding_txo().unwrap(), - channel.context.channel_id(), channel.context.get_user_id(), pending_forwards)); + htlc_forwards = Some(( + short_channel_id, Some(channel.context.get_counterparty_node_id()), + channel.context.get_funding_txo().unwrap(), channel.context.channel_id(), + channel.context.get_user_id(), pending_forwards + )); } let mut decode_update_add_htlcs = None; if !pending_update_adds.is_empty() { @@ -7594,15 +7908,15 @@ where } #[inline] - fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) { + fn forward_htlcs(&self, per_source_pending_forwards: &mut [(u64, Option, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) { let push_forward_event = self.forward_htlcs_without_forward_event(per_source_pending_forwards); if push_forward_event { self.push_pending_forwards_ev() } } #[inline] - fn forward_htlcs_without_forward_event(&self, per_source_pending_forwards: &mut [(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) -> bool { + fn forward_htlcs_without_forward_event(&self, per_source_pending_forwards: &mut [(u64, Option, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)]) -> bool { let mut push_forward_event = false; - for &mut (prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards { + for &mut (prev_short_channel_id, prev_counterparty_node_id, prev_funding_outpoint, prev_channel_id, prev_user_channel_id, ref mut pending_forwards) in per_source_pending_forwards { let mut new_intercept_events = VecDeque::new(); let mut failed_intercept_forwards = Vec::new(); if !pending_forwards.is_empty() { @@ -7621,7 +7935,9 @@ where match forward_htlcs.entry(scid) { hash_map::Entry::Occupied(mut entry) => { entry.get_mut().push(HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo { - prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info })); + prev_short_channel_id, prev_counterparty_node_id, prev_funding_outpoint, + prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info + })); }, hash_map::Entry::Vacant(entry) => { if !is_our_scid && forward_info.incoming_amt_msat.is_some() && @@ -7639,7 +7955,9 @@ where intercept_id }, None)); entry.insert(PendingAddHTLCInfo { - prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info }); + prev_short_channel_id, prev_counterparty_node_id, prev_funding_outpoint, + prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info + }); }, hash_map::Entry::Occupied(_) => { let logger = WithContext::from(&self.logger, None, Some(prev_channel_id), Some(forward_info.payment_hash)); @@ -7647,6 +7965,7 @@ where let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData { short_channel_id: prev_short_channel_id, user_channel_id: Some(prev_user_channel_id), + counterparty_node_id: prev_counterparty_node_id, outpoint: prev_funding_outpoint, channel_id: prev_channel_id, htlc_id: prev_htlc_id, @@ -7666,7 +7985,9 @@ where // payments are being processed. push_forward_event |= forward_htlcs_empty && decode_update_add_htlcs_empty; entry.insert(vec!(HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo { - prev_short_channel_id, prev_funding_outpoint, prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info }))); + prev_short_channel_id, prev_counterparty_node_id, prev_funding_outpoint, + prev_channel_id, prev_htlc_id, prev_user_channel_id, forward_info + }))); } } } @@ -7868,7 +8189,7 @@ where return Err(MsgHandleErrInternal::send_err_msg_no_close("Got a channel_update for a channel from the wrong node - it shouldn't know about our private channels!".to_owned(), chan_id)); } let were_node_one = self.get_our_node_id().serialize()[..] < chan.context.get_counterparty_node_id().serialize()[..]; - let msg_from_node_one = msg.contents.flags & 1 == 0; + let msg_from_node_one = msg.contents.channel_flags & 1 == 0; if were_node_one == msg_from_node_one { return Ok(NotifyOption::SkipPersistNoEvents); } else { @@ -8143,11 +8464,26 @@ where match phase { ChannelPhase::Funded(chan) => { let msgs = chan.signer_maybe_unblocked(&self.logger); - if let Some(updates) = msgs.commitment_update { - pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs { - node_id, - updates, - }); + let cu_msg = msgs.commitment_update.map(|updates| events::MessageSendEvent::UpdateHTLCs { + node_id, + updates, + }); + let raa_msg = msgs.revoke_and_ack.map(|msg| events::MessageSendEvent::SendRevokeAndACK { + node_id, + msg, + }); + match (cu_msg, raa_msg) { + (Some(cu), Some(raa)) if msgs.order == RAACommitmentOrder::CommitmentFirst => { + pending_msg_events.push(cu); + pending_msg_events.push(raa); + }, + (Some(cu), Some(raa)) if msgs.order == RAACommitmentOrder::RevokeAndACKFirst => { + pending_msg_events.push(raa); + pending_msg_events.push(cu); + }, + (Some(cu), _) => pending_msg_events.push(cu), + (_, Some(raa)) => pending_msg_events.push(raa), + (_, _) => {}, } if let Some(msg) = msgs.funding_signed { pending_msg_events.push(events::MessageSendEvent::SendFundingSigned { @@ -8322,8 +8658,10 @@ macro_rules! create_offer_builder { ($self: ident, $builder: ty) => { let entropy = &*$self.entropy_source; let secp_ctx = &$self.secp_ctx; - let path = $self.create_blinded_path_using_absolute_expiry(absolute_expiry) + let path = $self.create_blinded_paths_using_absolute_expiry(OffersContext::Unknown {}, absolute_expiry) + .and_then(|paths| paths.into_iter().next().ok_or(())) .map_err(|_| Bolt12SemanticError::MissingPaths)?; + let builder = OfferBuilder::deriving_signing_pubkey( node_id, expanded_key, entropy, secp_ctx ) @@ -8394,8 +8732,11 @@ macro_rules! create_refund_builder { ($self: ident, $builder: ty) => { let entropy = &*$self.entropy_source; let secp_ctx = &$self.secp_ctx; - let path = $self.create_blinded_path_using_absolute_expiry(Some(absolute_expiry)) + let context = OffersContext::OutboundPayment { payment_id }; + let path = $self.create_blinded_paths_using_absolute_expiry(context, Some(absolute_expiry)) + .and_then(|paths| paths.into_iter().next().ok_or(())) .map_err(|_| Bolt12SemanticError::MissingPaths)?; + let builder = RefundBuilder::deriving_payer_id( node_id, expanded_key, entropy, secp_ctx, amount_msats, payment_id )? @@ -8416,6 +8757,13 @@ macro_rules! create_refund_builder { ($self: ident, $builder: ty) => { } } } +/// Defines the maximum number of [`OffersMessage`] including different reply paths to be sent +/// along different paths. +/// Sending multiple requests increases the chances of successful delivery in case some +/// paths are unavailable. However, only one invoice for a given [`PaymentId`] will be paid, +/// even if multiple invoices are received. +const OFFERS_MESSAGE_REQUEST_LIMIT: usize = 10; + impl ChannelManager where M::Target: chain::Watch<::EcdsaSigner>, @@ -8517,7 +8865,9 @@ where Some(payer_note) => builder.payer_note(payer_note), }; let invoice_request = builder.build_and_sign()?; - let reply_path = self.create_blinded_path().map_err(|_| Bolt12SemanticError::MissingPaths)?; + + let context = OffersContext::OutboundPayment { payment_id }; + let reply_paths = self.create_blinded_paths(context).map_err(|_| Bolt12SemanticError::MissingPaths)?; let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); @@ -8530,25 +8880,27 @@ where let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap(); if !offer.paths().is_empty() { - // Send as many invoice requests as there are paths in the offer (with an upper bound). - // Using only one path could result in a failure if the path no longer exists. But only - // one invoice for a given payment id will be paid, even if more than one is received. - const REQUEST_LIMIT: usize = 10; - for path in offer.paths().into_iter().take(REQUEST_LIMIT) { + reply_paths + .iter() + .flat_map(|reply_path| offer.paths().iter().map(move |path| (path, reply_path))) + .take(OFFERS_MESSAGE_REQUEST_LIMIT) + .for_each(|(path, reply_path)| { + let message = new_pending_onion_message( + OffersMessage::InvoiceRequest(invoice_request.clone()), + Destination::BlindedPath(path.clone()), + Some(reply_path.clone()), + ); + pending_offers_messages.push(message); + }); + } else if let Some(signing_pubkey) = offer.signing_pubkey() { + for reply_path in reply_paths { let message = new_pending_onion_message( OffersMessage::InvoiceRequest(invoice_request.clone()), - Destination::BlindedPath(path.clone()), - Some(reply_path.clone()), + Destination::Node(signing_pubkey), + Some(reply_path), ); pending_offers_messages.push(message); } - } else if let Some(signing_pubkey) = offer.signing_pubkey() { - let message = new_pending_onion_message( - OffersMessage::InvoiceRequest(invoice_request), - Destination::Node(signing_pubkey), - Some(reply_path), - ); - pending_offers_messages.push(message); } else { debug_assert!(false); return Err(Bolt12SemanticError::MissingSigningPubkey); @@ -8617,26 +8969,32 @@ where )?; let builder: InvoiceBuilder = builder.into(); let invoice = builder.allow_mpp().build_and_sign(secp_ctx)?; - let reply_path = self.create_blinded_path() + let reply_paths = self.create_blinded_paths(OffersContext::Unknown {}) .map_err(|_| Bolt12SemanticError::MissingPaths)?; let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap(); if refund.paths().is_empty() { - let message = new_pending_onion_message( - OffersMessage::Invoice(invoice.clone()), - Destination::Node(refund.payer_id()), - Some(reply_path), - ); - pending_offers_messages.push(message); - } else { - for path in refund.paths() { + for reply_path in reply_paths { let message = new_pending_onion_message( OffersMessage::Invoice(invoice.clone()), - Destination::BlindedPath(path.clone()), - Some(reply_path.clone()), + Destination::Node(refund.payer_id()), + Some(reply_path), ); pending_offers_messages.push(message); } + } else { + reply_paths + .iter() + .flat_map(|reply_path| refund.paths().iter().map(move |path| (path, reply_path))) + .take(OFFERS_MESSAGE_REQUEST_LIMIT) + .for_each(|(path, reply_path)| { + let message = new_pending_onion_message( + OffersMessage::Invoice(invoice.clone()), + Destination::BlindedPath(path.clone()), + Some(reply_path.clone()), + ); + pending_offers_messages.push(message); + }); } Ok(invoice) @@ -8743,22 +9101,22 @@ where inbound_payment::get_payment_preimage(payment_hash, payment_secret, &self.inbound_payment_key) } - /// Creates a blinded path by delegating to [`MessageRouter`] based on the path's intended - /// lifetime. + /// Creates a collection of blinded paths by delegating to [`MessageRouter`] based on + /// the path's intended lifetime. /// /// Whether or not the path is compact depends on whether the path is short-lived or long-lived, /// respectively, based on the given `absolute_expiry` as seconds since the Unix epoch. See /// [`MAX_SHORT_LIVED_RELATIVE_EXPIRY`]. - fn create_blinded_path_using_absolute_expiry( - &self, absolute_expiry: Option - ) -> Result { + fn create_blinded_paths_using_absolute_expiry( + &self, context: OffersContext, absolute_expiry: Option, + ) -> Result, ()> { let now = self.duration_since_epoch(); let max_short_lived_absolute_expiry = now.saturating_add(MAX_SHORT_LIVED_RELATIVE_EXPIRY); if absolute_expiry.unwrap_or(Duration::MAX) <= max_short_lived_absolute_expiry { - self.create_compact_blinded_path() + self.create_compact_blinded_paths(context) } else { - self.create_blinded_path() + self.create_blinded_paths(context) } } @@ -8775,10 +9133,11 @@ where now } - /// Creates a blinded path by delegating to [`MessageRouter::create_blinded_paths`]. + /// Creates a collection of blinded paths by delegating to + /// [`MessageRouter::create_blinded_paths`]. /// - /// Errors if the `MessageRouter` errors or returns an empty `Vec`. - fn create_blinded_path(&self) -> Result { + /// Errors if the `MessageRouter` errors. + fn create_blinded_paths(&self, context: OffersContext) -> Result, ()> { let recipient = self.get_our_node_id(); let secp_ctx = &self.secp_ctx; @@ -8791,14 +9150,15 @@ where .collect::>(); self.router - .create_blinded_paths(recipient, peers, secp_ctx) - .and_then(|paths| paths.into_iter().next().ok_or(())) + .create_blinded_paths(recipient, MessageContext::Offers(context), peers, secp_ctx) + .and_then(|paths| (!paths.is_empty()).then(|| paths).ok_or(())) } - /// Creates a blinded path by delegating to [`MessageRouter::create_compact_blinded_paths`]. + /// Creates a collection of blinded paths by delegating to + /// [`MessageRouter::create_compact_blinded_paths`]. /// - /// Errors if the `MessageRouter` errors or returns an empty `Vec`. - fn create_compact_blinded_path(&self) -> Result { + /// Errors if the `MessageRouter` errors. + fn create_compact_blinded_paths(&self, context: OffersContext) -> Result, ()> { let recipient = self.get_our_node_id(); let secp_ctx = &self.secp_ctx; @@ -8818,8 +9178,8 @@ where .collect::>(); self.router - .create_compact_blinded_paths(recipient, peers, secp_ctx) - .and_then(|paths| paths.into_iter().next().ok_or(())) + .create_compact_blinded_paths(recipient, MessageContext::Offers(context), peers, secp_ctx) + .and_then(|paths| (!paths.is_empty()).then(|| paths).ok_or(())) } /// Creates multi-hop blinded payment paths for the given `amount_msats` by delegating to @@ -8917,7 +9277,7 @@ where #[cfg(any(test, feature = "_test_utils"))] pub fn get_and_clear_pending_events(&self) -> Vec { let events = core::cell::RefCell::new(Vec::new()); - let event_handler = |event: events::Event| events.borrow_mut().push(event); + let event_handler = |event: events::Event| Ok(events.borrow_mut().push(event)); self.process_pending_events(&event_handler); events.into_inner() } @@ -9024,7 +9384,7 @@ where /// using the given event handler. /// /// See the trait-level documentation of [`EventsProvider`] for requirements. - pub async fn process_pending_events_async Future>( + pub async fn process_pending_events_async>, H: Fn(Event) -> Future>( &self, handler: H ) { let mut ev; @@ -9455,6 +9815,7 @@ where htlc_id: htlc.prev_htlc_id, incoming_packet_shared_secret: htlc.forward_info.incoming_shared_secret, phantom_shared_secret: None, + counterparty_node_id: htlc.prev_counterparty_node_id, outpoint: htlc.prev_funding_outpoint, channel_id: htlc.prev_channel_id, blinded_failure: htlc.forward_info.routing.blinded_failure(), @@ -9640,7 +10001,7 @@ where } #[cfg(splicing)] - fn handle_splice(&self, counterparty_node_id: &PublicKey, msg: &msgs::Splice) { + fn handle_splice_init(&self, counterparty_node_id: &PublicKey, msg: &msgs::SpliceInit) { let _: Result<(), _> = handle_error!(self, Err(MsgHandleErrInternal::send_err_msg_no_close( "Splicing not supported".to_owned(), msg.channel_id.clone())), *counterparty_node_id); @@ -9847,7 +10208,7 @@ where // Quiescence &events::MessageSendEvent::SendStfu { .. } => false, // Splicing - &events::MessageSendEvent::SendSplice { .. } => false, + &events::MessageSendEvent::SendSpliceInit { .. } => false, &events::MessageSendEvent::SendSpliceAck { .. } => false, &events::MessageSendEvent::SendSpliceLocked { .. } => false, // Interactive Transaction Construction @@ -10199,10 +10560,17 @@ where R::Target: Router, L::Target: Logger, { - fn handle_message(&self, message: OffersMessage, responder: Option) -> ResponseInstruction { + fn handle_message(&self, message: OffersMessage, context: OffersContext, responder: Option) -> ResponseInstruction { let secp_ctx = &self.secp_ctx; let expanded_key = &self.inbound_payment_key; + let abandon_if_payment = |context| { + match context { + OffersContext::OutboundPayment { payment_id } => self.abandon_payment(payment_id), + _ => {}, + } + }; + match message { OffersMessage::InvoiceRequest(invoice_request) => { let responder = match responder { @@ -10315,17 +10683,38 @@ where }; match result { - Ok(()) => ResponseInstruction::NoResponse, - Err(e) => match responder { - Some(responder) => responder.respond(OffersMessage::InvoiceError(e)), + Ok(_) => ResponseInstruction::NoResponse, + Err(err) => match responder { + Some(responder) => { + abandon_if_payment(context); + responder.respond(OffersMessage::InvoiceError(err)) + }, None => { - log_trace!(self.logger, "No reply path for sending invoice error: {:?}", e); - ResponseInstruction::NoResponse + abandon_if_payment(context); + log_trace!( + self.logger, + "An error response was generated, but there is no reply_path specified \ + for sending the response. Error: {}", + err + ); + return ResponseInstruction::NoResponse; }, }, } }, + #[cfg(async_payments)] + OffersMessage::StaticInvoice(_invoice) => { + match responder { + Some(responder) => { + responder.respond(OffersMessage::InvoiceError( + InvoiceError::from_string("Static invoices not yet supported".to_string()) + )) + }, + None => return ResponseInstruction::NoResponse, + } + }, OffersMessage::InvoiceError(invoice_error) => { + abandon_if_payment(context); log_trace!(self.logger, "Received invoice_error: {}", invoice_error); ResponseInstruction::NoResponse }, @@ -10337,6 +10726,31 @@ where } } +impl +AsyncPaymentsMessageHandler for ChannelManager +where + M::Target: chain::Watch<::EcdsaSigner>, + T::Target: BroadcasterInterface, + ES::Target: EntropySource, + NS::Target: NodeSigner, + SP::Target: SignerProvider, + F::Target: FeeEstimator, + R::Target: Router, + L::Target: Logger, +{ + fn held_htlc_available( + &self, _message: HeldHtlcAvailable, _responder: Option + ) -> ResponseInstruction { + ResponseInstruction::NoResponse + } + + fn release_held_htlc(&self, _message: ReleaseHeldHtlc) {} + + fn release_pending_messages(&self) -> Vec> { + Vec::new() + } +} + impl NodeIdLookUp for ChannelManager where @@ -10452,7 +10866,7 @@ impl_writeable_tlv_based_enum!(PendingHTLCRouting, (4, payment_data, option), // Added in 0.0.116 (5, custom_tlvs, optional_vec), }, -;); +); impl_writeable_tlv_based!(PendingHTLCInfo, { (0, routing, required), @@ -10532,14 +10946,14 @@ impl Readable for HTLCFailureMsg { } } -impl_writeable_tlv_based_enum!(PendingHTLCStatus, ; +impl_writeable_tlv_based_enum_legacy!(PendingHTLCStatus, ; (0, Forward), (1, Fail), ); impl_writeable_tlv_based_enum!(BlindedFailure, (0, FromIntroductionNode) => {}, - (2, FromBlindedNode) => {}, ; + (2, FromBlindedNode) => {}, ); impl_writeable_tlv_based!(HTLCPreviousHopData, { @@ -10553,6 +10967,7 @@ impl_writeable_tlv_based!(HTLCPreviousHopData, { // Note that by the time we get past the required read for type 2 above, outpoint will be // filled in, so we can safely unwrap it here. (9, channel_id, (default_value, ChannelId::v1_from_funding_outpoint(outpoint.0.unwrap()))), + (11, counterparty_node_id, option), }); impl Writeable for ClaimableHTLC { @@ -10709,6 +11124,7 @@ impl_writeable_tlv_based!(PendingAddHTLCInfo, { // Note that by the time we get past the required read for type 6 above, prev_funding_outpoint will be // filled in, so we can safely unwrap it here. (7, prev_channel_id, (default_value, ChannelId::v1_from_funding_outpoint(prev_funding_outpoint.0.unwrap()))), + (9, prev_counterparty_node_id, option), }); impl Writeable for HTLCForwardInfo { @@ -11986,7 +12402,12 @@ where for action in actions.iter() { if let MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel { downstream_counterparty_and_funding_outpoint: - Some((blocked_node_id, _blocked_channel_outpoint, blocked_channel_id, blocking_action)), .. + Some(EventUnblockedChannel { + counterparty_node_id: blocked_node_id, + funding_txo: _, + channel_id: blocked_channel_id, + blocking_action, + }), .. } = action { if let Some(blocked_peer_state) = per_peer_state.get(blocked_node_id) { log_trace!(logger, @@ -12173,8 +12594,8 @@ mod tests { // update message and would always update the local fee info, even if our peer was // (spuriously) forwarding us our own channel_update. let as_node_one = nodes[0].node.get_our_node_id().serialize()[..] < nodes[1].node.get_our_node_id().serialize()[..]; - let as_update = if as_node_one == (chan.0.contents.flags & 1 == 0 /* chan.0 is from node one */) { &chan.0 } else { &chan.1 }; - let bs_update = if as_node_one == (chan.0.contents.flags & 1 == 0 /* chan.0 is from node one */) { &chan.1 } else { &chan.0 }; + let as_update = if as_node_one == (chan.0.contents.channel_flags & 1 == 0 /* chan.0 is from node one */) { &chan.0 } else { &chan.1 }; + let bs_update = if as_node_one == (chan.0.contents.channel_flags & 1 == 0 /* chan.0 is from node one */) { &chan.1 } else { &chan.0 }; // First deliver each peers' own message, checking that the node doesn't need to be // persisted and that its channel info remains the same.