persister, chain_monitor,
chain_monitor.process_pending_events_async(async_event_handler).await,
channel_manager, channel_manager.get_cm().process_pending_events_async(async_event_handler).await,
- peer_manager, process_onion_message_handler_events_async(&peer_manager, async_event_handler).await,
+ peer_manager,
+ for event in onion_message_handler_events(peer_manager) {
+ handler(event).await
+ },
gossip_sync, logger, scorer, should_break, {
let fut = Selector {
a: channel_manager.get_cm().get_event_or_persistence_needed_future(),
)
}
-#[cfg(feature = "futures")]
-async fn process_onion_message_handler_events_async<
- EventHandlerFuture: core::future::Future<Output = ()>,
- EventHandler: Fn(Event) -> EventHandlerFuture,
- PM: 'static + Deref + Send + Sync,
->(
- peer_manager: &PM, handler: EventHandler
-)
-where
- PM::Target: APeerManager + Send + Sync,
-{
- let events = core::cell::RefCell::new(Vec::new());
- peer_manager.onion_message_handler().process_pending_events(&|e| events.borrow_mut().push(e));
-
- for event in events.into_inner() {
- handler(event).await
- }
+fn onion_message_handler_events<PM: 'static + Deref + Send + Sync>(
+ peer_manager: &PM
+) -> impl Iterator<Item=Event> where PM::Target: APeerManager + Send + Sync {
+ peer_manager.onion_message_handler().get_and_clear_connections_needed()
+ .into_iter().map(|(node_id, addresses)| Event::ConnectionNeeded { node_id, addresses })
}
#[cfg(feature = "std")]
persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
channel_manager, channel_manager.get_cm().process_pending_events(&event_handler),
peer_manager,
- peer_manager.onion_message_handler().process_pending_events(&event_handler),
+ for event in onion_message_handler_events(&peer_manager) {
+ event_handler.handle_event(event);
+ },
gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire),
{ Sleeper::from_two_futures(
&channel_manager.get_cm().get_event_or_persistence_needed_future(),
}
/// A handler for received [`OnionMessage`]s and for providing generated ones to send.
-pub trait OnionMessageHandler: EventsProvider {
+pub trait OnionMessageHandler {
+ /// Because much of the lightning network does not yet support forwarding onion messages, we
+ /// may need to directly connect to a node which will forward a message for us. In such a case,
+ /// this method will return the set of nodes which need connection by node_id and the
+ /// corresponding socket addresses where they may accept incoming connections.
+ ///
+ /// Thus, this method should be polled regularly to detect messages await such a direct
+ /// connection.
+ fn get_and_clear_connections_needed(&self) -> Vec<(PublicKey, Vec<SocketAddress>)>;
+
/// Handle an incoming `onion_message` message from the given peer.
fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage);
fn processing_queue_high(&self) -> bool { false }
}
impl OnionMessageHandler for IgnoringMessageHandler {
+ fn get_and_clear_connections_needed(&self) -> Vec<(PublicKey, Vec<SocketAddress>)> { Vec::new() }
fn handle_onion_message(&self, _their_node_id: &PublicKey, _msg: &msgs::OnionMessage) {}
fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option<msgs::OnionMessage> { None }
fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init, _inbound: bool) -> Result<(), ()> { Ok(()) }
false
}
-impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, CMH: Deref> EventsProvider
+impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, CMH: Deref> OnionMessageHandler
for OnionMessenger<ES, NS, L, NL, MR, OMH, CMH>
where
ES::Target: EntropySource,
OMH::Target: OffersMessageHandler,
CMH::Target: CustomOnionMessageHandler,
{
- fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
+ fn get_and_clear_connections_needed(&self) -> Vec<(PublicKey, Vec<SocketAddress>)> {
+ let mut res = Vec::new();
for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() {
if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient {
if let Some(addresses) = addresses.take() {
- handler.handle_event(Event::ConnectionNeeded { node_id: *node_id, addresses });
+ res.push((*node_id, addresses));
}
}
}
+ res
}
-}
-impl<ES: Deref, NS: Deref, L: Deref, NL: Deref, MR: Deref, OMH: Deref, CMH: Deref> OnionMessageHandler
-for OnionMessenger<ES, NS, L, NL, MR, OMH, CMH>
-where
- ES::Target: EntropySource,
- NS::Target: NodeSigner,
- L::Target: Logger,
- NL::Target: NodeIdLookUp,
- MR::Target: MessageRouter,
- OMH::Target: OffersMessageHandler,
- CMH::Target: CustomOnionMessageHandler,
-{
fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage) {
let logger = WithContext::from(&self.logger, Some(*peer_node_id), None);
match self.peel_onion_message(msg) {