X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=src%2Fmain.rs;h=f740cd1d604c32e1c379ebcc2a53106b99dc4533;hb=1b034d63a4cb4e0b249ae6240166513b6be76737;hp=d8b0f23b415305d6a4c07c7889067c782ba0c1ad;hpb=a6962463ad036f472c03a67975fd94ba4350f63b;p=ldk-sample diff --git a/src/main.rs b/src/main.rs index d8b0f23..f740cd1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,60 +1,232 @@ -use base64; -use serde_json; +mod bitcoind_client; +mod cli; +mod utils; -use lightning::chain::chaininterface::{ConfirmationTarget, FeeEstimator}; +use background_processor::BackgroundProcessor; +use bitcoin::{BlockHash, Txid}; +use bitcoin::blockdata::constants::genesis_block; +use bitcoin::blockdata::transaction::Transaction; +use bitcoin::consensus::encode; +use bitcoin::hashes::hex::FromHex; +use bitcoin::network::constants::Network; +use bitcoin::secp256k1::Secp256k1; +use bitcoin::util::address::Address; +use bitcoin_bech32::WitnessProgram; +use crate::bitcoind_client::BitcoindClient; +use lightning::chain; +use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator}; +use lightning::chain::chainmonitor::ChainMonitor; +use lightning::chain::channelmonitor::ChannelMonitor; +use lightning::chain::Filter; +use lightning::chain::keysinterface::{InMemorySigner, KeysInterface, KeysManager}; +use lightning::chain::transaction::OutPoint; +use lightning::chain::Watch; +use lightning::ln::channelmanager; +use lightning::ln::channelmanager::{ChannelManagerReadArgs, PaymentHash, PaymentPreimage, + SimpleArcChannelManager}; +use lightning::ln::peer_handler::{MessageHandler, SimpleArcPeerManager}; +use lightning::util::config::UserConfig; +use lightning::util::events::{Event, EventsProvider}; use lightning::util::logger::{Logger, Record}; +use lightning::util::ser::{ReadableArgs, Writer}; +use lightning_block_sync::UnboundedCache; +use lightning_block_sync::SpvClient; use lightning_block_sync::http::HttpEndpoint; +use lightning_block_sync::init; +use lightning_block_sync::poll; +use lightning_block_sync::poll::{ChainTip, Poll}; use lightning_block_sync::rpc::RpcClient; +use lightning_net_tokio::SocketDescriptor; +use lightning_persister::FilesystemPersister; +use rand::{thread_rng, Rng}; +use lightning::routing::network_graph::NetGraphMsgHandler; +use std::cell::RefCell; +use std::collections::HashMap; +use std::fs; +use std::fs::File; +use std::io::Cursor; +use std::path::Path; +use std::str::FromStr; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::{Duration, SystemTime}; +use time::OffsetDateTime; +use tokio::runtime::Runtime; +use tokio::sync::mpsc; -use std::sync::Mutex; +const NETWORK: Network = Network::Regtest; -pub struct BitcoindFeeEstimator { - bitcoind_rpc_client: Mutex, +pub struct FilesystemLogger{} +impl Logger for FilesystemLogger { + fn log(&self, record: &Record) { + let raw_log = record.args.to_string(); + let log = format!("{} {:<5} [{}:{}] {}", OffsetDateTime::now_utc().format("%F %T"), + record.level.to_string(), record.module_path, record.line, raw_log); + fs::create_dir_all("logs").unwrap(); + fs::OpenOptions::new().create(true).append(true).open("./logs/logs.txt").unwrap() + .write_all(log.as_bytes()).unwrap(); + } } -impl BitcoindFeeEstimator { - fn new(host: String, port: u16, path: Option, rpc_user: String, rpc_password: String) -> - std::io::Result - { - let mut http_endpoint = HttpEndpoint::for_host(host).with_port(port); - if let Some(p) = path { - http_endpoint = http_endpoint.with_path(p); +fn read_channelmonitors_from_disk(path: String, keys_manager: Arc) -> + Result)>, std::io::Error> +{ + if !Path::new(&path).exists() { + return Ok(HashMap::new()) + } + let mut outpoint_to_channelmonitor = HashMap::new(); + for file_option in fs::read_dir(path).unwrap() { + let file = file_option.unwrap(); + let owned_file_name = file.file_name(); + let filename = owned_file_name.to_str(); + if !filename.is_some() || !filename.unwrap().is_ascii() || filename.unwrap().len() < 65 { + return Err(std::io::Error::new(std::io::ErrorKind::Other, "Invalid ChannelMonitor file name")); + } + + let txid = Txid::from_hex(filename.unwrap().split_at(64).0); + if txid.is_err() { + return Err(std::io::Error::new(std::io::ErrorKind::Other, "Invalid tx ID in filename")); } - let rpc_credentials = base64::encode(format!("{}:{}", rpc_user, rpc_password)); - let bitcoind_rpc_client = RpcClient::new(&rpc_credentials, http_endpoint)?; - Ok(Self { - bitcoind_rpc_client: Mutex::new(bitcoind_rpc_client) - }) + + let index = filename.unwrap().split_at(65).1.split('.').next().unwrap().parse(); + if index.is_err() { + return Err(std::io::Error::new(std::io::ErrorKind::Other, "Invalid tx index in filename")); + } + + let contents = fs::read(&file.path())?; + + if let Ok((blockhash, channel_monitor)) = + <(BlockHash, ChannelMonitor)>::read(&mut Cursor::new(&contents), + &*keys_manager) + { + outpoint_to_channelmonitor.insert(OutPoint { txid: txid.unwrap(), index: index.unwrap() }, + (blockhash, channel_monitor)); + } else { + return Err(std::io::Error::new(std::io::ErrorKind::Other, + "Failed to deserialize ChannelMonitor")); + } } + Ok(outpoint_to_channelmonitor) +} + +type Invoice = String; + +enum HTLCDirection { + Inbound, + Outbound } -impl FeeEstimator for BitcoindFeeEstimator { - fn get_est_sat_per_1000_weight(&self, confirmation_target: ConfirmationTarget) -> u32 { - let mut rpc_client_guard = self.bitcoind_rpc_client.lock().unwrap(); - match confirmation_target { - ConfirmationTarget::Background => { - let conf_target = serde_json::json!(144); - let estimate_mode = serde_json::json!("ECONOMICAL"); - let resp = rpc_client_guard.call_method("estimatesmartfee", - &vec![conf_target, estimate_mode]).unwrap(); - resp["feerate"].as_u64().unwrap() as u32 - }, - ConfirmationTarget::Normal => { - let conf_target = serde_json::json!(18); - let estimate_mode = serde_json::json!("ECONOMICAL"); - let resp = rpc_client_guard.call_method("estimatesmartfee", - &vec![conf_target, estimate_mode]).unwrap(); - resp["feerate"].as_u64().unwrap() as u32 - }, - ConfirmationTarget::HighPriority => { - let conf_target = serde_json::json!(6); - let estimate_mode = serde_json::json!("CONSERVATIVE"); - let resp = rpc_client_guard.call_method("estimatesmartfee", - &vec![conf_target, estimate_mode]).unwrap(); - resp["feerate"].as_u64().unwrap() as u32 - }, +type PaymentInfoStorage = Arc, HTLCDirection)>>>; + +type ArcChainMonitor = ChainMonitor, Arc, +Arc, Arc, Arc>; + +pub(crate) type PeerManager = SimpleArcPeerManager; + +pub(crate) type ChannelManager = SimpleArcChannelManager; + + +fn handle_ldk_events(peer_manager: Arc, channel_manager: Arc, + chain_monitor: Arc, bitcoind_rpc_client: Arc, + keys_manager: Arc, mut pending_txs: HashMap, + htlcs: PaymentInfoStorage) -> HashMap +{ + peer_manager.process_events(); + let mut check_for_more_events = true; + while check_for_more_events { + let loop_channel_manager = channel_manager.clone(); + check_for_more_events = false; + let mut events = channel_manager.get_and_clear_pending_events(); + events.append(&mut chain_monitor.get_and_clear_pending_events()); + let mut rpc = bitcoind_rpc_client.bitcoind_rpc_client.lock().unwrap(); + for event in events { + match event { + Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, + output_script, .. } => { + let addr = WitnessProgram::from_scriptpubkey(&output_script[..], match NETWORK { + Network::Bitcoin => bitcoin_bech32::constants::Network::Bitcoin, + Network::Testnet => bitcoin_bech32::constants::Network::Testnet, + Network::Regtest => bitcoin_bech32::constants::Network::Regtest, + } + ).expect("Lightning funding tx should always be to a SegWit output").to_address(); + let outputs = format!("{{\"{}\": {}}}", addr, channel_value_satoshis as f64 / 1_000_000_00.0).to_string(); + let tx_hex = rpc.call_method("createrawtransaction", &vec![serde_json::json!(outputs)]).unwrap(); + let raw_tx = format!("\"{}\"", tx_hex.as_str().unwrap()).to_string(); + let funded_tx = rpc.call_method("fundrawtransaction", &vec![serde_json::json!(raw_tx)]).unwrap(); + let change_output_position = funded_tx["changepos"].as_i64().unwrap(); + assert!(change_output_position == 0 || change_output_position == 1); + let funded_tx = format!("\"{}\"", funded_tx["hex"].as_str().unwrap()).to_string(); + let signed_tx = rpc.call_method("signrawtransactionwithwallet", + &vec![serde_json::json!(funded_tx)]).unwrap(); + assert_eq!(signed_tx["complete"].as_bool().unwrap(), true); + let final_tx: Transaction = encode::deserialize(&utils::hex_to_vec(&signed_tx["hex"].as_str().unwrap()).unwrap()).unwrap(); + let outpoint = OutPoint { + txid: final_tx.txid(), + index: if change_output_position == 0 { 1 } else { 0 } + }; + loop_channel_manager.funding_transaction_generated(&temporary_channel_id, outpoint); + pending_txs.insert(outpoint, final_tx); + check_for_more_events = true; + }, + Event::FundingBroadcastSafe { funding_txo, .. } => { + let funding_tx = pending_txs.remove(&funding_txo).unwrap(); + bitcoind_rpc_client.broadcast_transaction(&funding_tx); + }, + Event::PaymentReceived { payment_hash, payment_secret, amt: amt_msat } => { + let payment_info = htlcs.lock().unwrap(); + if let Some(htlc_info) = payment_info.get(&payment_hash) { + assert!(loop_channel_manager.claim_funds(htlc_info.1.unwrap().clone(), + &payment_secret, amt_msat)); + } else { + loop_channel_manager.fail_htlc_backwards(&payment_hash, &payment_secret); + } + check_for_more_events = true; + }, + Event::PaymentSent { payment_preimage } => { + let payment_info = htlcs.lock().unwrap(); + for (invoice, preimage_option, _) in payment_info.values() { + if let Some(preimage) = preimage_option { + if payment_preimage == *preimage { + println!("NEW EVENT: successfully sent payment from invoice {} with preimage {}", + invoice, utils::hex_str(&payment_preimage.0)); + } + } + } + }, + Event::PaymentFailed { payment_hash, rejected_by_dest } => { + let payment_info = htlcs.lock().unwrap(); + let htlc_info = payment_info.get(&payment_hash).unwrap(); + print!("NEW EVENT: Failed to send payment to invoice {}:", htlc_info.0); + if rejected_by_dest { + println!("rejected by destination node"); + } else { + println!("route failed"); + } + }, + Event::PendingHTLCsForwardable { .. } => { + loop_channel_manager.process_pending_htlc_forwards(); + check_for_more_events = true; + }, + Event::SpendableOutputs { outputs } => { + let addr_args = vec![serde_json::json!("LDK output address")]; + let destination_address_str = rpc.call_method("getnewaddress", &addr_args).unwrap(); + let destination_address = Address::from_str(destination_address_str.as_str().unwrap()).unwrap(); + let output_descriptors = &outputs.iter().map(|a| a).collect::>(); + let tx_feerate = bitcoind_rpc_client.get_est_sat_per_1000_weight(ConfirmationTarget::Normal); + let spending_tx = keys_manager.spend_spendable_outputs(output_descriptors, + Vec::new(), + destination_address.script_pubkey(), + tx_feerate, &Secp256k1::new()).unwrap(); + bitcoind_rpc_client.broadcast_transaction(&spending_tx); + // XXX maybe need to rescan and blah? but contrary to what matt's saying, it + // looks like spend_spendable's got us covered + } + } } } + pending_txs } fn main() { @@ -62,7 +234,191 @@ fn main() { let bitcoind_port = 18443; let rpc_user = "polaruser".to_string(); let rpc_password = "polarpass".to_string(); - let fee_estimator = BitcoindFeeEstimator::new(bitcoind_host, bitcoind_port, None, rpc_user, rpc_password).unwrap(); - let normal_fee = fee_estimator.get_est_sat_per_1000_weight(ConfirmationTarget::Normal); - println!("VMW: {}", normal_fee); + let bitcoind_client = Arc::new(BitcoindClient::new(bitcoind_host.clone(), bitcoind_port, None, + rpc_user.clone(), rpc_password.clone()).unwrap()); + + // ## Setup + // Step 1: Initialize the FeeEstimator + let fee_estimator = bitcoind_client.clone(); + + // Step 2: Initialize the Logger + let logger = Arc::new(FilesystemLogger{}); + + // Step 3: Initialize the BroadcasterInterface + let broadcaster = bitcoind_client.clone(); + + // Step 4: Initialize Persist + let persister = Arc::new(FilesystemPersister::new(".".to_string())); + + // Step 5: Initialize the ChainMonitor + let chain_monitor: Arc = Arc::new(ChainMonitor::new(None, broadcaster.clone(), + logger.clone(), fee_estimator.clone(), + persister.clone())); + + // Step 6: Initialize the KeysManager + let node_privkey = if let Ok(seed) = fs::read("./key_seed") { // the private key that corresponds + assert_eq!(seed.len(), 32); // to our lightning node's pubkey + let mut key = [0; 32]; + key.copy_from_slice(&seed); + key + } else { + let mut key = [0; 32]; + thread_rng().fill_bytes(&mut key); + let mut f = File::create("./key_seed").unwrap(); + f.write_all(&key).expect("Failed to write seed to disk"); + f.sync_all().expect("Failed to sync seed to disk"); + key + }; + let cur = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap(); + let keys_manager = Arc::new(KeysManager::new(&node_privkey, cur.as_secs(), cur.subsec_nanos())); + + // Step 7: Read ChannelMonitor state from disk + let mut outpoint_to_channelmonitor = read_channelmonitors_from_disk("./monitors".to_string(), + keys_manager.clone()).unwrap(); + + // Step 9: Read ChannelManager state from disk + let user_config = UserConfig::default(); + let mut channel_manager: ChannelManager; + let mut channel_manager_last_blockhash: Option = None; + if let Ok(mut f) = fs::File::open("./manager") { + let (last_block_hash_option, channel_manager_from_disk) = { + let mut channel_monitor_mut_references = Vec::new(); + for (_, channel_monitor) in outpoint_to_channelmonitor.iter_mut() { + channel_monitor_mut_references.push(&mut channel_monitor.1); + } + let read_args = ChannelManagerReadArgs::new(keys_manager.clone(), fee_estimator.clone(), + chain_monitor.clone(), broadcaster.clone(), + logger.clone(), user_config, + channel_monitor_mut_references); + <(Option, ChannelManager)>::read(&mut f, read_args).unwrap() + }; + channel_manager = channel_manager_from_disk; + channel_manager_last_blockhash = last_block_hash_option; + } else { + let mut bitcoind_rpc_client = bitcoind_client.bitcoind_rpc_client.lock().unwrap(); + let current_chain_height: usize = bitcoind_rpc_client + .call_method("getblockchaininfo", &vec![]).unwrap()["blocks"].as_u64().unwrap() as usize; + channel_manager = channelmanager::ChannelManager::new(Network::Regtest, fee_estimator.clone(), + chain_monitor.clone(), broadcaster.clone(), + logger.clone(), keys_manager.clone(), + user_config, current_chain_height); + } + + // Step 10: Sync ChannelMonitors to chain tip if restarting + let mut chain_tip = None; + let mut chain_listener_channel_monitors = Vec::new(); + let mut cache = UnboundedCache::new(); + let rpc_credentials = base64::encode(format!("{}:{}", rpc_user, rpc_password)); + let mut block_source = RpcClient::new(&rpc_credentials, HttpEndpoint::for_host(bitcoind_host) + .with_port(bitcoind_port)).unwrap(); + let runtime = Runtime::new().expect("Unable to create a runtime"); + if outpoint_to_channelmonitor.len() > 0 { + for (outpoint, blockhash_and_monitor) in outpoint_to_channelmonitor.drain() { + let blockhash = blockhash_and_monitor.0; + let channel_monitor = blockhash_and_monitor.1; + chain_listener_channel_monitors.push((blockhash, (RefCell::new(channel_monitor), + broadcaster.clone(), fee_estimator.clone(), + logger.clone()), outpoint)); + } + + let mut chain_listeners = Vec::new(); + for monitor_listener_info in chain_listener_channel_monitors.iter_mut() { + chain_listeners.push((monitor_listener_info.0, + &mut monitor_listener_info.1 as &mut dyn chain::Listen)); + } + // Because `sync_listeners` is an async function and we want to run it synchronously, + // we run it in a tokio Runtime. + chain_tip = Some(runtime.block_on(init::sync_listeners(&mut block_source, Network::Regtest, + &mut cache, chain_listeners)).unwrap()); + } + + // Step 11: Give ChannelMonitors to ChainMonitor + if chain_listener_channel_monitors.len() > 0 { + for item in chain_listener_channel_monitors.drain(..) { + let channel_monitor = item.1.0.into_inner(); + let funding_outpoint = item.2; + chain_monitor.watch_channel(funding_outpoint, channel_monitor).unwrap(); + } + } + + // Step 12: Sync ChannelManager to chain tip if restarting + if let Some(channel_manager_blockhash) = channel_manager_last_blockhash { + let chain_listener = vec![ + (channel_manager_blockhash, &mut channel_manager as &mut dyn chain::Listen)]; + chain_tip = Some(runtime.block_on(init::sync_listeners(&mut block_source, Network::Regtest, + &mut cache, chain_listener)).unwrap()); + } + + // Step 13: Optional: Initialize the NetGraphMsgHandler + // XXX persist routing data + let genesis = genesis_block(Network::Regtest).header.block_hash(); + let router = Arc::new(NetGraphMsgHandler::new(genesis, None::>, logger.clone())); + + // Step 14: Initialize the PeerManager + let channel_manager = Arc::new(channel_manager); + let mut ephemeral_bytes = [0; 32]; + rand::thread_rng().fill_bytes(&mut ephemeral_bytes); + let lightning_msg_handler = MessageHandler { chan_handler: channel_manager.clone(), + route_handler: router.clone() }; + let peer_manager: Arc = Arc::new(PeerManager::new(lightning_msg_handler, + keys_manager.get_node_secret(), + &ephemeral_bytes, logger.clone())); + + // ## Running LDK + // Step 15: Initialize LDK Event Handling + let (event_ntfn_sender, mut event_ntfn_receiver) = mpsc::channel(2); + let peer_manager_event_listener = peer_manager.clone(); + let channel_manager_event_listener = channel_manager.clone(); + let chain_monitor_event_listener = chain_monitor.clone(); + let payment_info: PaymentInfoStorage = Arc::new(Mutex::new(HashMap::new())); + let payment_info_for_events = payment_info.clone(); + thread::spawn(move || async move { + let mut pending_txs = HashMap::new(); + loop { + event_ntfn_receiver.recv().await.unwrap(); + pending_txs = handle_ldk_events(peer_manager_event_listener.clone(), + channel_manager_event_listener.clone(), + chain_monitor_event_listener.clone(), + bitcoind_client.clone(), keys_manager.clone(), + pending_txs, payment_info_for_events.clone()); + } + }); + + // Step 16: Initialize Peer Connection Handling + let peer_manager_connection_handler = peer_manager.clone(); + let event_notifier = event_ntfn_sender.clone(); + thread::spawn(move || async move { + let listener = std::net::TcpListener::bind("0.0.0.0:9735").unwrap(); + loop { + let tcp_stream = listener.accept().unwrap().0; + lightning_net_tokio::setup_inbound(peer_manager_connection_handler.clone(), + event_notifier.clone(), tcp_stream).await; + } + }); + + // Step 17: Connect and Disconnect Blocks + let mut chain_poller = poll::ChainPoller::new(&mut block_source, Network::Regtest); + if chain_tip.is_none() { + match runtime.block_on(chain_poller.poll_chain_tip(None)).unwrap() { + ChainTip::Better(header) => chain_tip = Some(header), + _ => panic!("Unexpected chain tip") + } + } + let chain_listener = (chain_monitor.clone(), channel_manager.clone()); + let _spv_client = SpvClient::new(chain_tip.unwrap(), chain_poller, &mut cache, &chain_listener); + + // Step 17 & 18: Initialize ChannelManager persistence & Once Per Minute: ChannelManager's + // timer_chan_freshness_every_min() and PeerManager's timer_tick_occurred + let persist_channel_manager_callback = move |node: &ChannelManager| { + FilesystemPersister::persist_manager("./".to_string(), &*node) + }; + BackgroundProcessor::start(persist_channel_manager_callback, channel_manager.clone(), logger.clone()); + let peer_manager_processor = peer_manager.clone(); + thread::spawn(move || { + loop { + peer_manager_processor.timer_tick_occured(); + thread::sleep(Duration::new(60, 0)); + } + }); + cli::poll_for_user_input(peer_manager.clone(), channel_manager.clone(), event_ntfn_sender.clone()); }