From 479964a45b965cbdbd4a0ca8e43cbd307ddc989b Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Wed, 18 May 2022 14:21:44 -0500 Subject: [PATCH] Update to LDK 0.0.107 --- Cargo.toml | 15 +++-- src/bitcoind_client.rs | 90 ++++++++++++------------- src/cli.rs | 97 ++++++++++++++++++++++----- src/disk.rs | 43 ++++-------- src/hex_utils.rs | 2 +- src/main.rs | 146 ++++++++++++++++++++--------------------- 6 files changed, 215 insertions(+), 178 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c673253..7725822 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,15 +8,16 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -lightning = { version = "0.0.106", features = ["max_level_trace"] } -lightning-block-sync = { version = "0.0.106", features = [ "rpc-client" ] } -lightning-invoice = { version = "0.14" } -lightning-net-tokio = { version = "0.0.106" } -lightning-persister = { version = "0.0.106" } -lightning-background-processor = { version = "0.0.106" } +lightning = { version = "0.0.107", features = ["max_level_trace"] } +lightning-block-sync = { version = "0.0.107", features = [ "rpc-client" ] } +lightning-invoice = { version = "0.15" } +lightning-net-tokio = { version = "0.0.107" } +lightning-persister = { version = "0.0.107" } +lightning-background-processor = { version = "0.0.107" } +lightning-rapid-gossip-sync = { version = "0.0.107" } base64 = "0.13.0" -bitcoin = "0.27" +bitcoin = "0.28.1" bitcoin-bech32 = "0.12" bech32 = "0.8" hex = "0.3" diff --git a/src/bitcoind_client.rs b/src/bitcoind_client.rs index 15ee8cf..70928a1 100644 --- a/src/bitcoind_client.rs +++ b/src/bitcoind_client.rs @@ -15,10 +15,9 @@ use std::str::FromStr; use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use std::time::Duration; -use tokio::sync::Mutex; pub struct BitcoindClient { - bitcoind_rpc_client: Arc>, + bitcoind_rpc_client: Arc, host: String, port: u16, rpc_user: String, @@ -36,28 +35,17 @@ pub enum Target { impl BlockSource for &BitcoindClient { fn get_header<'a>( - &'a mut self, header_hash: &'a BlockHash, height_hint: Option, + &'a self, header_hash: &'a BlockHash, height_hint: Option, ) -> AsyncBlockSourceResult<'a, BlockHeaderData> { - Box::pin(async move { - let mut rpc = self.bitcoind_rpc_client.lock().await; - rpc.get_header(header_hash, height_hint).await - }) + Box::pin(async move { self.bitcoind_rpc_client.get_header(header_hash, height_hint).await }) } - fn get_block<'a>( - &'a mut self, header_hash: &'a BlockHash, - ) -> AsyncBlockSourceResult<'a, Block> { - Box::pin(async move { - let mut rpc = self.bitcoind_rpc_client.lock().await; - rpc.get_block(header_hash).await - }) + fn get_block<'a>(&'a self, header_hash: &'a BlockHash) -> AsyncBlockSourceResult<'a, Block> { + Box::pin(async move { self.bitcoind_rpc_client.get_block(header_hash).await }) } - fn get_best_block<'a>(&'a mut self) -> AsyncBlockSourceResult<(BlockHash, Option)> { - Box::pin(async move { - let mut rpc = self.bitcoind_rpc_client.lock().await; - rpc.get_best_block().await - }) + fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<(BlockHash, Option)> { + Box::pin(async move { self.bitcoind_rpc_client.get_best_block().await }) } } @@ -72,7 +60,7 @@ impl BitcoindClient { let http_endpoint = HttpEndpoint::for_host(host.clone()).with_port(port); let rpc_credentials = base64::encode(format!("{}:{}", rpc_user.clone(), rpc_password.clone())); - let mut bitcoind_rpc_client = RpcClient::new(&rpc_credentials, http_endpoint)?; + let bitcoind_rpc_client = RpcClient::new(&rpc_credentials, http_endpoint)?; let _dummy = bitcoind_rpc_client .call_method::("getblockchaininfo", &vec![]) .await @@ -85,7 +73,7 @@ impl BitcoindClient { fees.insert(Target::Normal, AtomicU32::new(2000)); fees.insert(Target::HighPriority, AtomicU32::new(5000)); let client = Self { - bitcoind_rpc_client: Arc::new(Mutex::new(bitcoind_rpc_client)), + bitcoind_rpc_client: Arc::new(bitcoind_rpc_client), host, port, rpc_user, @@ -102,16 +90,15 @@ impl BitcoindClient { } fn poll_for_fee_estimates( - fees: Arc>, rpc_client: Arc>, + fees: Arc>, rpc_client: Arc, handle: tokio::runtime::Handle, ) { handle.spawn(async move { loop { let background_estimate = { - let mut rpc = rpc_client.lock().await; let background_conf_target = serde_json::json!(144); let background_estimate_mode = serde_json::json!("ECONOMICAL"); - let resp = rpc + let resp = rpc_client .call_method::( "estimatesmartfee", &vec![background_conf_target, background_estimate_mode], @@ -125,10 +112,9 @@ impl BitcoindClient { }; let normal_estimate = { - let mut rpc = rpc_client.lock().await; let normal_conf_target = serde_json::json!(18); let normal_estimate_mode = serde_json::json!("ECONOMICAL"); - let resp = rpc + let resp = rpc_client .call_method::( "estimatesmartfee", &vec![normal_conf_target, normal_estimate_mode], @@ -142,10 +128,9 @@ impl BitcoindClient { }; let high_prio_estimate = { - let mut rpc = rpc_client.lock().await; let high_prio_conf_target = serde_json::json!(6); let high_prio_estimate_mode = serde_json::json!("CONSERVATIVE"); - let resp = rpc + let resp = rpc_client .call_method::( "estimatesmartfee", &vec![high_prio_conf_target, high_prio_estimate_mode], @@ -179,17 +164,17 @@ impl BitcoindClient { } pub async fn create_raw_transaction(&self, outputs: Vec>) -> RawTx { - let mut rpc = self.bitcoind_rpc_client.lock().await; - let outputs_json = serde_json::json!(outputs); - rpc.call_method::("createrawtransaction", &vec![serde_json::json!([]), outputs_json]) + self.bitcoind_rpc_client + .call_method::( + "createrawtransaction", + &vec![serde_json::json!([]), outputs_json], + ) .await .unwrap() } pub async fn fund_raw_transaction(&self, raw_tx: RawTx) -> FundedTx { - let mut rpc = self.bitcoind_rpc_client.lock().await; - let raw_tx_json = serde_json::json!(raw_tx.0); let options = serde_json::json!({ // LDK gives us feerates in satoshis per KW but Bitcoin Core here expects fees @@ -203,34 +188,43 @@ impl BitcoindClient { // change address or to a new channel output negotiated with the same node. "replaceable": false, }); - rpc.call_method("fundrawtransaction", &[raw_tx_json, options]).await.unwrap() + self.bitcoind_rpc_client + .call_method("fundrawtransaction", &[raw_tx_json, options]) + .await + .unwrap() } pub async fn send_raw_transaction(&self, raw_tx: RawTx) { - let mut rpc = self.bitcoind_rpc_client.lock().await; - let raw_tx_json = serde_json::json!(raw_tx.0); - rpc.call_method::("sendrawtransaction", &[raw_tx_json]).await.unwrap(); + self.bitcoind_rpc_client + .call_method::("sendrawtransaction", &[raw_tx_json]) + .await + .unwrap(); } pub async fn sign_raw_transaction_with_wallet(&self, tx_hex: String) -> SignedTx { - let mut rpc = self.bitcoind_rpc_client.lock().await; - let tx_hex_json = serde_json::json!(tx_hex); - rpc.call_method("signrawtransactionwithwallet", &vec![tx_hex_json]).await.unwrap() + self.bitcoind_rpc_client + .call_method("signrawtransactionwithwallet", &vec![tx_hex_json]) + .await + .unwrap() } pub async fn get_new_address(&self) -> Address { - let mut rpc = self.bitcoind_rpc_client.lock().await; - let addr_args = vec![serde_json::json!("LDK output address")]; - let addr = rpc.call_method::("getnewaddress", &addr_args).await.unwrap(); + let addr = self + .bitcoind_rpc_client + .call_method::("getnewaddress", &addr_args) + .await + .unwrap(); Address::from_str(addr.0.as_str()).unwrap() } pub async fn get_blockchain_info(&self) -> BlockchainInfo { - let mut rpc = self.bitcoind_rpc_client.lock().await; - rpc.call_method::("getblockchaininfo", &vec![]).await.unwrap() + self.bitcoind_rpc_client + .call_method::("getblockchaininfo", &vec![]) + .await + .unwrap() } } @@ -255,10 +249,12 @@ impl BroadcasterInterface for BitcoindClient { let bitcoind_rpc_client = self.bitcoind_rpc_client.clone(); let tx_serialized = serde_json::json!(encode::serialize_hex(tx)); self.handle.spawn(async move { - let mut rpc = bitcoind_rpc_client.lock().await; // This may error due to RL calling `broadcast_transaction` with the same transaction // multiple times, but the error is safe to ignore. - match rpc.call_method::("sendrawtransaction", &vec![tx_serialized]).await { + match bitcoind_rpc_client + .call_method::("sendrawtransaction", &vec![tx_serialized]) + .await + { Ok(_) => {} Err(e) => { let err_str = e.get_ref().unwrap().to_string(); diff --git a/src/cli.rs b/src/cli.rs index 6904180..b392f0f 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,17 +1,17 @@ use crate::disk; use crate::hex_utils; use crate::{ - ChannelManager, HTLCStatus, InvoicePayer, MillisatAmount, PaymentInfo, PaymentInfoStorage, - PeerManager, + ChannelManager, HTLCStatus, InvoicePayer, MillisatAmount, NetworkGraph, PaymentInfo, + PaymentInfoStorage, PeerManager, }; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::hashes::Hash; use bitcoin::network::constants::Network; -use bitcoin::secp256k1::key::PublicKey; +use bitcoin::secp256k1::PublicKey; use lightning::chain::keysinterface::{KeysInterface, KeysManager, Recipient}; use lightning::ln::msgs::NetAddress; use lightning::ln::{PaymentHash, PaymentPreimage}; -use lightning::routing::network_graph::{NetworkGraph, NodeId}; +use lightning::routing::gossip::NodeId; use lightning::util::config::{ChannelConfig, ChannelHandshakeLimits, UserConfig}; use lightning::util::events::EventHandler; use lightning_invoice::payment::PaymentError; @@ -281,12 +281,26 @@ pub(crate) async fn poll_for_user_input( println!("ERROR: getinvoice provided payment amount was not a number"); continue; } + + let expiry_secs_str = words.next(); + if expiry_secs_str.is_none() { + println!("ERROR: getinvoice requires an expiry in seconds"); + continue; + } + + let expiry_secs: Result = expiry_secs_str.unwrap().parse(); + if expiry_secs.is_err() { + println!("ERROR: getinvoice provided expiry was not a number"); + continue; + } + get_invoice( amt_msat.unwrap(), inbound_payments.clone(), channel_manager.clone(), keys_manager.clone(), network, + expiry_secs.unwrap(), ); } "connectpeer" => { @@ -317,7 +331,7 @@ pub(crate) async fn poll_for_user_input( "closechannel" => { let channel_id_str = words.next(); if channel_id_str.is_none() { - println!("ERROR: closechannel requires a channel ID: `closechannel `"); + println!("ERROR: closechannel requires a channel ID: `closechannel `"); continue; } let channel_id_vec = hex_utils::to_vec(channel_id_str.unwrap()); @@ -327,12 +341,33 @@ pub(crate) async fn poll_for_user_input( } let mut channel_id = [0; 32]; channel_id.copy_from_slice(&channel_id_vec.unwrap()); - close_channel(channel_id, channel_manager.clone()); + + let peer_pubkey_str = words.next(); + if peer_pubkey_str.is_none() { + println!("ERROR: closechannel requires a peer pubkey: `closechannel `"); + continue; + } + let peer_pubkey_vec = match hex_utils::to_vec(peer_pubkey_str.unwrap()) { + Some(peer_pubkey_vec) => peer_pubkey_vec, + None => { + println!("ERROR: couldn't parse peer_pubkey"); + continue; + } + }; + let peer_pubkey = match PublicKey::from_slice(&peer_pubkey_vec) { + Ok(peer_pubkey) => peer_pubkey, + Err(_) => { + println!("ERROR: couldn't parse peer_pubkey"); + continue; + } + }; + + close_channel(channel_id, peer_pubkey, channel_manager.clone()); } "forceclosechannel" => { let channel_id_str = words.next(); if channel_id_str.is_none() { - println!("ERROR: forceclosechannel requires a channel ID: `forceclosechannel `"); + println!("ERROR: forceclosechannel requires a channel ID: `forceclosechannel `"); continue; } let channel_id_vec = hex_utils::to_vec(channel_id_str.unwrap()); @@ -342,7 +377,28 @@ pub(crate) async fn poll_for_user_input( } let mut channel_id = [0; 32]; channel_id.copy_from_slice(&channel_id_vec.unwrap()); - force_close_channel(channel_id, channel_manager.clone()); + + let peer_pubkey_str = words.next(); + if peer_pubkey_str.is_none() { + println!("ERROR: forceclosechannel requires a peer pubkey: `forceclosechannel `"); + continue; + } + let peer_pubkey_vec = match hex_utils::to_vec(peer_pubkey_str.unwrap()) { + Some(peer_pubkey_vec) => peer_pubkey_vec, + None => { + println!("ERROR: couldn't parse peer_pubkey"); + continue; + } + }; + let peer_pubkey = match PublicKey::from_slice(&peer_pubkey_vec) { + Ok(peer_pubkey) => peer_pubkey, + Err(_) => { + println!("ERROR: couldn't parse peer_pubkey"); + continue; + } + }; + + force_close_channel(channel_id, peer_pubkey, channel_manager.clone()); } "nodeinfo" => node_info(&channel_manager, &peer_manager), "listpeers" => list_peers(peer_manager.clone()), @@ -369,13 +425,13 @@ pub(crate) async fn poll_for_user_input( fn help() { println!("openchannel pubkey@host:port "); println!("sendpayment "); - println!("keysend "); - println!("getinvoice "); + println!("keysend "); + println!("getinvoice "); println!("connectpeer pubkey@host:port"); println!("listchannels"); println!("listpayments"); - println!("closechannel "); - println!("forceclosechannel "); + println!("closechannel "); + println!("forceclosechannel "); println!("nodeinfo"); println!("listpeers"); println!("signmessage "); @@ -441,7 +497,7 @@ fn list_channels(channel_manager: &Arc, network_graph: &Arc( fn get_invoice( amt_msat: u64, payment_storage: PaymentInfoStorage, channel_manager: Arc, - keys_manager: Arc, network: Network, + keys_manager: Arc, network: Network, expiry_secs: u32, ) { let mut payments = payment_storage.lock().unwrap(); let currency = match network { @@ -668,6 +724,7 @@ fn get_invoice( currency, Some(amt_msat), "ldk-tutorial-node".to_string(), + expiry_secs, ) { Ok(inv) => { println!("SUCCESS: generated invoice: {}", inv); @@ -691,15 +748,19 @@ fn get_invoice( ); } -fn close_channel(channel_id: [u8; 32], channel_manager: Arc) { - match channel_manager.close_channel(&channel_id) { +fn close_channel( + channel_id: [u8; 32], counterparty_node_id: PublicKey, channel_manager: Arc, +) { + match channel_manager.close_channel(&channel_id, &counterparty_node_id) { Ok(()) => println!("EVENT: initiating channel close"), Err(e) => println!("ERROR: failed to close channel: {:?}", e), } } -fn force_close_channel(channel_id: [u8; 32], channel_manager: Arc) { - match channel_manager.force_close_channel(&channel_id) { +fn force_close_channel( + channel_id: [u8; 32], counterparty_node_id: PublicKey, channel_manager: Arc, +) { + match channel_manager.force_close_channel(&channel_id, &counterparty_node_id) { Ok(()) => println!("EVENT: initiating channel force-close"), Err(e) => println!("ERROR: failed to force-close channel: {:?}", e), } diff --git a/src/disk.rs b/src/disk.rs index 8a54592..fe61418 100644 --- a/src/disk.rs +++ b/src/disk.rs @@ -1,15 +1,14 @@ -use crate::cli; -use bitcoin::secp256k1::key::PublicKey; +use crate::{cli, NetworkGraph}; +use bitcoin::secp256k1::PublicKey; use bitcoin::BlockHash; use chrono::Utc; -use lightning::routing::network_graph::NetworkGraph; use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringParameters}; use lightning::util::logger::{Logger, Record}; -use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer}; +use lightning::util::ser::{ReadableArgs, Writer}; use std::collections::HashMap; use std::fs; use std::fs::File; -use std::io::{BufRead, BufReader, BufWriter}; +use std::io::{BufRead, BufReader}; use std::net::SocketAddr; use std::path::Path; use std::sync::Arc; @@ -73,40 +72,26 @@ pub(crate) fn read_channel_peer_data( Ok(peer_data) } -pub(crate) fn read_network(path: &Path, genesis_hash: BlockHash) -> NetworkGraph { +pub(crate) fn read_network( + path: &Path, genesis_hash: BlockHash, logger: Arc, +) -> NetworkGraph { if let Ok(file) = File::open(path) { - if let Ok(graph) = NetworkGraph::read(&mut BufReader::new(file)) { + if let Ok(graph) = NetworkGraph::read(&mut BufReader::new(file), logger.clone()) { return graph; } } - NetworkGraph::new(genesis_hash) -} - -pub(crate) fn persist_scorer( - path: &Path, scorer: &ProbabilisticScorer>, -) -> std::io::Result<()> { - let mut tmp_path = path.to_path_buf().into_os_string(); - tmp_path.push(".tmp"); - let file = fs::OpenOptions::new().write(true).create(true).open(&tmp_path)?; - let write_res = scorer.write(&mut BufWriter::new(file)); - if let Err(e) = write_res.and_then(|_| fs::rename(&tmp_path, path)) { - let _ = fs::remove_file(&tmp_path); - Err(e) - } else { - Ok(()) - } + NetworkGraph::new(genesis_hash, logger) } pub(crate) fn read_scorer( - path: &Path, graph: Arc, -) -> ProbabilisticScorer> { + path: &Path, graph: Arc, logger: Arc, +) -> ProbabilisticScorer, Arc> { let params = ProbabilisticScoringParameters::default(); if let Ok(file) = File::open(path) { - if let Ok(scorer) = - ProbabilisticScorer::read(&mut BufReader::new(file), (params, Arc::clone(&graph))) - { + let args = (params, Arc::clone(&graph), Arc::clone(&logger)); + if let Ok(scorer) = ProbabilisticScorer::read(&mut BufReader::new(file), args) { return scorer; } } - ProbabilisticScorer::new(params, graph) + ProbabilisticScorer::new(params, graph, logger) } diff --git a/src/hex_utils.rs b/src/hex_utils.rs index 0eee5af..6077a60 100644 --- a/src/hex_utils.rs +++ b/src/hex_utils.rs @@ -1,4 +1,4 @@ -use bitcoin::secp256k1::key::PublicKey; +use bitcoin::secp256k1::PublicKey; pub fn to_vec(hex: &str) -> Option> { let mut out = Vec::with_capacity(hex.len() / 2); diff --git a/src/main.rs b/src/main.rs index 49fdcb8..1c1d794 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,12 +24,14 @@ use lightning::ln::channelmanager::{ }; use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler, SimpleArcPeerManager}; use lightning::ln::{PaymentHash, PaymentPreimage, PaymentSecret}; -use lightning::routing::network_graph::{NetGraphMsgHandler, NetworkGraph}; +use lightning::log_bytes; +use lightning::routing::gossip; +use lightning::routing::gossip::P2PGossipSync; use lightning::routing::scoring::ProbabilisticScorer; use lightning::util::config::UserConfig; use lightning::util::events::{Event, PaymentPurpose}; use lightning::util::ser::ReadableArgs; -use lightning_background_processor::{BackgroundProcessor, Persister}; +use lightning_background_processor::BackgroundProcessor; use lightning_block_sync::init; use lightning_block_sync::poll; use lightning_block_sync::SpvClient; @@ -38,6 +40,7 @@ use lightning_invoice::payment; use lightning_invoice::utils::DefaultRouter; use lightning_net_tokio::SocketDescriptor; use lightning_persister::FilesystemPersister; +use lightning_rapid_gossip_sync::RapidGossipSync; use rand::{thread_rng, Rng}; use std::collections::hash_map::Entry; use std::collections::HashMap; @@ -102,42 +105,17 @@ pub(crate) type ChannelManager = pub(crate) type InvoicePayer = payment::InvoicePayer< Arc, Router, - Arc>>>, + Arc, Arc>>>, Arc, E, >; type Router = DefaultRouter, Arc>; -struct DataPersister { - data_dir: String, -} - -impl - Persister< - InMemorySigner, - Arc, - Arc, - Arc, - Arc, - Arc, - > for DataPersister -{ - fn persist_manager(&self, channel_manager: &ChannelManager) -> Result<(), std::io::Error> { - FilesystemPersister::persist_manager(self.data_dir.clone(), channel_manager) - } - - fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error> { - if FilesystemPersister::persist_network_graph(self.data_dir.clone(), network_graph).is_err() - { - // Persistence errors here are non-fatal as we can just fetch the routing graph - // again later, but they may indicate a disk error which could be fatal elsewhere. - eprintln!("Warning: Failed to persist network graph, check your disk and permissions"); - } +type GossipSync = + lightning_background_processor::GossipSync>, G, A, L>; - Ok(()) - } -} +pub(crate) type NetworkGraph = gossip::NetworkGraph>; async fn handle_ldk_events( channel_manager: Arc, bitcoind_client: Arc, @@ -147,6 +125,7 @@ async fn handle_ldk_events( match event { Event::FundingGenerationReady { temporary_channel_id, + counterparty_node_id, channel_value_satoshis, output_script, .. @@ -179,7 +158,11 @@ async fn handle_ldk_events( encode::deserialize(&hex_utils::to_vec(&signed_tx.hex).unwrap()).unwrap(); // Give the funding transaction back to LDK for opening the channel. if channel_manager - .funding_transaction_generated(&temporary_channel_id, final_tx) + .funding_transaction_generated( + &temporary_channel_id, + counterparty_node_id, + final_tx, + ) .is_err() { println!( @@ -188,31 +171,39 @@ async fn handle_ldk_events( io::stdout().flush().unwrap(); } } - Event::PaymentReceived { payment_hash, purpose, amt, .. } => { - let mut payments = inbound_payments.lock().unwrap(); + Event::PaymentReceived { payment_hash, purpose, amount_msat } => { + println!( + "\nEVENT: received payment from payment hash {} of {} millisatoshis", + hex_utils::hex_str(&payment_hash.0), + amount_msat, + ); + print!("> "); + io::stdout().flush().unwrap(); + let payment_preimage = match purpose { + PaymentPurpose::InvoicePayment { payment_preimage, .. } => *payment_preimage, + PaymentPurpose::SpontaneousPayment(preimage) => Some(*preimage), + }; + channel_manager.claim_funds(payment_preimage.unwrap()); + } + Event::PaymentClaimed { payment_hash, purpose, amount_msat } => { + println!( + "\nEVENT: claimed payment from payment hash {} of {} millisatoshis", + hex_utils::hex_str(&payment_hash.0), + amount_msat, + ); + print!("> "); + io::stdout().flush().unwrap(); let (payment_preimage, payment_secret) = match purpose { PaymentPurpose::InvoicePayment { payment_preimage, payment_secret, .. } => { (*payment_preimage, Some(*payment_secret)) } PaymentPurpose::SpontaneousPayment(preimage) => (Some(*preimage), None), }; - let status = match channel_manager.claim_funds(payment_preimage.unwrap()) { - true => { - println!( - "\nEVENT: received payment from payment hash {} of {} millisatoshis", - hex_utils::hex_str(&payment_hash.0), - amt - ); - print!("> "); - io::stdout().flush().unwrap(); - HTLCStatus::Succeeded - } - _ => HTLCStatus::Failed, - }; + let mut payments = inbound_payments.lock().unwrap(); match payments.entry(*payment_hash) { Entry::Occupied(mut e) => { let payment = e.get_mut(); - payment.status = status; + payment.status = HTLCStatus::Succeeded; payment.preimage = payment_preimage; payment.secret = payment_secret; } @@ -220,8 +211,8 @@ async fn handle_ldk_events( e.insert(PaymentInfo { preimage: payment_preimage, secret: payment_secret, - status, - amt_msat: MillisatAmount(Some(*amt)), + status: HTLCStatus::Succeeded, + amt_msat: MillisatAmount(Some(*amount_msat)), }); } } @@ -268,7 +259,18 @@ async fn handle_ldk_events( payment.status = HTLCStatus::Failed; } } - Event::PaymentForwarded { fee_earned_msat, claim_from_onchain_tx } => { + Event::PaymentForwarded { + prev_channel_id, + next_channel_id, + fee_earned_msat, + claim_from_onchain_tx, + } => { + let from_channel_str = prev_channel_id + .map(|channel_id| format!(" from channel {}", log_bytes!(channel_id))) + .unwrap_or_default(); + let to_channel_str = next_channel_id + .map(|channel_id| format!(" to channel {}", log_bytes!(channel_id))) + .unwrap_or_default(); let from_onchain_str = if *claim_from_onchain_tx { "from onchain downstream claim" } else { @@ -276,11 +278,14 @@ async fn handle_ldk_events( }; if let Some(fee_earned) = fee_earned_msat { println!( - "\nEVENT: Forwarded payment, earning {} msat {}", - fee_earned, from_onchain_str + "\nEVENT: Forwarded payment{}{}, earning {} msat {}", + from_channel_str, to_channel_str, fee_earned, from_onchain_str ); } else { - println!("\nEVENT: Forwarded payment, claiming onchain {}", from_onchain_str); + println!( + "\nEVENT: Forwarded payment{}{}, claiming onchain {}", + from_channel_str, to_channel_str, from_onchain_str + ); } print!("> "); io::stdout().flush().unwrap(); @@ -512,11 +517,12 @@ async fn start_ldk() { chain_monitor.watch_channel(funding_outpoint, channel_monitor).unwrap(); } - // Step 11: Optional: Initialize the NetGraphMsgHandler + // Step 11: Optional: Initialize the P2PGossipSync let genesis = genesis_block(args.network).header.block_hash(); let network_graph_path = format!("{}/network_graph", ldk_data_dir.clone()); - let network_graph = Arc::new(disk::read_network(Path::new(&network_graph_path), genesis)); - let network_gossip = Arc::new(NetGraphMsgHandler::new( + let network_graph = + Arc::new(disk::read_network(Path::new(&network_graph_path), genesis, logger.clone())); + let gossip_sync = Arc::new(P2PGossipSync::new( Arc::clone(&network_graph), None::>, logger.clone(), @@ -528,7 +534,7 @@ async fn start_ldk() { rand::thread_rng().fill_bytes(&mut ephemeral_bytes); let lightning_msg_handler = MessageHandler { chan_handler: channel_manager.clone(), - route_handler: network_gossip.clone(), + route_handler: gossip_sync.clone(), }; let peer_manager: Arc = Arc::new(PeerManager::new( lightning_msg_handler, @@ -610,25 +616,12 @@ async fn start_ldk() { }; // Step 16: Initialize routing ProbabilisticScorer - let scorer_path = format!("{}/prob_scorer", ldk_data_dir.clone()); + let scorer_path = format!("{}/scorer", ldk_data_dir.clone()); let scorer = Arc::new(Mutex::new(disk::read_scorer( Path::new(&scorer_path), Arc::clone(&network_graph), + Arc::clone(&logger), ))); - let scorer_persist = Arc::clone(&scorer); - tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(600)); - loop { - interval.tick().await; - if disk::persist_scorer(Path::new(&scorer_path), &scorer_persist.lock().unwrap()) - .is_err() - { - // Persistence errors here are non-fatal as channels will be re-scored as payments - // fail, but they may indicate a disk error which could be fatal elsewhere. - eprintln!("Warning: Failed to persist scorer, check your disk and permissions"); - } - } - }); // Step 17: Create InvoicePayer let router = DefaultRouter::new( @@ -642,11 +635,11 @@ async fn start_ldk() { scorer.clone(), logger.clone(), event_handler, - payment::RetryAttempts(5), + payment::Retry::Attempts(5), )); // Step 18: Persist ChannelManager and NetworkGraph - let persister = DataPersister { data_dir: ldk_data_dir.clone() }; + let persister = Arc::new(FilesystemPersister::new(ldk_data_dir.clone())); // Step 19: Background Processing let background_processor = BackgroundProcessor::start( @@ -654,9 +647,10 @@ async fn start_ldk() { invoice_payer.clone(), chain_monitor.clone(), channel_manager.clone(), - Some(network_gossip.clone()), + GossipSync::P2P(gossip_sync.clone()), peer_manager.clone(), logger.clone(), + Some(scorer.clone()), ); // Regularly reconnect to channel peers. -- 2.30.2