X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=src%2Fmain.rs;h=6480805d92759eed4d90407ada06a3f6eb7f202f;hb=fea709d5a9fba7553c8bbc25236a6640ef52dee5;hp=d43bb9b26a4d500e7de303911cc33bb2c61c3858;hpb=f10f5c7c1d206f84c84bfbebf3add1be5e12b9cd;p=ldk-sample diff --git a/src/main.rs b/src/main.rs index d43bb9b..6480805 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,40 +4,44 @@ mod cli; mod convert; mod disk; mod hex_utils; +mod sweep; use crate::bitcoind_client::BitcoindClient; use crate::disk::FilesystemLogger; use bitcoin::blockdata::transaction::Transaction; use bitcoin::consensus::encode; use bitcoin::network::constants::Network; -use bitcoin::secp256k1::Secp256k1; use bitcoin::BlockHash; use bitcoin_bech32::WitnessProgram; -use lightning::chain; -use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; -use lightning::chain::keysinterface::{EntropySource, InMemorySigner, KeysManager}; +use disk::{INBOUND_PAYMENTS_FNAME, OUTBOUND_PAYMENTS_FNAME}; use lightning::chain::{chainmonitor, ChannelMonitorUpdateStatus}; use lightning::chain::{Filter, Watch}; -use lightning::ln::channelmanager; +use lightning::events::bump_transaction::{BumpTransactionEventHandler, Wallet}; +use lightning::events::{Event, PaymentFailureReason, PaymentPurpose}; +use lightning::ln::channelmanager::{self, RecentPaymentDetails}; use lightning::ln::channelmanager::{ - ChainParameters, ChannelManagerReadArgs, SimpleArcChannelManager, + ChainParameters, ChannelManagerReadArgs, PaymentId, SimpleArcChannelManager, }; +use lightning::ln::msgs::DecodeError; use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler, SimpleArcPeerManager}; -use lightning::ln::{PaymentHash, PaymentPreimage, PaymentSecret}; -use lightning::onion_message::SimpleArcOnionMessenger; +use lightning::ln::{ChannelId, PaymentHash, PaymentPreimage, PaymentSecret}; +use lightning::onion_message::{DefaultMessageRouter, SimpleArcOnionMessenger}; use lightning::routing::gossip; use lightning::routing::gossip::{NodeId, P2PGossipSync}; use lightning::routing::router::DefaultRouter; +use lightning::routing::scoring::ProbabilisticScoringFeeParameters; +use lightning::sign::{EntropySource, InMemorySigner, KeysManager, SpendableOutputDescriptor}; use lightning::util::config::UserConfig; -use lightning::util::events::{Event, PaymentPurpose}; -use lightning::util::ser::ReadableArgs; -use lightning_background_processor::{BackgroundProcessor, GossipSync}; +use lightning::util::persist::{self, KVStore, MonitorUpdatingPersister}; +use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer}; +use lightning::{chain, impl_writeable_tlv_based, impl_writeable_tlv_based_enum}; +use lightning_background_processor::{process_events_async, GossipSync}; use lightning_block_sync::init; use lightning_block_sync::poll; use lightning_block_sync::SpvClient; use lightning_block_sync::UnboundedCache; use lightning_net_tokio::SocketDescriptor; -use lightning_persister::FilesystemPersister; +use lightning_persister::fs_store::FilesystemStore; use rand::{thread_rng, Rng}; use std::collections::hash_map::Entry; use std::collections::HashMap; @@ -49,15 +53,24 @@ use std::io; use std::io::Write; use std::path::Path; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, SystemTime}; +pub(crate) const PENDING_SPENDABLE_OUTPUT_DIR: &'static str = "pending_spendable_outputs"; + +#[derive(Copy, Clone)] pub(crate) enum HTLCStatus { Pending, Succeeded, Failed, } +impl_writeable_tlv_based_enum!(HTLCStatus, + (0, Pending) => {}, + (1, Succeeded) => {}, + (2, Failed) => {}; +); + pub(crate) struct MillisatAmount(Option); impl fmt::Display for MillisatAmount { @@ -69,6 +82,19 @@ impl fmt::Display for MillisatAmount { } } +impl Readable for MillisatAmount { + fn read(r: &mut R) -> Result { + let amt: Option = Readable::read(r)?; + Ok(MillisatAmount(amt)) + } +} + +impl Writeable for MillisatAmount { + fn write(&self, w: &mut W) -> Result<(), std::io::Error> { + self.0.write(w) + } +} + pub(crate) struct PaymentInfo { preimage: Option, secret: Option, @@ -76,7 +102,28 @@ pub(crate) struct PaymentInfo { amt_msat: MillisatAmount, } -pub(crate) type PaymentInfoStorage = Arc>>; +impl_writeable_tlv_based!(PaymentInfo, { + (0, preimage, required), + (2, secret, required), + (4, status, required), + (6, amt_msat, required), +}); + +pub(crate) struct InboundPaymentInfoStorage { + payments: HashMap, +} + +impl_writeable_tlv_based!(InboundPaymentInfoStorage, { + (0, payments, required), +}); + +pub(crate) struct OutboundPaymentInfoStorage { + payments: HashMap, +} + +impl_writeable_tlv_based!(OutboundPaymentInfoStorage, { + (0, payments, required), +}); type ChainMonitor = chainmonitor::ChainMonitor< InMemorySigner, @@ -84,7 +131,25 @@ type ChainMonitor = chainmonitor::ChainMonitor< Arc, Arc, Arc, - Arc, + Arc< + MonitorUpdatingPersister< + Arc, + Arc, + Arc, + Arc, + >, + >, +>; + +pub(crate) type GossipVerifier = lightning_block_sync::gossip::GossipVerifier< + lightning_block_sync::gossip::TokioSpawner, + Arc, + Arc, + SocketDescriptor, + Arc, + Arc, + IgnoringMessageHandler, + Arc, >; pub(crate) type PeerManager = SimpleArcPeerManager< @@ -92,7 +157,7 @@ pub(crate) type PeerManager = SimpleArcPeerManager< ChainMonitor, BitcoindClient, BitcoindClient, - BitcoindClient, + GossipVerifier, FilesystemLogger, >; @@ -101,13 +166,23 @@ pub(crate) type ChannelManager = pub(crate) type NetworkGraph = gossip::NetworkGraph>; -type OnionMessenger = SimpleArcOnionMessenger; +type OnionMessenger = + SimpleArcOnionMessenger; + +pub(crate) type BumpTxEventHandler = BumpTransactionEventHandler< + Arc, + Arc, Arc>>, + Arc, + Arc, +>; async fn handle_ldk_events( channel_manager: &Arc, bitcoind_client: &BitcoindClient, network_graph: &NetworkGraph, keys_manager: &KeysManager, - inbound_payments: &PaymentInfoStorage, outbound_payments: &PaymentInfoStorage, - network: Network, event: &Event, + bump_tx_event_handler: &BumpTxEventHandler, + inbound_payments: Arc>, + outbound_payments: Arc>, fs_store: &Arc, + network: Network, event: Event, ) { match event { Event::FundingGenerationReady { @@ -131,7 +206,7 @@ async fn handle_ldk_events( .expect("Lightning funding tx should always be to a SegWit output") .to_address(); let mut outputs = vec![HashMap::with_capacity(1)]; - outputs[0].insert(addr, *channel_value_satoshis as f64 / 100_000_000.0); + outputs[0].insert(addr, channel_value_satoshis as f64 / 100_000_000.0); let raw_tx = bitcoind_client.create_raw_transaction(outputs).await; // Have your wallet put the inputs into the transaction such that the output is @@ -147,7 +222,7 @@ async fn handle_ldk_events( if channel_manager .funding_transaction_generated( &temporary_channel_id, - counterparty_node_id, + &counterparty_node_id, final_tx, ) .is_err() @@ -165,36 +240,44 @@ async fn handle_ldk_events( receiver_node_id: _, via_channel_id: _, via_user_channel_id: _, + claim_deadline: _, + onion_fields: _, + counterparty_skimmed_fee_msat: _, } => { println!( "\nEVENT: received payment from payment hash {} of {} millisatoshis", - hex_utils::hex_str(&payment_hash.0), - amount_msat, + payment_hash, amount_msat, ); print!("> "); io::stdout().flush().unwrap(); let payment_preimage = match purpose { - PaymentPurpose::InvoicePayment { payment_preimage, .. } => *payment_preimage, - PaymentPurpose::SpontaneousPayment(preimage) => Some(*preimage), + PaymentPurpose::InvoicePayment { payment_preimage, .. } => payment_preimage, + PaymentPurpose::SpontaneousPayment(preimage) => Some(preimage), }; channel_manager.claim_funds(payment_preimage.unwrap()); } - Event::PaymentClaimed { payment_hash, purpose, amount_msat, receiver_node_id: _ } => { + Event::PaymentClaimed { + payment_hash, + purpose, + amount_msat, + receiver_node_id: _, + htlcs: _, + sender_intended_total_msat: _, + } => { println!( "\nEVENT: claimed payment from payment hash {} of {} millisatoshis", - hex_utils::hex_str(&payment_hash.0), - amount_msat, + payment_hash, amount_msat, ); print!("> "); io::stdout().flush().unwrap(); let (payment_preimage, payment_secret) = match purpose { PaymentPurpose::InvoicePayment { payment_preimage, payment_secret, .. } => { - (*payment_preimage, Some(*payment_secret)) + (payment_preimage, Some(payment_secret)) } - PaymentPurpose::SpontaneousPayment(preimage) => (Some(*preimage), None), + PaymentPurpose::SpontaneousPayment(preimage) => (Some(preimage), None), }; - let mut payments = inbound_payments.lock().unwrap(); - match payments.entry(*payment_hash) { + let mut inbound = inbound_payments.lock().unwrap(); + match inbound.payments.entry(payment_hash) { Entry::Occupied(mut e) => { let payment = e.get_mut(); payment.status = HTLCStatus::Succeeded; @@ -206,66 +289,111 @@ async fn handle_ldk_events( preimage: payment_preimage, secret: payment_secret, status: HTLCStatus::Succeeded, - amt_msat: MillisatAmount(Some(*amount_msat)), + amt_msat: MillisatAmount(Some(amount_msat)), }); } } + fs_store.write("", "", INBOUND_PAYMENTS_FNAME, &inbound.encode()).unwrap(); } - Event::PaymentSent { payment_preimage, payment_hash, fee_paid_msat, .. } => { - let mut payments = outbound_payments.lock().unwrap(); - for (hash, payment) in payments.iter_mut() { - if *hash == *payment_hash { - payment.preimage = Some(*payment_preimage); + Event::PaymentSent { + payment_preimage, payment_hash, fee_paid_msat, payment_id, .. + } => { + let mut outbound = outbound_payments.lock().unwrap(); + for (id, payment) in outbound.payments.iter_mut() { + if *id == payment_id.unwrap() { + payment.preimage = Some(payment_preimage); payment.status = HTLCStatus::Succeeded; println!( "\nEVENT: successfully sent payment of {} millisatoshis{} from \ - payment hash {:?} with preimage {:?}", + payment hash {} with preimage {}", payment.amt_msat, if let Some(fee) = fee_paid_msat { format!(" (fee {} msat)", fee) } else { "".to_string() }, - hex_utils::hex_str(&payment_hash.0), - hex_utils::hex_str(&payment_preimage.0) + payment_hash, + payment_preimage ); print!("> "); io::stdout().flush().unwrap(); } } + fs_store.write("", "", OUTBOUND_PAYMENTS_FNAME, &outbound.encode()).unwrap(); } - Event::OpenChannelRequest { .. } => { - // Unreachable, we don't set manually_accept_inbound_channels + Event::OpenChannelRequest { + ref temporary_channel_id, ref counterparty_node_id, .. + } => { + let mut random_bytes = [0u8; 16]; + random_bytes.copy_from_slice(&keys_manager.get_secure_random_bytes()[..16]); + let user_channel_id = u128::from_be_bytes(random_bytes); + let res = channel_manager.accept_inbound_channel( + temporary_channel_id, + counterparty_node_id, + user_channel_id, + ); + + if let Err(e) = res { + print!( + "\nEVENT: Failed to accept inbound channel ({}) from {}: {:?}", + temporary_channel_id, + hex_utils::hex_str(&counterparty_node_id.serialize()), + e, + ); + } else { + print!( + "\nEVENT: Accepted inbound channel ({}) from {}", + temporary_channel_id, + hex_utils::hex_str(&counterparty_node_id.serialize()), + ); + } + print!("> "); + io::stdout().flush().unwrap(); } Event::PaymentPathSuccessful { .. } => {} Event::PaymentPathFailed { .. } => {} Event::ProbeSuccessful { .. } => {} Event::ProbeFailed { .. } => {} - Event::PaymentFailed { payment_hash, .. } => { + Event::PaymentFailed { payment_hash, reason, payment_id, .. } => { print!( - "\nEVENT: Failed to send payment to payment hash {:?}: exhausted payment retry attempts", - hex_utils::hex_str(&payment_hash.0) + "\nEVENT: Failed to send payment to payment hash {}: {:?}", + payment_hash, + if let Some(r) = reason { r } else { PaymentFailureReason::RetriesExhausted } ); print!("> "); io::stdout().flush().unwrap(); - let mut payments = outbound_payments.lock().unwrap(); - if payments.contains_key(&payment_hash) { - let payment = payments.get_mut(&payment_hash).unwrap(); + let mut outbound = outbound_payments.lock().unwrap(); + if outbound.payments.contains_key(&payment_id) { + let payment = outbound.payments.get_mut(&payment_id).unwrap(); payment.status = HTLCStatus::Failed; } + fs_store.write("", "", OUTBOUND_PAYMENTS_FNAME, &outbound.encode()).unwrap(); + } + Event::InvoiceRequestFailed { payment_id } => { + print!("\nEVENT: Failed to request invoice to send payment with id {}", payment_id); + print!("> "); + io::stdout().flush().unwrap(); + + let mut outbound = outbound_payments.lock().unwrap(); + if outbound.payments.contains_key(&payment_id) { + let payment = outbound.payments.get_mut(&payment_id).unwrap(); + payment.status = HTLCStatus::Failed; + } + fs_store.write("", "", OUTBOUND_PAYMENTS_FNAME, &outbound.encode()).unwrap(); } Event::PaymentForwarded { prev_channel_id, next_channel_id, fee_earned_msat, claim_from_onchain_tx, + outbound_amount_forwarded_msat, } => { let read_only_network_graph = network_graph.read_only(); let nodes = read_only_network_graph.nodes(); let channels = channel_manager.list_channels(); - let node_str = |channel_id: &Option<[u8; 32]>| match channel_id { + let node_str = |channel_id: &Option| match channel_id { None => String::new(), Some(channel_id) => match channels.iter().find(|c| c.channel_id == *channel_id) { None => String::new(), @@ -282,30 +410,35 @@ async fn handle_ldk_events( } }, }; - let channel_str = |channel_id: &Option<[u8; 32]>| { + let channel_str = |channel_id: &Option| { channel_id - .map(|channel_id| format!(" with channel {}", hex_utils::hex_str(&channel_id))) + .map(|channel_id| format!(" with channel {}", channel_id)) .unwrap_or_default() }; let from_prev_str = - format!(" from {}{}", node_str(prev_channel_id), channel_str(prev_channel_id)); + format!(" from {}{}", node_str(&prev_channel_id), channel_str(&prev_channel_id)); let to_next_str = - format!(" to {}{}", node_str(next_channel_id), channel_str(next_channel_id)); + format!(" to {}{}", node_str(&next_channel_id), channel_str(&next_channel_id)); - let from_onchain_str = if *claim_from_onchain_tx { + let from_onchain_str = if claim_from_onchain_tx { "from onchain downstream claim" } else { "from HTLC fulfill message" }; + let amt_args = if let Some(v) = outbound_amount_forwarded_msat { + format!("{}", v) + } else { + "?".to_string() + }; if let Some(fee_earned) = fee_earned_msat { println!( - "\nEVENT: Forwarded payment{}{}, earning {} msat {}", - from_prev_str, to_next_str, fee_earned, from_onchain_str + "\nEVENT: Forwarded payment for {} msat{}{}, earning {} msat {}", + amt_args, from_prev_str, to_next_str, fee_earned, from_onchain_str ); } else { println!( - "\nEVENT: Forwarded payment{}{}, claiming onchain {}", - from_prev_str, to_next_str, from_onchain_str + "\nEVENT: Forwarded payment for {} msat{}{}, claiming onchain {}", + amt_args, from_prev_str, to_next_str, from_onchain_str ); } print!("> "); @@ -321,21 +454,31 @@ async fn handle_ldk_events( forwarding_channel_manager.process_pending_htlc_forwards(); }); } - Event::SpendableOutputs { outputs } => { - let destination_address = bitcoind_client.get_new_address().await; - let output_descriptors = &outputs.iter().map(|a| a).collect::>(); - let tx_feerate = - bitcoind_client.get_est_sat_per_1000_weight(ConfirmationTarget::Normal); - let spending_tx = keys_manager - .spend_spendable_outputs( - output_descriptors, - Vec::new(), - destination_address.script_pubkey(), - tx_feerate, - &Secp256k1::new(), - ) - .unwrap(); - bitcoind_client.broadcast_transaction(&spending_tx); + Event::SpendableOutputs { outputs, channel_id: _ } => { + // SpendableOutputDescriptors, of which outputs is a vec of, are critical to keep track + // of! While a `StaticOutput` descriptor is just an output to a static, well-known key, + // other descriptors are not currently ever regenerated for you by LDK. Once we return + // from this method, the descriptor will be gone, and you may lose track of some funds. + // + // Here we simply persist them to disk, with a background task running which will try + // to spend them regularly (possibly duplicatively/RBF'ing them). These can just be + // treated as normal funds where possible - they are only spendable by us and there is + // no rush to claim them. + for output in outputs { + let key = hex_utils::hex_str(&keys_manager.get_secure_random_bytes()); + // Note that if the type here changes our read code needs to change as well. + let output: SpendableOutputDescriptor = output; + fs_store.write(PENDING_SPENDABLE_OUTPUT_DIR, "", &key, &output.encode()).unwrap(); + } + } + Event::ChannelPending { channel_id, counterparty_node_id, .. } => { + println!( + "\nEVENT: Channel {} with peer {} is pending awaiting funding lock-in!", + channel_id, + hex_utils::hex_str(&counterparty_node_id.serialize()), + ); + print!("> "); + io::stdout().flush().unwrap(); } Event::ChannelReady { ref channel_id, @@ -345,16 +488,23 @@ async fn handle_ldk_events( } => { println!( "\nEVENT: Channel {} with peer {} is ready to be used!", - hex_utils::hex_str(channel_id), + channel_id, hex_utils::hex_str(&counterparty_node_id.serialize()), ); print!("> "); io::stdout().flush().unwrap(); } - Event::ChannelClosed { channel_id, reason, user_channel_id: _ } => { + Event::ChannelClosed { + channel_id, + reason, + user_channel_id: _, + counterparty_node_id, + channel_capacity_sats: _, + } => { println!( - "\nEVENT: Channel {} closed due to: {:?}", - hex_utils::hex_str(channel_id), + "\nEVENT: Channel {} with counterparty {} closed due to: {:?}", + channel_id, + counterparty_node_id.map(|id| format!("{}", id)).unwrap_or("".to_owned()), reason ); print!("> "); @@ -365,6 +515,7 @@ async fn handle_ldk_events( // the funding transaction either confirms, or this event is generated. } Event::HTLCIntercepted { .. } => {} + Event::BumpTransaction(event) => bump_tx_event_handler.handle_event(&event), } } @@ -378,6 +529,10 @@ async fn start_ldk() { let ldk_data_dir = format!("{}/.ldk", args.ldk_storage_dir_path); fs::create_dir_all(ldk_data_dir.clone()).unwrap(); + // ## Setup + // Step 1: Initialize the Logger + let logger = Arc::new(FilesystemLogger::new(ldk_data_dir.clone())); + // Initialize our bitcoind client. let bitcoind_client = match BitcoindClient::new( args.bitcoind_rpc_host.clone(), @@ -385,6 +540,7 @@ async fn start_ldk() { args.bitcoind_rpc_username.clone(), args.bitcoind_rpc_password.clone(), tokio::runtime::Handle::current(), + Arc::clone(&logger), ) .await { @@ -411,34 +567,18 @@ async fn start_ldk() { return; } - // ## Setup - // Step 1: Initialize the FeeEstimator + // Step 2: Initialize the FeeEstimator // BitcoindClient implements the FeeEstimator trait, so it'll act as our fee estimator. let fee_estimator = bitcoind_client.clone(); - // Step 2: Initialize the Logger - let logger = Arc::new(FilesystemLogger::new(ldk_data_dir.clone())); - // Step 3: Initialize the BroadcasterInterface // BitcoindClient implements the BroadcasterInterface trait, so it'll act as our transaction // broadcaster. let broadcaster = bitcoind_client.clone(); - // Step 4: Initialize Persist - let persister = Arc::new(FilesystemPersister::new(ldk_data_dir.clone())); - - // Step 5: Initialize the ChainMonitor - let chain_monitor: Arc = Arc::new(chainmonitor::ChainMonitor::new( - None, - broadcaster.clone(), - logger.clone(), - fee_estimator.clone(), - persister.clone(), - )); - - // Step 6: Initialize the KeysManager + // Step 4: Initialize the KeysManager // The key seed that we use to derive the node privkey (that corresponds to the node pubkey) and // other secret key material. @@ -453,7 +593,7 @@ async fn start_ldk() { thread_rng().fill_bytes(&mut key); match File::create(keys_seed_path.clone()) { Ok(mut f) => { - f.write_all(&key).expect("Failed to write node keys seed to disk"); + Write::write_all(&mut f, &key).expect("Failed to write node keys seed to disk"); f.sync_all().expect("Failed to sync node keys seed to disk"); } Err(e) => { @@ -466,9 +606,42 @@ async fn start_ldk() { let cur = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(); let keys_manager = Arc::new(KeysManager::new(&keys_seed, cur.as_secs(), cur.subsec_nanos())); + let bump_tx_event_handler = Arc::new(BumpTransactionEventHandler::new( + Arc::clone(&broadcaster), + Arc::new(Wallet::new(Arc::clone(&bitcoind_client), Arc::clone(&logger))), + Arc::clone(&keys_manager), + Arc::clone(&logger), + )); + + // Step 5: Initialize Persistence + let fs_store = Arc::new(FilesystemStore::new(ldk_data_dir.clone().into())); + let persister = Arc::new(MonitorUpdatingPersister::new( + Arc::clone(&fs_store), + Arc::clone(&logger), + 1000, + Arc::clone(&keys_manager), + Arc::clone(&keys_manager), + )); + // Alternatively, you can use the `FilesystemStore` as a `Persist` directly, at the cost of + // larger `ChannelMonitor` update writes (but no deletion or cleanup): + //let persister = Arc::clone(&fs_store); + + // Step 6: Initialize the ChainMonitor + let chain_monitor: Arc = Arc::new(chainmonitor::ChainMonitor::new( + None, + Arc::clone(&broadcaster), + Arc::clone(&logger), + Arc::clone(&fee_estimator), + Arc::clone(&persister), + )); + // Step 7: Read ChannelMonitor state from disk - let mut channelmonitors = - persister.read_channelmonitors(keys_manager.clone(), keys_manager.clone()).unwrap(); + let mut channelmonitors = persister + .read_all_channel_monitors_with_updates(&bitcoind_client, &bitcoind_client) + .unwrap(); + // If you are using the `FilesystemStore` as a `Persist` directly, use + // `lightning::util::persist::read_channel_monitors` like this: + //read_channel_monitors(Arc::clone(&persister), Arc::clone(&keys_manager), Arc::clone(&keys_manager)).unwrap(); // Step 8: Poll for the best chain tip, which may be used by the channel manager & spv client let polled_chain_tip = init::validate_best_block_header(bitcoind_client.as_ref()) @@ -481,23 +654,27 @@ async fn start_ldk() { Arc::new(disk::read_network(Path::new(&network_graph_path), args.network, logger.clone())); let scorer_path = format!("{}/scorer", ldk_data_dir.clone()); - let scorer = Arc::new(Mutex::new(disk::read_scorer( + let scorer = Arc::new(RwLock::new(disk::read_scorer( Path::new(&scorer_path), Arc::clone(&network_graph), Arc::clone(&logger), ))); // Step 10: Create Router + let scoring_fee_params = ProbabilisticScoringFeeParameters::default(); let router = Arc::new(DefaultRouter::new( network_graph.clone(), logger.clone(), keys_manager.get_secure_random_bytes(), scorer.clone(), + scoring_fee_params, )); // Step 11: Initialize the ChannelManager let mut user_config = UserConfig::default(); user_config.channel_handshake_limits.force_announced_channel_preference = false; + user_config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true; + user_config.manually_accept_inbound_channels = true; let mut restarting_node = true; let (channel_manager_blockhash, channel_manager) = { if let Ok(mut f) = fs::File::open(format!("{}/manager", ldk_data_dir.clone())) { @@ -537,6 +714,7 @@ async fn start_ldk() { keys_manager.clone(), user_config, chain_params, + cur.as_secs() as u32, ); (polled_best_block_hash, fresh_channel_manager) } @@ -585,16 +763,13 @@ async fn start_ldk() { let funding_outpoint = item.2; assert_eq!( chain_monitor.watch_channel(funding_outpoint, channel_monitor), - ChannelMonitorUpdateStatus::Completed + Ok(ChannelMonitorUpdateStatus::Completed) ); } // Step 14: Optional: Initialize the P2PGossipSync - let gossip_sync = Arc::new(P2PGossipSync::new( - Arc::clone(&network_graph), - None::>, - logger.clone(), - )); + let gossip_sync = + Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, Arc::clone(&logger))); // Step 15: Initialize the PeerManager let channel_manager: Arc = Arc::new(channel_manager); @@ -602,6 +777,8 @@ async fn start_ldk() { Arc::clone(&keys_manager), Arc::clone(&keys_manager), Arc::clone(&logger), + Arc::new(DefaultMessageRouter {}), + Arc::clone(&channel_manager), IgnoringMessageHandler {}, )); let mut ephemeral_bytes = [0; 32]; @@ -611,16 +788,25 @@ async fn start_ldk() { chan_handler: channel_manager.clone(), route_handler: gossip_sync.clone(), onion_message_handler: onion_messenger.clone(), + custom_message_handler: IgnoringMessageHandler {}, }; let peer_manager: Arc = Arc::new(PeerManager::new( lightning_msg_handler, current_time.try_into().unwrap(), &ephemeral_bytes, logger.clone(), - IgnoringMessageHandler {}, Arc::clone(&keys_manager), )); + // Install a GossipVerifier in in the P2PGossipSync + let utxo_lookup = GossipVerifier::new( + Arc::clone(&bitcoind_client.bitcoind_rpc_client), + lightning_block_sync::gossip::TokioSpawner, + Arc::clone(&gossip_sync), + Arc::clone(&peer_manager), + ); + gossip_sync.add_utxo_lookup(Some(utxo_lookup)); + // ## Running LDK // Step 16: Initialize networking @@ -663,37 +849,79 @@ async fn start_ldk() { } }); + let inbound_payments = Arc::new(Mutex::new(disk::read_inbound_payment_info(Path::new( + &format!("{}/{}", ldk_data_dir, INBOUND_PAYMENTS_FNAME), + )))); + let outbound_payments = Arc::new(Mutex::new(disk::read_outbound_payment_info(Path::new( + &format!("{}/{}", ldk_data_dir, OUTBOUND_PAYMENTS_FNAME), + )))); + let recent_payments_payment_ids = channel_manager + .list_recent_payments() + .into_iter() + .filter_map(|p| match p { + RecentPaymentDetails::Pending { payment_id, .. } => Some(payment_id), + RecentPaymentDetails::Fulfilled { payment_id, .. } => Some(payment_id), + RecentPaymentDetails::Abandoned { payment_id, .. } => Some(payment_id), + RecentPaymentDetails::AwaitingInvoice { payment_id } => Some(payment_id), + }) + .collect::>(); + for (payment_id, payment_info) in outbound_payments + .lock() + .unwrap() + .payments + .iter_mut() + .filter(|(_, i)| matches!(i.status, HTLCStatus::Pending)) + { + if !recent_payments_payment_ids.contains(payment_id) { + payment_info.status = HTLCStatus::Failed; + } + } + fs_store + .write("", "", OUTBOUND_PAYMENTS_FNAME, &outbound_payments.lock().unwrap().encode()) + .unwrap(); + // Step 18: Handle LDK Events - let channel_manager_event_listener = channel_manager.clone(); - let keys_manager_listener = keys_manager.clone(); - // TODO: persist payment info to disk - let inbound_payments: PaymentInfoStorage = Arc::new(Mutex::new(HashMap::new())); - let outbound_payments: PaymentInfoStorage = Arc::new(Mutex::new(HashMap::new())); - let inbound_pmts_for_events = inbound_payments.clone(); - let outbound_pmts_for_events = outbound_payments.clone(); + let channel_manager_event_listener = Arc::clone(&channel_manager); + let bitcoind_client_event_listener = Arc::clone(&bitcoind_client); + let network_graph_event_listener = Arc::clone(&network_graph); + let keys_manager_event_listener = Arc::clone(&keys_manager); + let inbound_payments_event_listener = Arc::clone(&inbound_payments); + let outbound_payments_event_listener = Arc::clone(&outbound_payments); + let fs_store_event_listener = Arc::clone(&fs_store); let network = args.network; - let bitcoind_rpc = bitcoind_client.clone(); - let network_graph_events = network_graph.clone(); - let handle = tokio::runtime::Handle::current(); let event_handler = move |event: Event| { - handle.block_on(handle_ldk_events( - &channel_manager_event_listener, - &bitcoind_rpc, - &network_graph_events, - &keys_manager_listener, - &inbound_pmts_for_events, - &outbound_pmts_for_events, - network, - &event, - )); + let channel_manager_event_listener = Arc::clone(&channel_manager_event_listener); + let bitcoind_client_event_listener = Arc::clone(&bitcoind_client_event_listener); + let network_graph_event_listener = Arc::clone(&network_graph_event_listener); + let keys_manager_event_listener = Arc::clone(&keys_manager_event_listener); + let bump_tx_event_handler = Arc::clone(&bump_tx_event_handler); + let inbound_payments_event_listener = Arc::clone(&inbound_payments_event_listener); + let outbound_payments_event_listener = Arc::clone(&outbound_payments_event_listener); + let fs_store_event_listener = Arc::clone(&fs_store_event_listener); + async move { + handle_ldk_events( + &channel_manager_event_listener, + &bitcoind_client_event_listener, + &network_graph_event_listener, + &keys_manager_event_listener, + &bump_tx_event_handler, + inbound_payments_event_listener, + outbound_payments_event_listener, + &fs_store_event_listener, + network, + event, + ) + .await; + } }; // Step 19: Persist ChannelManager and NetworkGraph - let persister = Arc::new(FilesystemPersister::new(ldk_data_dir.clone())); + let persister = Arc::new(FilesystemStore::new(ldk_data_dir.clone().into())); // Step 20: Background Processing - let background_processor = BackgroundProcessor::start( - persister, + let (bp_exit, bp_exit_check) = tokio::sync::watch::channel(()); + let mut background_processor = tokio::spawn(process_events_async( + Arc::clone(&persister), event_handler, chain_monitor.clone(), channel_manager.clone(), @@ -701,15 +929,26 @@ async fn start_ldk() { peer_manager.clone(), logger.clone(), Some(scorer.clone()), - ); + move |t| { + let mut bp_exit_fut_check = bp_exit_check.clone(); + Box::pin(async move { + tokio::select! { + _ = tokio::time::sleep(t) => false, + _ = bp_exit_fut_check.changed() => true, + } + }) + }, + false, + )); // Regularly reconnect to channel peers. let connect_cm = Arc::clone(&channel_manager); let connect_pm = Arc::clone(&peer_manager); - let peer_data_path = format!("{}/channel_peer_data", ldk_data_dir.clone()); + let peer_data_path = format!("{}/channel_peer_data", ldk_data_dir); let stop_connect = Arc::clone(&stop_listen_connect); tokio::spawn(async move { let mut interval = tokio::time::interval(Duration::from_secs(1)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); loop { interval.tick().await; match disk::read_channel_peer_data(Path::new(&peer_data_path)) { @@ -742,47 +981,103 @@ async fn start_ldk() { }); // Regularly broadcast our node_announcement. This is only required (or possible) if we have - // some public channels, and is only useful if we have public listen address(es) to announce. - // In a production environment, this should occur only after the announcement of new channels - // to avoid churn in the global network graph. + // some public channels. let peer_man = Arc::clone(&peer_manager); + let chan_man = Arc::clone(&channel_manager); let network = args.network; - if !args.ldk_announced_listen_addr.is_empty() { - tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(60)); - loop { - interval.tick().await; + tokio::spawn(async move { + // First wait a minute until we have some peers and maybe have opened a channel. + tokio::time::sleep(Duration::from_secs(60)).await; + // Then, update our announcement once an hour to keep it fresh but avoid unnecessary churn + // in the global gossip network. + let mut interval = tokio::time::interval(Duration::from_secs(3600)); + loop { + interval.tick().await; + // Don't bother trying to announce if we don't have any public channls, though our + // peers should drop such an announcement anyway. Note that announcement may not + // propagate until we have a channel with 6+ confirmations. + if chan_man.list_channels().iter().any(|chan| chan.is_public) { peer_man.broadcast_node_announcement( [0; 3], args.ldk_announced_node_name, args.ldk_announced_listen_addr.clone(), ); } - }); - } + } + }); - // Start the CLI. - cli::poll_for_user_input( - Arc::clone(&peer_manager), - Arc::clone(&channel_manager), - Arc::clone(&keys_manager), - Arc::clone(&network_graph), - Arc::clone(&onion_messenger), - inbound_payments, - outbound_payments, + tokio::spawn(sweep::periodic_sweep( ldk_data_dir.clone(), - network, + Arc::clone(&keys_manager), Arc::clone(&logger), - ) - .await; + Arc::clone(&persister), + Arc::clone(&bitcoind_client), + Arc::clone(&channel_manager), + )); + + // Start the CLI. + let cli_channel_manager = Arc::clone(&channel_manager); + let cli_persister = Arc::clone(&persister); + let cli_logger = Arc::clone(&logger); + let cli_peer_manager = Arc::clone(&peer_manager); + let cli_poll = tokio::task::spawn_blocking(move || { + cli::poll_for_user_input( + cli_peer_manager, + cli_channel_manager, + keys_manager, + network_graph, + onion_messenger, + inbound_payments, + outbound_payments, + ldk_data_dir, + network, + cli_logger, + cli_persister, + ) + }); + + // Exit if either CLI polling exits or the background processor exits (which shouldn't happen + // unless we fail to write to the filesystem). + let mut bg_res = Ok(Ok(())); + tokio::select! { + _ = cli_poll => {}, + bg_exit = &mut background_processor => { + bg_res = bg_exit; + }, + } // Disconnect our peers and stop accepting new connections. This ensures we don't continue // updating our channel data after we've stopped the background processor. stop_listen_connect.store(true, Ordering::Release); peer_manager.disconnect_all_peers(); + if let Err(e) = bg_res { + let persist_res = persister + .write( + persist::CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + persist::CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + persist::CHANNEL_MANAGER_PERSISTENCE_KEY, + &channel_manager.encode(), + ) + .unwrap(); + use lightning::util::logger::Logger; + lightning::log_error!( + &*logger, + "Last-ditch ChannelManager persistence result: {:?}", + persist_res + ); + panic!( + "ERR: background processing stopped with result {:?}, exiting.\n\ + Last-ditch ChannelManager persistence result {:?}", + e, persist_res + ); + } + // Stop the background processor. - background_processor.stop().unwrap(); + if !bp_exit.is_closed() { + bp_exit.send(()).unwrap(); + background_processor.await.unwrap().unwrap(); + } } #[tokio::main]