Merge pull request #3125 from valentinewallace/2024-06-async-payments-prefactor
authorvalentinewallace <valentinewallace@users.noreply.github.com>
Mon, 24 Jun 2024 16:06:17 +0000 (12:06 -0400)
committerGitHub <noreply@github.com>
Mon, 24 Jun 2024 16:06:17 +0000 (12:06 -0400)
Async payments message encoding and prefactor

1  2 
lightning/src/ln/channelmanager.rs
lightning/src/ln/functional_test_utils.rs
lightning/src/ln/offers_tests.rs
lightning/src/onion_message/functional_tests.rs
lightning/src/onion_message/messenger.rs

index c2dcf31f2aea5fc5818888f2de0c98a772b3b133,fe3320a136029fa4a4504ccf906c1f8a2642150e..6ba7396ebfe04262904640c6b041d6969e95195f
@@@ -4031,8 -4031,8 +4031,8 @@@ wher
                self.pending_outbound_payments
                        .send_payment_for_bolt12_invoice(
                                invoice, payment_id, &self.router, self.list_usable_channels(),
 -                              || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer,
 -                              best_block_height, &self.logger, &self.pending_events,
 +                              || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, &self,
 +                              &self.secp_ctx, best_block_height, &self.logger, &self.pending_events,
                                |args| self.send_payment_along_path(args)
                        )
        }
                                if short_chan_id != 0 {
                                        let mut forwarding_counterparty = None;
                                        macro_rules! forwarding_channel_not_found {
 -                                              () => {
 -                                                      for forward_info in pending_forwards.drain(..) {
 +                                              ($forward_infos: expr) => {
 +                                                      for forward_info in $forward_infos {
                                                                match forward_info {
                                                                        HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
                                                                                prev_short_channel_id, prev_htlc_id, prev_channel_id, prev_funding_outpoint,
                                        let (counterparty_node_id, forward_chan_id) = match chan_info_opt {
                                                Some((cp_id, chan_id)) => (cp_id, chan_id),
                                                None => {
 -                                                      forwarding_channel_not_found!();
 +                                                      forwarding_channel_not_found!(pending_forwards.drain(..));
                                                        continue;
                                                }
                                        };
                                        let per_peer_state = self.per_peer_state.read().unwrap();
                                        let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
                                        if peer_state_mutex_opt.is_none() {
 -                                              forwarding_channel_not_found!();
 +                                              forwarding_channel_not_found!(pending_forwards.drain(..));
                                                continue;
                                        }
                                        let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
                                        let peer_state = &mut *peer_state_lock;
 -                                      if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) {
 -                                              let logger = WithChannelContext::from(&self.logger, &chan.context, None);
 -                                              for forward_info in pending_forwards.drain(..) {
 -                                                      let queue_fail_htlc_res = match forward_info {
 -                                                              HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
 -                                                                      prev_short_channel_id, prev_htlc_id, prev_channel_id, prev_funding_outpoint,
 -                                                                      prev_user_channel_id, forward_info: PendingHTLCInfo {
 -                                                                              incoming_shared_secret, payment_hash, outgoing_amt_msat, outgoing_cltv_value,
 -                                                                              routing: PendingHTLCRouting::Forward {
 -                                                                                      onion_packet, blinded, ..
 -                                                                              }, skimmed_fee_msat, ..
 +                                      let mut draining_pending_forwards = pending_forwards.drain(..);
 +                                      while let Some(forward_info) = draining_pending_forwards.next() {
 +                                              let queue_fail_htlc_res = match forward_info {
 +                                                      HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
 +                                                              prev_short_channel_id, prev_htlc_id, prev_channel_id, prev_funding_outpoint,
 +                                                              prev_user_channel_id, forward_info: PendingHTLCInfo {
 +                                                                      incoming_shared_secret, payment_hash, outgoing_amt_msat, outgoing_cltv_value,
 +                                                                      routing: PendingHTLCRouting::Forward {
 +                                                                              ref onion_packet, blinded, ..
 +                                                                      }, skimmed_fee_msat, ..
 +                                                              },
 +                                                      }) => {
 +                                                              let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData {
 +                                                                      short_channel_id: prev_short_channel_id,
 +                                                                      user_channel_id: Some(prev_user_channel_id),
 +                                                                      channel_id: prev_channel_id,
 +                                                                      outpoint: prev_funding_outpoint,
 +                                                                      htlc_id: prev_htlc_id,
 +                                                                      incoming_packet_shared_secret: incoming_shared_secret,
 +                                                                      // Phantom payments are only PendingHTLCRouting::Receive.
 +                                                                      phantom_shared_secret: None,
 +                                                                      blinded_failure: blinded.map(|b| b.failure),
 +                                                              });
 +                                                              let next_blinding_point = blinded.and_then(|b| {
 +                                                                      let encrypted_tlvs_ss = self.node_signer.ecdh(
 +                                                                              Recipient::Node, &b.inbound_blinding_point, None
 +                                                                      ).unwrap().secret_bytes();
 +                                                                      onion_utils::next_hop_pubkey(
 +                                                                              &self.secp_ctx, b.inbound_blinding_point, &encrypted_tlvs_ss
 +                                                                      ).ok()
 +                                                              });
 +
 +                                                              // Forward the HTLC over the most appropriate channel with the corresponding peer,
 +                                                              // applying non-strict forwarding.
 +                                                              // The channel with the least amount of outbound liquidity will be used to maximize the
 +                                                              // probability of being able to successfully forward a subsequent HTLC.
 +                                                              let maybe_optimal_channel = peer_state.channel_by_id.values_mut().filter_map(|phase| match phase {
 +                                                                      ChannelPhase::Funded(chan) => {
 +                                                                              let balances = chan.context.get_available_balances(&self.fee_estimator);
 +                                                                              if outgoing_amt_msat <= balances.next_outbound_htlc_limit_msat &&
 +                                                                                      outgoing_amt_msat >= balances.next_outbound_htlc_minimum_msat &&
 +                                                                                      chan.context.is_usable() {
 +                                                                                      Some((chan, balances))
 +                                                                              } else {
 +                                                                                      None
 +                                                                              }
                                                                        },
 -                                                              }) => {
 -                                                                      let logger = WithChannelContext::from(&self.logger, &chan.context, Some(payment_hash));
 -                                                                      log_trace!(logger, "Adding HTLC from short id {} with payment_hash {} to channel with short id {} after delay", prev_short_channel_id, &payment_hash, short_chan_id);
 -                                                                      let htlc_source = HTLCSource::PreviousHopData(HTLCPreviousHopData {
 -                                                                              short_channel_id: prev_short_channel_id,
 -                                                                              user_channel_id: Some(prev_user_channel_id),
 -                                                                              channel_id: prev_channel_id,
 -                                                                              outpoint: prev_funding_outpoint,
 -                                                                              htlc_id: prev_htlc_id,
 -                                                                              incoming_packet_shared_secret: incoming_shared_secret,
 -                                                                              // Phantom payments are only PendingHTLCRouting::Receive.
 -                                                                              phantom_shared_secret: None,
 -                                                                              blinded_failure: blinded.map(|b| b.failure),
 -                                                                      });
 -                                                                      let next_blinding_point = blinded.and_then(|b| {
 -                                                                              let encrypted_tlvs_ss = self.node_signer.ecdh(
 -                                                                                      Recipient::Node, &b.inbound_blinding_point, None
 -                                                                              ).unwrap().secret_bytes();
 -                                                                              onion_utils::next_hop_pubkey(
 -                                                                                      &self.secp_ctx, b.inbound_blinding_point, &encrypted_tlvs_ss
 -                                                                              ).ok()
 -                                                                      });
 -                                                                      if let Err(e) = chan.queue_add_htlc(outgoing_amt_msat,
 -                                                                              payment_hash, outgoing_cltv_value, htlc_source.clone(),
 -                                                                              onion_packet, skimmed_fee_msat, next_blinding_point, &self.fee_estimator,
 -                                                                              &&logger)
 -                                                                      {
 -                                                                              if let ChannelError::Ignore(msg) = e {
 -                                                                                      log_trace!(logger, "Failed to forward HTLC with payment_hash {}: {}", &payment_hash, msg);
 +                                                                      _ => None,
 +                                                              }).min_by_key(|(_, balances)| balances.next_outbound_htlc_limit_msat).map(|(c, _)| c);
 +                                                              let optimal_channel = match maybe_optimal_channel {
 +                                                                      Some(chan) => chan,
 +                                                                      None => {
 +                                                                              // Fall back to the specified channel to return an appropriate error.
 +                                                                              if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) {
 +                                                                                      chan
                                                                                } else {
 -                                                                                      panic!("Stated return value requirements in send_htlc() were not met");
 +                                                                                      forwarding_channel_not_found!(core::iter::once(forward_info).chain(draining_pending_forwards));
 +                                                                                      break;
                                                                                }
 +                                                                      }
 +                                                              };
 +
 +                                                              let logger = WithChannelContext::from(&self.logger, &optimal_channel.context, Some(payment_hash));
 +                                                              let channel_description = if optimal_channel.context.get_short_channel_id() == Some(short_chan_id) {
 +                                                                      "specified"
 +                                                              } else {
 +                                                                      "alternate"
 +                                                              };
 +                                                              log_trace!(logger, "Forwarding HTLC from SCID {} with payment_hash {} and next hop SCID {} over {} channel {} with corresponding peer {}",
 +                                                                      prev_short_channel_id, &payment_hash, short_chan_id, channel_description, optimal_channel.context.channel_id(), &counterparty_node_id);
 +                                                              if let Err(e) = optimal_channel.queue_add_htlc(outgoing_amt_msat,
 +                                                                              payment_hash, outgoing_cltv_value, htlc_source.clone(),
 +                                                                              onion_packet.clone(), skimmed_fee_msat, next_blinding_point, &self.fee_estimator,
 +                                                                              &&logger)
 +                                                              {
 +                                                                      if let ChannelError::Ignore(msg) = e {
 +                                                                              log_trace!(logger, "Failed to forward HTLC with payment_hash {} to peer {}: {}", &payment_hash, &counterparty_node_id, msg);
 +                                                                      } else {
 +                                                                              panic!("Stated return value requirements in send_htlc() were not met");
 +                                                                      }
 +
 +                                                                      if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) {
                                                                                let (failure_code, data) = self.get_htlc_temp_fail_err_and_data(0x1000|7, short_chan_id, chan);
                                                                                failed_forwards.push((htlc_source, payment_hash,
                                                                                        HTLCFailReason::reason(failure_code, data),
                                                                                        HTLCDestination::NextHopChannel { node_id: Some(chan.context.get_counterparty_node_id()), channel_id: forward_chan_id }
                                                                                ));
 -                                                                              continue;
 +                                                                      } else {
 +                                                                              forwarding_channel_not_found!(core::iter::once(forward_info).chain(draining_pending_forwards));
 +                                                                              break;
                                                                        }
 -                                                                      None
 -                                                              },
 -                                                              HTLCForwardInfo::AddHTLC { .. } => {
 -                                                                      panic!("short_channel_id != 0 should imply any pending_forward entries are of type Forward");
 -                                                              },
 -                                                              HTLCForwardInfo::FailHTLC { htlc_id, err_packet } => {
 +                                                              }
 +                                                              None
 +                                                      },
 +                                                      HTLCForwardInfo::AddHTLC { .. } => {
 +                                                              panic!("short_channel_id != 0 should imply any pending_forward entries are of type Forward");
 +                                                      },
 +                                                      HTLCForwardInfo::FailHTLC { htlc_id, ref err_packet } => {
 +                                                              if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) {
 +                                                                      let logger = WithChannelContext::from(&self.logger, &chan.context, None);
                                                                        log_trace!(logger, "Failing HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id);
 -                                                                      Some((chan.queue_fail_htlc(htlc_id, err_packet, &&logger), htlc_id))
 -                                                              },
 -                                                              HTLCForwardInfo::FailMalformedHTLC { htlc_id, failure_code, sha256_of_onion } => {
 +                                                                      Some((chan.queue_fail_htlc(htlc_id, err_packet.clone(), &&logger), htlc_id))
 +                                                              } else {
 +                                                                      forwarding_channel_not_found!(core::iter::once(forward_info).chain(draining_pending_forwards));
 +                                                                      break;
 +                                                              }
 +                                                      },
 +                                                      HTLCForwardInfo::FailMalformedHTLC { htlc_id, failure_code, sha256_of_onion } => {
 +                                                              if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) {
 +                                                                      let logger = WithChannelContext::from(&self.logger, &chan.context, None);
                                                                        log_trace!(logger, "Failing malformed HTLC back to channel with short id {} (backward HTLC ID {}) after delay", short_chan_id, htlc_id);
                                                                        let res = chan.queue_fail_malformed_htlc(
                                                                                htlc_id, failure_code, sha256_of_onion, &&logger
                                                                        );
                                                                        Some((res, htlc_id))
 -                                                              },
 -                                                      };
 -                                                      if let Some((queue_fail_htlc_res, htlc_id)) = queue_fail_htlc_res {
 -                                                              if let Err(e) = queue_fail_htlc_res {
 -                                                                      if let ChannelError::Ignore(msg) = e {
 +                                                              } else {
 +                                                                      forwarding_channel_not_found!(core::iter::once(forward_info).chain(draining_pending_forwards));
 +                                                                      break;
 +                                                              }
 +                                                      },
 +                                              };
 +                                              if let Some((queue_fail_htlc_res, htlc_id)) = queue_fail_htlc_res {
 +                                                      if let Err(e) = queue_fail_htlc_res {
 +                                                              if let ChannelError::Ignore(msg) = e {
 +                                                                      if let Some(ChannelPhase::Funded(ref mut chan)) = peer_state.channel_by_id.get_mut(&forward_chan_id) {
 +                                                                              let logger = WithChannelContext::from(&self.logger, &chan.context, None);
                                                                                log_trace!(logger, "Failed to fail HTLC with ID {} backwards to short_id {}: {}", htlc_id, short_chan_id, msg);
 -                                                                      } else {
 -                                                                              panic!("Stated return value requirements in queue_fail_{{malformed_}}htlc() were not met");
                                                                        }
 -                                                                      // fail-backs are best-effort, we probably already have one
 -                                                                      // pending, and if not that's OK, if not, the channel is on
 -                                                                      // the chain and sending the HTLC-Timeout is their problem.
 -                                                                      continue;
 +                                                              } else {
 +                                                                      panic!("Stated return value requirements in queue_fail_{{malformed_}}htlc() were not met");
                                                                }
 +                                                              // fail-backs are best-effort, we probably already have one
 +                                                              // pending, and if not that's OK, if not, the channel is on
 +                                                              // the chain and sending the HTLC-Timeout is their problem.
 +                                                              continue;
                                                        }
                                                }
 -                                      } else {
 -                                              forwarding_channel_not_found!();
 -                                              continue;
                                        }
                                } else {
                                        'next_forwardable_htlc: for forward_info in pending_forwards.drain(..) {
                }
                if valid_mpp {
                        for htlc in sources.drain(..) {
 -                              let prev_hop_chan_id = htlc.prev_hop.channel_id;
 -                              if let Err((pk, err)) = self.claim_funds_from_hop(
 +                              self.claim_funds_from_hop(
                                        htlc.prev_hop, payment_preimage,
                                        |_, definitely_duplicate| {
                                                debug_assert!(!definitely_duplicate, "We shouldn't claim duplicatively from a payment");
                                                Some(MonitorUpdateCompletionAction::PaymentClaimed { payment_hash })
                                        }
 -                              ) {
 -                                      if let msgs::ErrorAction::IgnoreError = err.err.action {
 -                                              // We got a temporary failure updating monitor, but will claim the
 -                                              // HTLC when the monitor updating is restored (or on chain).
 -                                              let logger = WithContext::from(&self.logger, None, Some(prev_hop_chan_id), Some(payment_hash));
 -                                              log_error!(logger, "Temporary failure claiming HTLC, treating as success: {}", err.err.err);
 -                                      } else { errs.push((pk, err)); }
 -                              }
 +                              );
                        }
                }
                if !valid_mpp {
                }
        }
  
 -      fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>, bool) -> Option<MonitorUpdateCompletionAction>>(&self,
 -              prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage, completion_action: ComplFunc)
 -      -> Result<(), (PublicKey, MsgHandleErrInternal)> {
 +      fn claim_funds_from_hop<ComplFunc: FnOnce(Option<u64>, bool) -> Option<MonitorUpdateCompletionAction>>(
 +              &self, prev_hop: HTLCPreviousHopData, payment_preimage: PaymentPreimage,
 +              completion_action: ComplFunc,
 +      ) {
                //TODO: Delay the claimed_funds relaying just like we do outbound relay!
  
                // If we haven't yet run background events assume we're still deserializing and shouldn't
                                                                let action = if let Some(action) = completion_action(None, true) {
                                                                        action
                                                                } else {
 -                                                                      return Ok(());
 +                                                                      return;
                                                                };
                                                                mem::drop(peer_state_lock);
  
                                                                } else {
                                                                        debug_assert!(false,
                                                                                "Duplicate claims should always free another channel immediately");
 -                                                                      return Ok(());
 +                                                                      return;
                                                                };
                                                                if let Some(peer_state_mtx) = per_peer_state.get(&node_id) {
                                                                        let mut peer_state = peer_state_mtx.lock().unwrap();
                                                        }
                                                }
                                        }
 -                                      return Ok(());
 +                                      return;
                                }
                        }
                }
                // generally always allowed to be duplicative (and it's specifically noted in
                // `PaymentForwarded`).
                self.handle_monitor_update_completion_actions(completion_action(None, false));
 -              Ok(())
        }
  
        fn finalize_claims(&self, sources: Vec<HTLCSource>) {
                                let completed_blocker = RAAMonitorUpdateBlockingAction::from_prev_hop_data(&hop_data);
                                #[cfg(debug_assertions)]
                                let claiming_chan_funding_outpoint = hop_data.outpoint;
 -                              let res = self.claim_funds_from_hop(hop_data, payment_preimage,
 +                              self.claim_funds_from_hop(hop_data, payment_preimage,
                                        |htlc_claim_value_msat, definitely_duplicate| {
                                                let chan_to_release =
                                                        if let Some(node_id) = next_channel_counterparty_node_id {
                                                        })
                                                }
                                        });
 -                              if let Err((pk, err)) = res {
 -                                      let result: Result<(), _> = Err(err);
 -                                      let _ = handle_error!(self, result, pk);
 -                              }
                        },
                }
        }
                                        },
                                }
                        },
