X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;ds=sidebyside;f=lightning-background-processor%2Fsrc%2Flib.rs;h=449891ba07f2d09e1dd4fa155f8322591e15b3dd;hb=7dcee4f2e54988dbaceefad7a352bbd15263622b;hp=f953ba1c753750133b6e79a3cc43eb1e05c81962;hpb=36ecc8e729775be2a12e1392066926bed760524a;p=rust-lightning diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index f953ba1c..449891ba 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -30,6 +30,7 @@ use lightning::events::{Event, PathFailure}; #[cfg(feature = "std")] use lightning::events::{EventHandler, EventsProvider}; use lightning::ln::channelmanager::ChannelManager; +use lightning::ln::msgs::OnionMessageHandler; use lightning::ln::peer_handler::APeerManager; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::utxo::UtxoLookup; @@ -104,6 +105,11 @@ const PING_TIMER: u64 = 30; #[cfg(test)] const PING_TIMER: u64 = 1; +#[cfg(not(test))] +const ONION_MESSAGE_HANDLER_TIMER: u64 = 10; +#[cfg(test)] +const ONION_MESSAGE_HANDLER_TIMER: u64 = 1; + /// Prune the network graph of stale entries hourly. const NETWORK_PRUNE_TIMER: u64 = 60 * 60; @@ -273,9 +279,9 @@ macro_rules! define_run_body { ( $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, $channel_manager: ident, $process_channel_manager_events: expr, - $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident, - $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr, - $check_slow_await: expr + $peer_manager: ident, $process_onion_message_handler_events: expr, $gossip_sync: ident, + $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr, + $timer_elapsed: expr, $check_slow_await: expr ) => { { log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); $channel_manager.timer_tick_occurred(); @@ -283,6 +289,7 @@ macro_rules! define_run_body { $chain_monitor.rebroadcast_pending_claims(); let mut last_freshness_call = $get_timer(FRESHNESS_TIMER); + let mut last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER); let mut last_ping_call = $get_timer(PING_TIMER); let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER); let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER); @@ -292,6 +299,7 @@ macro_rules! define_run_body { loop { $process_channel_manager_events; $process_chain_monitor_events; + $process_onion_message_handler_events; // Note that the PeerManager::process_events may block on ChannelManager's locks, // hence it comes last here. When the ChannelManager finishes whatever it's doing, @@ -335,6 +343,11 @@ macro_rules! define_run_body { $channel_manager.timer_tick_occurred(); last_freshness_call = $get_timer(FRESHNESS_TIMER); } + if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) { + log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred"); + $peer_manager.onion_message_handler().timer_tick_occurred(); + last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER); + } if await_slow { // On various platforms, we may be starved of CPU cycles for several reasons. // E.g. on iOS, if we've been in the background, we will be entirely paused. @@ -655,7 +668,8 @@ where persister, chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await, channel_manager, channel_manager.process_pending_events_async(async_event_handler).await, - gossip_sync, peer_manager, logger, scorer, should_break, { + peer_manager, process_onion_message_handler_events_async(&peer_manager, async_event_handler).await, + gossip_sync, logger, scorer, should_break, { let fut = Selector { a: channel_manager.get_event_or_persistence_needed_future(), b: chain_monitor.get_update_future(), @@ -679,6 +693,27 @@ where ) } +#[cfg(feature = "futures")] +async fn process_onion_message_handler_events_async< + EventHandlerFuture: core::future::Future, + EventHandler: Fn(Event) -> EventHandlerFuture, + PM: 'static + Deref + Send + Sync, +>( + peer_manager: &PM, handler: EventHandler +) +where + PM::Target: APeerManager + Send + Sync, +{ + use lightning::events::EventsProvider; + + let events = core::cell::RefCell::new(Vec::new()); + peer_manager.onion_message_handler().process_pending_events(&|e| events.borrow_mut().push(e)); + + for event in events.into_inner() { + handler(event).await + } +} + #[cfg(feature = "std")] impl BackgroundProcessor { /// Start a background thread that takes care of responsibilities enumerated in the [top-level @@ -788,7 +823,9 @@ impl BackgroundProcessor { define_run_body!( persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), channel_manager, channel_manager.process_pending_events(&event_handler), - gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire), + peer_manager, + peer_manager.onion_message_handler().process_pending_events(&event_handler), + gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire), { Sleeper::from_two_futures( channel_manager.get_event_or_persistence_needed_future(), chain_monitor.get_update_future() @@ -868,8 +905,8 @@ mod tests { use lightning::ln::msgs::{ChannelMessageHandler, Init}; use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler}; use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync}; - use lightning::routing::router::{DefaultRouter, Path, RouteHop}; use lightning::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp, LockableScore}; + use lightning::routing::router::{DefaultRouter, Path, RouteHop, CandidateRouteHop}; use lightning::util::config::UserConfig; use lightning::util::ser::Writeable; use lightning::util::test_utils; @@ -1076,7 +1113,7 @@ mod tests { impl ScoreLookUp for TestScorer { type ScoreParams = (); fn channel_penalty_msat( - &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage, _score_params: &Self::ScoreParams + &self, _candidate: &CandidateRouteHop, _usage: ChannelUsage, _score_params: &Self::ScoreParams ) -> u64 { unimplemented!(); } } @@ -1367,9 +1404,11 @@ mod tests { #[test] fn test_timer_tick_called() { - // Test that `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`, - // `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, and - // `PeerManager::timer_tick_occurred` every `PING_TIMER`. + // Test that: + // - `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`, + // - `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, + // - `PeerManager::timer_tick_occurred` is called every `PING_TIMER`, and + // - `OnionMessageHandler::timer_tick_occurred` is called every `ONION_MESSAGE_HANDLER_TIMER`. let (_, nodes) = create_nodes(1, "test_timer_tick_called"); let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); @@ -1380,9 +1419,11 @@ mod tests { let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string(); let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string(); let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string(); + let desired_log_4 = "Calling OnionMessageHandler's timer_tick_occurred".to_string(); if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some() && log_entries.get(&("lightning_background_processor", desired_log_2)).is_some() && - log_entries.get(&("lightning_background_processor", desired_log_3)).is_some() { + log_entries.get(&("lightning_background_processor", desired_log_3)).is_some() && + log_entries.get(&("lightning_background_processor", desired_log_4)).is_some() { break } }