AsyncPaymentsMessageHandler trait for OnionMessenger
[rust-lightning] / lightning-background-processor / src / lib.rs
index 8b55913728f4fefe46c59228b0f833565881de39..aae64e981bb54271d0279c26db666bdee77f4d10 100644 (file)
@@ -27,11 +27,12 @@ use lightning::chain::chainmonitor::{ChainMonitor, Persist};
 use lightning::events::{Event, PathFailure};
 #[cfg(feature = "std")]
 use lightning::events::EventHandler;
-#[cfg(any(feature = "std", feature = "futures"))]
+#[cfg(feature = "std")]
 use lightning::events::EventsProvider;
 
 use lightning::ln::channelmanager::AChannelManager;
 use lightning::ln::msgs::OnionMessageHandler;
+use lightning::onion_message::messenger::AOnionMessenger;
 use lightning::ln::peer_handler::APeerManager;
 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
 use lightning::routing::utxo::UtxoLookup;
@@ -55,7 +56,7 @@ use std::thread::{self, JoinHandle};
 use std::time::Instant;
 
 #[cfg(not(feature = "std"))]
-use alloc::vec::Vec;
+use alloc::boxed::Box;
 
 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
@@ -281,7 +282,8 @@ macro_rules! define_run_body {
        (
                $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
                $channel_manager: ident, $process_channel_manager_events: expr,
-               $peer_manager: ident, $process_onion_message_handler_events: expr, $gossip_sync: ident,
+               $onion_messenger: ident, $process_onion_message_handler_events: expr,
+               $peer_manager: ident, $gossip_sync: ident,
                $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
                $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
        ) => { {
@@ -347,8 +349,10 @@ macro_rules! define_run_body {
                                last_freshness_call = $get_timer(FRESHNESS_TIMER);
                        }
                        if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) {
-                               log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred");
-                               $peer_manager.onion_message_handler().timer_tick_occurred();
+                               if let Some(om) = &$onion_messenger {
+                                       log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred");
+                                       om.get_om().timer_tick_occurred();
+                               }
                                last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER);
                        }
                        if await_slow {
@@ -564,6 +568,7 @@ use core::task;
 /// # type NetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<Logger>>;
 /// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
 /// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
+/// # type OnionMessenger<B, F, FE> = lightning::onion_message::messenger::OnionMessenger<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<Logger>, Arc<ChannelManager<B, F, FE>>, Arc<lightning::onion_message::messenger::DefaultMessageRouter<Arc<NetworkGraph>, Arc<Logger>, Arc<lightning::sign::KeysManager>>>, Arc<ChannelManager<B, F, FE>>, lightning::ln::peer_handler::IgnoringMessageHandler, lightning::ln::peer_handler::IgnoringMessageHandler>;
 /// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
 /// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger>;
 /// #
@@ -576,6 +581,7 @@ use core::task;
 /// #     peer_manager: Arc<PeerManager<B, F, FE, UL>>,
 /// #     event_handler: Arc<EventHandler>,
 /// #     channel_manager: Arc<ChannelManager<B, F, FE>>,
+/// #     onion_messenger: Arc<OnionMessenger<B, F, FE>>,
 /// #     chain_monitor: Arc<ChainMonitor<B, F, FE>>,
 /// #     gossip_sync: Arc<P2PGossipSync<UL>>,
 /// #     persister: Arc<Store>,
@@ -595,6 +601,7 @@ use core::task;
 ///    let background_chan_man = Arc::clone(&node.channel_manager);
 ///    let background_gossip_sync = GossipSync::p2p(Arc::clone(&node.gossip_sync));
 ///    let background_peer_man = Arc::clone(&node.peer_manager);
+///    let background_onion_messenger = Arc::clone(&node.onion_messenger);
 ///    let background_logger = Arc::clone(&node.logger);
 ///    let background_scorer = Arc::clone(&node.scorer);
 ///
@@ -619,6 +626,7 @@ use core::task;
 ///                    |e| background_event_handler.handle_event(e),
 ///                    background_chain_mon,
 ///                    background_chan_man,
+///                    Some(background_onion_messenger),
 ///                    background_gossip_sync,
 ///                    background_peer_man,
 ///                    background_logger,
@@ -651,6 +659,7 @@ pub async fn process_events_async<
        PS: 'static + Deref + Send,
        M: 'static + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>> + Send + Sync,
        CM: 'static + Deref + Send + Sync,
+       OM: 'static + Deref + Send + Sync,
        PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
        RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
        PM: 'static + Deref + Send + Sync,
@@ -661,6 +670,7 @@ pub async fn process_events_async<
        FetchTime: Fn() -> Option<Duration>,
 >(
        persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
+       onion_messenger: Option<OM>,
        gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
        sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
 ) -> Result<(), lightning::io::Error>
@@ -673,6 +683,7 @@ where
        P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
        PS::Target: 'static + Persister<'a, CM, L, SC>,
        CM::Target: AChannelManager + Send + Sync,
+       OM::Target: AOnionMessenger + Send + Sync,
        PM::Target: APeerManager + Send + Sync,
 {
        let mut should_break = false;
@@ -683,7 +694,7 @@ where
                let logger = &logger;
                let persister = &persister;
                let fetch_time = &fetch_time;
-               async move {
+               Box::pin(async move { // We should be able to drop the Box once our MSRV is 1.68
                        if let Some(network_graph) = network_graph {
                                handle_network_graph_update(network_graph, &event)
                        }
@@ -698,14 +709,14 @@ where
                                }
                        }
                        event_handler(event).await;
-               }
+               })
        };
        define_run_body!(
                persister, chain_monitor,
                chain_monitor.process_pending_events_async(async_event_handler).await,
                channel_manager, channel_manager.get_cm().process_pending_events_async(async_event_handler).await,
-               peer_manager, process_onion_message_handler_events_async(&peer_manager, async_event_handler).await,
-               gossip_sync, logger, scorer, should_break, {
+               onion_messenger, if let Some(om) = &onion_messenger { om.get_om().process_pending_events_async(async_event_handler).await },
+               peer_manager, gossip_sync, logger, scorer, should_break, {
                        let fut = Selector {
                                a: channel_manager.get_cm().get_event_or_persistence_needed_future(),
                                b: chain_monitor.get_update_future(),
@@ -729,25 +740,6 @@ where
        )
 }
 
-#[cfg(feature = "futures")]
-async fn process_onion_message_handler_events_async<
-       EventHandlerFuture: core::future::Future<Output = ()>,
-       EventHandler: Fn(Event) -> EventHandlerFuture,
-       PM: 'static + Deref + Send + Sync,
->(
-       peer_manager: &PM, handler: EventHandler
-)
-where
-       PM::Target: APeerManager + Send + Sync,
-{
-       let events = core::cell::RefCell::new(Vec::new());
-       peer_manager.onion_message_handler().process_pending_events(&|e| events.borrow_mut().push(e));
-
-       for event in events.into_inner() {
-               handler(event).await
-       }
-}
-
 #[cfg(feature = "std")]
 impl BackgroundProcessor {
        /// Start a background thread that takes care of responsibilities enumerated in the [top-level
@@ -807,6 +799,7 @@ impl BackgroundProcessor {
                PS: 'static + Deref + Send,
                M: 'static + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>> + Send + Sync,
                CM: 'static + Deref + Send + Sync,
+               OM: 'static + Deref + Send + Sync,
                PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
                RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
                PM: 'static + Deref + Send + Sync,
@@ -814,6 +807,7 @@ impl BackgroundProcessor {
                SC: for <'b> WriteableScore<'b>,
        >(
                persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
+               onion_messenger: Option<OM>,
                gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
        ) -> Self
        where
@@ -825,6 +819,7 @@ impl BackgroundProcessor {
                P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
                PS::Target: 'static + Persister<'a, CM, L, SC>,
                CM::Target: AChannelManager + Send + Sync,
+               OM::Target: AOnionMessenger + Send + Sync,
                PM::Target: APeerManager + Send + Sync,
        {
                let stop_thread = Arc::new(AtomicBool::new(false));
@@ -851,9 +846,8 @@ impl BackgroundProcessor {
                        define_run_body!(
                                persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
                                channel_manager, channel_manager.get_cm().process_pending_events(&event_handler),
-                               peer_manager,
-                               peer_manager.onion_message_handler().process_pending_events(&event_handler),
-                               gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire),
+                               onion_messenger, if let Some(om) = &onion_messenger { om.get_om().process_pending_events(&event_handler) },
+                               peer_manager, gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire),
                                { Sleeper::from_two_futures(
                                        &channel_manager.get_cm().get_event_or_persistence_needed_future(),
                                        &chain_monitor.get_update_future()
@@ -940,6 +934,7 @@ mod tests {
        use lightning::ln::functional_test_utils::*;
        use lightning::ln::msgs::{ChannelMessageHandler, Init};
        use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
+       use lightning::onion_message::messenger::{DefaultMessageRouter, OnionMessenger};
        use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
        use lightning::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp, LockableScore};
        use lightning::routing::router::{DefaultRouter, Path, RouteHop, CandidateRouteHop};
@@ -1001,11 +996,14 @@ mod tests {
        type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
        type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
 
+       type OM = OnionMessenger<Arc<KeysManager>, Arc<KeysManager>, Arc<test_utils::TestLogger>, Arc<ChannelManager>, Arc<DefaultMessageRouter<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>, Arc<KeysManager>>>, IgnoringMessageHandler, IgnoringMessageHandler, IgnoringMessageHandler>;
+
        struct Node {
                node: Arc<ChannelManager>,
+               messenger: Arc<OM>,
                p2p_gossip_sync: PGS,
                rapid_gossip_sync: RGS,
-               peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
+               peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<OM>, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
                chain_monitor: Arc<ChainMonitor>,
                kv_store: Arc<FilesystemStore>,
                tx_broadcaster: Arc<test_utils::TestBroadcaster>,
@@ -1284,6 +1282,7 @@ mod tests {
                        let seed = [i as u8; 32];
                        let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
                        let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), Arc::clone(&keys_manager), scorer.clone(), Default::default()));
+                       let msg_router = Arc::new(DefaultMessageRouter::new(network_graph.clone(), Arc::clone(&keys_manager)));
                        let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin));
                        let kv_store = Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into()));
                        let now = Duration::from_secs(genesis_block.header.time as u64);
@@ -1292,6 +1291,7 @@ mod tests {
                        let best_block = BestBlock::from_network(network);
                        let params = ChainParameters { network, best_block };
                        let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params, genesis_block.header.time));
+                       let messenger = Arc::new(OnionMessenger::new(keys_manager.clone(), keys_manager.clone(), logger.clone(), manager.clone(), msg_router.clone(), IgnoringMessageHandler {}, IgnoringMessageHandler {}, IgnoringMessageHandler {}));
                        let wallet = Arc::new(TestWallet {});
                        let sweeper = Arc::new(OutputSweeper::new(best_block, Arc::clone(&tx_broadcaster), Arc::clone(&fee_estimator),
                                None::<Arc<dyn Filter + Sync + Send>>, Arc::clone(&keys_manager), wallet, Arc::clone(&kv_store), Arc::clone(&logger)));
@@ -1300,10 +1300,10 @@ mod tests {
                        let msg_handler = MessageHandler {
                                chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new(ChainHash::using_genesis_block(Network::Testnet))),
                                route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()),
-                               onion_message_handler: IgnoringMessageHandler{}, custom_message_handler: IgnoringMessageHandler{}
+                               onion_message_handler: messenger.clone(), custom_message_handler: IgnoringMessageHandler{}
                        };
                        let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), keys_manager.clone()));