+                       #[cfg(async_payments)]
+                       OffersMessage::StaticInvoice(_invoice) => {
+                               match responder {
+                                       Some(responder) => {
+                                               responder.respond(OffersMessage::InvoiceError(
+                                                               InvoiceError::from_string("Static invoices not yet supported".to_string())
+                                               ))
+                                       },
+                                       None => return ResponseInstruction::NoResponse,
+                               }
+                       },
                        OffersMessage::InvoiceError(invoice_error) => {
                                log_trace!(self.logger, "Received invoice_error: {}", invoice_error);
                                ResponseInstruction::NoResponse
index 8c08d37f5f6193e2d1fc932a90e547f9e5593d90,079f6345fc99bbc8a77a53f1a411db89f43e5982..00168fdfb0ca0252c3c54117ad6a05533cbdd794
@@@ -31,8 -31,6 +31,8 @@@ use crate::util::errors::APIError
  use crate::util::logger::Logger;
  use crate::util::scid_utils;
  use crate::util::test_channel_signer::TestChannelSigner;
 +#[cfg(test)]
 +use crate::util::test_channel_signer::SignerOp;
  use crate::util::test_utils;
  use crate::util::test_utils::{panicking, TestChainMonitor, TestScorer, TestKeysInterface};
  use crate::util::ser::{ReadableArgs, Writeable};
@@@ -423,6 -421,7 +423,7 @@@ type TestOnionMessenger<'chan_man, 'nod
        &'node_cfg test_utils::TestMessageRouter<'chan_mon_cfg>,
        &'chan_man TestChannelManager<'node_cfg, 'chan_mon_cfg>,
        IgnoringMessageHandler,
+       IgnoringMessageHandler,
  >;
  
  /// For use with [`OnionMessenger`] otherwise `test_restored_packages_retry` will fail. This is
@@@ -484,74 -483,46 +485,74 @@@ impl<'a, 'b, 'c> Node<'a, 'b, 'c> 
        pub fn get_block_header(&self, height: u32) -> Header {
                self.blocks.lock().unwrap()[height as usize].0.header
        }
 -      /// Changes the channel signer's availability for the specified peer and channel.
 +
 +      /// Toggles this node's signer to be available for the given signer operation.
 +      /// This is useful for testing behavior for restoring an async signer that previously
 +      /// could not return a signature immediately.
 +      #[cfg(test)]
 +      pub fn enable_channel_signer_op(&self, peer_id: &PublicKey, chan_id: &ChannelId, signer_op: SignerOp) {
 +              self.set_channel_signer_ops(peer_id, chan_id, signer_op, true);
 +      }
 +
 +      /// Toggles this node's signer to be unavailable, returning `Err` for the given signer operation.
 +      /// This is useful for testing behavior for an async signer that cannot return a signature
 +      /// immediately.
 +      #[cfg(test)]
 +      pub fn disable_channel_signer_op(&self, peer_id: &PublicKey, chan_id: &ChannelId, signer_op: SignerOp) {
 +              self.set_channel_signer_ops(peer_id, chan_id, signer_op, false);
 +      }
 +
 +      /// Changes the channel signer's availability for the specified peer, channel, and signer
 +      /// operation.
        ///
 -      /// When `available` is set to `true`, the channel signer will behave normally. When set to
 -      /// `false`, the channel signer will act like an off-line remote signer and will return `Err` for
 -      /// several of the signing methods. Currently, only `get_per_commitment_point` and
 -      /// `release_commitment_secret` are affected by this setting.
 +      /// For the specified signer operation, when `available` is set to `true`, the channel signer
 +      /// will behave normally, returning `Ok`. When set to `false`, and the channel signer will
 +      /// act like an off-line remote signer, returning `Err`. This applies to the signer in all
 +      /// relevant places, i.e. the channel manager, chain monitor, and the keys manager.
        #[cfg(test)]
 -      pub fn set_channel_signer_available(&self, peer_id: &PublicKey, chan_id: &ChannelId, available: bool) {
 +      fn set_channel_signer_ops(&self, peer_id: &PublicKey, chan_id: &ChannelId, signer_op: SignerOp, available: bool) {
                use crate::sign::ChannelSigner;
                log_debug!(self.logger, "Setting channel signer for {} as available={}", chan_id, available);
  
                let per_peer_state = self.node.per_peer_state.read().unwrap();
 -              let chan_lock = per_peer_state.get(peer_id).unwrap().lock().unwrap();
 +              let mut chan_lock = per_peer_state.get(peer_id).unwrap().lock().unwrap();
  
                let mut channel_keys_id = None;
 -              if let Some(chan) = chan_lock.channel_by_id.get(chan_id).map(|phase| phase.context()) {
 -                      chan.get_signer().as_ecdsa().unwrap().set_available(available);
 +              if let Some(chan) = chan_lock.channel_by_id.get_mut(chan_id).map(|phase| phase.context_mut()) {
 +                      let signer = chan.get_mut_signer().as_mut_ecdsa().unwrap();
 +                      if available {
 +                              signer.enable_op(signer_op);
 +                      } else {
 +                              signer.disable_op(signer_op);
 +                      }
                        channel_keys_id = Some(chan.channel_keys_id);
                }
  
 -              let mut monitor = None;
 -              for (funding_txo, channel_id) in self.chain_monitor.chain_monitor.list_monitors() {
 -                      if *chan_id == channel_id {
 -                              monitor = self.chain_monitor.chain_monitor.get_monitor(funding_txo).ok();
 -                      }
 -              }
 +              let monitor = self.chain_monitor.chain_monitor.list_monitors().into_iter()
 +                      .find(|(_, channel_id)| *channel_id == *chan_id)
 +                      .and_then(|(funding_txo, _)| self.chain_monitor.chain_monitor.get_monitor(funding_txo).ok());
                if let Some(monitor) = monitor {
 -                      monitor.do_signer_call(|signer| {
 +                      monitor.do_mut_signer_call(|signer| {
                                channel_keys_id = channel_keys_id.or(Some(signer.inner.channel_keys_id()));
 -                              signer.set_available(available)
 +                              if available {
 +                                      signer.enable_op(signer_op);
 +                              } else {
 +                                      signer.disable_op(signer_op);
 +                              }
                        });
                }
  
 +              let channel_keys_id = channel_keys_id.unwrap();
 +              let mut unavailable_signers_ops = self.keys_manager.unavailable_signers_ops.lock().unwrap();
 +              let entry = unavailable_signers_ops.entry(channel_keys_id).or_insert(new_hash_set());
                if available {
 -                      self.keys_manager.unavailable_signers.lock().unwrap()
 -                              .remove(channel_keys_id.as_ref().unwrap());
 +                      entry.remove(&signer_op);
 +                      if entry.is_empty() {
 +                              unavailable_signers_ops.remove(&channel_keys_id);
 +                      }
                } else {
 -                      self.keys_manager.unavailable_signers.lock().unwrap()
 -                              .insert(channel_keys_id.unwrap());
 -              }
 +                      entry.insert(signer_op);
 +              };
        }
  }
  
@@@ -3258,7 -3229,7 +3259,7 @@@ pub fn create_network<'a, 'b: 'a, 'c: '
                let dedicated_entropy = DedicatedEntropy(RandomBytes::new([i as u8; 32]));
                let onion_messenger = OnionMessenger::new(
                        dedicated_entropy, cfgs[i].keys_manager, cfgs[i].logger, &chan_mgrs[i],
-                       &cfgs[i].message_router, &chan_mgrs[i], IgnoringMessageHandler {},
+                       &cfgs[i].message_router, &chan_mgrs[i], IgnoringMessageHandler {}, IgnoringMessageHandler {},
                );
                let gossip_sync = P2PGossipSync::new(cfgs[i].network_graph.as_ref(), None, cfgs[i].logger);
                let wallet_source = Arc::new(test_utils::TestWalletSource::new(SecretKey::from_slice(&[i as u8 + 1; 32]).unwrap()));
index 52375f723c75ec216241468456a43f5f63a4c69f,e174dfc1cb4d3ef57ea0888cc6b7c4d97d86c773..405ab87be3f11fb25c99a13960d854e6253f2cbe
@@@ -192,8 -192,12 +192,12 @@@ fn extract_invoice_request<'a, 'b, 'c>
                        ParsedOnionMessageContents::Offers(offers_message) => match offers_message {
                                OffersMessage::InvoiceRequest(invoice_request) => (invoice_request, reply_path.unwrap()),
                                OffersMessage::Invoice(invoice) => panic!("Unexpected invoice: {:?}", invoice),
+                               #[cfg(async_payments)]
+                               OffersMessage::StaticInvoice(invoice) => panic!("Unexpected static invoice: {:?}", invoice),
                                OffersMessage::InvoiceError(error) => panic!("Unexpected invoice_error: {:?}", error),
                        },
+                       #[cfg(async_payments)]
+                       ParsedOnionMessageContents::AsyncPayments(message) => panic!("Unexpected async payments message: {:?}", message),
                        ParsedOnionMessageContents::Custom(message) => panic!("Unexpected custom message: {:?}", message),
                },
                Ok(PeeledOnion::Forward(_, _)) => panic!("Unexpected onion message forward"),
@@@ -207,8 -211,12 +211,12 @@@ fn extract_invoice<'a, 'b, 'c>(node: &N
                        ParsedOnionMessageContents::Offers(offers_message) => match offers_message {
                                OffersMessage::InvoiceRequest(invoice_request) => panic!("Unexpected invoice_request: {:?}", invoice_request),
                                OffersMessage::Invoice(invoice) => invoice,
+                               #[cfg(async_payments)]
+                               OffersMessage::StaticInvoice(invoice) => panic!("Unexpected static invoice: {:?}", invoice),
                                OffersMessage::InvoiceError(error) => panic!("Unexpected invoice_error: {:?}", error),
                        },
+                       #[cfg(async_payments)]
+                       ParsedOnionMessageContents::AsyncPayments(message) => panic!("Unexpected async payments message: {:?}", message),
                        ParsedOnionMessageContents::Custom(message) => panic!("Unexpected custom message: {:?}", message),
                },
                Ok(PeeledOnion::Forward(_, _)) => panic!("Unexpected onion message forward"),
@@@ -224,8 -232,12 +232,12 @@@ fn extract_invoice_error<'a, 'b, 'c>
                        ParsedOnionMessageContents::Offers(offers_message) => match offers_message {
                                OffersMessage::InvoiceRequest(invoice_request) => panic!("Unexpected invoice_request: {:?}", invoice_request),
                                OffersMessage::Invoice(invoice) => panic!("Unexpected invoice: {:?}", invoice),
+                               #[cfg(async_payments)]
+                               OffersMessage::StaticInvoice(invoice) => panic!("Unexpected invoice: {:?}", invoice),
                                OffersMessage::InvoiceError(error) => error,
                        },
+                       #[cfg(async_payments)]
+                       ParsedOnionMessageContents::AsyncPayments(message) => panic!("Unexpected async payments message: {:?}", message),
                        ParsedOnionMessageContents::Custom(message) => panic!("Unexpected custom message: {:?}", message),
                },
                Ok(PeeledOnion::Forward(_, _)) => panic!("Unexpected onion message forward"),
