f740cd1d604c32e1c379ebcc2a53106b99dc4533
[ldk-sample] / src / main.rs
1 mod bitcoind_client;
2 mod cli;
3 mod utils;
4
5 use background_processor::BackgroundProcessor;
6 use bitcoin::{BlockHash, Txid};
7 use bitcoin::blockdata::constants::genesis_block;
8 use bitcoin::blockdata::transaction::Transaction;
9 use bitcoin::consensus::encode;
10 use bitcoin::hashes::hex::FromHex;
11 use bitcoin::network::constants::Network;
12 use bitcoin::secp256k1::Secp256k1;
13 use bitcoin::util::address::Address;
14 use bitcoin_bech32::WitnessProgram;
15 use crate::bitcoind_client::BitcoindClient;
16 use lightning::chain;
17 use lightning::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
18 use lightning::chain::chainmonitor::ChainMonitor;
19 use lightning::chain::channelmonitor::ChannelMonitor;
20 use lightning::chain::Filter;
21 use lightning::chain::keysinterface::{InMemorySigner, KeysInterface, KeysManager};
22 use lightning::chain::transaction::OutPoint;
23 use lightning::chain::Watch;
24 use lightning::ln::channelmanager;
25 use lightning::ln::channelmanager::{ChannelManagerReadArgs, PaymentHash, PaymentPreimage,
26                                     SimpleArcChannelManager};
27 use lightning::ln::peer_handler::{MessageHandler, SimpleArcPeerManager};
28 use lightning::util::config::UserConfig;
29 use lightning::util::events::{Event, EventsProvider};
30 use lightning::util::logger::{Logger, Record};
31 use lightning::util::ser::{ReadableArgs, Writer};
32 use lightning_block_sync::UnboundedCache;
33 use lightning_block_sync::SpvClient;
34 use lightning_block_sync::http::HttpEndpoint;
35 use lightning_block_sync::init;
36 use lightning_block_sync::poll;
37 use lightning_block_sync::poll::{ChainTip, Poll};
38 use lightning_block_sync::rpc::RpcClient;
39 use lightning_net_tokio::SocketDescriptor;
40 use lightning_persister::FilesystemPersister;
41 use rand::{thread_rng, Rng};
42 use lightning::routing::network_graph::NetGraphMsgHandler;
43 use std::cell::RefCell;
44 use std::collections::HashMap;
45 use std::fs;
46 use std::fs::File;
47 use std::io::Cursor;
48 use std::path::Path;
49 use std::str::FromStr;
50 use std::sync::{Arc, Mutex};
51 use std::thread;
52 use std::time::{Duration, SystemTime};
53 use time::OffsetDateTime;
54 use tokio::runtime::Runtime;
55 use tokio::sync::mpsc;
56
57 const NETWORK: Network = Network::Regtest;
58
59 pub struct FilesystemLogger{}
60 impl Logger for FilesystemLogger {
61           fn log(&self, record: &Record) {
62                     let raw_log = record.args.to_string();
63                           let log = format!("{} {:<5} [{}:{}] {}", OffsetDateTime::now_utc().format("%F %T"),
64                   record.level.to_string(), record.module_path, record.line, raw_log);
65         fs::create_dir_all("logs").unwrap();
66         fs::OpenOptions::new().create(true).append(true).open("./logs/logs.txt").unwrap()
67             .write_all(log.as_bytes()).unwrap();
68           }
69 }
70
71 fn read_channelmonitors_from_disk(path: String, keys_manager: Arc<KeysManager>) ->
72     Result<HashMap<OutPoint, (BlockHash, ChannelMonitor<InMemorySigner>)>, std::io::Error>
73 {
74     if !Path::new(&path).exists() {
75         return Ok(HashMap::new())
76     }
77     let mut outpoint_to_channelmonitor = HashMap::new();
78     for file_option in fs::read_dir(path).unwrap() {
79         let file = file_option.unwrap();
80         let owned_file_name = file.file_name();
81         let filename = owned_file_name.to_str();
82         if !filename.is_some() || !filename.unwrap().is_ascii() || filename.unwrap().len() < 65 {
83             return Err(std::io::Error::new(std::io::ErrorKind::Other, "Invalid ChannelMonitor file name"));
84         }
85
86         let txid = Txid::from_hex(filename.unwrap().split_at(64).0);
87         if txid.is_err() {
88             return Err(std::io::Error::new(std::io::ErrorKind::Other, "Invalid tx ID in filename"));
89         }
90
91         let index = filename.unwrap().split_at(65).1.split('.').next().unwrap().parse();
92         if index.is_err() {
93             return Err(std::io::Error::new(std::io::ErrorKind::Other, "Invalid tx index in filename"));
94         }
95
96         let contents = fs::read(&file.path())?;
97
98         if let Ok((blockhash, channel_monitor)) =
99             <(BlockHash, ChannelMonitor<InMemorySigner>)>::read(&mut Cursor::new(&contents),
100                                                         &*keys_manager)
101         {
102                 outpoint_to_channelmonitor.insert(OutPoint { txid: txid.unwrap(), index: index.unwrap() },
103                                                   (blockhash, channel_monitor));
104             } else {
105                 return Err(std::io::Error::new(std::io::ErrorKind::Other,
106                                            "Failed to deserialize ChannelMonitor"));
107             }
108     }
109     Ok(outpoint_to_channelmonitor)
110 }
111
112 type Invoice = String;
113
114 enum HTLCDirection {
115     Inbound,
116     Outbound
117 }
118
119 type PaymentInfoStorage = Arc<Mutex<HashMap<PaymentHash, (Invoice, Option<PaymentPreimage>, HTLCDirection)>>>;
120
121 type ArcChainMonitor = ChainMonitor<InMemorySigner, Arc<dyn Filter>, Arc<BitcoindClient>,
122 Arc<BitcoindClient>, Arc<FilesystemLogger>, Arc<FilesystemPersister>>;
123
124 pub(crate) type PeerManager = SimpleArcPeerManager<SocketDescriptor, ArcChainMonitor, BitcoindClient,
125 BitcoindClient, dyn chain::Access, FilesystemLogger>;
126
127 pub(crate) type ChannelManager = SimpleArcChannelManager<ArcChainMonitor, BitcoindClient, BitcoindClient,
128 FilesystemLogger>;
129
130
131 fn handle_ldk_events(peer_manager: Arc<PeerManager>, channel_manager: Arc<ChannelManager>,
132                      chain_monitor: Arc<ArcChainMonitor>, bitcoind_rpc_client: Arc<BitcoindClient>,
133                      keys_manager: Arc<KeysManager>, mut pending_txs: HashMap<OutPoint, Transaction>,
134                      htlcs: PaymentInfoStorage) -> HashMap<OutPoint, Transaction>
135 {
136     peer_manager.process_events();
137     let mut check_for_more_events = true;
138     while check_for_more_events {
139         let loop_channel_manager = channel_manager.clone();
140         check_for_more_events = false;
141         let mut events = channel_manager.get_and_clear_pending_events();
142                     events.append(&mut chain_monitor.get_and_clear_pending_events());
143         let mut rpc = bitcoind_rpc_client.bitcoind_rpc_client.lock().unwrap();
144         for event in events {
145                               match event {
146                                         Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis,
147                                                 output_script, .. } => {
148                                                   let addr = WitnessProgram::from_scriptpubkey(&output_script[..], match NETWORK {
149                                                                   Network::Bitcoin => bitcoin_bech32::constants::Network::Bitcoin,
150                                                                   Network::Testnet => bitcoin_bech32::constants::Network::Testnet,
151                                                                   Network::Regtest => bitcoin_bech32::constants::Network::Regtest,
152                                                         }
153                                                   ).expect("Lightning funding tx should always be to a SegWit output").to_address();
154                                                   let outputs = format!("{{\"{}\": {}}}", addr, channel_value_satoshis as f64 / 1_000_000_00.0).to_string();
155                     let tx_hex = rpc.call_method("createrawtransaction", &vec![serde_json::json!(outputs)]).unwrap();
156                     let raw_tx = format!("\"{}\"", tx_hex.as_str().unwrap()).to_string();
157                     let funded_tx = rpc.call_method("fundrawtransaction", &vec![serde_json::json!(raw_tx)]).unwrap();
158                     let change_output_position = funded_tx["changepos"].as_i64().unwrap();
159                                                               assert!(change_output_position == 0 || change_output_position == 1);
160                                                               let funded_tx = format!("\"{}\"", funded_tx["hex"].as_str().unwrap()).to_string();
161                     let signed_tx = rpc.call_method("signrawtransactionwithwallet",
162                                                     &vec![serde_json::json!(funded_tx)]).unwrap();
163                                                                     assert_eq!(signed_tx["complete"].as_bool().unwrap(), true);
164                     let final_tx: Transaction = encode::deserialize(&utils::hex_to_vec(&signed_tx["hex"].as_str().unwrap()).unwrap()).unwrap();
165                                                                     let outpoint = OutPoint {
166                         txid: final_tx.txid(),
167                         index: if change_output_position == 0 { 1 } else { 0 }
168                     };
169                     loop_channel_manager.funding_transaction_generated(&temporary_channel_id, outpoint);
170                     pending_txs.insert(outpoint, final_tx);
171                     check_for_more_events = true;
172                                         },
173                                         Event::FundingBroadcastSafe { funding_txo, .. } => {
174                     let funding_tx = pending_txs.remove(&funding_txo).unwrap();
175                     bitcoind_rpc_client.broadcast_transaction(&funding_tx);
176                                         },
177                                         Event::PaymentReceived { payment_hash, payment_secret, amt: amt_msat } => {
178                     let payment_info = htlcs.lock().unwrap();
179                     if let Some(htlc_info) = payment_info.get(&payment_hash) {
180                                                             assert!(loop_channel_manager.claim_funds(htlc_info.1.unwrap().clone(),
181                                                                  &payment_secret, amt_msat));
182                     } else {
183                         loop_channel_manager.fail_htlc_backwards(&payment_hash, &payment_secret);
184                     }
185                     check_for_more_events = true;
186                                         },
187                                         Event::PaymentSent { payment_preimage } => {
188                     let payment_info = htlcs.lock().unwrap();
189                     for (invoice, preimage_option, _) in payment_info.values() {
190                         if let Some(preimage) = preimage_option {
191                             if payment_preimage == *preimage {
192                                 println!("NEW EVENT: successfully sent payment from invoice {} with preimage {}",
193                                          invoice, utils::hex_str(&payment_preimage.0));
194                             }
195                         }
196                     }
197                                         },
198                                         Event::PaymentFailed { payment_hash, rejected_by_dest } => {
199                     let payment_info = htlcs.lock().unwrap();
200                     let htlc_info = payment_info.get(&payment_hash).unwrap();
201                     print!("NEW EVENT: Failed to send payment to invoice {}:", htlc_info.0);
202                     if rejected_by_dest {
203                         println!("rejected by destination node");
204                     } else {
205                         println!("route failed");
206                     }
207                                         },
208                                         Event::PendingHTLCsForwardable { .. } => {
209                     loop_channel_manager.process_pending_htlc_forwards();
210                     check_for_more_events = true;
211                                         },
212                 Event::SpendableOutputs { outputs } => {
213                     let addr_args = vec![serde_json::json!("LDK output address")];
214                     let destination_address_str = rpc.call_method("getnewaddress", &addr_args).unwrap();
215                     let destination_address = Address::from_str(destination_address_str.as_str().unwrap()).unwrap();
216                     let output_descriptors = &outputs.iter().map(|a| a).collect::<Vec<_>>();
217                     let tx_feerate = bitcoind_rpc_client.get_est_sat_per_1000_weight(ConfirmationTarget::Normal);
218                     let spending_tx = keys_manager.spend_spendable_outputs(output_descriptors,
219                                                                            Vec::new(),
220                                                                            destination_address.script_pubkey(),
221                                                                            tx_feerate, &Secp256k1::new()).unwrap();
222                     bitcoind_rpc_client.broadcast_transaction(&spending_tx);
223                     // XXX maybe need to rescan and blah? but contrary to what matt's saying, it
224                     // looks like spend_spendable's got us covered
225                 }
226             }
227         }
228     }
229     pending_txs
230 }
231
232 fn main() {
233     let bitcoind_host = "127.0.0.1".to_string();
234     let bitcoind_port = 18443;
235     let rpc_user = "polaruser".to_string();
236     let rpc_password = "polarpass".to_string();
237     let bitcoind_client = Arc::new(BitcoindClient::new(bitcoind_host.clone(), bitcoind_port, None,
238                                                        rpc_user.clone(), rpc_password.clone()).unwrap());
239
240     // ## Setup
241     // Step 1: Initialize the FeeEstimator
242     let fee_estimator = bitcoind_client.clone();
243
244     // Step 2: Initialize the Logger
245     let logger = Arc::new(FilesystemLogger{});
246
247     // Step 3: Initialize the BroadcasterInterface
248     let broadcaster = bitcoind_client.clone();
249
250     // Step 4: Initialize Persist
251     let persister = Arc::new(FilesystemPersister::new(".".to_string()));
252
253     // Step 5: Initialize the ChainMonitor
254     let chain_monitor: Arc<ArcChainMonitor> = Arc::new(ChainMonitor::new(None, broadcaster.clone(),
255                                                            logger.clone(), fee_estimator.clone(),
256                                                            persister.clone()));
257
258     // Step 6: Initialize the KeysManager
259           let node_privkey = if let Ok(seed) = fs::read("./key_seed") { // the private key that corresponds
260                     assert_eq!(seed.len(), 32);                               // to our lightning node's pubkey
261                     let mut key = [0; 32];
262                     key.copy_from_slice(&seed);
263                     key
264           } else {
265                     let mut key = [0; 32];
266                     thread_rng().fill_bytes(&mut key);
267                     let mut f = File::create("./key_seed").unwrap();
268                     f.write_all(&key).expect("Failed to write seed to disk");
269                     f.sync_all().expect("Failed to sync seed to disk");
270                     key
271           };
272           let cur = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap();
273     let keys_manager = Arc::new(KeysManager::new(&node_privkey, cur.as_secs(), cur.subsec_nanos()));
274
275     // Step 7: Read ChannelMonitor state from disk
276     let mut outpoint_to_channelmonitor = read_channelmonitors_from_disk("./monitors".to_string(),
277                                                                     keys_manager.clone()).unwrap();
278
279     // Step 9: Read ChannelManager state from disk
280     let user_config = UserConfig::default();
281     let mut channel_manager: ChannelManager;
282     let mut channel_manager_last_blockhash: Option<BlockHash> = None;
283     if let Ok(mut f) = fs::File::open("./manager") {
284         let (last_block_hash_option, channel_manager_from_disk) = {
285             let mut channel_monitor_mut_references = Vec::new();
286             for (_, channel_monitor) in outpoint_to_channelmonitor.iter_mut() {
287                 channel_monitor_mut_references.push(&mut channel_monitor.1);
288             }
289             let read_args = ChannelManagerReadArgs::new(keys_manager.clone(), fee_estimator.clone(),
290                                                         chain_monitor.clone(), broadcaster.clone(),
291                                                         logger.clone(), user_config,
292                                                         channel_monitor_mut_references);
293             <(Option<BlockHash>, ChannelManager)>::read(&mut f, read_args).unwrap()
294         };
295         channel_manager = channel_manager_from_disk;
296         channel_manager_last_blockhash = last_block_hash_option;
297     } else {
298         let mut bitcoind_rpc_client = bitcoind_client.bitcoind_rpc_client.lock().unwrap();
299         let current_chain_height: usize = bitcoind_rpc_client
300             .call_method("getblockchaininfo", &vec![]).unwrap()["blocks"].as_u64().unwrap() as usize;
301         channel_manager = channelmanager::ChannelManager::new(Network::Regtest, fee_estimator.clone(),
302                                                        chain_monitor.clone(), broadcaster.clone(),
303                                                        logger.clone(), keys_manager.clone(),
304                                                        user_config, current_chain_height);
305     }
306
307     // Step 10: Sync ChannelMonitors to chain tip if restarting
308     let mut chain_tip = None;
309     let mut chain_listener_channel_monitors = Vec::new();
310     let mut cache = UnboundedCache::new();
311     let rpc_credentials = base64::encode(format!("{}:{}", rpc_user, rpc_password));
312     let mut block_source = RpcClient::new(&rpc_credentials, HttpEndpoint::for_host(bitcoind_host)
313                                           .with_port(bitcoind_port)).unwrap();
314                 let runtime = Runtime::new().expect("Unable to create a runtime");
315     if outpoint_to_channelmonitor.len() > 0 {
316         for (outpoint, blockhash_and_monitor) in outpoint_to_channelmonitor.drain() {
317             let blockhash = blockhash_and_monitor.0;
318             let channel_monitor = blockhash_and_monitor.1;
319             chain_listener_channel_monitors.push((blockhash, (RefCell::new(channel_monitor),
320                                                               broadcaster.clone(), fee_estimator.clone(),
321                                                               logger.clone()), outpoint));
322         }
323
324         let mut chain_listeners = Vec::new();
325         for monitor_listener_info in chain_listener_channel_monitors.iter_mut() {
326             chain_listeners.push((monitor_listener_info.0,
327                                   &mut monitor_listener_info.1 as &mut dyn chain::Listen));
328         }
329         // Because `sync_listeners` is an async function and we want to run it synchronously,
330         // we run it in a tokio Runtime.
331         chain_tip = Some(runtime.block_on(init::sync_listeners(&mut block_source, Network::Regtest,
332                                                                &mut cache, chain_listeners)).unwrap());
333     }
334
335     // Step 11: Give ChannelMonitors to ChainMonitor
336     if chain_listener_channel_monitors.len() > 0 {
337         for item in chain_listener_channel_monitors.drain(..) {
338             let channel_monitor = item.1.0.into_inner();
339             let funding_outpoint = item.2;
340             chain_monitor.watch_channel(funding_outpoint, channel_monitor).unwrap();
341         }
342     }
343
344     // Step 12: Sync ChannelManager to chain tip if restarting
345     if let Some(channel_manager_blockhash) = channel_manager_last_blockhash {
346         let chain_listener = vec![
347             (channel_manager_blockhash, &mut channel_manager as &mut dyn chain::Listen)];
348         chain_tip = Some(runtime.block_on(init::sync_listeners(&mut block_source, Network::Regtest,
349                                                                &mut cache, chain_listener)).unwrap());
350     }
351
352     // Step 13: Optional: Initialize the NetGraphMsgHandler
353     // XXX persist routing data
354     let genesis = genesis_block(Network::Regtest).header.block_hash();
355     let router = Arc::new(NetGraphMsgHandler::new(genesis, None::<Arc<dyn chain::Access>>, logger.clone()));
356
357     // Step 14: Initialize the PeerManager
358     let channel_manager = Arc::new(channel_manager);
359           let mut ephemeral_bytes = [0; 32];
360           rand::thread_rng().fill_bytes(&mut ephemeral_bytes);
361     let lightning_msg_handler = MessageHandler { chan_handler: channel_manager.clone(),
362                                                  route_handler: router.clone() };
363     let peer_manager: Arc<PeerManager> = Arc::new(PeerManager::new(lightning_msg_handler,
364                                                         keys_manager.get_node_secret(),
365                                                         &ephemeral_bytes, logger.clone()));
366
367     // ## Running LDK
368     // Step 15: Initialize LDK Event Handling
369     let (event_ntfn_sender, mut event_ntfn_receiver) = mpsc::channel(2);
370     let peer_manager_event_listener = peer_manager.clone();
371     let channel_manager_event_listener = channel_manager.clone();
372     let chain_monitor_event_listener = chain_monitor.clone();
373     let payment_info: PaymentInfoStorage = Arc::new(Mutex::new(HashMap::new()));
374     let payment_info_for_events = payment_info.clone();
375     thread::spawn(move || async move {
376         let mut pending_txs = HashMap::new();
377         loop {
378             event_ntfn_receiver.recv().await.unwrap();
379             pending_txs = handle_ldk_events(peer_manager_event_listener.clone(),
380                                             channel_manager_event_listener.clone(),
381                                             chain_monitor_event_listener.clone(),
382                                             bitcoind_client.clone(), keys_manager.clone(),
383                                             pending_txs, payment_info_for_events.clone());
384         }
385     });
386
387     // Step 16: Initialize Peer Connection Handling
388     let peer_manager_connection_handler = peer_manager.clone();
389     let event_notifier = event_ntfn_sender.clone();
390     thread::spawn(move || async move {
391               let listener = std::net::TcpListener::bind("0.0.0.0:9735").unwrap();
392         loop {
393             let tcp_stream = listener.accept().unwrap().0;
394             lightning_net_tokio::setup_inbound(peer_manager_connection_handler.clone(),
395                                                event_notifier.clone(), tcp_stream).await;
396         }
397     });
398
399     // Step 17: Connect and Disconnect Blocks
400     let mut chain_poller = poll::ChainPoller::new(&mut block_source, Network::Regtest);
401     if chain_tip.is_none() {
402         match runtime.block_on(chain_poller.poll_chain_tip(None)).unwrap() {
403             ChainTip::Better(header) => chain_tip = Some(header),
404             _ => panic!("Unexpected chain tip")
405         }
406     }
407     let chain_listener = (chain_monitor.clone(), channel_manager.clone());
408     let _spv_client = SpvClient::new(chain_tip.unwrap(), chain_poller, &mut cache, &chain_listener);
409
410     // Step 17 & 18: Initialize ChannelManager persistence & Once Per Minute: ChannelManager's
411     // timer_chan_freshness_every_min() and PeerManager's timer_tick_occurred
412     let persist_channel_manager_callback = move |node: &ChannelManager| {
413         FilesystemPersister::persist_manager("./".to_string(), &*node)
414     };
415     BackgroundProcessor::start(persist_channel_manager_callback, channel_manager.clone(), logger.clone());
416     let peer_manager_processor = peer_manager.clone();
417     thread::spawn(move || {
418         loop {
419             peer_manager_processor.timer_tick_occured();
420             thread::sleep(Duration::new(60, 0));
421         }
422     });
423     cli::poll_for_user_input(peer_manager.clone(), channel_manager.clone(), event_ntfn_sender.clone());
424 }