From f0b928b06df43c8be43aef30019d4d80c0fdbea7 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 3 Apr 2023 20:19:49 +0000 Subject: [PATCH] Make BP's `test_payment_path_scoring` dual sync/async. This finally gives us a bit of test coverage of the async BP, which was embarrassingly missing until now. --- lightning-background-processor/Cargo.toml | 1 + lightning-background-processor/src/lib.rs | 231 +++++++++++++--------- 2 files changed, 138 insertions(+), 94 deletions(-) diff --git a/lightning-background-processor/Cargo.toml b/lightning-background-processor/Cargo.toml index af8bb25e5..e2acb2240 100644 --- a/lightning-background-processor/Cargo.toml +++ b/lightning-background-processor/Cargo.toml @@ -25,6 +25,7 @@ lightning = { version = "0.0.114", path = "../lightning", default-features = fal lightning-rapid-gossip-sync = { version = "0.0.114", path = "../lightning-rapid-gossip-sync", default-features = false } [dev-dependencies] +tokio = { version = "1.14", features = [ "macros", "rt", "rt-multi-thread", "sync", "time" ] } lightning = { version = "0.0.114", path = "../lightning", features = ["_test_utils"] } lightning-invoice = { version = "0.22.0", path = "../lightning-invoice" } lightning-persister = { version = "0.0.114", path = "../lightning-persister" } diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index da5f2acf8..41aceedc8 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -1419,12 +1419,100 @@ mod tests { assert_eq!(network_graph.read_only().channels().len(), 0); } + macro_rules! do_test_payment_path_scoring { + ($nodes: expr, $receive: expr) => { + // Ensure that we update the scorer when relevant events are processed. In this case, we ensure + // that we update the scorer upon a payment path succeeding (note that the channel must be + // public or else we won't score it). + // A background event handler for FundingGenerationReady events must be hooked up to a + // running background processor. + let scored_scid = 4242; + let secp_ctx = Secp256k1::new(); + let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap(); + let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey); + + let path = vec![RouteHop { + pubkey: node_1_id, + node_features: NodeFeatures::empty(), + short_channel_id: scored_scid, + channel_features: ChannelFeatures::empty(), + fee_msat: 0, + cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32, + }]; + + $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid }); + $nodes[0].node.push_pending_event(Event::PaymentPathFailed { + payment_id: None, + payment_hash: PaymentHash([42; 32]), + payment_failed_permanently: false, + failure: PathFailure::OnPath { network_update: None }, + path: path.clone(), + short_channel_id: Some(scored_scid), + }); + let event = $receive.expect("PaymentPathFailed not handled within deadline"); + match event { + Event::PaymentPathFailed { .. } => {}, + _ => panic!("Unexpected event"), + } + + // Ensure we'll score payments that were explicitly failed back by the destination as + // ProbeSuccess. + $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() }); + $nodes[0].node.push_pending_event(Event::PaymentPathFailed { + payment_id: None, + payment_hash: PaymentHash([42; 32]), + payment_failed_permanently: true, + failure: PathFailure::OnPath { network_update: None }, + path: path.clone(), + short_channel_id: None, + }); + let event = $receive.expect("PaymentPathFailed not handled within deadline"); + match event { + Event::PaymentPathFailed { .. } => {}, + _ => panic!("Unexpected event"), + } + + $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() }); + $nodes[0].node.push_pending_event(Event::PaymentPathSuccessful { + payment_id: PaymentId([42; 32]), + payment_hash: None, + path: path.clone(), + }); + let event = $receive.expect("PaymentPathSuccessful not handled within deadline"); + match event { + Event::PaymentPathSuccessful { .. } => {}, + _ => panic!("Unexpected event"), + } + + $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() }); + $nodes[0].node.push_pending_event(Event::ProbeSuccessful { + payment_id: PaymentId([42; 32]), + payment_hash: PaymentHash([42; 32]), + path: path.clone(), + }); + let event = $receive.expect("ProbeSuccessful not handled within deadline"); + match event { + Event::ProbeSuccessful { .. } => {}, + _ => panic!("Unexpected event"), + } + + $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() }); + $nodes[0].node.push_pending_event(Event::ProbeFailed { + payment_id: PaymentId([42; 32]), + payment_hash: PaymentHash([42; 32]), + path, + short_channel_id: Some(scored_scid), + }); + let event = $receive.expect("ProbeFailure not handled within deadline"); + match event { + Event::ProbeFailed { .. } => {}, + _ => panic!("Unexpected event"), + } + } + } + #[test] fn test_payment_path_scoring() { - // Ensure that we update the scorer when relevant events are processed. In this case, we ensure - // that we update the scorer upon a payment path succeeding (note that the channel must be - // public or else we won't score it). - // Set up a background event handler for FundingGenerationReady events. let (sender, receiver) = std::sync::mpsc::sync_channel(1); let event_handler = move |event: Event| match event { Event::PaymentPathFailed { .. } => sender.send(event).unwrap(), @@ -1439,101 +1527,56 @@ mod tests { let persister = Arc::new(Persister::new(data_dir)); let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone())); - let scored_scid = 4242; - let secp_ctx = Secp256k1::new(); - let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap(); - let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey); - - let path = vec![RouteHop { - pubkey: node_1_id, - node_features: NodeFeatures::empty(), - short_channel_id: scored_scid, - channel_features: ChannelFeatures::empty(), - fee_msat: 0, - cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32, - }]; - - nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid }); - nodes[0].node.push_pending_event(Event::PaymentPathFailed { - payment_id: None, - payment_hash: PaymentHash([42; 32]), - payment_failed_permanently: false, - failure: PathFailure::OnPath { network_update: None }, - path: path.clone(), - short_channel_id: Some(scored_scid), - }); - let event = receiver - .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) - .expect("PaymentPathFailed not handled within deadline"); - match event { - Event::PaymentPathFailed { .. } => {}, - _ => panic!("Unexpected event"), - } + do_test_payment_path_scoring!(nodes, receiver.recv_timeout(Duration::from_secs(EVENT_DEADLINE))); - // Ensure we'll score payments that were explicitly failed back by the destination as - // ProbeSuccess. - nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() }); - nodes[0].node.push_pending_event(Event::PaymentPathFailed { - payment_id: None, - payment_hash: PaymentHash([42; 32]), - payment_failed_permanently: true, - failure: PathFailure::OnPath { network_update: None }, - path: path.clone(), - short_channel_id: None, - }); - let event = receiver - .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) - .expect("PaymentPathFailed not handled within deadline"); - match event { - Event::PaymentPathFailed { .. } => {}, - _ => panic!("Unexpected event"), + if !std::thread::panicking() { + bg_processor.stop().unwrap(); } + } - nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() }); - nodes[0].node.push_pending_event(Event::PaymentPathSuccessful { - payment_id: PaymentId([42; 32]), - payment_hash: None, - path: path.clone(), - }); - let event = receiver - .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) - .expect("PaymentPathSuccessful not handled within deadline"); - match event { - Event::PaymentPathSuccessful { .. } => {}, - _ => panic!("Unexpected event"), - } + #[tokio::test] + #[cfg(feature = "futures")] + async fn test_payment_path_scoring_async() { + let (sender, mut receiver) = tokio::sync::mpsc::channel(1); + let event_handler = move |event: Event| { + let sender_ref = sender.clone(); + async move { + match event { + Event::PaymentPathFailed { .. } => { sender_ref.send(event).await.unwrap() }, + Event::PaymentPathSuccessful { .. } => { sender_ref.send(event).await.unwrap() }, + Event::ProbeSuccessful { .. } => { sender_ref.send(event).await.unwrap() }, + Event::ProbeFailed { .. } => { sender_ref.send(event).await.unwrap() }, + _ => panic!("Unexpected event: {:?}", event), + } + } + }; - nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() }); - nodes[0].node.push_pending_event(Event::ProbeSuccessful { - payment_id: PaymentId([42; 32]), - payment_hash: PaymentHash([42; 32]), - path: path.clone(), - }); - let event = receiver - .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) - .expect("ProbeSuccessful not handled within deadline"); - match event { - Event::ProbeSuccessful { .. } => {}, - _ => panic!("Unexpected event"), - } + let nodes = create_nodes(1, "test_payment_path_scoring_async".to_string()); + let data_dir = nodes[0].persister.get_data_dir(); + let persister = Arc::new(Persister::new(data_dir)); - nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() }); - nodes[0].node.push_pending_event(Event::ProbeFailed { - payment_id: PaymentId([42; 32]), - payment_hash: PaymentHash([42; 32]), - path, - short_channel_id: Some(scored_scid), + let (exit_sender, exit_receiver) = tokio::sync::watch::channel(()); + + let bp_future = super::process_events_async( + persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), + nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), + Some(nodes[0].scorer.clone()), move |dur: Duration| { + let mut exit_receiver = exit_receiver.clone(); + Box::pin(async move { + tokio::select! { + _ = tokio::time::sleep(dur) => false, + _ = exit_receiver.changed() => true, + } + }) + }, false, + ); + // TODO: Drop _local and simply spawn after #2003 + let local_set = tokio::task::LocalSet::new(); + local_set.spawn_local(bp_future); + local_set.spawn_local(async move { + do_test_payment_path_scoring!(nodes, receiver.recv().await); + exit_sender.send(()).unwrap(); }); - let event = receiver - .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) - .expect("ProbeFailure not handled within deadline"); - match event { - Event::ProbeFailed { .. } => {}, - _ => panic!("Unexpected event"), - } - - if !std::thread::panicking() { - bg_processor.stop().unwrap(); - } + local_set.await; } } -- 2.39.5