@@@ -950,12 -962,9 +962,12 @@@ fn pays_bolt12_invoice_asynchronously(
        );
  }
  
 -/// Fails creating an offer when a blinded path cannot be created without exposing the node's id.
 +/// Checks that an offer can be created using an unannounced node as a blinded path's introduction
 +/// node. This is only preferred if there are no other options which may indicated either the offer
 +/// is intended for the unannounced node or that the node is actually announced (e.g., an LSP) but
 +/// the recipient doesn't have a network graph.
  #[test]
 -fn fails_creating_offer_without_blinded_paths() {
 +fn creates_offer_with_blinded_path_using_unannounced_introduction_node() {
        let chanmon_cfgs = create_chanmon_cfgs(2);
        let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
        let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
  
        create_unannounced_chan_between_nodes_with_value(&nodes, 0, 1, 10_000_000, 1_000_000_000);
  
 -      match nodes[0].node.create_offer_builder(None) {
 -              Ok(_) => panic!("Expected error"),
 -              Err(e) => assert_eq!(e, Bolt12SemanticError::MissingPaths),
 +      let alice = &nodes[0];
 +      let alice_id = alice.node.get_our_node_id();
 +      let bob = &nodes[1];
 +      let bob_id = bob.node.get_our_node_id();
 +
 +      let offer = alice.node
 +              .create_offer_builder(None).unwrap()
 +              .amount_msats(10_000_000)
 +              .build().unwrap();
 +      assert_ne!(offer.signing_pubkey(), Some(alice_id));
 +      assert!(!offer.paths().is_empty());
 +      for path in offer.paths() {
 +              assert_eq!(path.introduction_node, IntroductionNode::NodeId(bob_id));
        }
 +
 +      let payment_id = PaymentId([1; 32]);
 +      bob.node.pay_for_offer(&offer, None, None, None, payment_id, Retry::Attempts(0), None).unwrap();
 +      expect_recent_payment!(bob, RecentPaymentDetails::AwaitingInvoice, payment_id);
 +
 +      let onion_message = bob.onion_messenger.next_onion_message_for_peer(alice_id).unwrap();
 +      alice.onion_messenger.handle_onion_message(&bob_id, &onion_message);
 +
 +      let (invoice_request, reply_path) = extract_invoice_request(alice, &onion_message);
 +      let payment_context = PaymentContext::Bolt12Offer(Bolt12OfferContext {
 +              offer_id: offer.id(),
 +              invoice_request: InvoiceRequestFields {
 +                      payer_id: invoice_request.payer_id(),
 +                      quantity: None,
 +                      payer_note_truncated: None,
 +              },
 +      });
 +      assert_ne!(invoice_request.payer_id(), bob_id);
 +      assert_eq!(reply_path.introduction_node, IntroductionNode::NodeId(alice_id));
 +
 +      let onion_message = alice.onion_messenger.next_onion_message_for_peer(bob_id).unwrap();
 +      bob.onion_messenger.handle_onion_message(&alice_id, &onion_message);
 +
 +      let invoice = extract_invoice(bob, &onion_message);
 +      assert_ne!(invoice.signing_pubkey(), alice_id);
 +      assert!(!invoice.payment_paths().is_empty());
 +      for (_, path) in invoice.payment_paths() {
 +              assert_eq!(path.introduction_node, IntroductionNode::NodeId(bob_id));
 +      }
 +
 +      route_bolt12_payment(bob, &[alice], &invoice);
 +      expect_recent_payment!(bob, RecentPaymentDetails::Pending, payment_id);
 +
 +      claim_bolt12_payment(bob, &[alice], payment_context);
 +      expect_recent_payment!(bob, RecentPaymentDetails::Fulfilled, payment_id);
  }
  
 -/// Fails creating a refund when a blinded path cannot be created without exposing the node's id.
 +/// Checks that a refund can be created using an unannounced node as a blinded path's introduction
 +/// node. This is only preferred if there are no other options which may indicated either the refund
 +/// is intended for the unannounced node or that the node is actually announced (e.g., an LSP) but
 +/// the sender doesn't have a network graph.
  #[test]
 -fn fails_creating_refund_without_blinded_paths() {
 +fn creates_refund_with_blinded_path_using_unannounced_introduction_node() {
        let chanmon_cfgs = create_chanmon_cfgs(2);
        let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
        let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
  
        create_unannounced_chan_between_nodes_with_value(&nodes, 0, 1, 10_000_000, 1_000_000_000);
  
 +      let alice = &nodes[0];
 +      let alice_id = alice.node.get_our_node_id();
 +      let bob = &nodes[1];
 +      let bob_id = bob.node.get_our_node_id();
 +
        let absolute_expiry = Duration::from_secs(u64::MAX);
        let payment_id = PaymentId([1; 32]);
 -
 -      match nodes[0].node.create_refund_builder(
 -              10_000, absolute_expiry, payment_id, Retry::Attempts(0), None
 -      ) {
 -              Ok(_) => panic!("Expected error"),
 -              Err(e) => assert_eq!(e, Bolt12SemanticError::MissingPaths),
 +      let refund = bob.node
 +              .create_refund_builder(10_000_000, absolute_expiry, payment_id, Retry::Attempts(0), None)
 +              .unwrap()
 +              .build().unwrap();
 +      assert_ne!(refund.payer_id(), bob_id);
 +      assert!(!refund.paths().is_empty());
 +      for path in refund.paths() {
 +              assert_eq!(path.introduction_node, IntroductionNode::NodeId(alice_id));
        }
 +      expect_recent_payment!(bob, RecentPaymentDetails::AwaitingInvoice, payment_id);
  
 -      assert!(nodes[0].node.list_recent_payments().is_empty());
 +      let expected_invoice = alice.node.request_refund_payment(&refund).unwrap();
 +
 +      let onion_message = alice.onion_messenger.next_onion_message_for_peer(bob_id).unwrap();
 +
 +      let invoice = extract_invoice(bob, &onion_message);
 +      assert_eq!(invoice, expected_invoice);
 +      assert_ne!(invoice.signing_pubkey(), alice_id);
 +      assert!(!invoice.payment_paths().is_empty());
 +      for (_, path) in invoice.payment_paths() {
 +              assert_eq!(path.introduction_node, IntroductionNode::NodeId(bob_id));
 +      }
  }
  
  /// Fails creating or paying an offer when a blinded path cannot be created because no peers are
@@@ -1234,7 -1177,8 +1246,7 @@@ fn fails_sending_invoice_with_unsupport
        }
  }
  
 -/// Fails creating an invoice request when a blinded reply path cannot be created without exposing
 -/// the node's id.
 +/// Fails creating an invoice request when a blinded reply path cannot be created.
  #[test]
  fn fails_creating_invoice_request_without_blinded_reply_path() {
        let chanmon_cfgs = create_chanmon_cfgs(6);
        let (alice, bob, charlie, david) = (&nodes[0], &nodes[1], &nodes[2], &nodes[3]);
  
        disconnect_peers(alice, &[charlie, david, &nodes[4], &nodes[5]]);
 -      disconnect_peers(david, &[bob, &nodes[4], &nodes[5]]);
 +      disconnect_peers(david, &[bob, charlie, &nodes[4], &nodes[5]]);
  
        let offer = alice.node
                .create_offer_builder(None).unwrap()
index ed601c047c28a1edfbd1bb930a74a3c8ccf33441,9a2dc36f87324ea4447bcf221b2a4d410dd3a444..40b6177921fb144a66924707fab1101a2a0dced0
@@@ -19,6 -19,7 +19,7 @@@ use crate::routing::test_utils::{add_ch
  use crate::sign::{NodeSigner, Recipient};
  use crate::util::ser::{FixedLengthReader, LengthReadable, Writeable, Writer};
  use crate::util::test_utils;
+ use super::async_payments::{AsyncPaymentsMessageHandler, HeldHtlcAvailable, ReleaseHeldHtlc};
  use super::messenger::{CustomOnionMessageHandler, DefaultMessageRouter, Destination, OnionMessagePath, OnionMessenger, PendingOnionMessage, Responder, ResponseInstruction, SendError, SendSuccess};
  use super::offers::{OffersMessage, OffersMessageHandler};
  use super::packet::{OnionMessageContents, Packet};
@@@ -50,6 -51,7 +51,7 @@@ struct MessengerNode 
                        Arc<test_utils::TestKeysInterface>
                >>,
                Arc<TestOffersMessageHandler>,
+               Arc<TestAsyncPaymentsMessageHandler>,
                Arc<TestCustomMessageHandler>
        >,
        custom_message_handler: Arc<TestCustomMessageHandler>,
@@@ -79,6 -81,17 +81,17 @@@ impl OffersMessageHandler for TestOffer
        }
  }
  
+ struct TestAsyncPaymentsMessageHandler {}
+ impl AsyncPaymentsMessageHandler for TestAsyncPaymentsMessageHandler {
+       fn held_htlc_available(
+               &self, _message: HeldHtlcAvailable, _responder: Option<Responder>,
+       ) -> ResponseInstruction<ReleaseHeldHtlc> {
+               ResponseInstruction::NoResponse
+       }
+       fn release_held_htlc(&self, _message: ReleaseHeldHtlc) {}
+ }
  #[derive(Clone, Debug, PartialEq)]
  enum TestCustomMessage {
        Ping,
@@@ -249,18 -262,19 +262,19 @@@ fn create_nodes_using_cfgs(cfgs: Vec<Me
                        DefaultMessageRouter::new(network_graph.clone(), entropy_source.clone())
                );
                let offers_message_handler = Arc::new(TestOffersMessageHandler {});
+               let async_payments_message_handler = Arc::new(TestAsyncPaymentsMessageHandler {});
                let custom_message_handler = Arc::new(TestCustomMessageHandler::new());
                let messenger = if cfg.intercept_offline_peer_oms {
                        OnionMessenger::new_with_offline_peer_interception(
                                entropy_source.clone(), node_signer.clone(), logger.clone(),
                                node_id_lookup, message_router, offers_message_handler,
-                               custom_message_handler.clone()
+                               async_payments_message_handler, custom_message_handler.clone()
                        )
                } else {
                        OnionMessenger::new(
                                entropy_source.clone(), node_signer.clone(), logger.clone(),
                                node_id_lookup, message_router, offers_message_handler,
-                               custom_message_handler.clone()
+                               async_payments_message_handler, custom_message_handler.clone()
                        )
                };
                nodes.push(MessengerNode {
@@@ -492,9 -506,8 +506,9 @@@ fn async_response_with_reply_path_fails
        let path_id = Some([2; 32]);
        let reply_path = BlindedPath::new_for_message(&[], bob.node_id, &*bob.entropy_source, &secp_ctx).unwrap();
  
 -      // Alice tries to asynchronously respond to Bob, but fails because the nodes are unannounced.
 -      // Therefore, the reply_path cannot be used for the response.
 +      // Alice tries to asynchronously respond to Bob, but fails because the nodes are unannounced and
 +      // disconnected. Thus, a reply path could no be created for the response.
 +      disconnect_peers(alice, bob);
        let responder = Responder::new(reply_path, path_id);
        alice.custom_message_handler.expect_message_and_response(message.clone());
        let response_instruction = alice.custom_message_handler.handle_custom_message(message, Some(responder));
index 859c3f2b5b9c88fd6e7c42c23698d1402dcbee61,c7da29307bc5d44c08036bd4b8deb5eef6e6f70f..85600190db3df5cd0015ea4b1c809104f36a2292
@@@ -24,6 -24,9 +24,9 @@@ use crate::ln::features::{InitFeatures
  use crate::ln::msgs::{self, OnionMessage, OnionMessageHandler, SocketAddress};
  use crate::ln::onion_utils;
  use crate::routing::gossip::{NetworkGraph, NodeId, ReadOnlyNetworkGraph};
+ use super::async_payments::AsyncPaymentsMessageHandler;
+ #[cfg(async_payments)]
+ use super::async_payments::AsyncPaymentsMessage;
  use super::packet::OnionMessageContents;
  use super::packet::ParsedOnionMessageContents;
  use super::offers::OffersMessageHandler;
@@@ -76,22 -79,27 +79,27 @@@ pub trait AOnionMessenger 
        type OffersMessageHandler: OffersMessageHandler + ?Sized;
        /// A type that may be dereferenced to [`Self::OffersMessageHandler`]
        type OMH: Deref<Target = Self::OffersMessageHandler>;
+       /// A type implementing [`AsyncPaymentsMessageHandler`]
+       type AsyncPaymentsMessageHandler: AsyncPaymentsMessageHandler + ?Sized;
+       /// A type that may be dereferenced to [`Self::AsyncPaymentsMessageHandler`]
+       type APH: Deref<Target = Self::AsyncPaymentsMessageHandler>;
        /// A type implementing [`CustomOnionMessageHandler`]
        type CustomOnionMessageHandler: CustomOnionMessageHandler + ?Sized;
        /// A type that may be dereferenced to [`Self::CustomOnionMessageHandler`]
        type CMH: Deref<Target = Self::CustomOnionMessageHandler>;
        /// Returns a reference to the actual [`OnionMessenger`] object.
-       fn get_om(&self) -> &OnionMessenger<Self::ES, Self::NS, Self::L, Self::NL, Self::MR, Self::OMH, Self::CMH>;
+       fn get_om(&self) -> &OnionMessenger<Self::ES, Self::NS, Self::L, Self::NL, Self::MR, Self::OMH, Self::APH, Self::CMH>;
  }
  
- impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, CMH: Deref> AOnionMessenger
- for OnionMessenger<ES, NS, L, NL, MR, OMH, CMH> where
+ impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, APH: Deref, CMH: Deref> AOnionMessenger
+ for OnionMessenger<ES, NS, L, NL, MR, OMH, APH, CMH> where
        ES::Target: EntropySource,
        NS::Target: NodeSigner,
        L::Target: Logger,
        NL::Target: NodeIdLookUp,
        MR::Target: MessageRouter,
        OMH::Target: OffersMessageHandler,
+       APH:: Target: AsyncPaymentsMessageHandler,
        CMH::Target: CustomOnionMessageHandler,
  {
        type EntropySource = ES::Target;
        type MR = MR;
        type OffersMessageHandler = OMH::Target;
        type OMH = OMH;
+       type AsyncPaymentsMessageHandler = APH::Target;
+       type APH = APH;
        type CustomOnionMessageHandler = CMH::Target;
        type CMH = CMH;
-       fn get_om(&self) -> &OnionMessenger<ES, NS, L, NL, MR, OMH, CMH> { self }
+       fn get_om(&self) -> &OnionMessenger<ES, NS, L, NL, MR, OMH, APH, CMH> { self }
  }
  
  /// A sender, receiver and forwarder of [`OnionMessage`]s.
  /// # let message_router = Arc::new(FakeMessageRouter {});
  /// # let custom_message_handler = IgnoringMessageHandler {};
  /// # let offers_message_handler = IgnoringMessageHandler {};
+ /// # let async_payments_message_handler = IgnoringMessageHandler {};
  /// // Create the onion messenger. This must use the same `keys_manager` as is passed to your
  /// // ChannelManager.
  /// let onion_messenger = OnionMessenger::new(
  ///     &keys_manager, &keys_manager, logger, &node_id_lookup, message_router,
- ///     &offers_message_handler, &custom_message_handler
+ ///     &offers_message_handler, &async_payments_message_handler, &custom_message_handler
  /// );
  
  /// # #[derive(Debug)]
  ///
  /// [`InvoiceRequest`]: crate::offers::invoice_request::InvoiceRequest
  /// [`Bolt12Invoice`]: crate::offers::invoice::Bolt12Invoice
- pub struct OnionMessenger<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, CMH: Deref>
- where
+ pub struct OnionMessenger<
+       ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, APH: Deref, CMH: Deref
+ > where
        ES::Target: EntropySource,
        NS::Target: NodeSigner,
        L::Target: Logger,
        NL::Target: NodeIdLookUp,
        MR::Target: MessageRouter,
        OMH::Target: OffersMessageHandler,
+       APH::Target: AsyncPaymentsMessageHandler,
        CMH::Target: CustomOnionMessageHandler,
  {
        entropy_source: ES,
        node_id_lookup: NL,
        message_router: MR,
        offers_handler: OMH,
+       #[allow(unused)]
+       async_payments_handler: APH,
        custom_handler: CMH,
        intercept_messages_for_offline_peers: bool,
        pending_events: Mutex<PendingEvents>,
@@@ -489,7 -504,7 +504,7 @@@ wher
        }
  
        fn create_blinded_paths_from_iter<
 -              I: Iterator<Item = ForwardNode>,
 +              I: ExactSizeIterator<Item = ForwardNode>,
                T: secp256k1::Signing + secp256k1::Verification
        >(
                &self, recipient: PublicKey, peers: I, secp_ctx: &Secp256k1<T>, compact_paths: bool
                let is_recipient_announced =
                        network_graph.nodes().contains_key(&NodeId::from_pubkey(&recipient));
  
 +              let has_one_peer = peers.len() == 1;
                let mut peer_info = peers
 -                      // Limit to peers with announced channels
 +                      // Limit to peers with announced channels unless the recipient is unannounced.
                        .filter_map(|peer|
                                network_graph
                                        .node(&NodeId::from_pubkey(&peer.node_id))
 -                                      .filter(|info| info.channels.len() >= MIN_PEER_CHANNELS)
 +                                      .filter(|info|
 +                                              !is_recipient_announced || info.channels.len() >= MIN_PEER_CHANNELS
 +                                      )
                                        .map(|info| (peer, info.is_tor_only(), info.channels.len()))
 +                                      // Allow messages directly with the only peer when unannounced.
 +                                      .or_else(|| (!is_recipient_announced && has_one_peer)
 +                                              .then(|| (peer, false, 0))
 +                                      )
                        )
                        // Exclude Tor-only nodes when the recipient is announced.
                        .filter(|(_, is_tor_only, _)| !(*is_tor_only && is_recipient_announced))
@@@ -993,8 -1001,8 +1008,8 @@@ wher
        }
  }
  
- impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, CMH: Deref>
- OnionMessenger<ES, NS, L, NL, MR, OMH, CMH>
+ impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, APH: Deref, CMH: Deref>
+ OnionMessenger<ES, NS, L, NL, MR, OMH, APH, CMH>
  where
        ES::Target: EntropySource,
        NS::Target: NodeSigner,
        NL::Target: NodeIdLookUp,
        MR::Target: MessageRouter,
        OMH::Target: OffersMessageHandler,
+       APH::Target: AsyncPaymentsMessageHandler,
        CMH::Target: CustomOnionMessageHandler,
  {
        /// Constructs a new `OnionMessenger` to send, forward, and delegate received onion messages to
        /// their respective handlers.
        pub fn new(
                entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR,
-               offers_handler: OMH, custom_handler: CMH
+               offers_handler: OMH, async_payments_handler: APH, custom_handler: CMH
        ) -> Self {
                Self::new_inner(
                        entropy_source, node_signer, logger, node_id_lookup, message_router,
-                       offers_handler, custom_handler, false
+                       offers_handler, async_payments_handler, custom_handler, false
                )
        }
  
        /// peers.
        pub fn new_with_offline_peer_interception(
                entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL,
-               message_router: MR, offers_handler: OMH, custom_handler: CMH
+               message_router: MR, offers_handler: OMH, async_payments_handler: APH, custom_handler: CMH
        ) -> Self {
                Self::new_inner(
                        entropy_source, node_signer, logger, node_id_lookup, message_router,
-                       offers_handler, custom_handler, true
+                       offers_handler, async_payments_handler, custom_handler, true
                )
        }
  
        fn new_inner(
                entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL,
-               message_router: MR, offers_handler: OMH, custom_handler: CMH,
+               message_router: MR, offers_handler: OMH, async_payments_handler: APH, custom_handler: CMH,
                intercept_messages_for_offline_peers: bool
        ) -> Self {
                let mut secp_ctx = Secp256k1::new();
                        node_id_lookup,
                        message_router,
                        offers_handler,
+                       async_payments_handler,
                        custom_handler,
                        intercept_messages_for_offline_peers,
                        pending_events: Mutex::new(PendingEvents {
@@@ -1367,8 -1377,8 +1384,8 @@@ fn outbound_buffer_full(peer_node_id: &
        false
  }
  
- impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, CMH: Deref> EventsProvider
- for OnionMessenger<ES, NS, L, NL, MR, OMH, CMH>
+ impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, APH: Deref, CMH: Deref> EventsProvider
+ for OnionMessenger<ES, NS, L, NL, MR, OMH, APH, CMH>
  where
        ES::Target: EntropySource,
        NS::Target: NodeSigner,
        NL::Target: NodeIdLookUp,
        MR::Target: MessageRouter,
        OMH::Target: OffersMessageHandler,
+       APH::Target: AsyncPaymentsMessageHandler,
        CMH::Target: CustomOnionMessageHandler,
  {
        fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
        }
  }
  
- impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, CMH: Deref> OnionMessageHandler
- for OnionMessenger<ES, NS, L, NL, MR, OMH, CMH>
+ impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, APH: Deref, CMH: Deref> OnionMessageHandler
+ for OnionMessenger<ES, NS, L, NL, MR, OMH, APH, CMH>
  where
        ES::Target: EntropySource,
        NS::Target: NodeSigner,
        NL::Target: NodeIdLookUp,
        MR::Target: MessageRouter,
        OMH::Target: OffersMessageHandler,
+       APH::Target: AsyncPaymentsMessageHandler,
        CMH::Target: CustomOnionMessageHandler,
  {
        fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage) {
                                        "Received an onion message with path_id {:02x?} and {} reply_path: {:?}",
                                        path_id, if reply_path.is_some() { "a" } else { "no" }, message);
  
+                               let responder = reply_path.map(
+                                       |reply_path| Responder::new(reply_path, path_id)
+                               );
                                match message {
                                        ParsedOnionMessageContents::Offers(msg) => {
-                                               let responder = reply_path.map(
-                                                       |reply_path| Responder::new(reply_path, path_id)
-                                               );
                                                let response_instructions = self.offers_handler.handle_message(msg, responder);
                                                let _ = self.handle_onion_message_response(response_instructions);
                                        },
-                                       ParsedOnionMessageContents::Custom(msg) => {
-                                               let responder = reply_path.map(
-                                                       |reply_path| Responder::new(reply_path, path_id)
+                                       #[cfg(async_payments)]
+                                       ParsedOnionMessageContents::AsyncPayments(AsyncPaymentsMessage::HeldHtlcAvailable(msg)) => {
+                                               let response_instructions = self.async_payments_handler.held_htlc_available(
+                                                       msg, responder
                                                );
+                                               let _ = self.handle_onion_message_response(response_instructions);
+                                       },
+                                       #[cfg(async_payments)]
+                                       ParsedOnionMessageContents::AsyncPayments(AsyncPaymentsMessage::ReleaseHeldHtlc(msg)) => {
+                                               self.async_payments_handler.release_held_htlc(msg);
+                                       },
+                                       ParsedOnionMessageContents::Custom(msg) => {
                                                let response_instructions = self.custom_handler.handle_custom_message(msg, responder);
                                                let _ = self.handle_onion_message_response(response_instructions);
                                        },
@@@ -1606,6 -1626,7 +1633,7 @@@ pub type SimpleArcOnionMessenger<M, T, 
        Arc<SimpleArcChannelManager<M, T, F, L>>,
        Arc<DefaultMessageRouter<Arc<NetworkGraph<Arc<L>>>, Arc<L>, Arc<KeysManager>>>,
        Arc<SimpleArcChannelManager<M, T, F, L>>,
+       IgnoringMessageHandler,
        IgnoringMessageHandler
  >;
  
@@@ -1626,6 -1647,7 +1654,7 @@@ pub type SimpleRefOnionMessenger
        &'i SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L>,
        &'j DefaultMessageRouter<&'g NetworkGraph<&'b L>, &'b L, &'a KeysManager>,
        &'i SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L>,
+       IgnoringMessageHandler,
        IgnoringMessageHandler
  >;