use bitcoin::hashes::sha256::Hash as Sha256;
use bitcoin::secp256k1::{self, PublicKey, Scalar, Secp256k1, SecretKey};
-use crate::blinded_path::{BlindedPath, IntroductionNode, NodeIdLookUp};
-use crate::blinded_path::message::{advance_path_by_one, ForwardTlvs, NextHop, ReceiveTlvs};
+use crate::blinded_path::{BlindedPath, IntroductionNode, NextMessageHop, NodeIdLookUp};
+use crate::blinded_path::message::{advance_path_by_one, ForwardTlvs, ReceiveTlvs};
use crate::blinded_path::utils;
use crate::events::{Event, EventHandler, EventsProvider};
use crate::sign::{EntropySource, NodeSigner, Recipient};
/// # let your_custom_message_type = 42;
/// your_custom_message_type
/// }
+/// fn msg_type(&self) -> &'static str { "YourCustomMessageType" }
/// }
/// // Send a custom onion message to a node id.
/// let destination = Destination::Node(destination_node_id);
message_router: MR,
offers_handler: OMH,
custom_handler: CMH,
+ intercept_messages_for_offline_peers: bool,
+ pending_events: Mutex<Vec<Event>>,
}
/// [`OnionMessage`]s buffered to be sent.
}
}
+
+/// The `Responder` struct creates an appropriate [`ResponseInstruction`]
+/// for responding to a message.
+pub struct Responder {
+ /// The path along which a response can be sent.
+ reply_path: BlindedPath,
+ path_id: Option<[u8; 32]>
+}
+
+impl Responder {
+ /// Creates a new [`Responder`] instance with the provided reply path.
+ fn new(reply_path: BlindedPath, path_id: Option<[u8; 32]>) -> Self {
+ Responder {
+ reply_path,
+ path_id,
+ }
+ }
+
+ /// Creates the appropriate [`ResponseInstruction`] for a given response.
+ pub fn respond<T: OnionMessageContents>(self, response: T) -> ResponseInstruction<T> {
+ ResponseInstruction::WithoutReplyPath(OnionMessageResponse {
+ message: response,
+ reply_path: self.reply_path,
+ path_id: self.path_id,
+ })
+ }
+}
+
+/// This struct contains the information needed to reply to a received message.
+pub struct OnionMessageResponse<T: OnionMessageContents> {
+ message: T,
+ reply_path: BlindedPath,
+ path_id: Option<[u8; 32]>,
+}
+
+/// `ResponseInstruction` represents instructions for responding to received messages.
+pub enum ResponseInstruction<T: OnionMessageContents> {
+ /// Indicates that a response should be sent without including a reply path
+ /// for the recipient to respond back.
+ WithoutReplyPath(OnionMessageResponse<T>),
+ /// Indicates that there's no response to send back.
+ NoResponse,
+}
+
/// An [`OnionMessage`] for [`OnionMessenger`] to send.
///
/// These are obtained when released from [`OnionMessenger`]'s handlers after which they are
/// Called with the custom message that was received, returning a response to send, if any.
///
/// The returned [`Self::CustomMessage`], if any, is enqueued to be sent by [`OnionMessenger`].
- fn handle_custom_message(&self, msg: Self::CustomMessage) -> Option<Self::CustomMessage>;
+ fn handle_custom_message(&self, message: Self::CustomMessage, responder: Option<Responder>) -> ResponseInstruction<Self::CustomMessage>;
/// Read a custom message of type `message_type` from `buffer`, returning `Ok(None)` if the
/// message type is unknown.
/// A processed incoming onion message, containing either a Forward (another onion message)
/// or a Receive payload with decrypted contents.
-#[derive(Debug)]
+#[derive(Clone, Debug)]
pub enum PeeledOnion<T: OnionMessageContents> {
/// Forwarded onion, with the next node id and a new onion
- Forward(NextHop, OnionMessage),
+ Forward(NextMessageHop, OnionMessage),
/// Received onion message, with decrypted contents, path_id, and reply path
Receive(ParsedOnionMessageContents<T>, Option<[u8; 32]>, Option<BlindedPath>)
}
+
+/// Creates an [`OnionMessage`] with the given `contents` for sending to the destination of
+/// `path`, first calling [`Destination::resolve`] on `path.destination` with the given
+/// [`ReadOnlyNetworkGraph`].
+///
+/// Returns the node id of the peer to send the message to, the message itself, and any addresses
+/// needed to connect to the first node.
+pub fn create_onion_message_resolving_destination<
+ ES: Deref, NS: Deref, NL: Deref, T: OnionMessageContents
+>(
+ entropy_source: &ES, node_signer: &NS, node_id_lookup: &NL,
+ network_graph: &ReadOnlyNetworkGraph, secp_ctx: &Secp256k1<secp256k1::All>,
+ mut path: OnionMessagePath, contents: T, reply_path: Option<BlindedPath>,
+) -> Result<(PublicKey, OnionMessage, Option<Vec<SocketAddress>>), SendError>
+where
+ ES::Target: EntropySource,
+ NS::Target: NodeSigner,
+ NL::Target: NodeIdLookUp,
+{
+ path.destination.resolve(network_graph);
+ create_onion_message(
+ entropy_source, node_signer, node_id_lookup, secp_ctx, path, contents, reply_path,
+ )
+}
+
/// Creates an [`OnionMessage`] with the given `contents` for sending to the destination of
/// `path`.
///
/// Returns the node id of the peer to send the message to, the message itself, and any addresses
-/// need to connect to the first node.
+/// needed to connect to the first node.
+///
+/// Returns [`SendError::UnresolvedIntroductionNode`] if:
+/// - `destination` contains a blinded path with an [`IntroductionNode::DirectedShortChannelId`],
+/// - unless it can be resolved by [`NodeIdLookUp::next_node_id`].
+/// Use [`create_onion_message_resolving_destination`] instead to resolve the introduction node
+/// first with a [`ReadOnlyNetworkGraph`].
pub fn create_onion_message<ES: Deref, NS: Deref, NL: Deref, T: OnionMessageContents>(
entropy_source: &ES, node_signer: &NS, node_id_lookup: &NL,
secp_ctx: &Secp256k1<secp256k1::All>, path: OnionMessagePath, contents: T,
pub fn new(
entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL, message_router: MR,
offers_handler: OMH, custom_handler: CMH
+ ) -> Self {
+ Self::new_inner(
+ entropy_source, node_signer, logger, node_id_lookup, message_router,
+ offers_handler, custom_handler, false
+ )
+ }
+
+ /// Similar to [`Self::new`], but rather than dropping onion messages that are
+ /// intended to be forwarded to offline peers, we will intercept them for
+ /// later forwarding.
+ ///
+ /// Interception flow:
+ /// 1. If an onion message for an offline peer is received, `OnionMessenger` will
+ /// generate an [`Event::OnionMessageIntercepted`]. Event handlers can
+ /// then choose to persist this onion message for later forwarding, or drop
+ /// it.
+ /// 2. When the offline peer later comes back online, `OnionMessenger` will
+ /// generate an [`Event::OnionMessagePeerConnected`]. Event handlers will
+ /// then fetch all previously intercepted onion messages for this peer.
+ /// 3. Once the stored onion messages are fetched, they can finally be
+ /// forwarded to the now-online peer via [`Self::forward_onion_message`].
+ ///
+ /// # Note
+ ///
+ /// LDK will not rate limit how many [`Event::OnionMessageIntercepted`]s
+ /// are generated, so it is the caller's responsibility to limit how many
+ /// onion messages are persisted and only persist onion messages for relevant
+ /// peers.
+ pub fn new_with_offline_peer_interception(
+ entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL,
+ message_router: MR, offers_handler: OMH, custom_handler: CMH
+ ) -> Self {
+ Self::new_inner(
+ entropy_source, node_signer, logger, node_id_lookup, message_router,
+ offers_handler, custom_handler, true
+ )
+ }
+
+ fn new_inner(
+ entropy_source: ES, node_signer: NS, logger: L, node_id_lookup: NL,
+ message_router: MR, offers_handler: OMH, custom_handler: CMH,
+ intercept_messages_for_offline_peers: bool
) -> Self {
let mut secp_ctx = Secp256k1::new();
secp_ctx.seeded_randomize(&entropy_source.get_secure_random_bytes());
message_router,
offers_handler,
custom_handler,
+ intercept_messages_for_offline_peers,
+ pending_events: Mutex::new(Vec::new()),
}
}
&self, contents: T, destination: Destination, reply_path: Option<BlindedPath>,
log_suffix: fmt::Arguments
) -> Result<SendSuccess, SendError> {
- let mut logger = WithContext::from(&self.logger, None, None);
- let result = self.find_path(destination)
- .and_then(|path| {
- let first_hop = path.intermediate_nodes.get(0).map(|p| *p);
- logger = WithContext::from(&self.logger, first_hop, None);
- self.enqueue_onion_message(path, contents, reply_path, log_suffix)
- });
+ let mut logger = WithContext::from(&self.logger, None, None, None);
+ let result = self.find_path(destination).and_then(|path| {
+ let first_hop = path.intermediate_nodes.get(0).map(|p| *p);
+ logger = WithContext::from(&self.logger, first_hop, None, None);
+ self.enqueue_onion_message(path, contents, reply_path, log_suffix)
+ });
match result.as_ref() {
Err(SendError::GetNodeIdFailed) => {
}
}
+ /// Forwards an [`OnionMessage`] to `peer_node_id`. Useful if we initialized
+ /// the [`OnionMessenger`] with [`Self::new_with_offline_peer_interception`]
+ /// and want to forward a previously intercepted onion message to a peer that
+ /// has just come online.
+ pub fn forward_onion_message(
+ &self, message: OnionMessage, peer_node_id: &PublicKey
+ ) -> Result<(), SendError> {
+ let mut message_recipients = self.message_recipients.lock().unwrap();
+ if outbound_buffer_full(&peer_node_id, &message_recipients) {
+ return Err(SendError::BufferFull);
+ }
+
+ match message_recipients.entry(*peer_node_id) {
+ hash_map::Entry::Occupied(mut e) if e.get().is_connected() => {
+ e.get_mut().enqueue_message(message);
+ Ok(())
+ },
+ _ => Err(SendError::InvalidFirstHop(*peer_node_id))
+ }
+ }
+
#[cfg(any(test, feature = "_test_utils"))]
pub fn send_onion_message_using_path<T: OnionMessageContents>(
&self, path: OnionMessagePath, contents: T, reply_path: Option<BlindedPath>
}
fn handle_onion_message_response<T: OnionMessageContents>(
- &self, response: Option<T>, reply_path: Option<BlindedPath>, log_suffix: fmt::Arguments
+ &self, response: ResponseInstruction<T>
) {
- if let Some(response) = response {
- match reply_path {
- Some(reply_path) => {
- let _ = self.find_path_and_enqueue_onion_message(
- response, Destination::BlindedPath(reply_path), None, log_suffix
- );
- },
- None => {
- log_trace!(self.logger, "Missing reply path {}", log_suffix);
- },
- }
+ if let ResponseInstruction::WithoutReplyPath(response) = response {
+ let message_type = response.message.msg_type();
+ let _ = self.find_path_and_enqueue_onion_message(
+ response.message, Destination::BlindedPath(response.reply_path), None,
+ format_args!(
+ "when responding with {} to an onion message with path_id {:02x?}",
+ message_type,
+ response.path_id
+ )
+ );
}
}
}
msgs
}
+
+ fn enqueue_event(&self, event: Event) {
+ const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256;
+ let mut pending_events = self.pending_events.lock().unwrap();
+ let total_buffered_bytes: usize = pending_events
+ .iter()
+ .map(|ev| ev.serialized_length())
+ .sum();
+ if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE {
+ log_trace!(self.logger, "Dropping event {:?}: buffer full", event);
+ return
+ }
+ pending_events.push(event);
+ }
}
fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap<PublicKey, OnionMessageRecipient>) -> bool {
}
}
}
+ let mut events = Vec::new();
+ core::mem::swap(&mut *self.pending_events.lock().unwrap(), &mut events);
+ for ev in events {
+ handler.handle_event(ev);
+ }
}
}
CMH::Target: CustomOnionMessageHandler,
{
fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage) {
- let logger = WithContext::from(&self.logger, Some(*peer_node_id), None);
+ let logger = WithContext::from(&self.logger, Some(*peer_node_id), None, None);
match self.peel_onion_message(msg) {
Ok(PeeledOnion::Receive(message, path_id, reply_path)) => {
log_trace!(
match message {
ParsedOnionMessageContents::Offers(msg) => {
- let response = self.offers_handler.handle_message(msg);
- self.handle_onion_message_response(
- response, reply_path, format_args!(
- "when responding to Offers onion message with path_id {:02x?}",
- path_id
- )
+ let responder = reply_path.map(
+ |reply_path| Responder::new(reply_path, path_id)
);
+ let response_instructions = self.offers_handler.handle_message(msg, responder);
+ self.handle_onion_message_response(response_instructions);
},
ParsedOnionMessageContents::Custom(msg) => {
- let response = self.custom_handler.handle_custom_message(msg);
- self.handle_onion_message_response(
- response, reply_path, format_args!(
- "when responding to Custom onion message with path_id {:02x?}",
- path_id
- )
+ let responder = reply_path.map(
+ |reply_path| Responder::new(reply_path, path_id)
);
+ let response_instructions = self.custom_handler.handle_custom_message(msg, responder);
+ self.handle_onion_message_response(response_instructions);
},
}
},
Ok(PeeledOnion::Forward(next_hop, onion_message)) => {
let next_node_id = match next_hop {
- NextHop::NodeId(pubkey) => pubkey,
- NextHop::ShortChannelId(scid) => match self.node_id_lookup.next_node_id(scid) {
+ NextMessageHop::NodeId(pubkey) => pubkey,
+ NextMessageHop::ShortChannelId(scid) => match self.node_id_lookup.next_node_id(scid) {
Some(pubkey) => pubkey,
None => {
log_trace!(self.logger, "Dropping forwarded onion messager: unable to resolve next hop using SCID {}", scid);
e.get_mut().enqueue_message(onion_message);
log_trace!(logger, "Forwarding an onion message to peer {}", next_node_id);
},
+ _ if self.intercept_messages_for_offline_peers => {
+ self.enqueue_event(
+ Event::OnionMessageIntercepted {
+ peer_node_id: next_node_id, message: onion_message
+ }
+ );
+ },
_ => {
log_trace!(
logger,
.entry(*their_node_id)
.or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new()))
.mark_connected();
+ if self.intercept_messages_for_offline_peers {
+ self.enqueue_event(
+ Event::OnionMessagePeerConnected { peer_node_id: *their_node_id }
+ );
+ }
} else {
self.message_recipients.lock().unwrap().remove(their_node_id);
}
if let Some(ss) = prev_control_tlvs_ss.take() {
payloads.push((Payload::Forward(ForwardControlTlvs::Unblinded(
ForwardTlvs {
- next_hop: NextHop::NodeId(unblinded_pk_opt.unwrap()),
+ next_hop: NextMessageHop::NodeId(unblinded_pk_opt.unwrap()),
next_blinding_override: None,
}
)), ss));
} else if let Some((intro_node_id, blinding_pt)) = intro_node_id_blinding_pt.take() {
if let Some(control_tlvs_ss) = prev_control_tlvs_ss.take() {
payloads.push((Payload::Forward(ForwardControlTlvs::Unblinded(ForwardTlvs {
- next_hop: NextHop::NodeId(intro_node_id),
+ next_hop: NextMessageHop::NodeId(intro_node_id),
next_blinding_override: Some(blinding_pt),
})), control_tlvs_ss));
}