From 5eeb254b8285adf4b741a7bced88956bcff208bc Mon Sep 17 00:00:00 2001 From: Jurvis Tan <5944973+jurvis@users.noreply.github.com> Date: Wed, 27 Apr 2022 22:16:38 -0700 Subject: [PATCH] Add utils to persist scorer in BackgroundProcessor move last_prune_call back --- lightning-background-processor/src/lib.rs | 77 +++++++++++++++++++---- lightning/src/routing/scoring.rs | 8 +++ lightning/src/util/persist.rs | 21 +++++-- 3 files changed, 88 insertions(+), 18 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index e23737c0a..e306aeb3c 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -18,6 +18,7 @@ use lightning::ln::channelmanager::ChannelManager; use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler}; use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor}; use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler}; +use lightning::routing::scoring::WriteableScore; use lightning::util::events::{Event, EventHandler, EventsProvider}; use lightning::util::logger::Logger; use lightning::util::persist::Persister; @@ -151,6 +152,7 @@ impl BackgroundProcessor { /// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph /// [`NetworkGraph::write`]: lightning::routing::network_graph::NetworkGraph#impl-Writeable pub fn start< + 'a, Signer: 'static + Sign, CA: 'static + Deref + Send + Sync, CF: 'static + Deref + Send + Sync, @@ -171,9 +173,11 @@ impl BackgroundProcessor { NG: 'static + Deref> + Send + Sync, UMH: 'static + Deref + Send + Sync, PM: 'static + Deref> + Send + Sync, + S: 'static + Deref + Send + Sync, + SC: WriteableScore<'a>, >( persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM, - net_graph_msg_handler: Option, peer_manager: PM, logger: L + net_graph_msg_handler: Option, peer_manager: PM, logger: L, scorer: Option ) -> Self where CA::Target: 'static + chain::Access, @@ -187,7 +191,7 @@ impl BackgroundProcessor { CMH::Target: 'static + ChannelMessageHandler, RMH::Target: 'static + RoutingMessageHandler, UMH::Target: 'static + CustomMessageHandler, - PS::Target: 'static + Persister + PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>, { let stop_thread = Arc::new(AtomicBool::new(false)); let stop_thread_clone = stop_thread.clone(); @@ -277,6 +281,12 @@ impl BackgroundProcessor { last_prune_call = Instant::now(); have_pruned = true; } + if let Some(ref scorer) = scorer { + log_trace!(logger, "Persisting scorer"); + if let Err(e) = persister.persist_scorer(&scorer) { + log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) + } + } } } @@ -285,10 +295,16 @@ impl BackgroundProcessor { // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. persister.persist_manager(&*channel_manager)?; + // Persist Scorer on exit + if let Some(ref scorer) = scorer { + persister.persist_scorer(&scorer)?; + } + // Persist NetworkGraph on exit if let Some(ref handler) = net_graph_msg_handler { persister.persist_graph(handler.network_graph())?; } + Ok(()) }); Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } @@ -369,6 +385,7 @@ mod tests { use std::path::PathBuf; use std::sync::{Arc, Mutex}; use std::time::Duration; + use lightning::routing::scoring::{FixedPenaltyScorer}; use super::{BackgroundProcessor, FRESHNESS_TIMER}; const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER; @@ -395,6 +412,7 @@ mod tests { network_graph: Arc, logger: Arc, best_block: BestBlock, + scorer: Arc>, } impl Drop for Node { @@ -410,13 +428,14 @@ mod tests { struct Persister { graph_error: Option<(std::io::ErrorKind, &'static str)>, manager_error: Option<(std::io::ErrorKind, &'static str)>, + scorer_error: Option<(std::io::ErrorKind, &'static str)>, filesystem_persister: FilesystemPersister, } impl Persister { fn new(data_dir: String) -> Self { let filesystem_persister = FilesystemPersister::new(data_dir.clone()); - Self { graph_error: None, manager_error: None, filesystem_persister } + Self { graph_error: None, manager_error: None, scorer_error: None, filesystem_persister } } fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self { @@ -426,6 +445,10 @@ mod tests { fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self { Self { manager_error: Some((error, message)), ..self } } + + fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self { + Self { scorer_error: Some((error, message)), ..self } + } } impl KVStorePersister for Persister { @@ -442,6 +465,12 @@ mod tests { } } + if key == "scorer" { + if let Some((error, message)) = self.scorer_error { + return Err(std::io::Error::new(error, message)) + } + } + self.filesystem_persister.persist(key, object) } } @@ -473,7 +502,8 @@ mod tests { let net_graph_msg_handler = Some(Arc::new(NetGraphMsgHandler::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()))); let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )}; let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{})); - let node = Node { node: manager, net_graph_msg_handler, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block }; + let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0))); + let node = Node { node: manager, net_graph_msg_handler, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer }; nodes.push(node); } @@ -571,7 +601,7 @@ mod tests { let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); let event_handler = |_: &_| {}; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); macro_rules! check_persisted_data { ($node: expr, $filepath: expr) => { @@ -621,6 +651,10 @@ mod tests { check_persisted_data!(network_graph, filepath.clone()); } + // Check scorer is persisted + let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string()); + check_persisted_data!(nodes[0].scorer, filepath.clone()); + assert!(bg_processor.stop().is_ok()); } @@ -632,7 +666,7 @@ mod tests { let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); let event_handler = |_: &_| {}; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); loop { let log_entries = nodes[0].logger.lines.lock().unwrap(); let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string(); @@ -655,7 +689,7 @@ mod tests { let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test")); let event_handler = |_: &_| {}; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); match bg_processor.join() { Ok(_) => panic!("Expected error persisting manager"), Err(e) => { @@ -672,7 +706,7 @@ mod tests { let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test")); let event_handler = |_: &_| {}; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); match bg_processor.stop() { Ok(_) => panic!("Expected error persisting network graph"), @@ -683,6 +717,24 @@ mod tests { } } + #[test] + fn test_scorer_persist_error() { + // Test that if we encounter an error during scorer persistence, an error gets returned. + let nodes = create_nodes(2, "test_persist_scorer_error".to_string()); + let data_dir = nodes[0].persister.get_data_dir(); + let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test")); + let event_handler = |_: &_| {}; + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); + + match bg_processor.stop() { + Ok(_) => panic!("Expected error persisting scorer"), + Err(e) => { + assert_eq!(e.kind(), std::io::ErrorKind::Other); + assert_eq!(e.get_ref().unwrap().to_string(), "test"); + }, + } + } + #[test] fn test_background_event_handling() { let mut nodes = create_nodes(2, "test_background_event_handling".to_string()); @@ -695,7 +747,7 @@ mod tests { let event_handler = move |event: &Event| { sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(); }; - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); // Open a channel and check that the FundingGenerationReady event was handled. begin_open_channel!(nodes[0], nodes[1], channel_value); @@ -720,7 +772,7 @@ mod tests { let (sender, receiver) = std::sync::mpsc::sync_channel(1); let event_handler = move |event: &Event| sender.send(event.clone()).unwrap(); let persister = Arc::new(Persister::new(data_dir)); - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); // Force close the channel and check that the SpendableOutputs event was handled. nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id).unwrap(); @@ -747,11 +799,10 @@ mod tests { // Initiate the background processors to watch each node. let data_dir = nodes[0].persister.get_data_dir(); let persister = Arc::new(Persister::new(data_dir)); - let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0))); let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes); - let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, scorer, Arc::clone(&nodes[0].logger), |_: &_| {}, RetryAttempts(2))); + let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, RetryAttempts(2))); let event_handler = Arc::clone(&invoice_payer); - let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone()); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); assert!(bg_processor.stop().is_ok()); } } diff --git a/lightning/src/routing/scoring.rs b/lightning/src/routing/scoring.rs index 0e04c6390..ce1149e0f 100644 --- a/lightning/src/routing/scoring.rs +++ b/lightning/src/routing/scoring.rs @@ -137,6 +137,14 @@ pub trait LockableScore<'a> { fn lock(&'a self) -> Self::Locked; } +/// Refers to a scorer that is accessible under lock and also writeable to disk +/// +/// We need this trait to be able to pass in a scorer to `lightning-background-processor` that will enable us to +/// use the Persister to persist it. +pub trait WriteableScore<'a>: LockableScore<'a> + Writeable {} + +impl<'a, T> WriteableScore<'a> for T where T: LockableScore<'a> + Writeable {} + /// (C-not exported) impl<'a, T: 'a + Score> LockableScore<'a> for Mutex { type Locked = MutexGuard<'a, T>; diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 9476331c1..5c124c21a 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -11,6 +11,7 @@ use core::ops::Deref; use bitcoin::hashes::hex::ToHex; use io::{self}; +use routing::scoring::WriteableScore; use crate::{chain::{keysinterface::{Sign, KeysInterface}, self, transaction::{OutPoint}, chaininterface::{BroadcasterInterface, FeeEstimator}, chainmonitor::{Persist, MonitorUpdateId}, channelmonitor::{ChannelMonitor, ChannelMonitorUpdate}}, ln::channelmanager::ChannelManager, routing::network_graph::NetworkGraph}; use super::{logger::Logger, ser::Writeable}; @@ -24,37 +25,47 @@ pub trait KVStorePersister { fn persist(&self, key: &str, object: &W) -> io::Result<()>; } -/// Trait that handles persisting a [`ChannelManager`] and [`NetworkGraph`] to disk. -pub trait Persister +/// Trait that handles persisting a [`ChannelManager`], [`NetworkGraph`], and [`WriteableScore`] to disk. +pub trait Persister<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref, S> where M::Target: 'static + chain::Watch, T::Target: 'static + BroadcasterInterface, K::Target: 'static + KeysInterface, F::Target: 'static + FeeEstimator, L::Target: 'static + Logger, + S: WriteableScore<'a>, { /// Persist the given ['ChannelManager'] to disk, returning an error if persistence failed. fn persist_manager(&self, channel_manager: &ChannelManager) -> Result<(), io::Error>; /// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed. fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), io::Error>; + + /// Persist the given [`WriteableScore`] to disk, returning an error if persistence failed. + fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error>; } -impl Persister for A +impl<'a, A: KVStorePersister, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref, S> Persister<'a, Signer, M, T, K, F, L, S> for A where M::Target: 'static + chain::Watch, T::Target: 'static + BroadcasterInterface, K::Target: 'static + KeysInterface, F::Target: 'static + FeeEstimator, L::Target: 'static + Logger, + S: WriteableScore<'a>, { - /// Persist the given ['ChannelManager'] to disk, returning an error if persistence failed. + /// Persist the given ['ChannelManager'] to disk with the name "manager", returning an error if persistence failed. fn persist_manager(&self, channel_manager: &ChannelManager) -> Result<(), io::Error> { self.persist("manager", channel_manager) } - /// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed. + /// Persist the given [`NetworkGraph`] to disk with the name "network_graph", returning an error if persistence failed. fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), io::Error> { self.persist("network_graph", network_graph) } + + /// Persist the given [`WriteableScore`] to disk with name "scorer", returning an error if persistence failed. + fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error> { + self.persist("scorer", &scorer) + } } impl Persist for K { -- 2.39.5