-                       let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, kv_store, tx_broadcaster, network_graph, logger, best_block, scorer, sweeper };
+                       let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, kv_store, tx_broadcaster, network_graph, logger, best_block, scorer, sweeper, messenger };
                        nodes.push(node);
                }
 
@@ -1426,7 +1426,7 @@ mod tests {
                let data_dir = nodes[0].kv_store.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
                let event_handler = |_: _| {};
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                macro_rules! check_persisted_data {
                        ($node: expr, $filepath: expr) => {
@@ -1494,7 +1494,7 @@ mod tests {
                let data_dir = nodes[0].kv_store.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
                let event_handler = |_: _| {};
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
                loop {
                        let log_entries = nodes[0].logger.lines.lock().unwrap();
                        let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string();
@@ -1523,7 +1523,7 @@ mod tests {
                let data_dir = nodes[0].kv_store.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
                let event_handler = |_: _| {};
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
                match bg_processor.join() {
                        Ok(_) => panic!("Expected error persisting manager"),
                        Err(e) => {
@@ -1544,7 +1544,7 @@ mod tests {
                let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
 
                let bp_future = super::process_events_async(
-                       persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
+                       persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()),
                        nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
                        Some(nodes[0].scorer.clone()), move |dur: Duration| {
                                Box::pin(async move {
@@ -1569,7 +1569,7 @@ mod tests {
                let data_dir = nodes[0].kv_store.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
                let event_handler = |_: _| {};
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                match bg_processor.stop() {
                        Ok(_) => panic!("Expected error persisting network graph"),
@@ -1587,7 +1587,7 @@ mod tests {
                let data_dir = nodes[0].kv_store.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
                let event_handler = |_: _| {};
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                match bg_processor.stop() {
                        Ok(_) => panic!("Expected error persisting scorer"),
@@ -1615,7 +1615,7 @@ mod tests {
                        _ => panic!("Unexpected event: {:?}", event),
                };
 
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                // Open a channel and check that the FundingGenerationReady event was handled.
                begin_open_channel!(nodes[0], nodes[1], channel_value);
@@ -1655,7 +1655,7 @@ mod tests {
                        _ => panic!("Unexpected event: {:?}", event),
                };
                let persister = Arc::new(Persister::new(data_dir));
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                // Force close the channel and check that the SpendableOutputs event was handled.
                let error_message = "Channel force-closed";
@@ -1767,7 +1767,7 @@ mod tests {
                let data_dir = nodes[0].kv_store.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
                let event_handler = |_: _| {};
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                loop {
                        let log_entries = nodes[0].logger.lines.lock().unwrap();
@@ -1840,7 +1840,7 @@ mod tests {
                let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
 
                let event_handler = |_: _| {};
-               let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes,
                        receiver.recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5)),
@@ -1860,7 +1860,7 @@ mod tests {
 
                let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
                let bp_future = super::process_events_async(
-                       persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
+                       persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()),
                        nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
                        Some(nodes[0].scorer.clone()), move |dur: Duration| {
                                let mut exit_receiver = exit_receiver.clone();
@@ -1998,7 +1998,7 @@ mod tests {
                let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
                let data_dir = nodes[0].kv_store.get_data_dir();
                let persister = Arc::new(Persister::new(data_dir));
-               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
 
                do_test_payment_path_scoring!(nodes, receiver.recv_timeout(Duration::from_secs(EVENT_DEADLINE)));
 
@@ -2035,7 +2035,7 @@ mod tests {
                let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
 
                let bp_future = super::process_events_async(
-                       persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
+                       persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()),
                        nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
                        Some(nodes[0].scorer.clone()), move |dur: Duration| {
                                let mut exit_receiver = exit_receiver.clone();