From: Valentine Wallace Date: Fri, 3 Feb 2023 16:25:20 +0000 (-0500) Subject: Score payment paths in BackgroundProcessor X-Git-Tag: v0.0.114-beta~31^2 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=2f9c3e5ea12483af930c1e0cb843dd33348aeb5f;p=rust-lightning Score payment paths in BackgroundProcessor --- diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 9cf46a166..16e06780d 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -16,6 +16,9 @@ #[cfg(any(test, feature = "std"))] extern crate core; +#[cfg(not(feature = "std"))] +extern crate alloc; + #[macro_use] extern crate lightning; extern crate lightning_rapid_gossip_sync; @@ -28,7 +31,7 @@ use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMes use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor}; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::router::Router; -use lightning::routing::scoring::WriteableScore; +use lightning::routing::scoring::{Score, WriteableScore}; use lightning::util::events::{Event, EventHandler, EventsProvider}; use lightning::util::logger::Logger; use lightning::util::persist::Persister; @@ -49,6 +52,8 @@ use std::time::Instant; #[cfg(feature = "futures")] use futures_util::{select_biased, future::FutureExt, task}; +#[cfg(not(feature = "std"))] +use alloc::vec::Vec; /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its @@ -216,6 +221,37 @@ fn handle_network_graph_update( } } +fn update_scorer<'a, S: 'static + Deref + Send + Sync, SC: 'a + WriteableScore<'a>>( + scorer: &'a S, event: &Event +) { + let mut score = scorer.lock(); + match event { + Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => { + let path = path.iter().collect::>(); + score.payment_path_failed(&path, *scid); + }, + Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => { + // Reached if the destination explicitly failed it back. We treat this as a successful probe + // because the payment made it all the way to the destination with sufficient liquidity. + let path = path.iter().collect::>(); + score.probe_successful(&path); + }, + Event::PaymentPathSuccessful { path, .. } => { + let path = path.iter().collect::>(); + score.payment_path_successful(&path); + }, + Event::ProbeSuccessful { path, .. } => { + let path = path.iter().collect::>(); + score.probe_successful(&path); + }, + Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => { + let path = path.iter().collect::>(); + score.probe_failed(&path, *scid); + }, + _ => {}, + } +} + macro_rules! define_run_body { ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, $channel_manager: ident, $process_channel_manager_events: expr, @@ -387,7 +423,7 @@ pub async fn process_events_async< UMH: 'static + Deref + Send + Sync, PM: 'static + Deref> + Send + Sync, S: 'static + Deref + Send + Sync, - SC: WriteableScore<'a>, + SC: for<'b> WriteableScore<'b>, SleepFuture: core::future::Future + core::marker::Unpin, Sleeper: Fn(Duration) -> SleepFuture >( @@ -417,10 +453,14 @@ where let async_event_handler = |event| { let network_graph = gossip_sync.network_graph(); let event_handler = &event_handler; + let scorer = &scorer; async move { if let Some(network_graph) = network_graph { handle_network_graph_update(network_graph, &event) } + if let Some(ref scorer) = scorer { + update_scorer(scorer, &event); + } event_handler(event).await; } }; @@ -516,7 +556,7 @@ impl BackgroundProcessor { UMH: 'static + Deref + Send + Sync, PM: 'static + Deref> + Send + Sync, S: 'static + Deref + Send + Sync, - SC: WriteableScore<'a>, + SC: for <'b> WriteableScore<'b>, >( persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM, gossip_sync: GossipSync, peer_manager: PM, logger: L, scorer: Option, @@ -547,6 +587,9 @@ impl BackgroundProcessor { if let Some(network_graph) = network_graph { handle_network_graph_update(network_graph, &event) } + if let Some(ref scorer) = scorer { + update_scorer(scorer, &event); + } event_handler.handle_event(event); }; define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), @@ -613,14 +656,16 @@ mod tests { use bitcoin::blockdata::locktime::PackedLockTime; use bitcoin::blockdata::transaction::{Transaction, TxOut}; use bitcoin::network::constants::Network; + use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1}; use lightning::chain::{BestBlock, Confirm, chainmonitor}; use lightning::chain::channelmonitor::ANTI_REORG_DELAY; use lightning::chain::keysinterface::{InMemorySigner, EntropySource, KeysManager}; use lightning::chain::transaction::OutPoint; use lightning::get_event_msg; + use lightning::ln::PaymentHash; use lightning::ln::channelmanager; - use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters}; - use lightning::ln::features::ChannelFeatures; + use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId}; + use lightning::ln::features::{ChannelFeatures, NodeFeatures}; use lightning::ln::msgs::{ChannelMessageHandler, Init}; use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler}; use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync}; @@ -1296,4 +1341,124 @@ mod tests { let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); assert!(bg_processor.stop().is_ok()); } + + #[test] + fn test_payment_path_scoring() { + // Ensure that we update the scorer when relevant events are processed. In this case, we ensure + // that we update the scorer upon a payment path succeeding (note that the channel must be + // public or else we won't score it). + // Set up a background event handler for FundingGenerationReady events. + let (sender, receiver) = std::sync::mpsc::sync_channel(1); + let event_handler = move |event: Event| match event { + Event::PaymentPathFailed { .. } => sender.send(event).unwrap(), + Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(), + Event::ProbeSuccessful { .. } => sender.send(event).unwrap(), + Event::ProbeFailed { .. } => sender.send(event).unwrap(), + _ => panic!("Unexpected event: {:?}", event), + }; + + let nodes = create_nodes(1, "test_payment_path_scoring".to_string()); + let data_dir = nodes[0].persister.get_data_dir(); + let persister = Arc::new(Persister::new(data_dir.clone())); + let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); + + let scored_scid = 4242; + let secp_ctx = Secp256k1::new(); + let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap(); + let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey); + + let path = vec![RouteHop { + pubkey: node_1_id, + node_features: NodeFeatures::empty(), + short_channel_id: scored_scid, + channel_features: ChannelFeatures::empty(), + fee_msat: 0, + cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32, + }]; + + nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid }); + nodes[0].node.push_pending_event(Event::PaymentPathFailed { + payment_id: None, + payment_hash: PaymentHash([42; 32]), + payment_failed_permanently: false, + network_update: None, + all_paths_failed: true, + path: path.clone(), + short_channel_id: Some(scored_scid), + retry: None, + }); + let event = receiver + .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) + .expect("PaymentPathFailed not handled within deadline"); + match event { + Event::PaymentPathFailed { .. } => {}, + _ => panic!("Unexpected event"), + } + + // Ensure we'll score payments that were explicitly failed back by the destination as + // ProbeSuccess. + nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() }); + nodes[0].node.push_pending_event(Event::PaymentPathFailed { + payment_id: None, + payment_hash: PaymentHash([42; 32]), + payment_failed_permanently: true, + network_update: None, + all_paths_failed: true, + path: path.clone(), + short_channel_id: None, + retry: None, + }); + let event = receiver + .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) + .expect("PaymentPathFailed not handled within deadline"); + match event { + Event::PaymentPathFailed { .. } => {}, + _ => panic!("Unexpected event"), + } + + nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() }); + nodes[0].node.push_pending_event(Event::PaymentPathSuccessful { + payment_id: PaymentId([42; 32]), + payment_hash: None, + path: path.clone(), + }); + let event = receiver + .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) + .expect("PaymentPathSuccessful not handled within deadline"); + match event { + Event::PaymentPathSuccessful { .. } => {}, + _ => panic!("Unexpected event"), + } + + nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() }); + nodes[0].node.push_pending_event(Event::ProbeSuccessful { + payment_id: PaymentId([42; 32]), + payment_hash: PaymentHash([42; 32]), + path: path.clone(), + }); + let event = receiver + .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) + .expect("ProbeSuccessful not handled within deadline"); + match event { + Event::ProbeSuccessful { .. } => {}, + _ => panic!("Unexpected event"), + } + + nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() }); + nodes[0].node.push_pending_event(Event::ProbeFailed { + payment_id: PaymentId([42; 32]), + payment_hash: PaymentHash([42; 32]), + path: path.clone(), + short_channel_id: Some(scored_scid), + }); + let event = receiver + .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) + .expect("ProbeFailure not handled within deadline"); + match event { + Event::ProbeFailed { .. } => {}, + _ => panic!("Unexpected event"), + } + + assert!(bg_processor.stop().is_ok()); + } } diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 69006bcc8..4ea0a5e3e 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -5520,6 +5520,12 @@ where events.into_inner() } + #[cfg(feature = "_test_utils")] + pub fn push_pending_event(&self, event: events::Event) { + let mut events = self.pending_events.lock().unwrap(); + events.push(event); + } + #[cfg(test)] pub fn pop_pending_event(&self) -> Option { let mut events = self.pending_events.lock().unwrap();