]> git.bitcoin.ninja Git - rust-lightning/commitdiff
Score payment paths in BackgroundProcessor
authorValentine Wallace <vwallace@protonmail.com>
Fri, 3 Feb 2023 16:25:20 +0000 (11:25 -0500)
committerValentine Wallace <vwallace@protonmail.com>
Fri, 3 Feb 2023 16:25:20 +0000 (11:25 -0500)
lightning-background-processor/src/lib.rs
lightning/src/ln/channelmanager.rs

index 9cf46a1663cbd5c9d4e6862c141dc4f24057e85e..16e06780dfce48ade113ddd97aaf06860c6f1596 100644 (file)
@@ -16,6 +16,9 @@
 #[cfg(any(test, feature = "std"))]
 extern crate core;
 
+#[cfg(not(feature = "std"))]
+extern crate alloc;
+
 #[macro_use] extern crate lightning;
 extern crate lightning_rapid_gossip_sync;
 
@@ -28,7 +31,7 @@ use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMes
 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
 use lightning::routing::router::Router;
-use lightning::routing::scoring::WriteableScore;
+use lightning::routing::scoring::{Score, WriteableScore};
 use lightning::util::events::{Event, EventHandler, EventsProvider};
 use lightning::util::logger::Logger;
 use lightning::util::persist::Persister;
@@ -49,6 +52,8 @@ use std::time::Instant;
 
 #[cfg(feature = "futures")]
 use futures_util::{select_biased, future::FutureExt, task};
+#[cfg(not(feature = "std"))]
+use alloc::vec::Vec;
 
 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
@@ -216,6 +221,37 @@ fn handle_network_graph_update<L: Deref>(
        }
 }
 
+fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
+       scorer: &'a S, event: &Event
+) {
+       let mut score = scorer.lock();
+       match event {
+               Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
+                       let path = path.iter().collect::<Vec<_>>();
+                       score.payment_path_failed(&path, *scid);
+               },
+               Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
+                       // Reached if the destination explicitly failed it back. We treat this as a successful probe
+                       // because the payment made it all the way to the destination with sufficient liquidity.
+                       let path = path.iter().collect::<Vec<_>>();
+                       score.probe_successful(&path);
+               },
+               Event::PaymentPathSuccessful { path, .. } => {
+                       let path = path.iter().collect::<Vec<_>>();
+                       score.payment_path_successful(&path);
+               },
+               Event::ProbeSuccessful { path, .. } => {
+                       let path = path.iter().collect::<Vec<_>>();
+                       score.probe_successful(&path);
+               },
+               Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
+                       let path = path.iter().collect::<Vec<_>>();
+                       score.probe_failed(&path, *scid);
+               },
+               _ => {},
+       }
+}
+
 macro_rules! define_run_body {
        ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
         $channel_manager: ident, $process_channel_manager_events: expr,
@@ -387,7 +423,7 @@ pub async fn process_events_async<
        UMH: 'static + Deref + Send + Sync,
        PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
        S: 'static + Deref<Target = SC> + Send + Sync,
-       SC: WriteableScore<'a>,
+       SC: for<'b> WriteableScore<'b>,
        SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
        Sleeper: Fn(Duration) -> SleepFuture
 >(
@@ -417,10 +453,14 @@ where
        let async_event_handler = |event| {
                let network_graph = gossip_sync.network_graph();
                let event_handler = &event_handler;
+               let scorer = &scorer;
                async move {
                        if let Some(network_graph) = network_graph {
                                handle_network_graph_update(network_graph, &event)
                        }
+                       if let Some(ref scorer) = scorer {
+                               update_scorer(scorer, &event);
+                       }
                        event_handler(event).await;
                }
        };
@@ -516,7 +556,7 @@ impl BackgroundProcessor {
                UMH: 'static + Deref + Send + Sync,
                PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
                S: 'static + Deref<Target = SC> + Send + Sync,
-               SC: WriteableScore<'a>,
+               SC: for <'b> WriteableScore<'b>,
        >(
                persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
                gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
@@ -547,6 +587,9 @@ impl BackgroundProcessor {
                                if let Some(network_graph) = network_graph {
                                        handle_network_graph_update(network_graph, &event)
                                }
+                               if let Some(ref scorer) = scorer {
+                                       update_scorer(scorer, &event);
+                               }
                                event_handler.handle_event(event);
                        };
                        define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
@@ -613,14 +656,16 @@ mod tests {
        use bitcoin::blockdata::locktime::PackedLockTime;
        use bitcoin::blockdata::transaction::{Transaction, TxOut};
        use bitcoin::network::constants::Network;
+       use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1};
        use lightning::chain::{BestBlock, Confirm, chainmonitor};
        use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
        use lightning::chain::keysinterface::{InMemorySigner, EntropySource, KeysManager};
        use lightning::chain::transaction::OutPoint;
        use lightning::get_event_msg;
+       use lightning::ln::PaymentHash;
        use lightning::ln::channelmanager;
-       use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters};
-       use lightning::ln::features::ChannelFeatures;
+       use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId};
+       use lightning::ln::features::{ChannelFeatures, NodeFeatures};
        use lightning::ln::msgs::{ChannelMessageHandler, Init};
        use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
        use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
@@ -1296,4 +1341,124 @@ mod tests {
                let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
                assert!(bg_processor.stop().is_ok());
        }
+
+       #[test]
+       fn test_payment_path_scoring() {
+               // Ensure that we update the scorer when relevant events are processed. In this case, we ensure
+               // that we update the scorer upon a payment path succeeding (note that the channel must be
+               // public or else we won't score it).
+               // Set up a background event handler for FundingGenerationReady events.
+               let (sender, receiver) = std::sync::mpsc::sync_channel(1);
+               let event_handler = move |event: Event| match event {
+                       Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
+                       Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
+                       Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
+                       Event::ProbeFailed { .. } => sender.send(event).unwrap(),
+                       _ => panic!("Unexpected event: {:?}", event),
+               };
+
+               let nodes = create_nodes(1, "test_payment_path_scoring".to_string());
+               let data_dir = nodes[0].persister.get_data_dir();
+               let persister = Arc::new(Persister::new(data_dir.clone()));
+               let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
+
+               let scored_scid = 4242;
+               let secp_ctx = Secp256k1::new();
+               let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
+               let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
+
+               let path = vec![RouteHop {
+                       pubkey: node_1_id,
+                       node_features: NodeFeatures::empty(),
+                       short_channel_id: scored_scid,
+                       channel_features: ChannelFeatures::empty(),
+                       fee_msat: 0,
+                       cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
+               }];
+
+               nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
+               nodes[0].node.push_pending_event(Event::PaymentPathFailed {
+                       payment_id: None,
+                       payment_hash: PaymentHash([42; 32]),
+                       payment_failed_permanently: false,
+                       network_update: None,
+                       all_paths_failed: true,
+                       path: path.clone(),
+                       short_channel_id: Some(scored_scid),
+                       retry: None,
+               });
+               let event = receiver
+                       .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
+                       .expect("PaymentPathFailed not handled within deadline");
+               match event {
+                       Event::PaymentPathFailed { .. } => {},
+                       _ => panic!("Unexpected event"),
+               }
+
+               // Ensure we'll score payments that were explicitly failed back by the destination as
+               // ProbeSuccess.
+               nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
+               nodes[0].node.push_pending_event(Event::PaymentPathFailed {
+                       payment_id: None,
+                       payment_hash: PaymentHash([42; 32]),
+                       payment_failed_permanently: true,
+                       network_update: None,
+                       all_paths_failed: true,
+                       path: path.clone(),
+                       short_channel_id: None,
+                       retry: None,
+               });
+               let event = receiver
+                       .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
+                       .expect("PaymentPathFailed not handled within deadline");
+               match event {
+                       Event::PaymentPathFailed { .. } => {},
+                       _ => panic!("Unexpected event"),
+               }
+
+               nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() });
+               nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
+                       payment_id: PaymentId([42; 32]),
+                       payment_hash: None,
+                       path: path.clone(),
+               });
+               let event = receiver
+                       .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
+                       .expect("PaymentPathSuccessful not handled within deadline");
+               match event {
+                       Event::PaymentPathSuccessful { .. } => {},
+                       _ => panic!("Unexpected event"),
+               }
+
+               nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
+               nodes[0].node.push_pending_event(Event::ProbeSuccessful {
+                       payment_id: PaymentId([42; 32]),
+                       payment_hash: PaymentHash([42; 32]),
+                       path: path.clone(),
+               });
+               let event = receiver
+                       .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
+                       .expect("ProbeSuccessful not handled within deadline");
+               match event {
+                       Event::ProbeSuccessful  { .. } => {},
+                       _ => panic!("Unexpected event"),
+               }
+
+               nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() });
+               nodes[0].node.push_pending_event(Event::ProbeFailed {
+                       payment_id: PaymentId([42; 32]),
+                       payment_hash: PaymentHash([42; 32]),
+                       path: path.clone(),
+                       short_channel_id: Some(scored_scid),
+               });
+               let event = receiver
+                       .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
+                       .expect("ProbeFailure not handled within deadline");
+               match event {
+                       Event::ProbeFailed { .. } => {},
+                       _ => panic!("Unexpected event"),
+               }
+
+               assert!(bg_processor.stop().is_ok());
+       }
 }
index 69006bcc889cdd20b4bb45b3cd610bab63d2934d..4ea0a5e3e7503ab3d8a56a5c6d5b6a5b4eeed600 100644 (file)
@@ -5520,6 +5520,12 @@ where
                events.into_inner()
        }
 
+       #[cfg(feature = "_test_utils")]
+       pub fn push_pending_event(&self, event: events::Event) {
+               let mut events = self.pending_events.lock().unwrap();
+               events.push(event);
+       }
+
        #[cfg(test)]
        pub fn pop_pending_event(&self) -> Option<events::Event> {
                let mut events = self.pending_events.lock().unwrap();