X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-background-processor%2Fsrc%2Flib.rs;h=fe4496afa2681c06ed440af85e5aae1804154ed4;hb=bcf174034a23f93a56aa20001602cd0645361b16;hp=ece82bcaa55a2299fd51f738050f55c52926ad78;hpb=838d48698384d41e8a9114f4f644dbb83ce5d6fa;p=rust-lightning diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index ece82bca..fe4496af 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -17,11 +17,12 @@ extern crate lightning_rapid_gossip_sync; use lightning::chain; use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use lightning::chain::chainmonitor::{ChainMonitor, Persist}; -use lightning::chain::keysinterface::KeysInterface; +use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider}; use lightning::ln::channelmanager::ChannelManager; use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler}; use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor}; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; +use lightning::routing::router::Router; use lightning::routing::scoring::WriteableScore; use lightning::util::events::{Event, EventHandler, EventsProvider}; use lightning::util::logger::Logger; @@ -283,8 +284,8 @@ macro_rules! define_run_body { // continuing our normal cadence. if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } { // The network graph must not be pruned while rapid sync completion is pending - log_trace!($logger, "Assessing prunability of network graph"); if let Some(network_graph) = $gossip_sync.prunable_network_graph() { + log_trace!($logger, "Pruning and persisting network graph."); network_graph.remove_stale_channels_and_tracking(); if let Err(e) = $persister.persist_graph(network_graph) { @@ -293,8 +294,6 @@ macro_rules! define_run_body { last_prune_call = Instant::now(); have_pruned = true; - } else { - log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph."); } } @@ -342,8 +341,11 @@ pub async fn process_events_async< CF: 'static + Deref + Send + Sync, CW: 'static + Deref + Send + Sync, T: 'static + Deref + Send + Sync, - K: 'static + Deref + Send + Sync, + ES: 'static + Deref + Send + Sync, + NS: 'static + Deref + Send + Sync, + SP: 'static + Deref + Send + Sync, F: 'static + Deref + Send + Sync, + R: 'static + Deref + Send + Sync, G: 'static + Deref> + Send + Sync, L: 'static + Deref + Send + Sync, P: 'static + Deref + Send + Sync, @@ -354,8 +356,8 @@ pub async fn process_events_async< EventHandlerFuture: core::future::Future, EventHandler: Fn(Event) -> EventHandlerFuture, PS: 'static + Deref + Send, - M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, - CM: 'static + Deref> + Send + Sync, + M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, + CM: 'static + Deref> + Send + Sync, PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, UMH: 'static + Deref + Send + Sync, @@ -372,17 +374,20 @@ pub async fn process_events_async< where CA::Target: 'static + chain::Access, CF::Target: 'static + chain::Filter, - CW::Target: 'static + chain::Watch<::Signer>, + CW::Target: 'static + chain::Watch<::Signer>, T::Target: 'static + BroadcasterInterface, - K::Target: 'static + KeysInterface, + ES::Target: 'static + EntropySource, + NS::Target: 'static + NodeSigner, + SP::Target: 'static + SignerProvider, F::Target: 'static + FeeEstimator, + R::Target: 'static + Router, L::Target: 'static + Logger, - P::Target: 'static + Persist<::Signer>, + P::Target: 'static + Persist<::Signer>, CMH::Target: 'static + ChannelMessageHandler, OMH::Target: 'static + OnionMessageHandler, RMH::Target: 'static + RoutingMessageHandler, UMH::Target: 'static + CustomMessageHandler, - PS::Target: 'static + Persister<'a, CW, T, K, F, L, SC>, + PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>, { let mut should_break = true; let async_event_handler = |event| { @@ -460,8 +465,11 @@ impl BackgroundProcessor { CF: 'static + Deref + Send + Sync, CW: 'static + Deref + Send + Sync, T: 'static + Deref + Send + Sync, - K: 'static + Deref + Send + Sync, + ES: 'static + Deref + Send + Sync, + NS: 'static + Deref + Send + Sync, + SP: 'static + Deref + Send + Sync, F: 'static + Deref + Send + Sync, + R: 'static + Deref + Send + Sync, G: 'static + Deref> + Send + Sync, L: 'static + Deref + Send + Sync, P: 'static + Deref + Send + Sync, @@ -471,8 +479,8 @@ impl BackgroundProcessor { RMH: 'static + Deref + Send + Sync, EH: 'static + EventHandler + Send, PS: 'static + Deref + Send, - M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, - CM: 'static + Deref> + Send + Sync, + M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, + CM: 'static + Deref> + Send + Sync, PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, UMH: 'static + Deref + Send + Sync, @@ -486,17 +494,20 @@ impl BackgroundProcessor { where CA::Target: 'static + chain::Access, CF::Target: 'static + chain::Filter, - CW::Target: 'static + chain::Watch<::Signer>, + CW::Target: 'static + chain::Watch<::Signer>, T::Target: 'static + BroadcasterInterface, - K::Target: 'static + KeysInterface, + ES::Target: 'static + EntropySource, + NS::Target: 'static + NodeSigner, + SP::Target: 'static + SignerProvider, F::Target: 'static + FeeEstimator, + R::Target: 'static + Router, L::Target: 'static + Logger, - P::Target: 'static + Persist<::Signer>, + P::Target: 'static + Persist<::Signer>, CMH::Target: 'static + ChannelMessageHandler, OMH::Target: 'static + OnionMessageHandler, RMH::Target: 'static + RoutingMessageHandler, UMH::Target: 'static + CustomMessageHandler, - PS::Target: 'static + Persister<'a, CW, T, K, F, L, SC>, + PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>, { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = stop_thread.clone(); @@ -572,21 +583,22 @@ mod tests { use bitcoin::network::constants::Network; use lightning::chain::{BestBlock, Confirm, chainmonitor}; use lightning::chain::channelmonitor::ANTI_REORG_DELAY; - use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager}; + use lightning::chain::keysinterface::{InMemorySigner, Recipient, EntropySource, KeysManager, NodeSigner}; use lightning::chain::transaction::OutPoint; use lightning::get_event_msg; - use lightning::ln::channelmanager::{self, BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager}; + use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager}; use lightning::ln::features::ChannelFeatures; use lightning::ln::msgs::{ChannelMessageHandler, Init}; use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler}; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; + use lightning::routing::router::DefaultRouter; + use lightning::routing::scoring::{ProbabilisticScoringParameters, ProbabilisticScorer}; use lightning::util::config::UserConfig; use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent}; use lightning::util::ser::Writeable; use lightning::util::test_utils; use lightning::util::persist::KVStorePersister; use lightning_invoice::payment::{InvoicePayer, Retry}; - use lightning_invoice::utils::DefaultRouter; use lightning_persister::FilesystemPersister; use std::fs; use std::path::PathBuf; @@ -595,7 +607,6 @@ mod tests { use std::time::Duration; use bitcoin::hashes::Hash; use bitcoin::TxMerkleNode; - use lightning::routing::scoring::{FixedPenaltyScorer}; use lightning_rapid_gossip_sync::RapidGossipSync; use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER}; @@ -627,7 +638,7 @@ mod tests { network_graph: Arc>>, logger: Arc, best_block: BestBlock, - scorer: Arc>, + scorer: Arc>>, Arc>>>, } impl Node { @@ -724,32 +735,34 @@ mod tests { for i in 0..num_nodes { let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))}); let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) }); - let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet)); let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i))); - let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i))); - let seed = [i as u8; 32]; let network = Network::Testnet; let genesis_block = genesis_block(network); + let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone())); + let params = ProbabilisticScoringParameters::default(); + let scorer = Arc::new(Mutex::new(ProbabilisticScorer::new(params, network_graph.clone(), logger.clone()))); + let seed = [i as u8; 32]; + let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone())); + let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet)); + let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i))); let now = Duration::from_secs(genesis_block.header.time as u64); let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos())); let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone())); let best_block = BestBlock::from_genesis(network); let params = ChainParameters { network, best_block }; - let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params)); - let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone())); + let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params)); let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone())); let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone())); let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}}; let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{})); - let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0))); let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer }; nodes.push(node); } for i in 0..num_nodes { for j in (i+1)..num_nodes { - nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap(); - nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap(); + nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }).unwrap(); + nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }).unwrap(); } } @@ -770,8 +783,8 @@ mod tests { macro_rules! begin_open_channel { ($node_a: expr, $node_b: expr, $channel_value: expr) => {{ $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap(); - $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id())); - $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id())); + $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id())); + $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id())); }} } @@ -1072,10 +1085,11 @@ mod tests { loop { let log_entries = nodes[0].logger.lines.lock().unwrap(); - let expected_log_a = "Assessing prunability of network graph".to_string(); - let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string(); - if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() && - log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() { + let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string(); + if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter)) + .unwrap_or(&0) > 1 + { + // Wait until the loop has gone around at least twice. break } }