X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-background-processor%2Fsrc%2Flib.rs;h=449891ba07f2d09e1dd4fa155f8322591e15b3dd;hb=7dcee4f2e54988dbaceefad7a352bbd15263622b;hp=fb20514efd160185745438b03d44f5faeedf7bd8;hpb=0ce1c5a674f4ba27345634d529db5cdd758b8319;p=rust-lightning diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index fb20514e..449891ba 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -30,6 +30,7 @@ use lightning::events::{Event, PathFailure}; #[cfg(feature = "std")] use lightning::events::{EventHandler, EventsProvider}; use lightning::ln::channelmanager::ChannelManager; +use lightning::ln::msgs::OnionMessageHandler; use lightning::ln::peer_handler::APeerManager; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::utxo::UtxoLookup; @@ -104,6 +105,11 @@ const PING_TIMER: u64 = 30; #[cfg(test)] const PING_TIMER: u64 = 1; +#[cfg(not(test))] +const ONION_MESSAGE_HANDLER_TIMER: u64 = 10; +#[cfg(test)] +const ONION_MESSAGE_HANDLER_TIMER: u64 = 1; + /// Prune the network graph of stale entries hourly. const NETWORK_PRUNE_TIMER: u64 = 60 * 60; @@ -270,18 +276,20 @@ fn update_scorer<'a, S: 'static + Deref + Send + Sync, SC: 'a + Wri } macro_rules! define_run_body { - ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, - $channel_manager: ident, $process_channel_manager_events: expr, - $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident, - $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr, - $check_slow_await: expr) - => { { + ( + $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, + $channel_manager: ident, $process_channel_manager_events: expr, + $peer_manager: ident, $process_onion_message_handler_events: expr, $gossip_sync: ident, + $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr, + $timer_elapsed: expr, $check_slow_await: expr + ) => { { log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); $channel_manager.timer_tick_occurred(); log_trace!($logger, "Rebroadcasting monitor's pending claims on startup"); $chain_monitor.rebroadcast_pending_claims(); let mut last_freshness_call = $get_timer(FRESHNESS_TIMER); + let mut last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER); let mut last_ping_call = $get_timer(PING_TIMER); let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER); let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER); @@ -291,6 +299,7 @@ macro_rules! define_run_body { loop { $process_channel_manager_events; $process_chain_monitor_events; + $process_onion_message_handler_events; // Note that the PeerManager::process_events may block on ChannelManager's locks, // hence it comes last here. When the ChannelManager finishes whatever it's doing, @@ -334,6 +343,11 @@ macro_rules! define_run_body { $channel_manager.timer_tick_occurred(); last_freshness_call = $get_timer(FRESHNESS_TIMER); } + if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) { + log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred"); + $peer_manager.onion_message_handler().timer_tick_occurred(); + last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER); + } if await_slow { // On various platforms, we may be starved of CPU cycles for several reasons. // E.g. on iOS, if we've been in the background, we will be entirely paused. @@ -528,7 +542,7 @@ use core::task; /// # type MyFilter = dyn lightning::chain::Filter + Send + Sync; /// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync; /// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; -/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager; +/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager, MyLogger>; /// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph>; /// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync, Arc, Arc>; /// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager; @@ -599,12 +613,11 @@ pub async fn process_events_async< EventHandlerFuture: core::future::Future, EventHandler: Fn(Event) -> EventHandlerFuture, PS: 'static + Deref + Send, - M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, + M: 'static + Deref::EcdsaSigner, CF, T, F, L, P>> + Send + Sync, CM: 'static + Deref> + Send + Sync, PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, - APM: APeerManager + Send + Sync, - PM: 'static + Deref + Send + Sync, + PM: 'static + Deref + Send + Sync, S: 'static + Deref + Send + Sync, SC: for<'b> WriteableScore<'b>, SleepFuture: core::future::Future + core::marker::Unpin, @@ -617,7 +630,7 @@ pub async fn process_events_async< where UL::Target: 'static + UtxoLookup, CF::Target: 'static + chain::Filter, - CW::Target: 'static + chain::Watch<::Signer>, + CW::Target: 'static + chain::Watch<::EcdsaSigner>, T::Target: 'static + BroadcasterInterface, ES::Target: 'static + EntropySource, NS::Target: 'static + NodeSigner, @@ -625,8 +638,9 @@ where F::Target: 'static + FeeEstimator, R::Target: 'static + Router, L::Target: 'static + Logger, - P::Target: 'static + Persist<::Signer>, + P::Target: 'static + Persist<::EcdsaSigner>, PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>, + PM::Target: APeerManager + Send + Sync, { let mut should_break = false; let async_event_handler = |event| { @@ -650,10 +664,12 @@ where event_handler(event).await; } }; - define_run_body!(persister, - chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await, + define_run_body!( + persister, chain_monitor, + chain_monitor.process_pending_events_async(async_event_handler).await, channel_manager, channel_manager.process_pending_events_async(async_event_handler).await, - gossip_sync, peer_manager, logger, scorer, should_break, { + peer_manager, process_onion_message_handler_events_async(&peer_manager, async_event_handler).await, + gossip_sync, logger, scorer, should_break, { let fut = Selector { a: channel_manager.get_event_or_persistence_needed_future(), b: chain_monitor.get_update_future(), @@ -673,7 +689,29 @@ where task::Poll::Ready(exit) => { should_break = exit; true }, task::Poll::Pending => false, } - }, mobile_interruptable_platform) + }, mobile_interruptable_platform + ) +} + +#[cfg(feature = "futures")] +async fn process_onion_message_handler_events_async< + EventHandlerFuture: core::future::Future, + EventHandler: Fn(Event) -> EventHandlerFuture, + PM: 'static + Deref + Send + Sync, +>( + peer_manager: &PM, handler: EventHandler +) +where + PM::Target: APeerManager + Send + Sync, +{ + use lightning::events::EventsProvider; + + let events = core::cell::RefCell::new(Vec::new()); + peer_manager.onion_message_handler().process_pending_events(&|e| events.borrow_mut().push(e)); + + for event in events.into_inner() { + handler(event).await + } } #[cfg(feature = "std")] @@ -738,12 +776,11 @@ impl BackgroundProcessor { P: 'static + Deref + Send + Sync, EH: 'static + EventHandler + Send, PS: 'static + Deref + Send, - M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, + M: 'static + Deref::EcdsaSigner, CF, T, F, L, P>> + Send + Sync, CM: 'static + Deref> + Send + Sync, PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, - APM: APeerManager + Send + Sync, - PM: 'static + Deref + Send + Sync, + PM: 'static + Deref + Send + Sync, S: 'static + Deref + Send + Sync, SC: for <'b> WriteableScore<'b>, >( @@ -753,7 +790,7 @@ impl BackgroundProcessor { where UL::Target: 'static + UtxoLookup, CF::Target: 'static + chain::Filter, - CW::Target: 'static + chain::Watch<::Signer>, + CW::Target: 'static + chain::Watch<::EcdsaSigner>, T::Target: 'static + BroadcasterInterface, ES::Target: 'static + EntropySource, NS::Target: 'static + NodeSigner, @@ -761,8 +798,9 @@ impl BackgroundProcessor { F::Target: 'static + FeeEstimator, R::Target: 'static + Router, L::Target: 'static + Logger, - P::Target: 'static + Persist<::Signer>, + P::Target: 'static + Persist<::EcdsaSigner>, PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>, + PM::Target: APeerManager + Send + Sync, { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = stop_thread.clone(); @@ -782,14 +820,18 @@ impl BackgroundProcessor { } event_handler.handle_event(event); }; - define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), + define_run_body!( + persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), channel_manager, channel_manager.process_pending_events(&event_handler), - gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire), + peer_manager, + peer_manager.onion_message_handler().process_pending_events(&event_handler), + gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire), { Sleeper::from_two_futures( channel_manager.get_event_or_persistence_needed_future(), chain_monitor.get_update_future() ).wait_timeout(Duration::from_millis(100)); }, - |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false) + |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false + ) }); Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } } @@ -845,7 +887,7 @@ impl Drop for BackgroundProcessor { #[cfg(all(feature = "std", test))] mod tests { use bitcoin::blockdata::constants::{genesis_block, ChainHash}; - use bitcoin::blockdata::locktime::PackedLockTime; + use bitcoin::blockdata::locktime::absolute::LockTime; use bitcoin::blockdata::transaction::{Transaction, TxOut}; use bitcoin::network::constants::Network; use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1}; @@ -863,8 +905,8 @@ mod tests { use lightning::ln::msgs::{ChannelMessageHandler, Init}; use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler}; use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync}; - use lightning::routing::router::{DefaultRouter, Path, RouteHop}; - use lightning::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp}; + use lightning::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp, LockableScore}; + use lightning::routing::router::{DefaultRouter, Path, RouteHop, CandidateRouteHop}; use lightning::util::config::UserConfig; use lightning::util::ser::Writeable; use lightning::util::test_utils; @@ -894,6 +936,11 @@ mod tests { fn disconnect_socket(&mut self) {} } + #[cfg(c_bindings)] + type LockingWrapper = lightning::routing::scoring::MultiThreadedLockableScore; + #[cfg(not(c_bindings))] + type LockingWrapper = Mutex; + type ChannelManager = channelmanager::ChannelManager< Arc, @@ -905,7 +952,7 @@ mod tests { Arc>>, Arc, - Arc>, + Arc>, (), TestScorer> >, @@ -927,7 +974,7 @@ mod tests { network_graph: Arc>>, logger: Arc, best_block: BestBlock, - scorer: Arc>, + scorer: Arc>, } impl Node { @@ -1066,7 +1113,7 @@ mod tests { impl ScoreLookUp for TestScorer { type ScoreParams = (); fn channel_penalty_msat( - &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage, _score_params: &Self::ScoreParams + &self, _candidate: &CandidateRouteHop, _usage: ChannelUsage, _score_params: &Self::ScoreParams ) -> u64 { unimplemented!(); } } @@ -1148,6 +1195,9 @@ mod tests { } } + #[cfg(c_bindings)] + impl lightning::routing::scoring::Score for TestScorer {} + impl Drop for TestScorer { fn drop(&mut self) { if std::thread::panicking() { @@ -1179,7 +1229,7 @@ mod tests { let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i))); let genesis_block = genesis_block(network); let network_graph = Arc::new(NetworkGraph::new(network, logger.clone())); - let scorer = Arc::new(Mutex::new(TestScorer::new())); + let scorer = Arc::new(LockingWrapper::new(TestScorer::new())); let seed = [i as u8; 32]; let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone(), Default::default())); let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin)); @@ -1233,7 +1283,7 @@ mod tests { macro_rules! begin_open_channel { ($node_a: expr, $node_b: expr, $channel_value: expr) => {{ - $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap(); + $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None, None).unwrap(); $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id())); $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id())); }} @@ -1246,7 +1296,7 @@ mod tests { assert_eq!(channel_value_satoshis, $channel_value); assert_eq!(user_channel_id, 42); - let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut { + let tx = Transaction { version: 1 as i32, lock_time: LockTime::ZERO, input: Vec::new(), output: vec![TxOut { value: channel_value_satoshis, script_pubkey: output_script.clone(), }]}; (temporary_channel_id, tx) @@ -1354,9 +1404,11 @@ mod tests { #[test] fn test_timer_tick_called() { - // Test that `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`, - // `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, and - // `PeerManager::timer_tick_occurred` every `PING_TIMER`. + // Test that: + // - `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`, + // - `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, + // - `PeerManager::timer_tick_occurred` is called every `PING_TIMER`, and + // - `OnionMessageHandler::timer_tick_occurred` is called every `ONION_MESSAGE_HANDLER_TIMER`. let (_, nodes) = create_nodes(1, "test_timer_tick_called"); let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); @@ -1367,9 +1419,11 @@ mod tests { let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string(); let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string(); let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string(); - if log_entries.get(&("lightning_background_processor".to_string(), desired_log_1)).is_some() && - log_entries.get(&("lightning_background_processor".to_string(), desired_log_2)).is_some() && - log_entries.get(&("lightning_background_processor".to_string(), desired_log_3)).is_some() { + let desired_log_4 = "Calling OnionMessageHandler's timer_tick_occurred".to_string(); + if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some() && + log_entries.get(&("lightning_background_processor", desired_log_2)).is_some() && + log_entries.get(&("lightning_background_processor", desired_log_3)).is_some() && + log_entries.get(&("lightning_background_processor", desired_log_4)).is_some() { break } } @@ -1548,7 +1602,7 @@ mod tests { loop { let log_entries = nodes[0].logger.lines.lock().unwrap(); let expected_log = "Persisting scorer".to_string(); - if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() { + if log_entries.get(&("lightning_background_processor", expected_log)).is_some() { break } } @@ -1572,7 +1626,7 @@ mod tests { $sleep; let log_entries = $nodes[0].logger.lines.lock().unwrap(); let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string(); - if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter)) + if *log_entries.get(&("lightning_background_processor", loop_counter)) .unwrap_or(&0) > 1 { // Wait until the loop has gone around at least twice. @@ -1689,7 +1743,7 @@ mod tests { maybe_announced_channel: true, }], blinded_tail: None }; - $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid }); + $nodes[0].scorer.write_lock().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid }); $nodes[0].node.push_pending_event(Event::PaymentPathFailed { payment_id: None, payment_hash: PaymentHash([42; 32]), @@ -1706,7 +1760,7 @@ mod tests { // Ensure we'll score payments that were explicitly failed back by the destination as // ProbeSuccess. - $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() }); + $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() }); $nodes[0].node.push_pending_event(Event::PaymentPathFailed { payment_id: None, payment_hash: PaymentHash([42; 32]), @@ -1721,7 +1775,7 @@ mod tests { _ => panic!("Unexpected event"), } - $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() }); + $nodes[0].scorer.write_lock().expect(TestResult::PaymentSuccess { path: path.clone() }); $nodes[0].node.push_pending_event(Event::PaymentPathSuccessful { payment_id: PaymentId([42; 32]), payment_hash: None, @@ -1733,7 +1787,7 @@ mod tests { _ => panic!("Unexpected event"), } - $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() }); + $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() }); $nodes[0].node.push_pending_event(Event::ProbeSuccessful { payment_id: PaymentId([42; 32]), payment_hash: PaymentHash([42; 32]), @@ -1745,7 +1799,7 @@ mod tests { _ => panic!("Unexpected event"), } - $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() }); + $nodes[0].scorer.write_lock().expect(TestResult::ProbeFailure { path: path.clone() }); $nodes[0].node.push_pending_event(Event::ProbeFailed { payment_id: PaymentId([42; 32]), payment_hash: PaymentHash([42; 32]), @@ -1784,7 +1838,7 @@ mod tests { let log_entries = nodes[0].logger.lines.lock().unwrap(); let expected_log = "Persisting scorer after update".to_string(); - assert_eq!(*log_entries.get(&("lightning_background_processor".to_string(), expected_log)).unwrap(), 5); + assert_eq!(*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(), 5); } #[tokio::test] @@ -1830,7 +1884,7 @@ mod tests { let log_entries = nodes[0].logger.lines.lock().unwrap(); let expected_log = "Persisting scorer after update".to_string(); - assert_eq!(*log_entries.get(&("lightning_background_processor".to_string(), expected_log)).unwrap(), 5); + assert_eq!(*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(), 5); }); let (r1, r2) = tokio::join!(t1, t2);