persist payments info to disk
authorZoe Faltibà <zoefaltiba@gmail.com>
Wed, 10 May 2023 15:40:47 +0000 (17:40 +0200)
committerZoe Faltibà <zoefaltiba@gmail.com>
Tue, 25 Jul 2023 12:29:26 +0000 (14:29 +0200)
src/cli.rs
src/disk.rs
src/main.rs

index cba62559166d02a35b95e1356a720de947711eaa..f896721cdfef415d06600a34ac38695bfc4ee007 100644 (file)
@@ -1,4 +1,4 @@
-use crate::disk;
+use crate::disk::{self, INBOUND_PAYMENTS_FNAME, OUTBOUND_PAYMENTS_FNAME};
 use crate::hex_utils;
 use crate::{
        ChannelManager, HTLCStatus, MillisatAmount, NetworkGraph, OnionMessenger, PaymentInfo,
@@ -16,17 +16,18 @@ use lightning::onion_message::{CustomOnionMessageContents, Destination, OnionMes
 use lightning::routing::gossip::NodeId;
 use lightning::routing::router::{PaymentParameters, RouteParameters};
 use lightning::util::config::{ChannelHandshakeConfig, ChannelHandshakeLimits, UserConfig};
+use lightning::util::persist::KVStorePersister;
 use lightning::util::ser::{Writeable, Writer};
 use lightning_invoice::payment::pay_invoice;
 use lightning_invoice::{utils, Currency, Invoice};
+use lightning_persister::FilesystemPersister;
 use std::env;
 use std::io;
 use std::io::Write;
 use std::net::{SocketAddr, ToSocketAddrs};
-use std::ops::Deref;
 use std::path::Path;
 use std::str::FromStr;
-use std::sync::Arc;
+use std::sync::{Arc, Mutex};
 use std::time::Duration;
 
 pub(crate) struct LdkUserInfo {
@@ -61,9 +62,9 @@ impl Writeable for UserOnionMessageContents {
 pub(crate) async fn poll_for_user_input(
        peer_manager: Arc<PeerManager>, channel_manager: Arc<ChannelManager>,
        keys_manager: Arc<KeysManager>, network_graph: Arc<NetworkGraph>,
-       onion_messenger: Arc<OnionMessenger>, inbound_payments: PaymentInfoStorage,
-       outbound_payments: PaymentInfoStorage, ldk_data_dir: String, network: Network,
-       logger: Arc<disk::FilesystemLogger>,
+       onion_messenger: Arc<OnionMessenger>, inbound_payments: Arc<Mutex<PaymentInfoStorage>>,
+       outbound_payments: Arc<Mutex<PaymentInfoStorage>>, ldk_data_dir: String, network: Network,
+       logger: Arc<disk::FilesystemLogger>, persister: Arc<FilesystemPersister>,
 ) {
        println!(
                "LDK startup successful. Enter \"help\" to view available commands. Press Ctrl-D to quit."
@@ -157,7 +158,12 @@ pub(crate) async fn poll_for_user_input(
                                                }
                                        };
 
-                                       send_payment(&*channel_manager, &invoice, outbound_payments.clone());
+                                       send_payment(
+                                               &channel_manager,
+                                               &invoice,
+                                               &mut outbound_payments.lock().unwrap(),
+                                               persister.clone(),
+                                       );
                                }
                                "keysend" => {
                                        let dest_pubkey = match words.next() {
@@ -188,11 +194,12 @@ pub(crate) async fn poll_for_user_input(
                                                }
                                        };
                                        keysend(
-                                               &*channel_manager,
+                                               &channel_manager,
                                                dest_pubkey,
                                                amt_msat,
                                                &*keys_manager,
-                                               outbound_payments.clone(),
+                                               &mut outbound_payments.lock().unwrap(),
+                                               persister.clone(),
                                        );
                                }
                                "getinvoice" => {
@@ -220,15 +227,17 @@ pub(crate) async fn poll_for_user_input(
                                                continue;
                                        }
 
+                                       let mut inbound_payments = inbound_payments.lock().unwrap();
                                        get_invoice(
                                                amt_msat.unwrap(),
-                                               Arc::clone(&inbound_payments),
-                                               &*channel_manager,
+                                               &mut inbound_payments,
+                                               &channel_manager,
                                                Arc::clone(&keys_manager),
                                                network,
                                                expiry_secs.unwrap(),
                                                Arc::clone(&logger),
                                        );
+                                       persister.persist(INBOUND_PAYMENTS_FNAME, &*inbound_payments).unwrap();
                                }
                                "connectpeer" => {
                                        let peer_pubkey_and_ip_addr = words.next();
@@ -278,9 +287,10 @@ pub(crate) async fn poll_for_user_input(
                                        }
                                }
                                "listchannels" => list_channels(&channel_manager, &network_graph),
-                               "listpayments" => {
-                                       list_payments(inbound_payments.clone(), outbound_payments.clone())
-                               }
+                               "listpayments" => list_payments(
+                                       &inbound_payments.lock().unwrap(),
+                                       &outbound_payments.lock().unwrap(),
+                               ),
                                "closechannel" => {
                                        let channel_id_str = words.next();
                                        if channel_id_str.is_none() {
@@ -527,11 +537,9 @@ fn list_channels(channel_manager: &Arc<ChannelManager>, network_graph: &Arc<Netw
        println!("]");
 }
 
-fn list_payments(inbound_payments: PaymentInfoStorage, outbound_payments: PaymentInfoStorage) {
-       let inbound = inbound_payments.lock().unwrap();
-       let outbound = outbound_payments.lock().unwrap();
+fn list_payments(inbound_payments: &PaymentInfoStorage, outbound_payments: &PaymentInfoStorage) {
        print!("[");
-       for (payment_hash, payment_info) in inbound.deref() {
+       for (payment_hash, payment_info) in &inbound_payments.payments {
                println!("");
                println!("\t{{");
                println!("\t\tamount_millisatoshis: {},", payment_info.amt_msat);
@@ -549,7 +557,7 @@ fn list_payments(inbound_payments: PaymentInfoStorage, outbound_payments: Paymen
                println!("\t}},");
        }
 
-       for (payment_hash, payment_info) in outbound.deref() {
+       for (payment_hash, payment_info) in &outbound_payments.payments {
                println!("");
                println!("\t{{");
                println!("\t\tamount_millisatoshis: {},", payment_info.amt_msat);
@@ -658,41 +666,40 @@ fn open_channel(
 }
 
 fn send_payment(
-       channel_manager: &ChannelManager, invoice: &Invoice, payment_storage: PaymentInfoStorage,
+       channel_manager: &ChannelManager, invoice: &Invoice,
+       outbound_payments: &mut PaymentInfoStorage, persister: Arc<FilesystemPersister>,
 ) {
-       let status =
-               match pay_invoice(invoice, Retry::Timeout(Duration::from_secs(10)), channel_manager) {
-                       Ok(_payment_id) => {
-                               let payee_pubkey = invoice.recover_payee_pub_key();
-                               let amt_msat = invoice.amount_milli_satoshis().unwrap();
-                               println!("EVENT: initiated sending {} msats to {}", amt_msat, payee_pubkey);
-                               print!("> ");
-                               HTLCStatus::Pending
-                       }
-                       Err(e) => {
-                               println!("ERROR: failed to send payment: {:?}", e);
-                               print!("> ");
-                               HTLCStatus::Failed
-                       }
-               };
-       let payment_hash = PaymentHash(invoice.payment_hash().clone().into_inner());
-       let payment_secret = Some(invoice.payment_secret().clone());
-
-       let mut payments = payment_storage.lock().unwrap();
-       payments.insert(
+       let payment_hash = PaymentHash((*invoice.payment_hash()).into_inner());
+       let payment_secret = Some(*invoice.payment_secret());
+       outbound_payments.payments.insert(
                payment_hash,
                PaymentInfo {
                        preimage: None,
                        secret: payment_secret,
-                       status,
+                       status: HTLCStatus::Pending,
                        amt_msat: MillisatAmount(invoice.amount_milli_satoshis()),
                },
        );
+       persister.persist(OUTBOUND_PAYMENTS_FNAME, &*outbound_payments).unwrap();
+       match pay_invoice(invoice, Retry::Timeout(Duration::from_secs(10)), channel_manager) {
+               Ok(_payment_id) => {
+                       let payee_pubkey = invoice.recover_payee_pub_key();
+                       let amt_msat = invoice.amount_milli_satoshis().unwrap();
+                       println!("EVENT: initiated sending {} msats to {}", amt_msat, payee_pubkey);
+                       print!("> ");
+               }
+               Err(e) => {
+                       println!("ERROR: failed to send payment: {:?}", e);
+                       print!("> ");
+                       outbound_payments.payments.get_mut(&payment_hash).unwrap().status = HTLCStatus::Failed;
+                       persister.persist(OUTBOUND_PAYMENTS_FNAME, &*outbound_payments).unwrap();
+               }
+       };
 }
 
 fn keysend<E: EntropySource>(
        channel_manager: &ChannelManager, payee_pubkey: PublicKey, amt_msat: u64, entropy_source: &E,
-       payment_storage: PaymentInfoStorage,
+       outbound_payments: &mut PaymentInfoStorage, persister: Arc<FilesystemPersister>,
 ) {
        let payment_preimage = PaymentPreimage(entropy_source.get_secure_random_bytes());
        let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0[..]).into_inner());
@@ -701,7 +708,17 @@ fn keysend<E: EntropySource>(
                payment_params: PaymentParameters::for_keysend(payee_pubkey, 40),
                final_value_msat: amt_msat,
        };
-       let status = match channel_manager.send_spontaneous_payment_with_retry(
+       outbound_payments.payments.insert(
+               payment_hash,
+               PaymentInfo {
+                       preimage: None,
+                       secret: None,
+                       status: HTLCStatus::Pending,
+                       amt_msat: MillisatAmount(Some(amt_msat)),
+               },
+       );
+       persister.persist(OUTBOUND_PAYMENTS_FNAME, &*outbound_payments).unwrap();
+       match channel_manager.send_spontaneous_payment_with_retry(
                Some(payment_preimage),
                RecipientOnionFields::spontaneous_empty(),
                PaymentId(payment_hash.0),
@@ -711,33 +728,21 @@ fn keysend<E: EntropySource>(
                Ok(_payment_hash) => {
                        println!("EVENT: initiated sending {} msats to {}", amt_msat, payee_pubkey);
                        print!("> ");
-                       HTLCStatus::Pending
                }
                Err(e) => {
                        println!("ERROR: failed to send payment: {:?}", e);
                        print!("> ");
-                       HTLCStatus::Failed
+                       outbound_payments.payments.get_mut(&payment_hash).unwrap().status = HTLCStatus::Failed;
+                       persister.persist(OUTBOUND_PAYMENTS_FNAME, &*outbound_payments).unwrap();
                }
        };
-
-       let mut payments = payment_storage.lock().unwrap();
-       payments.insert(
-               payment_hash,
-               PaymentInfo {
-                       preimage: None,
-                       secret: None,
-                       status,
-                       amt_msat: MillisatAmount(Some(amt_msat)),
-               },
-       );
 }
 
 fn get_invoice(
-       amt_msat: u64, payment_storage: PaymentInfoStorage, channel_manager: &ChannelManager,
+       amt_msat: u64, inbound_payments: &mut PaymentInfoStorage, channel_manager: &ChannelManager,
        keys_manager: Arc<KeysManager>, network: Network, expiry_secs: u32,
        logger: Arc<disk::FilesystemLogger>,
 ) {
-       let mut payments = payment_storage.lock().unwrap();
        let currency = match network {
                Network::Bitcoin => Currency::Bitcoin,
                Network::Testnet => Currency::BitcoinTestnet,
@@ -765,7 +770,7 @@ fn get_invoice(
        };
 
        let payment_hash = PaymentHash(invoice.payment_hash().clone().into_inner());
-       payments.insert(
+       inbound_payments.payments.insert(
                payment_hash,
                PaymentInfo {
                        preimage: None,
index 0c3085134bedcd8cf35ca57330a776ad196223a9..77cefd6d59a91bc25f26c855d9119235c62df78c 100644 (file)
@@ -1,10 +1,10 @@
-use crate::{cli, NetworkGraph};
+use crate::{cli, NetworkGraph, PaymentInfoStorage};
 use bitcoin::secp256k1::PublicKey;
 use bitcoin::Network;
 use chrono::Utc;
 use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringParameters};
 use lightning::util::logger::{Logger, Record};
-use lightning::util::ser::{ReadableArgs, Writer};
+use lightning::util::ser::{Readable, ReadableArgs, Writer};
 use std::collections::HashMap;
 use std::fs;
 use std::fs::File;
@@ -13,6 +13,9 @@ use std::net::SocketAddr;
 use std::path::Path;
 use std::sync::Arc;
 
+pub(crate) const INBOUND_PAYMENTS_FNAME: &str = "inbound_payments";
+pub(crate) const OUTBOUND_PAYMENTS_FNAME: &str = "outbound_payments";
+
 pub(crate) struct FilesystemLogger {
        data_dir: String,
 }
@@ -83,6 +86,15 @@ pub(crate) fn read_network(
        NetworkGraph::new(network, logger)
 }
 
+pub(crate) fn read_payment_info(path: &Path) -> PaymentInfoStorage {
+       if let Ok(file) = File::open(path) {
+               if let Ok(info) = PaymentInfoStorage::read(&mut BufReader::new(file)) {
+                       return info;
+               }
+       }
+       PaymentInfoStorage { payments: HashMap::new() }
+}
+
 pub(crate) fn read_scorer(
        path: &Path, graph: Arc<NetworkGraph>, logger: Arc<FilesystemLogger>,
 ) -> ProbabilisticScorer<Arc<NetworkGraph>, Arc<FilesystemLogger>> {
index de7c16641d3eb0314fec0d6226c63d53adadab93..a526efc1866a1fbe1919d2440cc6240bbbe3679c 100644 (file)
@@ -13,17 +13,18 @@ use bitcoin::consensus::encode;
 use bitcoin::network::constants::Network;
 use bitcoin::BlockHash;
 use bitcoin_bech32::WitnessProgram;
-use lightning::chain;
+use disk::{INBOUND_PAYMENTS_FNAME, OUTBOUND_PAYMENTS_FNAME};
 use lightning::chain::keysinterface::{
        EntropySource, InMemorySigner, KeysManager, SpendableOutputDescriptor,
 };
 use lightning::chain::{chainmonitor, ChannelMonitorUpdateStatus};
 use lightning::chain::{Filter, Watch};
 use lightning::events::{Event, PaymentFailureReason, PaymentPurpose};
-use lightning::ln::channelmanager;
+use lightning::ln::channelmanager::{self, RecentPaymentDetails};
 use lightning::ln::channelmanager::{
        ChainParameters, ChannelManagerReadArgs, SimpleArcChannelManager,
 };
+use lightning::ln::msgs::DecodeError;
 use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler, SimpleArcPeerManager};
 use lightning::ln::{PaymentHash, PaymentPreimage, PaymentSecret};
 use lightning::onion_message::SimpleArcOnionMessenger;
@@ -32,7 +33,8 @@ use lightning::routing::gossip::{NodeId, P2PGossipSync};
 use lightning::routing::router::DefaultRouter;
 use lightning::util::config::UserConfig;
 use lightning::util::persist::KVStorePersister;
-use lightning::util::ser::ReadableArgs;
+use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
+use lightning::{chain, impl_writeable_tlv_based, impl_writeable_tlv_based_enum};
 use lightning_background_processor::{process_events_async, GossipSync};
 use lightning_block_sync::init;
 use lightning_block_sync::poll;
@@ -56,12 +58,19 @@ use std::time::{Duration, SystemTime};
 
 pub(crate) const PENDING_SPENDABLE_OUTPUT_DIR: &'static str = "pending_spendable_outputs";
 
+#[derive(Copy, Clone)]
 pub(crate) enum HTLCStatus {
        Pending,
        Succeeded,
        Failed,
 }
 
+impl_writeable_tlv_based_enum!(HTLCStatus,
+       (0, Pending) => {},
+       (1, Succeeded) => {},
+       (2, Failed) => {};
+);
+
 pub(crate) struct MillisatAmount(Option<u64>);
 
 impl fmt::Display for MillisatAmount {
@@ -73,6 +82,19 @@ impl fmt::Display for MillisatAmount {
        }
 }
 
+impl Readable for MillisatAmount {
+       fn read<R: io::Read>(r: &mut R) -> Result<Self, DecodeError> {
+               let amt: Option<u64> = Readable::read(r)?;
+               Ok(MillisatAmount(amt))
+       }
+}
+
+impl Writeable for MillisatAmount {
+       fn write<W: Writer>(&self, w: &mut W) -> Result<(), std::io::Error> {
+               self.0.write(w)
+       }
+}
+
 pub(crate) struct PaymentInfo {
        preimage: Option<PaymentPreimage>,
        secret: Option<PaymentSecret>,
@@ -80,7 +102,20 @@ pub(crate) struct PaymentInfo {
        amt_msat: MillisatAmount,
 }
 
-pub(crate) type PaymentInfoStorage = Arc<Mutex<HashMap<PaymentHash, PaymentInfo>>>;
+impl_writeable_tlv_based!(PaymentInfo, {
+       (0, preimage, required),
+       (2, secret, required),
+       (4, status, required),
+       (6, amt_msat, required),
+});
+
+pub(crate) struct PaymentInfoStorage {
+       payments: HashMap<PaymentHash, PaymentInfo>,
+}
+
+impl_writeable_tlv_based!(PaymentInfoStorage, {
+       (0, payments, required),
+});
 
 type ChainMonitor = chainmonitor::ChainMonitor<
        InMemorySigner,
@@ -110,8 +145,9 @@ type OnionMessenger = SimpleArcOnionMessenger<FilesystemLogger>;
 async fn handle_ldk_events(
        channel_manager: &Arc<ChannelManager>, bitcoind_client: &BitcoindClient,
        network_graph: &NetworkGraph, keys_manager: &KeysManager,
-       inbound_payments: &PaymentInfoStorage, outbound_payments: &PaymentInfoStorage,
-       persister: &Arc<FilesystemPersister>, network: Network, event: Event,
+       inbound_payments: Arc<Mutex<PaymentInfoStorage>>,
+       outbound_payments: Arc<Mutex<PaymentInfoStorage>>, persister: &Arc<FilesystemPersister>,
+       network: Network, event: Event,
 ) {
        match event {
                Event::FundingGenerationReady {
@@ -199,8 +235,8 @@ async fn handle_ldk_events(
                                }
                                PaymentPurpose::SpontaneousPayment(preimage) => (Some(preimage), None),
                        };
-                       let mut payments = inbound_payments.lock().unwrap();
-                       match payments.entry(payment_hash) {
+                       let mut inbound = inbound_payments.lock().unwrap();
+                       match inbound.payments.entry(payment_hash) {
                                Entry::Occupied(mut e) => {
                                        let payment = e.get_mut();
                                        payment.status = HTLCStatus::Succeeded;
@@ -216,10 +252,11 @@ async fn handle_ldk_events(
                                        });
                                }
                        }
+                       persister.persist(INBOUND_PAYMENTS_FNAME, &*inbound).unwrap();
                }
                Event::PaymentSent { payment_preimage, payment_hash, fee_paid_msat, .. } => {
-                       let mut payments = outbound_payments.lock().unwrap();
-                       for (hash, payment) in payments.iter_mut() {
+                       let mut outbound = outbound_payments.lock().unwrap();
+                       for (hash, payment) in outbound.payments.iter_mut() {
                                if *hash == payment_hash {
                                        payment.preimage = Some(payment_preimage);
                                        payment.status = HTLCStatus::Succeeded;
@@ -239,6 +276,7 @@ async fn handle_ldk_events(
                                        io::stdout().flush().unwrap();
                                }
                        }
+                       persister.persist(OUTBOUND_PAYMENTS_FNAME, &*outbound).unwrap();
                }
                Event::OpenChannelRequest { .. } => {
                        // Unreachable, we don't set manually_accept_inbound_channels
@@ -256,11 +294,12 @@ async fn handle_ldk_events(
                        print!("> ");
                        io::stdout().flush().unwrap();
 
-                       let mut payments = outbound_payments.lock().unwrap();
-                       if payments.contains_key(&payment_hash) {
-                               let payment = payments.get_mut(&payment_hash).unwrap();
+                       let mut outbound = outbound_payments.lock().unwrap();
+                       if outbound.payments.contains_key(&payment_hash) {
+                               let payment = outbound.payments.get_mut(&payment_hash).unwrap();
                                payment.status = HTLCStatus::Failed;
                        }
+                       persister.persist(OUTBOUND_PAYMENTS_FNAME, &*outbound).unwrap();
                }
                Event::PaymentForwarded {
                        prev_channel_id,
@@ -479,7 +518,7 @@ async fn start_ldk() {
                thread_rng().fill_bytes(&mut key);
                match File::create(keys_seed_path.clone()) {
                        Ok(mut f) => {
-                               f.write_all(&key).expect("Failed to write node keys seed to disk");
+                               Write::write_all(&mut f, &key).expect("Failed to write node keys seed to disk");
                                f.sync_all().expect("Failed to sync node keys seed to disk");
                        }
                        Err(e) => {
@@ -689,9 +728,35 @@ async fn start_ldk() {
                }
        });
 
-       // TODO: persist payment info to disk
-       let inbound_payments: PaymentInfoStorage = Arc::new(Mutex::new(HashMap::new()));
-       let outbound_payments: PaymentInfoStorage = Arc::new(Mutex::new(HashMap::new()));
+       let inbound_payments = Arc::new(Mutex::new(disk::read_payment_info(Path::new(&format!(
+               "{}/{}",
+               ldk_data_dir, INBOUND_PAYMENTS_FNAME
+       )))));
+       let outbound_payments = Arc::new(Mutex::new(disk::read_payment_info(Path::new(&format!(
+               "{}/{}",
+               ldk_data_dir, OUTBOUND_PAYMENTS_FNAME
+       )))));
+       let recent_payments_payment_hashes = channel_manager
+               .list_recent_payments()
+               .into_iter()
+               .filter_map(|p| match p {
+                       RecentPaymentDetails::Pending { payment_hash, .. } => Some(payment_hash),
+                       RecentPaymentDetails::Fulfilled { payment_hash } => payment_hash,
+                       RecentPaymentDetails::Abandoned { payment_hash } => Some(payment_hash),
+               })
+               .collect::<Vec<PaymentHash>>();
+       for (payment_hash, payment_info) in outbound_payments
+               .lock()
+               .unwrap()
+               .payments
+               .iter_mut()
+               .filter(|(_, i)| matches!(i.status, HTLCStatus::Pending))
+       {
+               if !recent_payments_payment_hashes.contains(payment_hash) {
+                       payment_info.status = HTLCStatus::Failed;
+               }
+       }
+       persister.persist(OUTBOUND_PAYMENTS_FNAME, &*outbound_payments.lock().unwrap()).unwrap();
 
        // Step 18: Handle LDK Events
        let channel_manager_event_listener = Arc::clone(&channel_manager);
@@ -716,8 +781,8 @@ async fn start_ldk() {
                                &bitcoind_client_event_listener,
                                &network_graph_event_listener,
                                &keys_manager_event_listener,
-                               &inbound_payments_event_listener,
-                               &outbound_payments_event_listener,
+                               inbound_payments_event_listener,
+                               outbound_payments_event_listener,
                                &persister_event_listener,
                                network,
                                event,
@@ -837,6 +902,7 @@ async fn start_ldk() {
                ldk_data_dir,
                network,
                Arc::clone(&logger),
+               Arc::clone(&persister),
        )
        .await;