X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-background-processor%2Fsrc%2Flib.rs;h=aa6d0b0615ef21e221e745e3563f1ddc9c2bf620;hb=refs%2Fheads%2F2023-11-less-graph-memory-frag;hp=539a6bf14880397e580c8431fd2ceaf1d1cf43ab;hpb=21b0818be7c3f4b3fce24c2bd3ac0c156549f8bc;p=rust-lightning diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 539a6bf1..aa6d0b06 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -34,7 +34,7 @@ use lightning::ln::peer_handler::APeerManager; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::utxo::UtxoLookup; use lightning::routing::router::Router; -use lightning::routing::scoring::{Score, WriteableScore}; +use lightning::routing::scoring::{ScoreUpdate, WriteableScore}; use lightning::util::logger::Logger; use lightning::util::persist::Persister; #[cfg(feature = "std")] @@ -108,7 +108,7 @@ const PING_TIMER: u64 = 1; const NETWORK_PRUNE_TIMER: u64 = 60 * 60; #[cfg(not(test))] -const SCORER_PERSIST_TIMER: u64 = 30; +const SCORER_PERSIST_TIMER: u64 = 60 * 60; #[cfg(test)] const SCORER_PERSIST_TIMER: u64 = 1; @@ -236,30 +236,37 @@ fn handle_network_graph_update( } } +/// Updates scorer based on event and returns whether an update occurred so we can decide whether +/// to persist. fn update_scorer<'a, S: 'static + Deref + Send + Sync, SC: 'a + WriteableScore<'a>>( scorer: &'a S, event: &Event -) { - let mut score = scorer.lock(); +) -> bool { match event { Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => { + let mut score = scorer.write_lock(); score.payment_path_failed(path, *scid); }, Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => { // Reached if the destination explicitly failed it back. We treat this as a successful probe // because the payment made it all the way to the destination with sufficient liquidity. + let mut score = scorer.write_lock(); score.probe_successful(path); }, Event::PaymentPathSuccessful { path, .. } => { + let mut score = scorer.write_lock(); score.payment_path_successful(path); }, Event::ProbeSuccessful { path, .. } => { + let mut score = scorer.write_lock(); score.probe_successful(path); }, Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => { + let mut score = scorer.write_lock(); score.probe_failed(path, *scid); }, - _ => {}, + _ => return false, } + true } macro_rules! define_run_body { @@ -308,7 +315,7 @@ macro_rules! define_run_body { // see `await_start`'s use below. let mut await_start = None; if $check_slow_await { await_start = Some($get_timer(1)); } - let updates_available = $await; + $await; let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false }; // Exit the loop if the background processor was requested to stop. @@ -317,7 +324,7 @@ macro_rules! define_run_body { break; } - if updates_available { + if $channel_manager.get_and_clear_needs_persistence() { log_trace!($logger, "Persisting ChannelManager..."); $persister.persist_manager(&*$channel_manager)?; log_trace!($logger, "Done persisting ChannelManager."); @@ -352,9 +359,15 @@ macro_rules! define_run_body { // Note that we want to run a graph prune once not long after startup before // falling back to our usual hourly prunes. This avoids short-lived clients never // pruning their network graph. We run once 60 seconds after startup before - // continuing our normal cadence. + // continuing our normal cadence. For RGS, since 60 seconds is likely too long, + // we prune after an initial sync completes. let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }; - if $timer_elapsed(&mut last_prune_call, prune_timer) { + let prune_timer_elapsed = $timer_elapsed(&mut last_prune_call, prune_timer); + let should_prune = match $gossip_sync { + GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed, + _ => prune_timer_elapsed, + }; + if should_prune { // The network graph must not be pruned while rapid sync completion is pending if let Some(network_graph) = $gossip_sync.prunable_network_graph() { #[cfg(feature = "std")] { @@ -487,9 +500,16 @@ use core::task; /// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you /// could setup `process_events_async` like this: /// ``` -/// # struct MyPersister {} -/// # impl lightning::util::persist::KVStorePersister for MyPersister { -/// # fn persist(&self, key: &str, object: &W) -> lightning::io::Result<()> { Ok(()) } +/// # use lightning::io; +/// # use std::sync::{Arc, RwLock}; +/// # use std::sync::atomic::{AtomicBool, Ordering}; +/// # use lightning_background_processor::{process_events_async, GossipSync}; +/// # struct MyStore {} +/// # impl lightning::util::persist::KVStore for MyStore { +/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result> { Ok(Vec::new()) } +/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) } +/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) } +/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { Ok(Vec::new()) } /// # } /// # struct MyEventHandler {} /// # impl MyEventHandler { @@ -501,23 +521,20 @@ use core::task; /// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 } /// # fn disconnect_socket(&mut self) {} /// # } -/// # use std::sync::{Arc, Mutex}; -/// # use std::sync::atomic::{AtomicBool, Ordering}; -/// # use lightning_background_processor::{process_events_async, GossipSync}; /// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync; /// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync; /// # type MyNodeSigner = dyn lightning::sign::NodeSigner + Send + Sync; /// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync; /// # type MyFilter = dyn lightning::chain::Filter + Send + Sync; /// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync; -/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; -/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager; +/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; +/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager, MyLogger>; /// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph>; /// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync, Arc, Arc>; /// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager; -/// # type MyScorer = Mutex, Arc>>; +/// # type MyScorer = RwLock, Arc>>; /// -/// # async fn setup_background_processing(my_persister: Arc, my_event_handler: Arc, my_chain_monitor: Arc, my_channel_manager: Arc, my_gossip_sync: Arc, my_logger: Arc, my_scorer: Arc, my_peer_manager: Arc) { +/// # async fn setup_background_processing(my_persister: Arc, my_event_handler: Arc, my_chain_monitor: Arc, my_channel_manager: Arc, my_gossip_sync: Arc, my_logger: Arc, my_scorer: Arc, my_peer_manager: Arc) { /// let background_persister = Arc::clone(&my_persister); /// let background_event_handler = Arc::clone(&my_event_handler); /// let background_chain_mon = Arc::clone(&my_chain_monitor); @@ -616,12 +633,19 @@ where let network_graph = gossip_sync.network_graph(); let event_handler = &event_handler; let scorer = &scorer; + let logger = &logger; + let persister = &persister; async move { if let Some(network_graph) = network_graph { handle_network_graph_update(network_graph, &event) } if let Some(ref scorer) = scorer { - update_scorer(scorer, &event); + if update_scorer(scorer, &event) { + log_trace!(logger, "Persisting scorer after update"); + if let Err(e) = persister.persist_scorer(&scorer) { + log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) + } + } } event_handler(event).await; } @@ -631,16 +655,14 @@ where channel_manager, channel_manager.process_pending_events_async(async_event_handler).await, gossip_sync, peer_manager, logger, scorer, should_break, { let fut = Selector { - a: channel_manager.get_persistable_update_future(), + a: channel_manager.get_event_or_persistence_needed_future(), b: chain_monitor.get_update_future(), c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }), }; match fut.await { - SelectorOutput::A => true, - SelectorOutput::B => false, + SelectorOutput::A|SelectorOutput::B => {}, SelectorOutput::C(exit) => { should_break = exit; - false } } }, |t| sleeper(Duration::from_secs(t)), @@ -751,17 +773,22 @@ impl BackgroundProcessor { handle_network_graph_update(network_graph, &event) } if let Some(ref scorer) = scorer { - update_scorer(scorer, &event); + if update_scorer(scorer, &event) { + log_trace!(logger, "Persisting scorer after update"); + if let Err(e) = persister.persist_scorer(&scorer) { + log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) + } + } } event_handler.handle_event(event); }; define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), channel_manager, channel_manager.process_pending_events(&event_handler), gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire), - Sleeper::from_two_futures( - channel_manager.get_persistable_update_future(), + { Sleeper::from_two_futures( + channel_manager.get_event_or_persistence_needed_future(), chain_monitor.get_update_future() - ).wait_timeout(Duration::from_millis(100)), + ).wait_timeout(Duration::from_millis(100)); }, |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false) }); Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } @@ -817,8 +844,7 @@ impl Drop for BackgroundProcessor { #[cfg(all(feature = "std", test))] mod tests { - use bitcoin::blockdata::block::BlockHeader; - use bitcoin::blockdata::constants::genesis_block; + use bitcoin::blockdata::constants::{genesis_block, ChainHash}; use bitcoin::blockdata::locktime::PackedLockTime; use bitcoin::blockdata::transaction::{Transaction, TxOut}; use bitcoin::network::constants::Network; @@ -833,24 +859,26 @@ mod tests { use lightning::ln::channelmanager; use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId}; use lightning::ln::features::{ChannelFeatures, NodeFeatures}; + use lightning::ln::functional_test_utils::*; use lightning::ln::msgs::{ChannelMessageHandler, Init}; use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler}; use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync}; use lightning::routing::router::{DefaultRouter, Path, RouteHop}; - use lightning::routing::scoring::{ChannelUsage, Score}; + use lightning::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp, LockableScore}; use lightning::util::config::UserConfig; use lightning::util::ser::Writeable; use lightning::util::test_utils; - use lightning::util::persist::KVStorePersister; - use lightning_persister::FilesystemPersister; + use lightning::util::persist::{KVStore, + CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, + NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY, + SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY}; + use lightning_persister::fs_store::FilesystemStore; use std::collections::VecDeque; use std::{fs, env}; use std::path::PathBuf; use std::sync::{Arc, Mutex}; use std::sync::mpsc::SyncSender; use std::time::Duration; - use bitcoin::hashes::Hash; - use bitcoin::TxMerkleNode; use lightning_rapid_gossip_sync::RapidGossipSync; use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER}; @@ -866,9 +894,29 @@ mod tests { fn disconnect_socket(&mut self) {} } - type ChannelManager = channelmanager::ChannelManager, Arc, Arc, Arc, Arc, Arc, Arc>>, Arc, Arc>, (), TestScorer>>, Arc>; - - type ChainMonitor = chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; + #[cfg(c_bindings)] + type LockingWrapper = lightning::routing::scoring::MultiThreadedLockableScore; + #[cfg(not(c_bindings))] + type LockingWrapper = Mutex; + + type ChannelManager = + channelmanager::ChannelManager< + Arc, + Arc, + Arc, + Arc, + Arc, + Arc, + Arc>>, + Arc, + Arc>, + (), + TestScorer> + >, + Arc>; + + type ChainMonitor = chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; type PGS = Arc>>, Arc, Arc>>; type RGS = Arc>>, Arc>>; @@ -879,12 +927,12 @@ mod tests { rapid_gossip_sync: RGS, peer_manager: Arc, Arc, IgnoringMessageHandler, Arc, IgnoringMessageHandler, Arc>>, chain_monitor: Arc, - persister: Arc, + kv_store: Arc, tx_broadcaster: Arc, network_graph: Arc>>, logger: Arc, best_block: BestBlock, - scorer: Arc>, + scorer: Arc>, } impl Node { @@ -903,9 +951,9 @@ mod tests { impl Drop for Node { fn drop(&mut self) { - let data_dir = self.persister.get_data_dir(); + let data_dir = self.kv_store.get_data_dir(); match fs::remove_dir_all(data_dir.clone()) { - Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e), + Err(e) => println!("Failed to remove test store directory {}: {}", data_dir.display(), e), _ => {} } } @@ -916,13 +964,13 @@ mod tests { graph_persistence_notifier: Option>, manager_error: Option<(std::io::ErrorKind, &'static str)>, scorer_error: Option<(std::io::ErrorKind, &'static str)>, - filesystem_persister: FilesystemPersister, + kv_store: FilesystemStore, } impl Persister { - fn new(data_dir: String) -> Self { - let filesystem_persister = FilesystemPersister::new(data_dir); - Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister } + fn new(data_dir: PathBuf) -> Self { + let kv_store = FilesystemStore::new(data_dir); + Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, kv_store } } fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self { @@ -942,15 +990,25 @@ mod tests { } } - impl KVStorePersister for Persister { - fn persist(&self, key: &str, object: &W) -> std::io::Result<()> { - if key == "manager" { + impl KVStore for Persister { + fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> lightning::io::Result> { + self.kv_store.read(primary_namespace, secondary_namespace, key) + } + + fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> lightning::io::Result<()> { + if primary_namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE && + secondary_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE && + key == CHANNEL_MANAGER_PERSISTENCE_KEY + { if let Some((error, message)) = self.manager_error { return Err(std::io::Error::new(error, message)) } } - if key == "network_graph" { + if primary_namespace == NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE && + secondary_namespace == NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE && + key == NETWORK_GRAPH_PERSISTENCE_KEY + { if let Some(sender) = &self.graph_persistence_notifier { match sender.send(()) { Ok(()) => {}, @@ -963,13 +1021,24 @@ mod tests { } } - if key == "scorer" { + if primary_namespace == SCORER_PERSISTENCE_PRIMARY_NAMESPACE && + secondary_namespace == SCORER_PERSISTENCE_SECONDARY_NAMESPACE && + key == SCORER_PERSISTENCE_KEY + { if let Some((error, message)) = self.scorer_error { return Err(std::io::Error::new(error, message)) } } - self.filesystem_persister.persist(key, object) + self.kv_store.write(primary_namespace, secondary_namespace, key, buf) + } + + fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> lightning::io::Result<()> { + self.kv_store.remove(primary_namespace, secondary_namespace, key, lazy) + } + + fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> lightning::io::Result> { + self.kv_store.list(primary_namespace, secondary_namespace) } } @@ -999,12 +1068,14 @@ mod tests { fn write(&self, _: &mut W) -> Result<(), lightning::io::Error> { Ok(()) } } - impl Score for TestScorer { + impl ScoreLookUp for TestScorer { type ScoreParams = (); fn channel_penalty_msat( &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage, _score_params: &Self::ScoreParams ) -> u64 { unimplemented!(); } + } + impl ScoreUpdate for TestScorer { fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64) { if let Some(expectations) = &mut self.event_expectations { match expectations.pop_front().unwrap() { @@ -1082,6 +1153,9 @@ mod tests { } } + #[cfg(c_bindings)] + impl lightning::routing::scoring::Score for TestScorer {} + impl Drop for TestScorer { fn drop(&mut self) { if std::thread::panicking() { @@ -1105,7 +1179,7 @@ mod tests { fn create_nodes(num_nodes: usize, persist_dir: &str) -> (String, Vec) { let persist_temp_path = env::temp_dir().join(persist_dir); let persist_dir = persist_temp_path.to_string_lossy().to_string(); - let network = Network::Testnet; + let network = Network::Bitcoin; let mut nodes = Vec::new(); for i in 0..num_nodes { let tx_broadcaster = Arc::new(test_utils::TestBroadcaster::new(network)); @@ -1113,33 +1187,37 @@ mod tests { let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i))); let genesis_block = genesis_block(network); let network_graph = Arc::new(NetworkGraph::new(network, logger.clone())); - let scorer = Arc::new(Mutex::new(TestScorer::new())); + let scorer = Arc::new(LockingWrapper::new(TestScorer::new())); let seed = [i as u8; 32]; - let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone(), ())); - let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet)); - let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", &persist_dir, i))); + let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone(), Default::default())); + let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin)); + let kv_store = Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into())); let now = Duration::from_secs(genesis_block.header.time as u64); let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos())); - let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone())); + let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), kv_store.clone())); let best_block = BestBlock::from_network(network); let params = ChainParameters { network, best_block }; - let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params)); + let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params, genesis_block.header.time)); let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone())); let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone())); let msg_handler = MessageHandler { - chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), + chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new(ChainHash::using_genesis_block(Network::Testnet))), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}, custom_message_handler: IgnoringMessageHandler{} }; let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), keys_manager.clone())); - let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer }; + let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, kv_store, tx_broadcaster, network_graph, logger, best_block, scorer }; nodes.push(node); } for i in 0..num_nodes { for j in (i+1)..num_nodes { - nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }, true).unwrap(); - nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }, false).unwrap(); + nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { + features: nodes[j].node.init_features(), networks: None, remote_network_address: None + }, true).unwrap(); + nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { + features: nodes[i].node.init_features(), networks: None, remote_network_address: None + }, false).unwrap(); } } @@ -1190,7 +1268,7 @@ mod tests { for i in 1..=depth { let prev_blockhash = node.best_block.block_hash(); let height = node.best_block.height() + 1; - let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 }; + let header = create_dummy_header(prev_blockhash, height); let txdata = vec![(0, tx)]; node.best_block = BestBlock::new(header.block_hash(), height); match i { @@ -1223,7 +1301,7 @@ mod tests { let tx = open_channel!(nodes[0], nodes[1], 100000); // Initiate the background processors to watch each node. - let data_dir = nodes[0].persister.get_data_dir(); + let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); let event_handler = |_: _| {}; let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); @@ -1257,7 +1335,7 @@ mod tests { check_persisted_data!(nodes[0].node, filepath.clone()); loop { - if !nodes[0].node.get_persistence_condvar_value() { break } + if !nodes[0].node.get_event_or_persist_condvar_value() { break } } // Force-close the channel. @@ -1266,7 +1344,7 @@ mod tests { // Check that the force-close updates are persisted. check_persisted_data!(nodes[0].node, filepath.clone()); loop { - if !nodes[0].node.get_persistence_condvar_value() { break } + if !nodes[0].node.get_event_or_persist_condvar_value() { break } } // Check network graph is persisted @@ -1288,7 +1366,7 @@ mod tests { // `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, and // `PeerManager::timer_tick_occurred` every `PING_TIMER`. let (_, nodes) = create_nodes(1, "test_timer_tick_called"); - let data_dir = nodes[0].persister.get_data_dir(); + let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); let event_handler = |_: _| {}; let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); @@ -1315,7 +1393,7 @@ mod tests { let (_, nodes) = create_nodes(2, "test_persist_error"); open_channel!(nodes[0], nodes[1], 100000); - let data_dir = nodes[0].persister.get_data_dir(); + let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test")); let event_handler = |_: _| {}; let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); @@ -1335,7 +1413,7 @@ mod tests { let (_, nodes) = create_nodes(2, "test_persist_error_sync"); open_channel!(nodes[0], nodes[1], 100000); - let data_dir = nodes[0].persister.get_data_dir(); + let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test")); let bp_future = super::process_events_async( @@ -1361,7 +1439,7 @@ mod tests { fn test_network_graph_persist_error() { // Test that if we encounter an error during network graph persistence, an error gets returned. let (_, nodes) = create_nodes(2, "test_persist_network_graph_error"); - let data_dir = nodes[0].persister.get_data_dir(); + let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test")); let event_handler = |_: _| {}; let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); @@ -1379,7 +1457,7 @@ mod tests { fn test_scorer_persist_error() { // Test that if we encounter an error during scorer persistence, an error gets returned. let (_, nodes) = create_nodes(2, "test_persist_scorer_error"); - let data_dir = nodes[0].persister.get_data_dir(); + let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test")); let event_handler = |_: _| {}; let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); @@ -1397,7 +1475,7 @@ mod tests { fn test_background_event_handling() { let (_, mut nodes) = create_nodes(2, "test_background_event_handling"); let channel_value = 100000; - let data_dir = nodes[0].persister.get_data_dir(); + let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir.clone())); // Set up a background event handler for FundingGenerationReady events. @@ -1470,7 +1548,7 @@ mod tests { #[test] fn test_scorer_persistence() { let (_, nodes) = create_nodes(2, "test_scorer_persistence"); - let data_dir = nodes[0].persister.get_data_dir(); + let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); let event_handler = |_: _| {}; let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); @@ -1542,7 +1620,7 @@ mod tests { let (sender, receiver) = std::sync::mpsc::sync_channel(1); let (_, nodes) = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion"); - let data_dir = nodes[0].persister.get_data_dir(); + let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender)); let event_handler = |_: _| {}; @@ -1561,7 +1639,7 @@ mod tests { let (sender, receiver) = std::sync::mpsc::sync_channel(1); let (_, nodes) = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async"); - let data_dir = nodes[0].persister.get_data_dir(); + let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender)); let (exit_sender, exit_receiver) = tokio::sync::watch::channel(()); @@ -1616,9 +1694,10 @@ mod tests { channel_features: ChannelFeatures::empty(), fee_msat: 0, cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32, + maybe_announced_channel: true, }], blinded_tail: None }; - $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid }); + $nodes[0].scorer.write_lock().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid }); $nodes[0].node.push_pending_event(Event::PaymentPathFailed { payment_id: None, payment_hash: PaymentHash([42; 32]), @@ -1635,7 +1714,7 @@ mod tests { // Ensure we'll score payments that were explicitly failed back by the destination as // ProbeSuccess. - $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() }); + $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() }); $nodes[0].node.push_pending_event(Event::PaymentPathFailed { payment_id: None, payment_hash: PaymentHash([42; 32]), @@ -1650,7 +1729,7 @@ mod tests { _ => panic!("Unexpected event"), } - $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() }); + $nodes[0].scorer.write_lock().expect(TestResult::PaymentSuccess { path: path.clone() }); $nodes[0].node.push_pending_event(Event::PaymentPathSuccessful { payment_id: PaymentId([42; 32]), payment_hash: None, @@ -1662,7 +1741,7 @@ mod tests { _ => panic!("Unexpected event"), } - $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() }); + $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() }); $nodes[0].node.push_pending_event(Event::ProbeSuccessful { payment_id: PaymentId([42; 32]), payment_hash: PaymentHash([42; 32]), @@ -1674,7 +1753,7 @@ mod tests { _ => panic!("Unexpected event"), } - $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() }); + $nodes[0].scorer.write_lock().expect(TestResult::ProbeFailure { path: path.clone() }); $nodes[0].node.push_pending_event(Event::ProbeFailed { payment_id: PaymentId([42; 32]), payment_hash: PaymentHash([42; 32]), @@ -1701,7 +1780,7 @@ mod tests { }; let (_, nodes) = create_nodes(1, "test_payment_path_scoring"); - let data_dir = nodes[0].persister.get_data_dir(); + let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); @@ -1710,6 +1789,10 @@ mod tests { if !std::thread::panicking() { bg_processor.stop().unwrap(); } + + let log_entries = nodes[0].logger.lines.lock().unwrap(); + let expected_log = "Persisting scorer after update".to_string(); + assert_eq!(*log_entries.get(&("lightning_background_processor".to_string(), expected_log)).unwrap(), 5); } #[tokio::test] @@ -1730,7 +1813,7 @@ mod tests { }; let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async"); - let data_dir = nodes[0].persister.get_data_dir(); + let data_dir = nodes[0].kv_store.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); let (exit_sender, exit_receiver) = tokio::sync::watch::channel(()); @@ -1752,6 +1835,10 @@ mod tests { let t2 = tokio::spawn(async move { do_test_payment_path_scoring!(nodes, receiver.recv().await); exit_sender.send(()).unwrap(); + + let log_entries = nodes[0].logger.lines.lock().unwrap(); + let expected_log = "Persisting scorer after update".to_string(); + assert_eq!(*log_entries.get(&("lightning_background_processor".to_string(), expected_log)).unwrap(), 5); }); let (r1, r2) = tokio::join!(t1, t2);