use bitcoin::secp256k1::{self, PublicKey, Scalar, Secp256k1, SecretKey};
use crate::blinded_path::{BlindedPath, IntroductionNode, NextMessageHop, NodeIdLookUp};
-use crate::blinded_path::message::{advance_path_by_one, ForwardTlvs, ReceiveTlvs};
+use crate::blinded_path::message::{advance_path_by_one, ForwardNode, ForwardTlvs, ReceiveTlvs};
use crate::blinded_path::utils;
use crate::events::{Event, EventHandler, EventsProvider};
use crate::sign::{EntropySource, NodeSigner, Recipient};
pub(super) const MAX_TIMER_TICKS: usize = 2;
+/// A trivial trait which describes any [`OnionMessenger`].
+///
+/// This is not exported to bindings users as general cover traits aren't useful in other
+/// languages.
+pub trait AOnionMessenger {
+ /// A type implementing [`EntropySource`]
+ type EntropySource: EntropySource + ?Sized;
+ /// A type that may be dereferenced to [`Self::EntropySource`]
+ type ES: Deref<Target = Self::EntropySource>;
+ /// A type implementing [`NodeSigner`]
+ type NodeSigner: NodeSigner + ?Sized;
+ /// A type that may be dereferenced to [`Self::NodeSigner`]
+ type NS: Deref<Target = Self::NodeSigner>;
+ /// A type implementing [`Logger`]
+ type Logger: Logger + ?Sized;
+ /// A type that may be dereferenced to [`Self::Logger`]
+ type L: Deref<Target = Self::Logger>;
+ /// A type implementing [`NodeIdLookUp`]
+ type NodeIdLookUp: NodeIdLookUp + ?Sized;
+ /// A type that may be dereferenced to [`Self::NodeIdLookUp`]
+ type NL: Deref<Target = Self::NodeIdLookUp>;
+ /// A type implementing [`MessageRouter`]
+ type MessageRouter: MessageRouter + ?Sized;
+ /// A type that may be dereferenced to [`Self::MessageRouter`]
+ type MR: Deref<Target = Self::MessageRouter>;
+ /// A type implementing [`OffersMessageHandler`]
+ type OffersMessageHandler: OffersMessageHandler + ?Sized;
+ /// A type that may be dereferenced to [`Self::OffersMessageHandler`]
+ type OMH: Deref<Target = Self::OffersMessageHandler>;
+ /// A type implementing [`CustomOnionMessageHandler`]
+ type CustomOnionMessageHandler: CustomOnionMessageHandler + ?Sized;
+ /// A type that may be dereferenced to [`Self::CustomOnionMessageHandler`]
+ type CMH: Deref<Target = Self::CustomOnionMessageHandler>;
+ /// Returns a reference to the actual [`OnionMessenger`] object.
+ fn get_om(&self) -> &OnionMessenger<Self::ES, Self::NS, Self::L, Self::NL, Self::MR, Self::OMH, Self::CMH>;
+}
+
+impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, CMH: Deref> AOnionMessenger
+for OnionMessenger<ES, NS, L, NL, MR, OMH, CMH> where
+ ES::Target: EntropySource,
+ NS::Target: NodeSigner,
+ L::Target: Logger,
+ NL::Target: NodeIdLookUp,
+ MR::Target: MessageRouter,
+ OMH::Target: OffersMessageHandler,
+ CMH::Target: CustomOnionMessageHandler,
+{
+ type EntropySource = ES::Target;
+ type ES = ES;
+ type NodeSigner = NS::Target;
+ type NS = NS;
+ type Logger = L::Target;
+ type L = L;
+ type NodeIdLookUp = NL::Target;
+ type NL = NL;
+ type MessageRouter = MR::Target;
+ type MR = MR;
+ type OffersMessageHandler = OMH::Target;
+ type OMH = OMH;
+ type CustomOnionMessageHandler = CMH::Target;
+ type CMH = CMH;
+ fn get_om(&self) -> &OnionMessenger<ES, NS, L, NL, MR, OMH, CMH> { self }
+}
+
/// A sender, receiver and forwarder of [`OnionMessage`]s.
///
/// # Handling Messages
/// # use bitcoin::hashes::hex::FromHex;
/// # use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey, self};
/// # use lightning::blinded_path::{BlindedPath, EmptyNodeIdLookUp};
+/// # use lightning::blinded_path::message::ForwardNode;
/// # use lightning::sign::{EntropySource, KeysManager};
/// # use lightning::ln::peer_handler::IgnoringMessageHandler;
/// # use lightning::onion_message::messenger::{Destination, MessageRouter, OnionMessagePath, OnionMessenger};
///
/// // Create a blinded path to yourself, for someone to send an onion message to.
/// # let your_node_id = hop_node_id1;
-/// let hops = [hop_node_id3, hop_node_id4, your_node_id];
-/// let blinded_path = BlindedPath::new_for_message(&hops, &keys_manager, &secp_ctx).unwrap();
+/// let hops = [
+/// ForwardNode { node_id: hop_node_id3, short_channel_id: None },
+/// ForwardNode { node_id: hop_node_id4, short_channel_id: None },
+/// ];
+/// let blinded_path = BlindedPath::new_for_message(&hops, your_node_id, &keys_manager, &secp_ctx).unwrap();
///
/// // Send a custom onion message to a blinded path.
/// let destination = Destination::BlindedPath(blinded_path);
offers_handler: OMH,
custom_handler: CMH,
intercept_messages_for_offline_peers: bool,
- pending_events: Mutex<Vec<Event>>,
+ pending_events: Mutex<PendingEvents>,
+}
+
+struct PendingEvents {
+ intercepted_msgs: Vec<Event>,
+ peer_connecteds: Vec<Event>,
}
/// [`OnionMessage`]s buffered to be sent.
impl Responder {
/// Creates a new [`Responder`] instance with the provided reply path.
- fn new(reply_path: BlindedPath, path_id: Option<[u8; 32]>) -> Self {
+ pub(super) fn new(reply_path: BlindedPath, path_id: Option<[u8; 32]>) -> Self {
Responder {
reply_path,
path_id,
}
}
- /// Creates the appropriate [`ResponseInstruction`] for a given response.
+ /// Creates a [`ResponseInstruction::WithoutReplyPath`] for a given response.
+ ///
+ /// Use when the recipient doesn't need to send back a reply to us.
pub fn respond<T: OnionMessageContents>(self, response: T) -> ResponseInstruction<T> {
ResponseInstruction::WithoutReplyPath(OnionMessageResponse {
message: response,
path_id: self.path_id,
})
}
+
+ /// Creates a [`ResponseInstruction::WithReplyPath`] for a given response.
+ ///
+ /// Use when the recipient needs to send back a reply to us.
+ pub fn respond_with_reply_path<T: OnionMessageContents>(self, response: T) -> ResponseInstruction<T> {
+ ResponseInstruction::WithReplyPath(OnionMessageResponse {
+ message: response,
+ reply_path: self.reply_path,
+ path_id: self.path_id,
+ })
+ }
}
/// This struct contains the information needed to reply to a received message.
/// `ResponseInstruction` represents instructions for responding to received messages.
pub enum ResponseInstruction<T: OnionMessageContents> {
+ /// Indicates that a response should be sent including a reply path for
+ /// the recipient to respond back.
+ WithReplyPath(OnionMessageResponse<T>),
/// Indicates that a response should be sent without including a reply path
/// for the recipient to respond back.
WithoutReplyPath(OnionMessageResponse<T>),
>(
&self, recipient: PublicKey, peers: Vec<PublicKey>, secp_ctx: &Secp256k1<T>,
) -> Result<Vec<BlindedPath>, ()>;
+
+ /// Creates compact [`BlindedPath`]s to the `recipient` node. The nodes in `peers` are assumed
+ /// to be direct peers with the `recipient`.
+ ///
+ /// Compact blinded paths use short channel ids instead of pubkeys for a smaller serialization,
+ /// which is beneficial when a QR code is used to transport the data. The SCID is passed using a
+ /// [`ForwardNode`] but may be `None` for graceful degradation.
+ ///
+ /// Implementations using additional intermediate nodes are responsible for using a
+ /// [`ForwardNode`] with `Some` short channel id, if possible. Similarly, implementations should
+ /// call [`BlindedPath::use_compact_introduction_node`].
+ ///
+ /// The provided implementation simply delegates to [`MessageRouter::create_blinded_paths`],
+ /// ignoring the short channel ids.
+ fn create_compact_blinded_paths<
+ T: secp256k1::Signing + secp256k1::Verification
+ >(
+ &self, recipient: PublicKey, peers: Vec<ForwardNode>, secp_ctx: &Secp256k1<T>,
+ ) -> Result<Vec<BlindedPath>, ()> {
+ let peers = peers
+ .into_iter()
+ .map(|ForwardNode { node_id, short_channel_id: _ }| node_id)
+ .collect();
+ self.create_blinded_paths(recipient, peers, secp_ctx)
+ }
}
/// A [`MessageRouter`] that can only route to a directly connected [`Destination`].
+///
+/// # Privacy
+///
+/// Creating [`BlindedPath`]s may affect privacy since, if a suitable path cannot be found, it will
+/// create a one-hop path using the recipient as the introduction node if it is a announced node.
+/// Otherwise, there is no way to find a path to the introduction node in order to send a message,
+/// and thus an `Err` is returned.
pub struct DefaultMessageRouter<G: Deref<Target=NetworkGraph<L>>, L: Deref, ES: Deref>
where
L::Target: Logger,
pub fn new(network_graph: G, entropy_source: ES) -> Self {
Self { network_graph, entropy_source }
}
-}
-impl<G: Deref<Target=NetworkGraph<L>>, L: Deref, ES: Deref> MessageRouter for DefaultMessageRouter<G, L, ES>
-where
- L::Target: Logger,
- ES::Target: EntropySource,
-{
- fn find_path(
- &self, sender: PublicKey, peers: Vec<PublicKey>, mut destination: Destination
- ) -> Result<OnionMessagePath, ()> {
- let network_graph = self.network_graph.deref().read_only();
- destination.resolve(&network_graph);
-
- let first_node = match destination.first_node() {
- Some(first_node) => first_node,
- None => return Err(()),
- };
-
- if peers.contains(&first_node) || sender == first_node {
- Ok(OnionMessagePath {
- intermediate_nodes: vec![], destination, first_node_addresses: None
- })
- } else {
- let node_announcement = network_graph
- .node(&NodeId::from_pubkey(&first_node))
- .and_then(|node_info| node_info.announcement_info.as_ref())
- .and_then(|announcement_info| announcement_info.announcement_message.as_ref())
- .map(|node_announcement| &node_announcement.contents);
-
- match node_announcement {
- Some(node_announcement) if node_announcement.features.supports_onion_messages() => {
- let first_node_addresses = Some(node_announcement.addresses.clone());
- Ok(OnionMessagePath {
- intermediate_nodes: vec![], destination, first_node_addresses
- })
- },
- _ => Err(()),
- }
- }
- }
-
- fn create_blinded_paths<
+ fn create_blinded_paths_from_iter<
+ I: Iterator<Item = ForwardNode>,
T: secp256k1::Signing + secp256k1::Verification
>(
- &self, recipient: PublicKey, peers: Vec<PublicKey>, secp_ctx: &Secp256k1<T>,
+ &self, recipient: PublicKey, peers: I, secp_ctx: &Secp256k1<T>, compact_paths: bool
) -> Result<Vec<BlindedPath>, ()> {
// Limit the number of blinded paths that are computed.
const MAX_PATHS: usize = 3;
let is_recipient_announced =
network_graph.nodes().contains_key(&NodeId::from_pubkey(&recipient));
- let mut peer_info = peers.iter()
+ let mut peer_info = peers
// Limit to peers with announced channels
- .filter_map(|pubkey|
+ .filter_map(|peer|
network_graph
- .node(&NodeId::from_pubkey(pubkey))
+ .node(&NodeId::from_pubkey(&peer.node_id))
.filter(|info| info.channels.len() >= MIN_PEER_CHANNELS)
- .map(|info| (*pubkey, info.is_tor_only(), info.channels.len()))
+ .map(|info| (peer, info.is_tor_only(), info.channels.len()))
)
// Exclude Tor-only nodes when the recipient is announced.
.filter(|(_, is_tor_only, _)| !(*is_tor_only && is_recipient_announced))
});
let paths = peer_info.into_iter()
- .map(|(pubkey, _, _)| vec![pubkey, recipient])
- .map(|node_pks| BlindedPath::new_for_message(&node_pks, &*self.entropy_source, secp_ctx))
+ .map(|(peer, _, _)| {
+ BlindedPath::new_for_message(&[peer], recipient, &*self.entropy_source, secp_ctx)
+ })
.take(MAX_PATHS)
.collect::<Result<Vec<_>, _>>();
- match paths {
+ let mut paths = match paths {
Ok(paths) if !paths.is_empty() => Ok(paths),
_ => {
if is_recipient_announced {
Err(())
}
},
+ }?;
+
+ if compact_paths {
+ for path in &mut paths {
+ path.use_compact_introduction_node(&network_graph);
+ }
}
+
+ Ok(paths)
+ }
+}
+
+impl<G: Deref<Target=NetworkGraph<L>>, L: Deref, ES: Deref> MessageRouter for DefaultMessageRouter<G, L, ES>
+where
+ L::Target: Logger,
+ ES::Target: EntropySource,
+{
+ fn find_path(
+ &self, sender: PublicKey, peers: Vec<PublicKey>, mut destination: Destination
+ ) -> Result<OnionMessagePath, ()> {
+ let network_graph = self.network_graph.deref().read_only();
+ destination.resolve(&network_graph);
+
+ let first_node = match destination.first_node() {
+ Some(first_node) => first_node,
+ None => return Err(()),
+ };
+
+ if peers.contains(&first_node) || sender == first_node {
+ Ok(OnionMessagePath {
+ intermediate_nodes: vec![], destination, first_node_addresses: None
+ })
+ } else {
+ let node_details = network_graph
+ .node(&NodeId::from_pubkey(&first_node))
+ .and_then(|node_info| node_info.announcement_info.as_ref())
+ .map(|announcement_info| (announcement_info.features(), announcement_info.addresses()));
+
+ match node_details {
+ Some((features, addresses)) if features.supports_onion_messages() && addresses.len() > 0 => {
+ let first_node_addresses = Some(addresses.clone());
+ Ok(OnionMessagePath {
+ intermediate_nodes: vec![], destination, first_node_addresses
+ })
+ },
+ _ => Err(()),
+ }
+ }
+ }
+
+ fn create_blinded_paths<
+ T: secp256k1::Signing + secp256k1::Verification
+ >(
+ &self, recipient: PublicKey, peers: Vec<PublicKey>, secp_ctx: &Secp256k1<T>,
+ ) -> Result<Vec<BlindedPath>, ()> {
+ let peers = peers
+ .into_iter()
+ .map(|node_id| ForwardNode { node_id, short_channel_id: None });
+ self.create_blinded_paths_from_iter(recipient, peers, secp_ctx, false)
+ }
+
+ fn create_compact_blinded_paths<
+ T: secp256k1::Signing + secp256k1::Verification
+ >(
+ &self, recipient: PublicKey, peers: Vec<ForwardNode>, secp_ctx: &Secp256k1<T>,
+ ) -> Result<Vec<BlindedPath>, ()> {
+ self.create_blinded_paths_from_iter(recipient, peers.into_iter(), secp_ctx, true)
}
}
TooFewBlindedHops,
/// The first hop is not a peer and doesn't have a known [`SocketAddress`].
InvalidFirstHop(PublicKey),
- /// A path from the sender to the destination could not be found by the [`MessageRouter`].
+ /// Indicates that a path could not be found by the [`MessageRouter`].
+ ///
+ /// This occurs when either:
+ /// - No path from the sender to the destination was found to send the onion message
+ /// - No reply path to the sender could be created when responding to an onion message
PathNotFound,
/// Onion message contents must have a TLV type >= 64.
InvalidMessage,
offers_handler,
custom_handler,
intercept_messages_for_offline_peers,
- pending_events: Mutex::new(Vec::new()),
+ pending_events: Mutex::new(PendingEvents {
+ intercepted_msgs: Vec::new(),
+ peer_connecteds: Vec::new(),
+ }),
}
}
&self, contents: T, destination: Destination, reply_path: Option<BlindedPath>,
log_suffix: fmt::Arguments
) -> Result<SendSuccess, SendError> {
- let mut logger = WithContext::from(&self.logger, None, None);
- let result = self.find_path(destination)
- .and_then(|path| {
- let first_hop = path.intermediate_nodes.get(0).map(|p| *p);
- logger = WithContext::from(&self.logger, first_hop, None);
- self.enqueue_onion_message(path, contents, reply_path, log_suffix)
- });
+ let mut logger = WithContext::from(&self.logger, None, None, None);
+ let result = self.find_path(destination).and_then(|path| {
+ let first_hop = path.intermediate_nodes.get(0).map(|p| *p);
+ logger = WithContext::from(&self.logger, first_hop, None, None);
+ self.enqueue_onion_message(path, contents, reply_path, log_suffix)
+ });
match result.as_ref() {
Err(SendError::GetNodeIdFailed) => {
.map_err(|_| SendError::PathNotFound)
}
+ fn create_blinded_path(&self) -> Result<BlindedPath, SendError> {
+ let recipient = self.node_signer
+ .get_node_id(Recipient::Node)
+ .map_err(|_| SendError::GetNodeIdFailed)?;
+ let secp_ctx = &self.secp_ctx;
+
+ let peers = self.message_recipients.lock().unwrap()
+ .iter()
+ .filter(|(_, peer)| matches!(peer, OnionMessageRecipient::ConnectedPeer(_)))
+ .map(|(node_id, _ )| *node_id)
+ .collect::<Vec<_>>();
+
+ self.message_router
+ .create_blinded_paths(recipient, peers, secp_ctx)
+ .and_then(|paths| paths.into_iter().next().ok_or(()))
+ .map_err(|_| SendError::PathNotFound)
+ }
+
fn enqueue_onion_message<T: OnionMessageContents>(
&self, path: OnionMessagePath, contents: T, reply_path: Option<BlindedPath>,
log_suffix: fmt::Arguments
)
}
- fn handle_onion_message_response<T: OnionMessageContents>(
+ /// Handles the response to an [`OnionMessage`] based on its [`ResponseInstruction`],
+ /// enqueueing any response for sending.
+ ///
+ /// This function is useful for asynchronous handling of [`OnionMessage`]s.
+ /// Handlers have the option to return [`ResponseInstruction::NoResponse`], indicating that
+ /// no immediate response should be sent. Then, they can transfer the associated [`Responder`]
+ /// to another task responsible for generating the response asynchronously. Subsequently, when
+ /// the response is prepared and ready for sending, that task can invoke this method to enqueue
+ /// the response for delivery.
+ pub fn handle_onion_message_response<T: OnionMessageContents>(
&self, response: ResponseInstruction<T>
- ) {
- if let ResponseInstruction::WithoutReplyPath(response) = response {
- let message_type = response.message.msg_type();
- let _ = self.find_path_and_enqueue_onion_message(
- response.message, Destination::BlindedPath(response.reply_path), None,
- format_args!(
- "when responding with {} to an onion message with path_id {:02x?}",
- message_type,
- response.path_id
- )
- );
- }
+ ) -> Result<Option<SendSuccess>, SendError> {
+ let (response, create_reply_path) = match response {
+ ResponseInstruction::WithReplyPath(response) => (response, true),
+ ResponseInstruction::WithoutReplyPath(response) => (response, false),
+ ResponseInstruction::NoResponse => return Ok(None),
+ };
+
+ let message_type = response.message.msg_type();
+ let reply_path = if create_reply_path {
+ match self.create_blinded_path() {
+ Ok(reply_path) => Some(reply_path),
+ Err(err) => {
+ log_trace!(
+ self.logger,
+ "Failed to create reply path when responding with {} to an onion message \
+ with path_id {:02x?}: {:?}",
+ message_type, response.path_id, err
+ );
+ return Err(err);
+ }
+ }
+ } else { None };
+
+ self.find_path_and_enqueue_onion_message(
+ response.message, Destination::BlindedPath(response.reply_path), reply_path,
+ format_args!(
+ "when responding with {} to an onion message with path_id {:02x?}",
+ message_type,
+ response.path_id
+ )
+ ).map(|result| Some(result))
}
#[cfg(test)]
msgs
}
- fn enqueue_event(&self, event: Event) {
+ fn enqueue_intercepted_event(&self, event: Event) {
const MAX_EVENTS_BUFFER_SIZE: usize = (1 << 10) * 256;
let mut pending_events = self.pending_events.lock().unwrap();
- let total_buffered_bytes: usize = pending_events
- .iter()
- .map(|ev| ev.serialized_length())
- .sum();
+ let total_buffered_bytes: usize =
+ pending_events.intercepted_msgs.iter().map(|ev| ev.serialized_length()).sum();
if total_buffered_bytes >= MAX_EVENTS_BUFFER_SIZE {
log_trace!(self.logger, "Dropping event {:?}: buffer full", event);
return
}
- pending_events.push(event);
+ pending_events.intercepted_msgs.push(event);
+ }
+
+ /// Processes any events asynchronously using the given handler.
+ ///
+ /// Note that the event handler is called in the order each event was generated, however
+ /// futures are polled in parallel for some events to allow for parallelism where events do not
+ /// have an ordering requirement.
+ ///
+ /// See the trait-level documentation of [`EventsProvider`] for requirements.
+ pub async fn process_pending_events_async<Future: core::future::Future<Output = ()> + core::marker::Unpin, H: Fn(Event) -> Future>(
+ &self, handler: H
+ ) {
+ let mut intercepted_msgs = Vec::new();
+ let mut peer_connecteds = Vec::new();
+ {
+ let mut pending_events = self.pending_events.lock().unwrap();
+ core::mem::swap(&mut pending_events.intercepted_msgs, &mut intercepted_msgs);
+ core::mem::swap(&mut pending_events.peer_connecteds, &mut peer_connecteds);
+ }
+
+ let mut futures = Vec::with_capacity(intercepted_msgs.len());
+ for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
+ if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
+ if let Some(addresses) = addresses.take() {
+ futures.push(Some(handler(Event::ConnectionNeeded { node_id: *node_id, addresses })));
+ }
+ }
+ }
+
+ for ev in intercepted_msgs {
+ if let Event::OnionMessageIntercepted { .. } = ev {} else { debug_assert!(false); }
+ futures.push(Some(handler(ev)));
+ }
+ // Let the `OnionMessageIntercepted` events finish before moving on to peer_connecteds
+ crate::util::async_poll::MultiFuturePoller(futures).await;
+
+ if peer_connecteds.len() <= 1 {
+ for event in peer_connecteds { handler(event).await; }
+ } else {
+ let mut futures = Vec::new();
+ for event in peer_connecteds {
+ futures.push(Some(handler(event)));
+ }
+ crate::util::async_poll::MultiFuturePoller(futures).await;
+ }
}
}
}
}
let mut events = Vec::new();
- core::mem::swap(&mut *self.pending_events.lock().unwrap(), &mut events);
+ {
+ let mut pending_events = self.pending_events.lock().unwrap();
+ #[cfg(debug_assertions)] {
+ for ev in pending_events.intercepted_msgs.iter() {
+ if let Event::OnionMessageIntercepted { .. } = ev {} else { panic!(); }
+ }
+ for ev in pending_events.peer_connecteds.iter() {
+ if let Event::OnionMessagePeerConnected { .. } = ev {} else { panic!(); }
+ }
+ }
+ core::mem::swap(&mut pending_events.intercepted_msgs, &mut events);
+ events.append(&mut pending_events.peer_connecteds);
+ pending_events.peer_connecteds.shrink_to(10); // Limit total heap usage
+ }
for ev in events {
handler.handle_event(ev);
}
CMH::Target: CustomOnionMessageHandler,
{
fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage) {
- let logger = WithContext::from(&self.logger, Some(*peer_node_id), None);
+ let logger = WithContext::from(&self.logger, Some(*peer_node_id), None, None);
match self.peel_onion_message(msg) {
Ok(PeeledOnion::Receive(message, path_id, reply_path)) => {
log_trace!(
|reply_path| Responder::new(reply_path, path_id)
);
let response_instructions = self.offers_handler.handle_message(msg, responder);
- self.handle_onion_message_response(response_instructions);
+ let _ = self.handle_onion_message_response(response_instructions);
},
ParsedOnionMessageContents::Custom(msg) => {
let responder = reply_path.map(
|reply_path| Responder::new(reply_path, path_id)
);
let response_instructions = self.custom_handler.handle_custom_message(msg, responder);
- self.handle_onion_message_response(response_instructions);
+ let _ = self.handle_onion_message_response(response_instructions);
},
}
},
log_trace!(logger, "Forwarding an onion message to peer {}", next_node_id);
},
_ if self.intercept_messages_for_offline_peers => {
- self.enqueue_event(
+ self.enqueue_intercepted_event(
Event::OnionMessageIntercepted {
peer_node_id: next_node_id, message: onion_message
}
.or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new()))
.mark_connected();
if self.intercept_messages_for_offline_peers {
- self.enqueue_event(
+ self.pending_events.lock().unwrap().peer_connecteds.push(
Event::OnionMessagePeerConnected { peer_node_id: *their_node_id }
);
}