X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-background-processor%2Fsrc%2Flib.rs;h=449891ba07f2d09e1dd4fa155f8322591e15b3dd;hb=7dcee4f2e54988dbaceefad7a352bbd15263622b;hp=24d39bf50c1991357859c0ead0b9ecdc15247a57;hpb=5d187f65b993cc4862f37f6fceaa6cccd6561dc9;p=rust-lightning diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 24d39bf5..449891ba 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -30,6 +30,7 @@ use lightning::events::{Event, PathFailure}; #[cfg(feature = "std")] use lightning::events::{EventHandler, EventsProvider}; use lightning::ln::channelmanager::ChannelManager; +use lightning::ln::msgs::OnionMessageHandler; use lightning::ln::peer_handler::APeerManager; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::utxo::UtxoLookup; @@ -104,6 +105,11 @@ const PING_TIMER: u64 = 30; #[cfg(test)] const PING_TIMER: u64 = 1; +#[cfg(not(test))] +const ONION_MESSAGE_HANDLER_TIMER: u64 = 10; +#[cfg(test)] +const ONION_MESSAGE_HANDLER_TIMER: u64 = 1; + /// Prune the network graph of stale entries hourly. const NETWORK_PRUNE_TIMER: u64 = 60 * 60; @@ -270,18 +276,20 @@ fn update_scorer<'a, S: 'static + Deref + Send + Sync, SC: 'a + Wri } macro_rules! define_run_body { - ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, - $channel_manager: ident, $process_channel_manager_events: expr, - $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident, - $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr, - $check_slow_await: expr) - => { { + ( + $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, + $channel_manager: ident, $process_channel_manager_events: expr, + $peer_manager: ident, $process_onion_message_handler_events: expr, $gossip_sync: ident, + $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr, + $timer_elapsed: expr, $check_slow_await: expr + ) => { { log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); $channel_manager.timer_tick_occurred(); log_trace!($logger, "Rebroadcasting monitor's pending claims on startup"); $chain_monitor.rebroadcast_pending_claims(); let mut last_freshness_call = $get_timer(FRESHNESS_TIMER); + let mut last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER); let mut last_ping_call = $get_timer(PING_TIMER); let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER); let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER); @@ -291,6 +299,7 @@ macro_rules! define_run_body { loop { $process_channel_manager_events; $process_chain_monitor_events; + $process_onion_message_handler_events; // Note that the PeerManager::process_events may block on ChannelManager's locks, // hence it comes last here. When the ChannelManager finishes whatever it's doing, @@ -334,6 +343,11 @@ macro_rules! define_run_body { $channel_manager.timer_tick_occurred(); last_freshness_call = $get_timer(FRESHNESS_TIMER); } + if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) { + log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred"); + $peer_manager.onion_message_handler().timer_tick_occurred(); + last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER); + } if await_slow { // On various platforms, we may be starved of CPU cycles for several reasons. // E.g. on iOS, if we've been in the background, we will be entirely paused. @@ -599,12 +613,11 @@ pub async fn process_events_async< EventHandlerFuture: core::future::Future, EventHandler: Fn(Event) -> EventHandlerFuture, PS: 'static + Deref + Send, - M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, + M: 'static + Deref::EcdsaSigner, CF, T, F, L, P>> + Send + Sync, CM: 'static + Deref> + Send + Sync, PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, - APM: APeerManager + Send + Sync, - PM: 'static + Deref + Send + Sync, + PM: 'static + Deref + Send + Sync, S: 'static + Deref + Send + Sync, SC: for<'b> WriteableScore<'b>, SleepFuture: core::future::Future + core::marker::Unpin, @@ -617,7 +630,7 @@ pub async fn process_events_async< where UL::Target: 'static + UtxoLookup, CF::Target: 'static + chain::Filter, - CW::Target: 'static + chain::Watch<::Signer>, + CW::Target: 'static + chain::Watch<::EcdsaSigner>, T::Target: 'static + BroadcasterInterface, ES::Target: 'static + EntropySource, NS::Target: 'static + NodeSigner, @@ -625,8 +638,9 @@ where F::Target: 'static + FeeEstimator, R::Target: 'static + Router, L::Target: 'static + Logger, - P::Target: 'static + Persist<::Signer>, + P::Target: 'static + Persist<::EcdsaSigner>, PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>, + PM::Target: APeerManager + Send + Sync, { let mut should_break = false; let async_event_handler = |event| { @@ -650,10 +664,12 @@ where event_handler(event).await; } }; - define_run_body!(persister, - chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await, + define_run_body!( + persister, chain_monitor, + chain_monitor.process_pending_events_async(async_event_handler).await, channel_manager, channel_manager.process_pending_events_async(async_event_handler).await, - gossip_sync, peer_manager, logger, scorer, should_break, { + peer_manager, process_onion_message_handler_events_async(&peer_manager, async_event_handler).await, + gossip_sync, logger, scorer, should_break, { let fut = Selector { a: channel_manager.get_event_or_persistence_needed_future(), b: chain_monitor.get_update_future(), @@ -673,7 +689,29 @@ where task::Poll::Ready(exit) => { should_break = exit; true }, task::Poll::Pending => false, } - }, mobile_interruptable_platform) + }, mobile_interruptable_platform + ) +} + +#[cfg(feature = "futures")] +async fn process_onion_message_handler_events_async< + EventHandlerFuture: core::future::Future, + EventHandler: Fn(Event) -> EventHandlerFuture, + PM: 'static + Deref + Send + Sync, +>( + peer_manager: &PM, handler: EventHandler +) +where + PM::Target: APeerManager + Send + Sync, +{ + use lightning::events::EventsProvider; + + let events = core::cell::RefCell::new(Vec::new()); + peer_manager.onion_message_handler().process_pending_events(&|e| events.borrow_mut().push(e)); + + for event in events.into_inner() { + handler(event).await + } } #[cfg(feature = "std")] @@ -738,12 +776,11 @@ impl BackgroundProcessor { P: 'static + Deref + Send + Sync, EH: 'static + EventHandler + Send, PS: 'static + Deref + Send, - M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, + M: 'static + Deref::EcdsaSigner, CF, T, F, L, P>> + Send + Sync, CM: 'static + Deref> + Send + Sync, PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, - APM: APeerManager + Send + Sync, - PM: 'static + Deref + Send + Sync, + PM: 'static + Deref + Send + Sync, S: 'static + Deref + Send + Sync, SC: for <'b> WriteableScore<'b>, >( @@ -753,7 +790,7 @@ impl BackgroundProcessor { where UL::Target: 'static + UtxoLookup, CF::Target: 'static + chain::Filter, - CW::Target: 'static + chain::Watch<::Signer>, + CW::Target: 'static + chain::Watch<::EcdsaSigner>, T::Target: 'static + BroadcasterInterface, ES::Target: 'static + EntropySource, NS::Target: 'static + NodeSigner, @@ -761,8 +798,9 @@ impl BackgroundProcessor { F::Target: 'static + FeeEstimator, R::Target: 'static + Router, L::Target: 'static + Logger, - P::Target: 'static + Persist<::Signer>, + P::Target: 'static + Persist<::EcdsaSigner>, PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>, + PM::Target: APeerManager + Send + Sync, { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = stop_thread.clone(); @@ -782,14 +820,18 @@ impl BackgroundProcessor { } event_handler.handle_event(event); }; - define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), + define_run_body!( + persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), channel_manager, channel_manager.process_pending_events(&event_handler), - gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire), + peer_manager, + peer_manager.onion_message_handler().process_pending_events(&event_handler), + gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire), { Sleeper::from_two_futures( channel_manager.get_event_or_persistence_needed_future(), chain_monitor.get_update_future() ).wait_timeout(Duration::from_millis(100)); }, - |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false) + |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false + ) }); Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } } @@ -845,7 +887,7 @@ impl Drop for BackgroundProcessor { #[cfg(all(feature = "std", test))] mod tests { use bitcoin::blockdata::constants::{genesis_block, ChainHash}; - use bitcoin::blockdata::locktime::PackedLockTime; + use bitcoin::blockdata::locktime::absolute::LockTime; use bitcoin::blockdata::transaction::{Transaction, TxOut}; use bitcoin::network::constants::Network; use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1}; @@ -863,8 +905,8 @@ mod tests { use lightning::ln::msgs::{ChannelMessageHandler, Init}; use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler}; use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync}; - use lightning::routing::router::{DefaultRouter, Path, RouteHop}; use lightning::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp, LockableScore}; + use lightning::routing::router::{DefaultRouter, Path, RouteHop, CandidateRouteHop}; use lightning::util::config::UserConfig; use lightning::util::ser::Writeable; use lightning::util::test_utils; @@ -1071,7 +1113,7 @@ mod tests { impl ScoreLookUp for TestScorer { type ScoreParams = (); fn channel_penalty_msat( - &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage, _score_params: &Self::ScoreParams + &self, _candidate: &CandidateRouteHop, _usage: ChannelUsage, _score_params: &Self::ScoreParams ) -> u64 { unimplemented!(); } } @@ -1254,7 +1296,7 @@ mod tests { assert_eq!(channel_value_satoshis, $channel_value); assert_eq!(user_channel_id, 42); - let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut { + let tx = Transaction { version: 1 as i32, lock_time: LockTime::ZERO, input: Vec::new(), output: vec![TxOut { value: channel_value_satoshis, script_pubkey: output_script.clone(), }]}; (temporary_channel_id, tx) @@ -1362,9 +1404,11 @@ mod tests { #[test] fn test_timer_tick_called() { - // Test that `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`, - // `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, and - // `PeerManager::timer_tick_occurred` every `PING_TIMER`. + // Test that: + // - `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`, + // - `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, + // - `PeerManager::timer_tick_occurred` is called every `PING_TIMER`, and + // - `OnionMessageHandler::timer_tick_occurred` is called every `ONION_MESSAGE_HANDLER_TIMER`. let (_, nodes) = create_nodes(1, "test_timer_tick_called"); let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); @@ -1375,9 +1419,11 @@ mod tests { let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string(); let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string(); let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string(); - if log_entries.get(&("lightning_background_processor".to_string(), desired_log_1)).is_some() && - log_entries.get(&("lightning_background_processor".to_string(), desired_log_2)).is_some() && - log_entries.get(&("lightning_background_processor".to_string(), desired_log_3)).is_some() { + let desired_log_4 = "Calling OnionMessageHandler's timer_tick_occurred".to_string(); + if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some() && + log_entries.get(&("lightning_background_processor", desired_log_2)).is_some() && + log_entries.get(&("lightning_background_processor", desired_log_3)).is_some() && + log_entries.get(&("lightning_background_processor", desired_log_4)).is_some() { break } } @@ -1556,7 +1602,7 @@ mod tests { loop { let log_entries = nodes[0].logger.lines.lock().unwrap(); let expected_log = "Persisting scorer".to_string(); - if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() { + if log_entries.get(&("lightning_background_processor", expected_log)).is_some() { break } } @@ -1580,7 +1626,7 @@ mod tests { $sleep; let log_entries = $nodes[0].logger.lines.lock().unwrap(); let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string(); - if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter)) + if *log_entries.get(&("lightning_background_processor", loop_counter)) .unwrap_or(&0) > 1 { // Wait until the loop has gone around at least twice. @@ -1792,7 +1838,7 @@ mod tests { let log_entries = nodes[0].logger.lines.lock().unwrap(); let expected_log = "Persisting scorer after update".to_string(); - assert_eq!(*log_entries.get(&("lightning_background_processor".to_string(), expected_log)).unwrap(), 5); + assert_eq!(*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(), 5); } #[tokio::test] @@ -1838,7 +1884,7 @@ mod tests { let log_entries = nodes[0].logger.lines.lock().unwrap(); let expected_log = "Persisting scorer after update".to_string(); - assert_eq!(*log_entries.get(&("lightning_background_processor".to_string(), expected_log)).unwrap(), 5); + assert_eq!(*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(), 5); }); let (r1, r2) = tokio::join!(t1, t2);