X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-background-processor%2Fsrc%2Flib.rs;h=1c720921970095d4211b4b8113b113926db3854f;hb=f4ab077a6938e5a2a28e33c25011cb80ce0a6611;hp=b394a2311c3b7943d5d2c81a5308773875bbb468;hpb=05cb46723489a91cf14e6e1ada27589a30bd9040;p=rust-lightning diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index b394a231..1c720921 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -17,7 +17,7 @@ extern crate lightning_rapid_gossip_sync; use lightning::chain; use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use lightning::chain::chainmonitor::{ChainMonitor, Persist}; -use lightning::chain::keysinterface::{Sign, KeysInterface}; +use lightning::chain::keysinterface::KeysInterface; use lightning::ln::channelmanager::ChannelManager; use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler}; use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor}; @@ -202,49 +202,12 @@ fn handle_network_graph_update( } } -/// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s. -struct DecoratingEventHandler< - 'a, - E: EventHandler, - PGS: Deref>, - RGS: Deref>, - G: Deref>, - A: Deref, - L: Deref, -> -where A::Target: chain::Access, L::Target: Logger { - event_handler: E, - gossip_sync: &'a GossipSync, -} - -impl< - 'a, - E: EventHandler, - PGS: Deref>, - RGS: Deref>, - G: Deref>, - A: Deref, - L: Deref, -> EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L> -where A::Target: chain::Access, L::Target: Logger { - fn handle_event(&self, event: Event) { - if let Some(network_graph) = self.gossip_sync.network_graph() { - handle_network_graph_update(network_graph, &event) - } - self.event_handler.handle_event(event); - } -} - macro_rules! define_run_body { - ($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident, + ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, + $channel_manager: ident, $process_channel_manager_events: expr, $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr) => { { - let event_handler = DecoratingEventHandler { - event_handler: $event_handler, - gossip_sync: &$gossip_sync, - }; - log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); $channel_manager.timer_tick_occurred(); @@ -255,8 +218,8 @@ macro_rules! define_run_body { let mut have_pruned = false; loop { - $channel_manager.process_pending_events(&event_handler); - $chain_monitor.process_pending_events(&event_handler); + $process_channel_manager_events; + $process_chain_monitor_events; // Note that the PeerManager::process_events may block on ChannelManager's locks, // hence it comes last here. When the ChannelManager finishes whatever it's doing, @@ -320,8 +283,8 @@ macro_rules! define_run_body { // continuing our normal cadence. if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } { // The network graph must not be pruned while rapid sync completion is pending - log_trace!($logger, "Assessing prunability of network graph"); if let Some(network_graph) = $gossip_sync.prunable_network_graph() { + log_trace!($logger, "Pruning and persisting network graph."); network_graph.remove_stale_channels_and_tracking(); if let Err(e) = $persister.persist_graph(network_graph) { @@ -330,8 +293,6 @@ macro_rules! define_run_body { last_prune_call = Instant::now(); have_pruned = true; - } else { - log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph."); } } @@ -375,7 +336,6 @@ macro_rules! define_run_body { #[cfg(feature = "futures")] pub async fn process_events_async< 'a, - Signer: 'static + Sign, CA: 'static + Deref + Send + Sync, CF: 'static + Deref + Send + Sync, CW: 'static + Deref + Send + Sync, @@ -389,9 +349,10 @@ pub async fn process_events_async< CMH: 'static + Deref + Send + Sync, RMH: 'static + Deref + Send + Sync, OMH: 'static + Deref + Send + Sync, - EH: 'static + EventHandler + Send, + EventHandlerFuture: core::future::Future, + EventHandler: Fn(Event) -> EventHandlerFuture, PS: 'static + Deref + Send, - M: 'static + Deref> + Send + Sync, + M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, CM: 'static + Deref> + Send + Sync, PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, @@ -402,27 +363,39 @@ pub async fn process_events_async< SleepFuture: core::future::Future, Sleeper: Fn(Duration) -> SleepFuture >( - persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM, + persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM, gossip_sync: GossipSync, peer_manager: PM, logger: L, scorer: Option, sleeper: Sleeper, ) -> Result<(), std::io::Error> where CA::Target: 'static + chain::Access, CF::Target: 'static + chain::Filter, - CW::Target: 'static + chain::Watch, + CW::Target: 'static + chain::Watch<::Signer>, T::Target: 'static + BroadcasterInterface, - K::Target: 'static + KeysInterface, + K::Target: 'static + KeysInterface, F::Target: 'static + FeeEstimator, L::Target: 'static + Logger, - P::Target: 'static + Persist, + P::Target: 'static + Persist<::Signer>, CMH::Target: 'static + ChannelMessageHandler, OMH::Target: 'static + OnionMessageHandler, RMH::Target: 'static + RoutingMessageHandler, UMH::Target: 'static + CustomMessageHandler, - PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>, + PS::Target: 'static + Persister<'a, CW, T, K, F, L, SC>, { let mut should_break = true; - define_run_body!(persister, event_handler, chain_monitor, channel_manager, + let async_event_handler = |event| { + let network_graph = gossip_sync.network_graph(); + let event_handler = &event_handler; + async move { + if let Some(network_graph) = network_graph { + handle_network_graph_update(network_graph, &event) + } + event_handler(event).await; + } + }; + define_run_body!(persister, + chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await, + channel_manager, channel_manager.process_pending_events_async(async_event_handler).await, gossip_sync, peer_manager, logger, scorer, should_break, { select_biased! { _ = channel_manager.get_persistable_update_future().fuse() => true, @@ -481,7 +454,6 @@ impl BackgroundProcessor { /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable pub fn start< 'a, - Signer: 'static + Sign, CA: 'static + Deref + Send + Sync, CF: 'static + Deref + Send + Sync, CW: 'static + Deref + Send + Sync, @@ -497,7 +469,7 @@ impl BackgroundProcessor { RMH: 'static + Deref + Send + Sync, EH: 'static + EventHandler + Send, PS: 'static + Deref + Send, - M: 'static + Deref> + Send + Sync, + M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, CM: 'static + Deref> + Send + Sync, PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, @@ -512,22 +484,30 @@ impl BackgroundProcessor { where CA::Target: 'static + chain::Access, CF::Target: 'static + chain::Filter, - CW::Target: 'static + chain::Watch, + CW::Target: 'static + chain::Watch<::Signer>, T::Target: 'static + BroadcasterInterface, - K::Target: 'static + KeysInterface, + K::Target: 'static + KeysInterface, F::Target: 'static + FeeEstimator, L::Target: 'static + Logger, - P::Target: 'static + Persist, + P::Target: 'static + Persist<::Signer>, CMH::Target: 'static + ChannelMessageHandler, OMH::Target: 'static + OnionMessageHandler, RMH::Target: 'static + RoutingMessageHandler, UMH::Target: 'static + CustomMessageHandler, - PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>, + PS::Target: 'static + Persister<'a, CW, T, K, F, L, SC>, { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = stop_thread.clone(); let handle = thread::spawn(move || -> Result<(), std::io::Error> { - define_run_body!(persister, event_handler, chain_monitor, channel_manager, + let event_handler = |event| { + let network_graph = gossip_sync.network_graph(); + if let Some(network_graph) = network_graph { + handle_network_graph_update(network_graph, &event) + } + event_handler.handle_event(event); + }; + define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), + channel_manager, channel_manager.process_pending_events(&event_handler), gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire), channel_manager.await_persistable_update_timeout(Duration::from_millis(100))) }); @@ -598,13 +578,13 @@ mod tests { use lightning::ln::msgs::{ChannelMessageHandler, Init}; use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler}; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; + use lightning::routing::router::DefaultRouter; use lightning::util::config::UserConfig; use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent}; use lightning::util::ser::Writeable; use lightning::util::test_utils; use lightning::util::persist::KVStorePersister; use lightning_invoice::payment::{InvoicePayer, Retry}; - use lightning_invoice::utils::DefaultRouter; use lightning_persister::FilesystemPersister; use std::fs; use std::path::PathBuf; @@ -1090,10 +1070,11 @@ mod tests { loop { let log_entries = nodes[0].logger.lines.lock().unwrap(); - let expected_log_a = "Assessing prunability of network graph".to_string(); - let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string(); - if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() && - log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() { + let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string(); + if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter)) + .unwrap_or(&0) > 1 + { + // Wait until the loop has gone around at least twice. break } }