mod convert;
mod disk;
mod hex_utils;
+mod sweep;
use crate::bitcoind_client::BitcoindClient;
use crate::disk::FilesystemLogger;
use bitcoin::blockdata::transaction::Transaction;
use bitcoin::consensus::encode;
use bitcoin::network::constants::Network;
-use bitcoin::secp256k1::Secp256k1;
use bitcoin::BlockHash;
use bitcoin_bech32::WitnessProgram;
-use lightning::chain;
-use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
-use lightning::chain::keysinterface::{EntropySource, InMemorySigner, KeysManager};
+use disk::{INBOUND_PAYMENTS_FNAME, OUTBOUND_PAYMENTS_FNAME};
use lightning::chain::{chainmonitor, ChannelMonitorUpdateStatus};
use lightning::chain::{Filter, Watch};
+use lightning::events::bump_transaction::{BumpTransactionEventHandler, Wallet};
use lightning::events::{Event, PaymentFailureReason, PaymentPurpose};
-use lightning::ln::channelmanager;
+use lightning::ln::channelmanager::{self, RecentPaymentDetails};
use lightning::ln::channelmanager::{
ChainParameters, ChannelManagerReadArgs, SimpleArcChannelManager,
};
+use lightning::ln::msgs::DecodeError;
use lightning::ln::peer_handler::{IgnoringMessageHandler, MessageHandler, SimpleArcPeerManager};
use lightning::ln::{PaymentHash, PaymentPreimage, PaymentSecret};
-use lightning::onion_message::SimpleArcOnionMessenger;
+use lightning::onion_message::{DefaultMessageRouter, SimpleArcOnionMessenger};
use lightning::routing::gossip;
use lightning::routing::gossip::{NodeId, P2PGossipSync};
use lightning::routing::router::DefaultRouter;
+use lightning::routing::scoring::ProbabilisticScoringFeeParameters;
+use lightning::sign::{EntropySource, InMemorySigner, KeysManager, SpendableOutputDescriptor};
use lightning::util::config::UserConfig;
-use lightning::util::ser::ReadableArgs;
+use lightning::util::persist::KVStorePersister;
+use lightning::util::ser::{Readable, ReadableArgs, Writeable, Writer};
+use lightning::{chain, impl_writeable_tlv_based, impl_writeable_tlv_based_enum};
use lightning_background_processor::{process_events_async, GossipSync};
use lightning_block_sync::init;
use lightning_block_sync::poll;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
+pub(crate) const PENDING_SPENDABLE_OUTPUT_DIR: &'static str = "pending_spendable_outputs";
+
+#[derive(Copy, Clone)]
pub(crate) enum HTLCStatus {
Pending,
Succeeded,
Failed,
}
+impl_writeable_tlv_based_enum!(HTLCStatus,
+ (0, Pending) => {},
+ (1, Succeeded) => {},
+ (2, Failed) => {};
+);
+
pub(crate) struct MillisatAmount(Option<u64>);
impl fmt::Display for MillisatAmount {
}
}
+impl Readable for MillisatAmount {
+ fn read<R: io::Read>(r: &mut R) -> Result<Self, DecodeError> {
+ let amt: Option<u64> = Readable::read(r)?;
+ Ok(MillisatAmount(amt))
+ }
+}
+
+impl Writeable for MillisatAmount {
+ fn write<W: Writer>(&self, w: &mut W) -> Result<(), std::io::Error> {
+ self.0.write(w)
+ }
+}
+
pub(crate) struct PaymentInfo {
preimage: Option<PaymentPreimage>,
secret: Option<PaymentSecret>,
amt_msat: MillisatAmount,
}
-pub(crate) type PaymentInfoStorage = Arc<Mutex<HashMap<PaymentHash, PaymentInfo>>>;
+impl_writeable_tlv_based!(PaymentInfo, {
+ (0, preimage, required),
+ (2, secret, required),
+ (4, status, required),
+ (6, amt_msat, required),
+});
+
+pub(crate) struct PaymentInfoStorage {
+ payments: HashMap<PaymentHash, PaymentInfo>,
+}
+
+impl_writeable_tlv_based!(PaymentInfoStorage, {
+ (0, payments, required),
+});
type ChainMonitor = chainmonitor::ChainMonitor<
InMemorySigner,
type OnionMessenger = SimpleArcOnionMessenger<FilesystemLogger>;
+pub(crate) type BumpTxEventHandler = BumpTransactionEventHandler<
+ Arc<BitcoindClient>,
+ Arc<Wallet<Arc<BitcoindClient>, Arc<FilesystemLogger>>>,
+ Arc<KeysManager>,
+ Arc<FilesystemLogger>,
+>;
+
async fn handle_ldk_events(
channel_manager: &Arc<ChannelManager>, bitcoind_client: &BitcoindClient,
network_graph: &NetworkGraph, keys_manager: &KeysManager,
- inbound_payments: &PaymentInfoStorage, outbound_payments: &PaymentInfoStorage,
+ bump_tx_event_handler: &BumpTxEventHandler, inbound_payments: Arc<Mutex<PaymentInfoStorage>>,
+ outbound_payments: Arc<Mutex<PaymentInfoStorage>>, persister: &Arc<FilesystemPersister>,
network: Network, event: Event,
) {
match event {
via_user_channel_id: _,
claim_deadline: _,
onion_fields: _,
+ counterparty_skimmed_fee_msat: _,
} => {
println!(
"\nEVENT: received payment from payment hash {} of {} millisatoshis",
}
PaymentPurpose::SpontaneousPayment(preimage) => (Some(preimage), None),
};
- let mut payments = inbound_payments.lock().unwrap();
- match payments.entry(payment_hash) {
+ let mut inbound = inbound_payments.lock().unwrap();
+ match inbound.payments.entry(payment_hash) {
Entry::Occupied(mut e) => {
let payment = e.get_mut();
payment.status = HTLCStatus::Succeeded;
});
}
}
+ persister.persist(INBOUND_PAYMENTS_FNAME, &*inbound).unwrap();
}
Event::PaymentSent { payment_preimage, payment_hash, fee_paid_msat, .. } => {
- let mut payments = outbound_payments.lock().unwrap();
- for (hash, payment) in payments.iter_mut() {
+ let mut outbound = outbound_payments.lock().unwrap();
+ for (hash, payment) in outbound.payments.iter_mut() {
if *hash == payment_hash {
payment.preimage = Some(payment_preimage);
payment.status = HTLCStatus::Succeeded;
io::stdout().flush().unwrap();
}
}
+ persister.persist(OUTBOUND_PAYMENTS_FNAME, &*outbound).unwrap();
}
- Event::OpenChannelRequest { .. } => {
- // Unreachable, we don't set manually_accept_inbound_channels
+ Event::OpenChannelRequest {
+ ref temporary_channel_id, ref counterparty_node_id, ..
+ } => {
+ let mut random_bytes = [0u8; 16];
+ random_bytes.copy_from_slice(&keys_manager.get_secure_random_bytes()[..16]);
+ let user_channel_id = u128::from_be_bytes(random_bytes);
+ let res = channel_manager.accept_inbound_channel(
+ temporary_channel_id,
+ counterparty_node_id,
+ user_channel_id,
+ );
+
+ if let Err(e) = res {
+ print!(
+ "\nEVENT: Failed to accept inbound channel ({}) from {}: {:?}",
+ hex_utils::hex_str(&temporary_channel_id[..]),
+ hex_utils::hex_str(&counterparty_node_id.serialize()),
+ e,
+ );
+ } else {
+ print!(
+ "\nEVENT: Accepted inbound channel ({}) from {}",
+ hex_utils::hex_str(&temporary_channel_id[..]),
+ hex_utils::hex_str(&counterparty_node_id.serialize()),
+ );
+ }
+ print!("> ");
+ io::stdout().flush().unwrap();
}
Event::PaymentPathSuccessful { .. } => {}
Event::PaymentPathFailed { .. } => {}
print!("> ");
io::stdout().flush().unwrap();
- let mut payments = outbound_payments.lock().unwrap();
- if payments.contains_key(&payment_hash) {
- let payment = payments.get_mut(&payment_hash).unwrap();
+ let mut outbound = outbound_payments.lock().unwrap();
+ if outbound.payments.contains_key(&payment_hash) {
+ let payment = outbound.payments.get_mut(&payment_hash).unwrap();
payment.status = HTLCStatus::Failed;
}
+ persister.persist(OUTBOUND_PAYMENTS_FNAME, &*outbound).unwrap();
}
Event::PaymentForwarded {
prev_channel_id,
});
}
Event::SpendableOutputs { outputs } => {
- let destination_address = bitcoind_client.get_new_address().await;
- let output_descriptors = &outputs.iter().map(|a| a).collect::<Vec<_>>();
- let tx_feerate =
- bitcoind_client.get_est_sat_per_1000_weight(ConfirmationTarget::Normal);
- let spending_tx = keys_manager
- .spend_spendable_outputs(
- output_descriptors,
- Vec::new(),
- destination_address.script_pubkey(),
- tx_feerate,
- &Secp256k1::new(),
- )
- .unwrap();
- bitcoind_client.broadcast_transaction(&spending_tx);
+ // SpendableOutputDescriptors, of which outputs is a vec of, are critical to keep track
+ // of! While a `StaticOutput` descriptor is just an output to a static, well-known key,
+ // other descriptors are not currently ever regenerated for you by LDK. Once we return
+ // from this method, the descriptor will be gone, and you may lose track of some funds.
+ //
+ // Here we simply persist them to disk, with a background task running which will try
+ // to spend them regularly (possibly duplicatively/RBF'ing them). These can just be
+ // treated as normal funds where possible - they are only spendable by us and there is
+ // no rush to claim them.
+ for output in outputs {
+ let key = hex_utils::hex_str(&keys_manager.get_secure_random_bytes());
+ // Note that if the type here changes our read code needs to change as well.
+ let output: SpendableOutputDescriptor = output;
+ persister
+ .persist(&format!("{}/{}", PENDING_SPENDABLE_OUTPUT_DIR, key), &output)
+ .unwrap();
+ }
}
Event::ChannelPending { channel_id, counterparty_node_id, .. } => {
println!(
// the funding transaction either confirms, or this event is generated.
}
Event::HTLCIntercepted { .. } => {}
+ Event::BumpTransaction(event) => bump_tx_event_handler.handle_event(&event),
}
}
thread_rng().fill_bytes(&mut key);
match File::create(keys_seed_path.clone()) {
Ok(mut f) => {
- f.write_all(&key).expect("Failed to write node keys seed to disk");
+ Write::write_all(&mut f, &key).expect("Failed to write node keys seed to disk");
f.sync_all().expect("Failed to sync node keys seed to disk");
}
Err(e) => {
let cur = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap();
let keys_manager = Arc::new(KeysManager::new(&keys_seed, cur.as_secs(), cur.subsec_nanos()));
+ let bump_tx_event_handler = Arc::new(BumpTransactionEventHandler::new(
+ Arc::clone(&broadcaster),
+ Arc::new(Wallet::new(Arc::clone(&bitcoind_client), Arc::clone(&logger))),
+ Arc::clone(&keys_manager),
+ Arc::clone(&logger),
+ ));
+
// Step 7: Read ChannelMonitor state from disk
let mut channelmonitors =
persister.read_channelmonitors(keys_manager.clone(), keys_manager.clone()).unwrap();
)));
// Step 10: Create Router
+ let scoring_fee_params = ProbabilisticScoringFeeParameters::default();
let router = Arc::new(DefaultRouter::new(
network_graph.clone(),
logger.clone(),
keys_manager.get_secure_random_bytes(),
scorer.clone(),
+ scoring_fee_params,
));
// Step 11: Initialize the ChannelManager
let mut user_config = UserConfig::default();
user_config.channel_handshake_limits.force_announced_channel_preference = false;
+ user_config.channel_handshake_config.negotiate_anchors_zero_fee_htlc_tx = true;
+ user_config.manually_accept_inbound_channels = true;
let mut restarting_node = true;
let (channel_manager_blockhash, channel_manager) = {
if let Ok(mut f) = fs::File::open(format!("{}/manager", ldk_data_dir.clone())) {
keys_manager.clone(),
user_config,
chain_params,
+ cur.as_secs() as u32,
);
(polled_best_block_hash, fresh_channel_manager)
}
Arc::clone(&keys_manager),
Arc::clone(&keys_manager),
Arc::clone(&logger),
+ Arc::new(DefaultMessageRouter {}),
+ IgnoringMessageHandler {},
IgnoringMessageHandler {},
));
let mut ephemeral_bytes = [0; 32];
chan_handler: channel_manager.clone(),
route_handler: gossip_sync.clone(),
onion_message_handler: onion_messenger.clone(),
+ custom_message_handler: IgnoringMessageHandler {},
};
let peer_manager: Arc<PeerManager> = Arc::new(PeerManager::new(
lightning_msg_handler,
current_time.try_into().unwrap(),
&ephemeral_bytes,
logger.clone(),
- IgnoringMessageHandler {},
Arc::clone(&keys_manager),
));
}
});
- // TODO: persist payment info to disk
- let inbound_payments: PaymentInfoStorage = Arc::new(Mutex::new(HashMap::new()));
- let outbound_payments: PaymentInfoStorage = Arc::new(Mutex::new(HashMap::new()));
+ let inbound_payments = Arc::new(Mutex::new(disk::read_payment_info(Path::new(&format!(
+ "{}/{}",
+ ldk_data_dir, INBOUND_PAYMENTS_FNAME
+ )))));
+ let outbound_payments = Arc::new(Mutex::new(disk::read_payment_info(Path::new(&format!(
+ "{}/{}",
+ ldk_data_dir, OUTBOUND_PAYMENTS_FNAME
+ )))));
+ let recent_payments_payment_hashes = channel_manager
+ .list_recent_payments()
+ .into_iter()
+ .filter_map(|p| match p {
+ RecentPaymentDetails::Pending { payment_hash, .. } => Some(payment_hash),
+ RecentPaymentDetails::Fulfilled { payment_hash } => payment_hash,
+ RecentPaymentDetails::Abandoned { payment_hash } => Some(payment_hash),
+ })
+ .collect::<Vec<PaymentHash>>();
+ for (payment_hash, payment_info) in outbound_payments
+ .lock()
+ .unwrap()
+ .payments
+ .iter_mut()
+ .filter(|(_, i)| matches!(i.status, HTLCStatus::Pending))
+ {
+ if !recent_payments_payment_hashes.contains(payment_hash) {
+ payment_info.status = HTLCStatus::Failed;
+ }
+ }
+ persister.persist(OUTBOUND_PAYMENTS_FNAME, &*outbound_payments.lock().unwrap()).unwrap();
// Step 18: Handle LDK Events
let channel_manager_event_listener = Arc::clone(&channel_manager);
let keys_manager_event_listener = Arc::clone(&keys_manager);
let inbound_payments_event_listener = Arc::clone(&inbound_payments);
let outbound_payments_event_listener = Arc::clone(&outbound_payments);
+ let persister_event_listener = Arc::clone(&persister);
let network = args.network;
let event_handler = move |event: Event| {
let channel_manager_event_listener = Arc::clone(&channel_manager_event_listener);
let bitcoind_client_event_listener = Arc::clone(&bitcoind_client_event_listener);
let network_graph_event_listener = Arc::clone(&network_graph_event_listener);
let keys_manager_event_listener = Arc::clone(&keys_manager_event_listener);
+ let bump_tx_event_handler = Arc::clone(&bump_tx_event_handler);
let inbound_payments_event_listener = Arc::clone(&inbound_payments_event_listener);
let outbound_payments_event_listener = Arc::clone(&outbound_payments_event_listener);
+ let persister_event_listener = Arc::clone(&persister_event_listener);
async move {
handle_ldk_events(
&channel_manager_event_listener,
&bitcoind_client_event_listener,
&network_graph_event_listener,
&keys_manager_event_listener,
- &inbound_payments_event_listener,
- &outbound_payments_event_listener,
+ &bump_tx_event_handler,
+ inbound_payments_event_listener,
+ outbound_payments_event_listener,
+ &persister_event_listener,
network,
event,
)
// Step 20: Background Processing
let (bp_exit, bp_exit_check) = tokio::sync::watch::channel(());
- let background_processor = tokio::spawn(process_events_async(
- persister,
+ let mut background_processor = tokio::spawn(process_events_async(
+ Arc::clone(&persister),
event_handler,
chain_monitor.clone(),
channel_manager.clone(),
let stop_connect = Arc::clone(&stop_listen_connect);
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(1));
+ interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
interval.tick().await;
match disk::read_channel_peer_data(Path::new(&peer_data_path)) {
loop {
interval.tick().await;
// Don't bother trying to announce if we don't have any public channls, though our
- // peers should drop such an announcement anyway.
+ // peers should drop such an announcement anyway. Note that announcement may not
+ // propagate until we have a channel with 6+ confirmations.
if chan_man.list_channels().iter().any(|chan| chan.is_public) {
peer_man.broadcast_node_announcement(
[0; 3],
}
});
- // Start the CLI.
- cli::poll_for_user_input(
- Arc::clone(&peer_manager),
- Arc::clone(&channel_manager),
- Arc::clone(&keys_manager),
- Arc::clone(&network_graph),
- Arc::clone(&onion_messenger),
- inbound_payments,
- outbound_payments,
+ tokio::spawn(sweep::periodic_sweep(
ldk_data_dir.clone(),
- network,
+ Arc::clone(&keys_manager),
Arc::clone(&logger),
- )
- .await;
+ Arc::clone(&persister),
+ Arc::clone(&bitcoind_client),
+ Arc::clone(&channel_manager),
+ ));
+
+ // Start the CLI.
+ let cli_channel_manager = Arc::clone(&channel_manager);
+ let cli_persister = Arc::clone(&persister);
+ let cli_logger = Arc::clone(&logger);
+ let cli_peer_manager = Arc::clone(&peer_manager);
+ let cli_poll = tokio::task::spawn_blocking(move || {
+ cli::poll_for_user_input(
+ cli_peer_manager,
+ cli_channel_manager,
+ keys_manager,
+ network_graph,
+ onion_messenger,
+ inbound_payments,
+ outbound_payments,
+ ldk_data_dir,
+ network,
+ cli_logger,
+ cli_persister,
+ )
+ });
+
+ // Exit if either CLI polling exits or the background processor exits (which shouldn't happen
+ // unless we fail to write to the filesystem).
+ let mut bg_res = Ok(Ok(()));
+ tokio::select! {
+ _ = cli_poll => {},
+ bg_exit = &mut background_processor => {
+ bg_res = bg_exit;
+ },
+ }
// Disconnect our peers and stop accepting new connections. This ensures we don't continue
// updating our channel data after we've stopped the background processor.
stop_listen_connect.store(true, Ordering::Release);
peer_manager.disconnect_all_peers();
+ if let Err(e) = bg_res {
+ let persist_res = persister.persist("manager", &*channel_manager).unwrap();
+ use lightning::util::logger::Logger;
+ lightning::log_error!(
+ &*logger,
+ "Last-ditch ChannelManager persistence result: {:?}",
+ persist_res
+ );
+ panic!(
+ "ERR: background processing stopped with result {:?}, exiting.\n\
+ Last-ditch ChannelManager persistence result {:?}",
+ e, persist_res
+ );
+ }
+
// Stop the background processor.
- bp_exit.send(()).unwrap();
- background_processor.await.unwrap().unwrap();
+ if !bp_exit.is_closed() {
+ bp_exit.send(()).unwrap();
+ background_processor.await.unwrap().unwrap();
+ }
}
#[tokio::main]