From 5f6bfd3fb2f6f91d7af323cacf807a26c41895eb Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 24 Apr 2023 03:21:42 +0000 Subject: [PATCH] Switch to using the async background processor This drops a full thread from the sample node and better integrates LDK into the tokio runtime. While we're at it we also push the `Event` being processed by move into the event handler, which doesn't matter much here but makes further modification in the event handling somewhat simpler. --- Cargo.toml | 2 +- src/main.rs | 95 ++++++++++++++++++++++++++++++++--------------------- 2 files changed, 59 insertions(+), 38 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9338251..aa842bf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ lightning-block-sync = { version = "0.0.115", features = [ "rpc-client" ] } lightning-invoice = { version = "0.23" } lightning-net-tokio = { version = "0.0.115" } lightning-persister = { version = "0.0.115" } -lightning-background-processor = { version = "0.0.115" } +lightning-background-processor = { version = "0.0.115", features = [ "futures" ] } lightning-rapid-gossip-sync = { version = "0.0.115" } base64 = "0.13.0" diff --git a/src/main.rs b/src/main.rs index 6780d38..cad7425 100644 --- a/src/main.rs +++ b/src/main.rs @@ -31,7 +31,7 @@ use lightning::routing::router::DefaultRouter; use lightning::util::config::UserConfig; use lightning::events::{Event, PaymentPurpose}; use lightning::util::ser::ReadableArgs; -use lightning_background_processor::{BackgroundProcessor, GossipSync}; +use lightning_background_processor::{process_events_async, GossipSync}; use lightning_block_sync::init; use lightning_block_sync::poll; use lightning_block_sync::SpvClient; @@ -107,7 +107,7 @@ async fn handle_ldk_events( channel_manager: &Arc, bitcoind_client: &BitcoindClient, network_graph: &NetworkGraph, keys_manager: &KeysManager, inbound_payments: &PaymentInfoStorage, outbound_payments: &PaymentInfoStorage, - network: Network, event: &Event, + network: Network, event: Event, ) { match event { Event::FundingGenerationReady { @@ -131,7 +131,7 @@ async fn handle_ldk_events( .expect("Lightning funding tx should always be to a SegWit output") .to_address(); let mut outputs = vec![HashMap::with_capacity(1)]; - outputs[0].insert(addr, *channel_value_satoshis as f64 / 100_000_000.0); + outputs[0].insert(addr, channel_value_satoshis as f64 / 100_000_000.0); let raw_tx = bitcoind_client.create_raw_transaction(outputs).await; // Have your wallet put the inputs into the transaction such that the output is @@ -147,7 +147,7 @@ async fn handle_ldk_events( if channel_manager .funding_transaction_generated( &temporary_channel_id, - counterparty_node_id, + &counterparty_node_id, final_tx, ) .is_err() @@ -176,8 +176,8 @@ async fn handle_ldk_events( print!("> "); io::stdout().flush().unwrap(); let payment_preimage = match purpose { - PaymentPurpose::InvoicePayment { payment_preimage, .. } => *payment_preimage, - PaymentPurpose::SpontaneousPayment(preimage) => Some(*preimage), + PaymentPurpose::InvoicePayment { payment_preimage, .. } => payment_preimage, + PaymentPurpose::SpontaneousPayment(preimage) => Some(preimage), }; channel_manager.claim_funds(payment_preimage.unwrap()); } @@ -191,12 +191,12 @@ async fn handle_ldk_events( io::stdout().flush().unwrap(); let (payment_preimage, payment_secret) = match purpose { PaymentPurpose::InvoicePayment { payment_preimage, payment_secret, .. } => { - (*payment_preimage, Some(*payment_secret)) + (payment_preimage, Some(payment_secret)) } - PaymentPurpose::SpontaneousPayment(preimage) => (Some(*preimage), None), + PaymentPurpose::SpontaneousPayment(preimage) => (Some(preimage), None), }; let mut payments = inbound_payments.lock().unwrap(); - match payments.entry(*payment_hash) { + match payments.entry(payment_hash) { Entry::Occupied(mut e) => { let payment = e.get_mut(); payment.status = HTLCStatus::Succeeded; @@ -208,7 +208,7 @@ async fn handle_ldk_events( preimage: payment_preimage, secret: payment_secret, status: HTLCStatus::Succeeded, - amt_msat: MillisatAmount(Some(*amount_msat)), + amt_msat: MillisatAmount(Some(amount_msat)), }); } } @@ -216,8 +216,8 @@ async fn handle_ldk_events( Event::PaymentSent { payment_preimage, payment_hash, fee_paid_msat, .. } => { let mut payments = outbound_payments.lock().unwrap(); for (hash, payment) in payments.iter_mut() { - if *hash == *payment_hash { - payment.preimage = Some(*payment_preimage); + if *hash == payment_hash { + payment.preimage = Some(payment_preimage); payment.status = HTLCStatus::Succeeded; println!( "\nEVENT: successfully sent payment of {} millisatoshis{} from \ @@ -291,11 +291,11 @@ async fn handle_ldk_events( .unwrap_or_default() }; let from_prev_str = - format!(" from {}{}", node_str(prev_channel_id), channel_str(prev_channel_id)); + format!(" from {}{}", node_str(&prev_channel_id), channel_str(&prev_channel_id)); let to_next_str = - format!(" to {}{}", node_str(next_channel_id), channel_str(next_channel_id)); + format!(" to {}{}", node_str(&next_channel_id), channel_str(&next_channel_id)); - let from_onchain_str = if *claim_from_onchain_tx { + let from_onchain_str = if claim_from_onchain_tx { "from onchain downstream claim" } else { "from HTLC fulfill message" @@ -363,7 +363,7 @@ async fn handle_ldk_events( Event::ChannelClosed { channel_id, reason, user_channel_id: _ } => { println!( "\nEVENT: Channel {} closed due to: {:?}", - hex_utils::hex_str(channel_id), + hex_utils::hex_str(&channel_id), reason ); print!("> "); @@ -673,36 +673,46 @@ async fn start_ldk() { } }); - // Step 18: Handle LDK Events - let channel_manager_event_listener = channel_manager.clone(); - let keys_manager_listener = keys_manager.clone(); // TODO: persist payment info to disk let inbound_payments: PaymentInfoStorage = Arc::new(Mutex::new(HashMap::new())); let outbound_payments: PaymentInfoStorage = Arc::new(Mutex::new(HashMap::new())); - let inbound_pmts_for_events = inbound_payments.clone(); - let outbound_pmts_for_events = outbound_payments.clone(); + + // Step 18: Handle LDK Events + let channel_manager_event_listener = Arc::clone(&channel_manager); + let bitcoind_client_event_listener = Arc::clone(&bitcoind_client); + let network_graph_event_listener = Arc::clone(&network_graph); + let keys_manager_event_listener = Arc::clone(&keys_manager); + let inbound_payments_event_listener = Arc::clone(&inbound_payments); + let outbound_payments_event_listener = Arc::clone(&outbound_payments); let network = args.network; - let bitcoind_rpc = bitcoind_client.clone(); - let network_graph_events = network_graph.clone(); - let handle = tokio::runtime::Handle::current(); let event_handler = move |event: Event| { - handle.block_on(handle_ldk_events( - &channel_manager_event_listener, - &bitcoind_rpc, - &network_graph_events, - &keys_manager_listener, - &inbound_pmts_for_events, - &outbound_pmts_for_events, - network, - &event, - )); + let channel_manager_event_listener = Arc::clone(&channel_manager_event_listener); + let bitcoind_client_event_listener = Arc::clone(&bitcoind_client_event_listener); + let network_graph_event_listener = Arc::clone(&network_graph_event_listener); + let keys_manager_event_listener = Arc::clone(&keys_manager_event_listener); + let inbound_payments_event_listener = Arc::clone(&inbound_payments_event_listener); + let outbound_payments_event_listener = Arc::clone(&outbound_payments_event_listener); + async move { + handle_ldk_events( + &channel_manager_event_listener, + &bitcoind_client_event_listener, + &network_graph_event_listener, + &keys_manager_event_listener, + &inbound_payments_event_listener, + &outbound_payments_event_listener, + network, + event, + ) + .await; + } }; // Step 19: Persist ChannelManager and NetworkGraph let persister = Arc::new(FilesystemPersister::new(ldk_data_dir.clone())); // Step 20: Background Processing - let background_processor = BackgroundProcessor::start( + let (bp_exit, bp_exit_check) = tokio::sync::watch::channel(()); + let background_processor = tokio::spawn(process_events_async( persister, event_handler, chain_monitor.clone(), @@ -711,7 +721,17 @@ async fn start_ldk() { peer_manager.clone(), logger.clone(), Some(scorer.clone()), - ); + move |t| { + let mut bp_exit_fut_check = bp_exit_check.clone(); + Box::pin(async move { + tokio::select! { + _ = tokio::time::sleep(t) => false, + _ = bp_exit_fut_check.changed() => true, + } + }) + }, + false, + )); // Regularly reconnect to channel peers. let connect_cm = Arc::clone(&channel_manager); @@ -792,7 +812,8 @@ async fn start_ldk() { peer_manager.disconnect_all_peers(); // Stop the background processor. - background_processor.stop().unwrap(); + bp_exit.send(()).unwrap(); + background_processor.await.unwrap().unwrap(); } #[tokio::main] -- 2.39.5