use lightning::ln::{PaymentHash, PaymentPreimage, PaymentSecret};
use lightning::routing::network_graph::NetGraphMsgHandler;
use lightning::util::config::UserConfig;
-use lightning::util::events::{Event, EventsProvider};
+use lightning::util::events::Event;
use lightning::util::ser::ReadableArgs;
use lightning_background_processor::BackgroundProcessor;
use lightning_block_sync::init;
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
-use tokio::sync::mpsc;
-use tokio::sync::mpsc::Receiver;
pub(crate) enum HTLCStatus {
Pending,
SimpleArcChannelManager<ChainMonitor, BitcoindClient, BitcoindClient, FilesystemLogger>;
async fn handle_ldk_events(
- channel_manager: Arc<ChannelManager>, chain_monitor: Arc<ChainMonitor>,
- bitcoind_client: Arc<BitcoindClient>, keys_manager: Arc<KeysManager>,
- inbound_payments: PaymentInfoStorage, outbound_payments: PaymentInfoStorage, network: Network,
- mut event_receiver: Receiver<()>,
+ channel_manager: Arc<ChannelManager>, bitcoind_client: Arc<BitcoindClient>,
+ keys_manager: Arc<KeysManager>, inbound_payments: PaymentInfoStorage,
+ outbound_payments: PaymentInfoStorage, network: Network, event: Event,
) {
- loop {
- let received = event_receiver.recv();
- if received.await.is_none() {
- println!("LDK Event channel closed!");
- return;
+ match event {
+ Event::FundingGenerationReady {
+ temporary_channel_id,
+ channel_value_satoshis,
+ output_script,
+ ..
+ } => {
+ // Construct the raw transaction with one output, that is paid the amount of the
+ // channel.
+ let addr = WitnessProgram::from_scriptpubkey(
+ &output_script[..],
+ match network {
+ Network::Bitcoin => bitcoin_bech32::constants::Network::Bitcoin,
+ Network::Testnet => bitcoin_bech32::constants::Network::Testnet,
+ Network::Regtest => bitcoin_bech32::constants::Network::Regtest,
+ Network::Signet => panic!("Signet unsupported"),
+ },
+ )
+ .expect("Lightning funding tx should always be to a SegWit output")
+ .to_address();
+ let mut outputs = vec![HashMap::with_capacity(1)];
+ outputs[0].insert(addr, channel_value_satoshis as f64 / 100_000_000.0);
+ let raw_tx = bitcoind_client.create_raw_transaction(outputs).await;
+
+ // Have your wallet put the inputs into the transaction such that the output is
+ // satisfied.
+ let funded_tx = bitcoind_client.fund_raw_transaction(raw_tx).await;
+ let change_output_position = funded_tx.changepos;
+ assert!(change_output_position == 0 || change_output_position == 1);
+
+ // Sign the final funding transaction and broadcast it.
+ let signed_tx = bitcoind_client.sign_raw_transaction_with_wallet(funded_tx.hex).await;
+ assert_eq!(signed_tx.complete, true);
+ let final_tx: Transaction =
+ encode::deserialize(&hex_utils::to_vec(&signed_tx.hex).unwrap()).unwrap();
+ // Give the funding transaction back to LDK for opening the channel.
+ channel_manager.funding_transaction_generated(&temporary_channel_id, final_tx).unwrap();
}
- let loop_channel_manager = channel_manager.clone();
- let mut events = channel_manager.get_and_clear_pending_events();
- events.append(&mut chain_monitor.get_and_clear_pending_events());
- for event in events {
- match event {
- Event::FundingGenerationReady {
- temporary_channel_id,
- channel_value_satoshis,
- output_script,
- ..
- } => {
- // Construct the raw transaction with one output, that is paid the amount of the
- // channel.
- let addr = WitnessProgram::from_scriptpubkey(
- &output_script[..],
- match network {
- Network::Bitcoin => bitcoin_bech32::constants::Network::Bitcoin,
- Network::Testnet => bitcoin_bech32::constants::Network::Testnet,
- Network::Regtest => bitcoin_bech32::constants::Network::Regtest,
- Network::Signet => panic!("Signet unsupported"),
- },
- )
- .expect("Lightning funding tx should always be to a SegWit output")
- .to_address();
- let mut outputs = vec![HashMap::with_capacity(1)];
- outputs[0].insert(addr, channel_value_satoshis as f64 / 100_000_000.0);
- let raw_tx = bitcoind_client.create_raw_transaction(outputs).await;
-
- // Have your wallet put the inputs into the transaction such that the output is
- // satisfied.
- let funded_tx = bitcoind_client.fund_raw_transaction(raw_tx).await;
- let change_output_position = funded_tx.changepos;
- assert!(change_output_position == 0 || change_output_position == 1);
-
- // Sign the final funding transaction and broadcast it.
- let signed_tx =
- bitcoind_client.sign_raw_transaction_with_wallet(funded_tx.hex).await;
- assert_eq!(signed_tx.complete, true);
- let final_tx: Transaction =
- encode::deserialize(&hex_utils::to_vec(&signed_tx.hex).unwrap()).unwrap();
- // Give the funding transaction back to LDK for opening the channel.
- loop_channel_manager
- .funding_transaction_generated(&temporary_channel_id, final_tx)
- .unwrap();
- }
- Event::PaymentReceived {
- payment_hash,
- payment_preimage,
- payment_secret,
- amt,
- ..
- } => {
- let mut payments = inbound_payments.lock().unwrap();
- let status = match loop_channel_manager.claim_funds(payment_preimage.unwrap()) {
- true => {
- println!(
- "\nEVENT: received payment from payment hash {} of {} millisatoshis",
- hex_utils::hex_str(&payment_hash.0),
- amt
- );
- print!("> ");
- io::stdout().flush().unwrap();
- HTLCStatus::Succeeded
- }
- _ => HTLCStatus::Failed,
- };
- match payments.entry(payment_hash) {
- Entry::Occupied(mut e) => {
- let payment = e.get_mut();
- payment.status = status;
- payment.preimage = Some(payment_preimage.unwrap());
- payment.secret = Some(payment_secret);
- }
- Entry::Vacant(e) => {
- e.insert(PaymentInfo {
- preimage: Some(payment_preimage.unwrap()),
- secret: Some(payment_secret),
- status,
- amt_msat: MillisatAmount(Some(amt)),
- });
- }
- }
- }
- Event::PaymentSent { payment_preimage } => {
- let hashed = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner());
- let mut payments = outbound_payments.lock().unwrap();
- for (payment_hash, payment) in payments.iter_mut() {
- if *payment_hash == hashed {
- payment.preimage = Some(payment_preimage);
- payment.status = HTLCStatus::Succeeded;
- println!(
- "\nEVENT: successfully sent payment of {} millisatoshis from \
- payment hash {:?} with preimage {:?}",
- payment.amt_msat,
- hex_utils::hex_str(&payment_hash.0),
- hex_utils::hex_str(&payment_preimage.0)
- );
- print!("> ");
- io::stdout().flush().unwrap();
- }
- }
- }
- Event::PaymentFailed { payment_hash, rejected_by_dest } => {
- print!(
- "\nEVENT: Failed to send payment to payment hash {:?}: ",
- hex_utils::hex_str(&payment_hash.0)
+ Event::PaymentReceived { payment_hash, payment_preimage, payment_secret, amt, .. } => {
+ let mut payments = inbound_payments.lock().unwrap();
+ let status = match channel_manager.claim_funds(payment_preimage.unwrap()) {
+ true => {
+ println!(
+ "\nEVENT: received payment from payment hash {} of {} millisatoshis",
+ hex_utils::hex_str(&payment_hash.0),
+ amt
);
- if rejected_by_dest {
- println!("re-attempting the payment will not succeed");
- } else {
- println!("payment may be retried");
- }
print!("> ");
io::stdout().flush().unwrap();
-
- let mut payments = outbound_payments.lock().unwrap();
- if payments.contains_key(&payment_hash) {
- let payment = payments.get_mut(&payment_hash).unwrap();
- payment.status = HTLCStatus::Failed;
- }
+ HTLCStatus::Succeeded
+ }
+ _ => HTLCStatus::Failed,
+ };
+ match payments.entry(payment_hash) {
+ Entry::Occupied(mut e) => {
+ let payment = e.get_mut();
+ payment.status = status;
+ payment.preimage = Some(payment_preimage.unwrap());
+ payment.secret = Some(payment_secret);
}
- Event::PendingHTLCsForwardable { time_forwardable } => {
- let forwarding_channel_manager = loop_channel_manager.clone();
- tokio::spawn(async move {
- let min = time_forwardable.as_millis() as u64;
- let millis_to_sleep = thread_rng().gen_range(min, min * 5) as u64;
- tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await;
- forwarding_channel_manager.process_pending_htlc_forwards();
+ Entry::Vacant(e) => {
+ e.insert(PaymentInfo {
+ preimage: Some(payment_preimage.unwrap()),
+ secret: Some(payment_secret),
+ status,
+ amt_msat: MillisatAmount(Some(amt)),
});
}
- Event::SpendableOutputs { outputs } => {
- let destination_address = bitcoind_client.get_new_address().await;
- let output_descriptors = &outputs.iter().map(|a| a).collect::<Vec<_>>();
- let tx_feerate =
- bitcoind_client.get_est_sat_per_1000_weight(ConfirmationTarget::Normal);
- let spending_tx = keys_manager
- .spend_spendable_outputs(
- output_descriptors,
- Vec::new(),
- destination_address.script_pubkey(),
- tx_feerate,
- &Secp256k1::new(),
- )
- .unwrap();
- bitcoind_client.broadcast_transaction(&spending_tx);
+ }
+ }
+ Event::PaymentSent { payment_preimage } => {
+ let hashed = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner());
+ let mut payments = outbound_payments.lock().unwrap();
+ for (payment_hash, payment) in payments.iter_mut() {
+ if *payment_hash == hashed {
+ payment.preimage = Some(payment_preimage);
+ payment.status = HTLCStatus::Succeeded;
+ println!(
+ "\nEVENT: successfully sent payment of {} millisatoshis from \
+ payment hash {:?} with preimage {:?}",
+ payment.amt_msat,
+ hex_utils::hex_str(&payment_hash.0),
+ hex_utils::hex_str(&payment_preimage.0)
+ );
+ print!("> ");
+ io::stdout().flush().unwrap();
}
}
}
- tokio::time::sleep(Duration::from_secs(1)).await;
+ Event::PaymentFailed { payment_hash, rejected_by_dest } => {
+ print!(
+ "\nEVENT: Failed to send payment to payment hash {:?}: ",
+ hex_utils::hex_str(&payment_hash.0)
+ );
+ if rejected_by_dest {
+ println!("re-attempting the payment will not succeed");
+ } else {
+ println!("payment may be retried");
+ }
+ print!("> ");
+ io::stdout().flush().unwrap();
+
+ let mut payments = outbound_payments.lock().unwrap();
+ if payments.contains_key(&payment_hash) {
+ let payment = payments.get_mut(&payment_hash).unwrap();
+ payment.status = HTLCStatus::Failed;
+ }
+ }
+ Event::PendingHTLCsForwardable { time_forwardable } => {
+ let forwarding_channel_manager = channel_manager.clone();
+ tokio::spawn(async move {
+ let min = time_forwardable.as_millis() as u64;
+ let millis_to_sleep = thread_rng().gen_range(min, min * 5) as u64;
+ tokio::time::sleep(Duration::from_millis(millis_to_sleep)).await;
+ forwarding_channel_manager.process_pending_htlc_forwards();
+ });
+ }
+ Event::SpendableOutputs { outputs } => {
+ let destination_address = bitcoind_client.get_new_address().await;
+ let output_descriptors = &outputs.iter().map(|a| a).collect::<Vec<_>>();
+ let tx_feerate =
+ bitcoind_client.get_est_sat_per_1000_weight(ConfirmationTarget::Normal);
+ let spending_tx = keys_manager
+ .spend_spendable_outputs(
+ output_descriptors,
+ Vec::new(),
+ destination_address.script_pubkey(),
+ tx_feerate,
+ &Secp256k1::new(),
+ )
+ .unwrap();
+ bitcoind_client.broadcast_transaction(&spending_tx);
+ }
}
}
// ## Running LDK
// Step 13: Initialize networking
- // We poll for events in handle_ldk_events(..) rather than waiting for them over the
- // mpsc::channel, so we can leave the event receiver as unused.
- let (event_ntfn_sender, event_ntfn_receiver) = mpsc::channel(2);
let peer_manager_connection_handler = peer_manager.clone();
- let event_notifier = event_ntfn_sender.clone();
let listening_port = args.ldk_peer_listening_port;
tokio::spawn(async move {
let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", listening_port))
.expect("Failed to bind to listen port - is something else already listening on it?");
loop {
let peer_mgr = peer_manager_connection_handler.clone();
- let notifier = event_notifier.clone();
let tcp_stream = listener.accept().await.unwrap().0;
tokio::spawn(async move {
lightning_net_tokio::setup_inbound(
peer_mgr.clone(),
- notifier.clone(),
tcp_stream.into_std().unwrap(),
)
.await;
}
});
- // Step 15: Initialize LDK Event Handling
+ // Step 15: Event Handling
let channel_manager_event_listener = channel_manager.clone();
- let chain_monitor_event_listener = chain_monitor.clone();
let keys_manager_listener = keys_manager.clone();
// TODO: persist payment info to disk
let inbound_payments: PaymentInfoStorage = Arc::new(Mutex::new(HashMap::new()));
let outbound_pmts_for_events = outbound_payments.clone();
let network = args.network;
let bitcoind_rpc = bitcoind_client.clone();
- tokio::spawn(async move {
- handle_ldk_events(
- channel_manager_event_listener,
- chain_monitor_event_listener,
- bitcoind_rpc,
- keys_manager_listener,
- inbound_pmts_for_events,
- outbound_pmts_for_events,
+ let handle = tokio::runtime::Handle::current();
+ let event_handler = move |event| {
+ handle.block_on(handle_ldk_events(
+ channel_manager_event_listener.clone(),
+ bitcoind_rpc.clone(),
+ keys_manager_listener.clone(),
+ inbound_pmts_for_events.clone(),
+ outbound_pmts_for_events.clone(),
network,
- event_ntfn_receiver,
- )
- .await;
- });
-
- // Step 16 & 17: Persist ChannelManager & Background Processing
+ event,
+ ))
+ };
+ // Step 16: ChannelManager Persisting
let data_dir = ldk_data_dir.clone();
let persist_channel_manager_callback =
move |node: &ChannelManager| FilesystemPersister::persist_manager(data_dir.clone(), &*node);
+ // Step 17: Background Processing
BackgroundProcessor::start(
persist_channel_manager_callback,
+ event_handler,
+ chain_monitor.clone(),
channel_manager.clone(),
peer_manager.clone(),
logger.clone(),
for (pubkey, peer_addr) in info.drain() {
for chan_info in channel_manager.list_channels() {
if pubkey == chan_info.remote_network_id {
- let _ = cli::connect_peer_if_necessary(
- pubkey,
- peer_addr,
- peer_manager.clone(),
- event_ntfn_sender.clone(),
- )
- .await;
+ let _ =
+ cli::connect_peer_if_necessary(pubkey, peer_addr, peer_manager.clone())
+ .await;
}
}
}
router.clone(),
inbound_payments,
outbound_payments,
- event_ntfn_sender,
ldk_data_dir.clone(),
logger.clone(),
network,