X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-background-processor%2Fsrc%2Flib.rs;h=0cfa9801badb3a8d31a30cbdce81b93cbaab1f5a;hb=ba342de241f0b5e968d6ad0106e5690e7c9c68bc;hp=f72e9821258bfde25de17acd1441e41147bc5e63;hpb=2afbdf5d1c1b531b08950ddca9143450ba458245;p=rust-lightning diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index f72e9821..0cfa9801 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -25,13 +25,12 @@ extern crate lightning_rapid_gossip_sync; use lightning::chain; use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use lightning::chain::chainmonitor::{ChainMonitor, Persist}; -use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider}; +use lightning::sign::{EntropySource, NodeSigner, SignerProvider}; use lightning::events::{Event, PathFailure}; #[cfg(feature = "std")] use lightning::events::{EventHandler, EventsProvider}; use lightning::ln::channelmanager::ChannelManager; -use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler}; -use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor}; +use lightning::ln::peer_handler::APeerManager; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::utxo::UtxoLookup; use lightning::routing::router::Router; @@ -81,6 +80,8 @@ use alloc::vec::Vec; /// /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor /// [`Event`]: lightning::events::Event +/// [`PeerManager::timer_tick_occurred`]: lightning::ln::peer_handler::PeerManager::timer_tick_occurred +/// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events #[cfg(feature = "std")] #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."] pub struct BackgroundProcessor { @@ -298,7 +299,7 @@ macro_rules! define_run_body { // ChannelManager, we want to minimize methods blocking on a ChannelManager // generally, and as a fallback place such blocking only immediately before // persistence. - $peer_manager.process_events(); + $peer_manager.as_ref().process_events(); // Exit the loop if the background processor was requested to stop. if $loop_exit_check { @@ -343,11 +344,11 @@ macro_rules! define_run_body { // more than a handful of seconds to complete, and shouldn't disconnect all our // peers. log_trace!($logger, "100ms sleep took more than a second, disconnecting peers."); - $peer_manager.disconnect_all_peers(); + $peer_manager.as_ref().disconnect_all_peers(); last_ping_call = $get_timer(PING_TIMER); } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) { log_trace!($logger, "Calling PeerManager's timer_tick_occurred"); - $peer_manager.timer_tick_occurred(); + $peer_manager.as_ref().timer_tick_occurred(); last_ping_call = $get_timer(PING_TIMER); } @@ -514,12 +515,13 @@ use core::task; /// # use lightning_background_processor::{process_events_async, GossipSync}; /// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync; /// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync; -/// # type MyNodeSigner = dyn lightning::chain::keysinterface::NodeSigner + Send + Sync; +/// # type MyNodeSigner = dyn lightning::sign::NodeSigner + Send + Sync; /// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync; /// # type MyFilter = dyn lightning::chain::Filter + Send + Sync; /// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync; -/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; -/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager; +/// # type MyMessageRouter = dyn lightning::onion_message::MessageRouter + Send + Sync; +/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; +/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager; /// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph>; /// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync, Arc, Arc>; /// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager; @@ -587,10 +589,6 @@ pub async fn process_events_async< G: 'static + Deref> + Send + Sync, L: 'static + Deref + Send + Sync, P: 'static + Deref + Send + Sync, - Descriptor: 'static + SocketDescriptor + Send + Sync, - CMH: 'static + Deref + Send + Sync, - RMH: 'static + Deref + Send + Sync, - OMH: 'static + Deref + Send + Sync, EventHandlerFuture: core::future::Future, EventHandler: Fn(Event) -> EventHandlerFuture, PS: 'static + Deref + Send, @@ -598,8 +596,8 @@ pub async fn process_events_async< CM: 'static + Deref> + Send + Sync, PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, - UMH: 'static + Deref + Send + Sync, - PM: 'static + Deref> + Send + Sync, + APM: APeerManager + Send + Sync, + PM: 'static + Deref + Send + Sync, S: 'static + Deref + Send + Sync, SC: for<'b> WriteableScore<'b>, SleepFuture: core::future::Future + core::marker::Unpin, @@ -621,10 +619,6 @@ where R::Target: 'static + Router, L::Target: 'static + Logger, P::Target: 'static + Persist<::Signer>, - CMH::Target: 'static + ChannelMessageHandler, - OMH::Target: 'static + OnionMessageHandler, - RMH::Target: 'static + RoutingMessageHandler, - UMH::Target: 'static + CustomMessageHandler, PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>, { let mut should_break = false; @@ -737,18 +731,14 @@ impl BackgroundProcessor { G: 'static + Deref> + Send + Sync, L: 'static + Deref + Send + Sync, P: 'static + Deref + Send + Sync, - Descriptor: 'static + SocketDescriptor + Send + Sync, - CMH: 'static + Deref + Send + Sync, - OMH: 'static + Deref + Send + Sync, - RMH: 'static + Deref + Send + Sync, EH: 'static + EventHandler + Send, PS: 'static + Deref + Send, M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, CM: 'static + Deref> + Send + Sync, PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, - UMH: 'static + Deref + Send + Sync, - PM: 'static + Deref> + Send + Sync, + APM: APeerManager + Send + Sync, + PM: 'static + Deref + Send + Sync, S: 'static + Deref + Send + Sync, SC: for <'b> WriteableScore<'b>, >( @@ -767,10 +757,6 @@ impl BackgroundProcessor { R::Target: 'static + Router, L::Target: 'static + Logger, P::Target: 'static + Persist<::Signer>, - CMH::Target: 'static + ChannelMessageHandler, - OMH::Target: 'static + OnionMessageHandler, - RMH::Target: 'static + RoutingMessageHandler, - UMH::Target: 'static + CustomMessageHandler, PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>, { let stop_thread = Arc::new(AtomicBool::new(false)); @@ -853,15 +839,14 @@ impl Drop for BackgroundProcessor { #[cfg(all(feature = "std", test))] mod tests { - use bitcoin::blockdata::block::BlockHeader; - use bitcoin::blockdata::constants::genesis_block; + use bitcoin::blockdata::constants::{genesis_block, ChainHash}; use bitcoin::blockdata::locktime::PackedLockTime; use bitcoin::blockdata::transaction::{Transaction, TxOut}; use bitcoin::network::constants::Network; use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1}; use lightning::chain::{BestBlock, Confirm, chainmonitor}; use lightning::chain::channelmonitor::ANTI_REORG_DELAY; - use lightning::chain::keysinterface::{InMemorySigner, KeysManager}; + use lightning::sign::{InMemorySigner, KeysManager}; use lightning::chain::transaction::OutPoint; use lightning::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent}; use lightning::{get_event_msg, get_event}; @@ -869,6 +854,7 @@ mod tests { use lightning::ln::channelmanager; use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId}; use lightning::ln::features::{ChannelFeatures, NodeFeatures}; + use lightning::ln::functional_test_utils::*; use lightning::ln::msgs::{ChannelMessageHandler, Init}; use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler}; use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync}; @@ -880,13 +866,11 @@ mod tests { use lightning::util::persist::KVStorePersister; use lightning_persister::FilesystemPersister; use std::collections::VecDeque; - use std::fs; + use std::{fs, env}; use std::path::PathBuf; use std::sync::{Arc, Mutex}; use std::sync::mpsc::SyncSender; use std::time::Duration; - use bitcoin::hashes::Hash; - use bitcoin::TxMerkleNode; use lightning_rapid_gossip_sync::RapidGossipSync; use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER}; @@ -902,7 +886,7 @@ mod tests { fn disconnect_socket(&mut self) {} } - type ChannelManager = channelmanager::ChannelManager, Arc, Arc, Arc, Arc, Arc, Arc>>, Arc, Arc>>>, Arc>; + type ChannelManager = channelmanager::ChannelManager, Arc, Arc, Arc, Arc, Arc, Arc>>, Arc, Arc>, (), TestScorer>>, Arc>; type ChainMonitor = chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; @@ -1036,8 +1020,9 @@ mod tests { } impl Score for TestScorer { + type ScoreParams = (); fn channel_penalty_msat( - &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage + &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage, _score_params: &Self::ScoreParams ) -> u64 { unimplemented!(); } fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64) { @@ -1137,8 +1122,10 @@ mod tests { path.to_str().unwrap().to_string() } - fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec { - let network = Network::Testnet; + fn create_nodes(num_nodes: usize, persist_dir: &str) -> (String, Vec) { + let persist_temp_path = env::temp_dir().join(persist_dir); + let persist_dir = persist_temp_path.to_string_lossy().to_string(); + let network = Network::Bitcoin; let mut nodes = Vec::new(); for i in 0..num_nodes { let tx_broadcaster = Arc::new(test_utils::TestBroadcaster::new(network)); @@ -1148,9 +1135,9 @@ mod tests { let network_graph = Arc::new(NetworkGraph::new(network, logger.clone())); let scorer = Arc::new(Mutex::new(TestScorer::new())); let seed = [i as u8; 32]; - let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone())); - let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet)); - let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i))); + let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone(), ())); + let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin)); + let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", &persist_dir, i))); let now = Duration::from_secs(genesis_block.header.time as u64); let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos())); let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone())); @@ -1159,20 +1146,28 @@ mod tests { let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params)); let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone())); let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone())); - let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}}; - let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), IgnoringMessageHandler{}, keys_manager.clone())); + let msg_handler = MessageHandler { + chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new(ChainHash::using_genesis_block(Network::Testnet))), + route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), + onion_message_handler: IgnoringMessageHandler{}, custom_message_handler: IgnoringMessageHandler{} + }; + let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), keys_manager.clone())); let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer }; nodes.push(node); } for i in 0..num_nodes { for j in (i+1)..num_nodes { - nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }, true).unwrap(); - nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }, false).unwrap(); + nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { + features: nodes[j].node.init_features(), networks: None, remote_network_address: None + }, true).unwrap(); + nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { + features: nodes[i].node.init_features(), networks: None, remote_network_address: None + }, false).unwrap(); } } - nodes + (persist_dir, nodes) } macro_rules! open_channel { @@ -1219,7 +1214,7 @@ mod tests { for i in 1..=depth { let prev_blockhash = node.best_block.block_hash(); let height = node.best_block.height() + 1; - let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 }; + let header = create_dummy_header(prev_blockhash, height); let txdata = vec![(0, tx)]; node.best_block = BestBlock::new(header.block_hash(), height); match i { @@ -1244,7 +1239,7 @@ mod tests { // Test that when a new channel is created, the ChannelManager needs to be re-persisted with // updates. Also test that when new updates are available, the manager signals that it needs // re-persistence and is successfully re-persisted. - let nodes = create_nodes(2, "test_background_processor".to_string()); + let (persist_dir, nodes) = create_nodes(2, "test_background_processor"); // Go through the channel creation process so that each node has something to persist. Since // open_channel consumes events, it must complete before starting BackgroundProcessor to @@ -1282,7 +1277,7 @@ mod tests { } // Check that the initial channel manager data is persisted as expected. - let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string()); + let filepath = get_full_filepath(format!("{}_persister_0", &persist_dir), "manager".to_string()); check_persisted_data!(nodes[0].node, filepath.clone()); loop { @@ -1299,11 +1294,11 @@ mod tests { } // Check network graph is persisted - let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string()); + let filepath = get_full_filepath(format!("{}_persister_0", &persist_dir), "network_graph".to_string()); check_persisted_data!(nodes[0].network_graph, filepath.clone()); // Check scorer is persisted - let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string()); + let filepath = get_full_filepath(format!("{}_persister_0", &persist_dir), "scorer".to_string()); check_persisted_data!(nodes[0].scorer, filepath.clone()); if !std::thread::panicking() { @@ -1316,7 +1311,7 @@ mod tests { // Test that `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`, // `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, and // `PeerManager::timer_tick_occurred` every `PING_TIMER`. - let nodes = create_nodes(1, "test_timer_tick_called".to_string()); + let (_, nodes) = create_nodes(1, "test_timer_tick_called"); let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); let event_handler = |_: _| {}; @@ -1341,7 +1336,7 @@ mod tests { #[test] fn test_channel_manager_persist_error() { // Test that if we encounter an error during manager persistence, the thread panics. - let nodes = create_nodes(2, "test_persist_error".to_string()); + let (_, nodes) = create_nodes(2, "test_persist_error"); open_channel!(nodes[0], nodes[1], 100000); let data_dir = nodes[0].persister.get_data_dir(); @@ -1361,7 +1356,7 @@ mod tests { #[cfg(feature = "futures")] async fn test_channel_manager_persist_error_async() { // Test that if we encounter an error during manager persistence, the thread panics. - let nodes = create_nodes(2, "test_persist_error_sync".to_string()); + let (_, nodes) = create_nodes(2, "test_persist_error_sync"); open_channel!(nodes[0], nodes[1], 100000); let data_dir = nodes[0].persister.get_data_dir(); @@ -1389,7 +1384,7 @@ mod tests { #[test] fn test_network_graph_persist_error() { // Test that if we encounter an error during network graph persistence, an error gets returned. - let nodes = create_nodes(2, "test_persist_network_graph_error".to_string()); + let (_, nodes) = create_nodes(2, "test_persist_network_graph_error"); let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test")); let event_handler = |_: _| {}; @@ -1407,7 +1402,7 @@ mod tests { #[test] fn test_scorer_persist_error() { // Test that if we encounter an error during scorer persistence, an error gets returned. - let nodes = create_nodes(2, "test_persist_scorer_error".to_string()); + let (_, nodes) = create_nodes(2, "test_persist_scorer_error"); let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test")); let event_handler = |_: _| {}; @@ -1424,7 +1419,7 @@ mod tests { #[test] fn test_background_event_handling() { - let mut nodes = create_nodes(2, "test_background_event_handling".to_string()); + let (_, mut nodes) = create_nodes(2, "test_background_event_handling"); let channel_value = 100000; let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir.clone())); @@ -1498,7 +1493,7 @@ mod tests { #[test] fn test_scorer_persistence() { - let nodes = create_nodes(2, "test_scorer_persistence".to_string()); + let (_, nodes) = create_nodes(2, "test_scorer_persistence"); let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); let event_handler = |_: _| {}; @@ -1570,7 +1565,7 @@ mod tests { fn test_not_pruning_network_graph_until_graph_sync_completion() { let (sender, receiver) = std::sync::mpsc::sync_channel(1); - let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string()); + let (_, nodes) = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion"); let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender)); @@ -1589,7 +1584,7 @@ mod tests { async fn test_not_pruning_network_graph_until_graph_sync_completion_async() { let (sender, receiver) = std::sync::mpsc::sync_channel(1); - let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async".to_string()); + let (_, nodes) = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async"); let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender)); @@ -1729,7 +1724,7 @@ mod tests { _ => panic!("Unexpected event: {:?}", event), }; - let nodes = create_nodes(1, "test_payment_path_scoring".to_string()); + let (_, nodes) = create_nodes(1, "test_payment_path_scoring"); let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); @@ -1762,7 +1757,7 @@ mod tests { } }; - let nodes = create_nodes(1, "test_payment_path_scoring_async".to_string()); + let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async"); let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir));