X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-transaction-sync%2Fsrc%2Fesplora.rs;h=538918ada953f74e8170864be1a20f7a6e3bf7ad;hb=1d2a27d11963d0d37606458169b9e59e5165f2a9;hp=807ef8075005c13ba12a29d99a393aea72328ac3;hpb=f71daed02d159e051e065802155d3ad77edbc124;p=rust-lightning diff --git a/lightning-transaction-sync/src/esplora.rs b/lightning-transaction-sync/src/esplora.rs index 807ef807..538918ad 100644 --- a/lightning-transaction-sync/src/esplora.rs +++ b/lightning-transaction-sync/src/esplora.rs @@ -2,7 +2,7 @@ use crate::error::{TxSyncError, InternalError}; use crate::common::{SyncState, FilterQueue, ConfirmedTx}; use lightning::util::logger::Logger; -use lightning::{log_error, log_given_level, log_info, log_internal, log_debug, log_trace}; +use lightning::{log_error, log_debug, log_trace}; use lightning::chain::WatchedOutput; use lightning::chain::{Confirm, Filter}; @@ -89,7 +89,11 @@ where #[cfg(feature = "async-interface")] let mut sync_state = self.sync_state.lock().await; - log_info!(self.logger, "Starting transaction sync."); + log_trace!(self.logger, "Starting transaction sync."); + #[cfg(feature = "time")] + let start_time = std::time::Instant::now(); + let mut num_confirmed = 0; + let mut num_unconfirmed = 0; let mut tip_hash = maybe_await!(self.client.get_tip_hash())?; @@ -110,23 +114,46 @@ where Ok(unconfirmed_txs) => { // Double-check the tip hash. If it changed, a reorg happened since // we started syncing and we need to restart last-minute. - let check_tip_hash = maybe_await!(self.client.get_tip_hash())?; - if check_tip_hash != tip_hash { - tip_hash = check_tip_hash; - continue; + match maybe_await!(self.client.get_tip_hash()) { + Ok(check_tip_hash) => { + if check_tip_hash != tip_hash { + tip_hash = check_tip_hash; + + log_debug!(self.logger, "Encountered inconsistency during transaction sync, restarting."); + sync_state.pending_sync = true; + continue; + } + num_unconfirmed += unconfirmed_txs.len(); + sync_state.sync_unconfirmed_transactions( + &confirmables, + unconfirmed_txs + ); + } + Err(err) => { + // (Semi-)permanent failure, retry later. + log_error!(self.logger, + "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.", + num_confirmed, + num_unconfirmed + ); + sync_state.pending_sync = true; + return Err(TxSyncError::from(err)); + } } - - self.sync_unconfirmed_transactions(&mut sync_state, &confirmables, unconfirmed_txs); }, Err(err) => { // (Semi-)permanent failure, retry later. - log_error!(self.logger, "Failed during transaction sync, aborting."); + log_error!(self.logger, + "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.", + num_confirmed, + num_unconfirmed + ); sync_state.pending_sync = true; return Err(TxSyncError::from(err)); } } - match maybe_await!(self.sync_best_block_updated(&confirmables, &tip_hash)) { + match maybe_await!(self.sync_best_block_updated(&confirmables, &mut sync_state, &tip_hash)) { Ok(()) => {} Err(InternalError::Inconsistency) => { // Immediately restart syncing when we encounter any inconsistencies. @@ -136,6 +163,11 @@ where } Err(err) => { // (Semi-)permanent failure, retry later. + log_error!(self.logger, + "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.", + num_confirmed, + num_unconfirmed + ); sync_state.pending_sync = true; return Err(TxSyncError::from(err)); } @@ -146,17 +178,33 @@ where Ok(confirmed_txs) => { // Double-check the tip hash. If it changed, a reorg happened since // we started syncing and we need to restart last-minute. - let check_tip_hash = maybe_await!(self.client.get_tip_hash())?; - if check_tip_hash != tip_hash { - tip_hash = check_tip_hash; - continue; + match maybe_await!(self.client.get_tip_hash()) { + Ok(check_tip_hash) => { + if check_tip_hash != tip_hash { + tip_hash = check_tip_hash; + + log_debug!(self.logger, + "Encountered inconsistency during transaction sync, restarting."); + sync_state.pending_sync = true; + continue; + } + num_confirmed += confirmed_txs.len(); + sync_state.sync_confirmed_transactions( + &confirmables, + confirmed_txs + ); + } + Err(err) => { + // (Semi-)permanent failure, retry later. + log_error!(self.logger, + "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.", + num_confirmed, + num_unconfirmed + ); + sync_state.pending_sync = true; + return Err(TxSyncError::from(err)); + } } - - self.sync_confirmed_transactions( - &mut sync_state, - &confirmables, - confirmed_txs, - ); } Err(InternalError::Inconsistency) => { // Immediately restart syncing when we encounter any inconsistencies. @@ -166,7 +214,11 @@ where } Err(err) => { // (Semi-)permanent failure, retry later. - log_error!(self.logger, "Failed during transaction sync, aborting."); + log_error!(self.logger, + "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.", + num_confirmed, + num_unconfirmed + ); sync_state.pending_sync = true; return Err(TxSyncError::from(err)); } @@ -175,13 +227,18 @@ where sync_state.pending_sync = false; } } - log_info!(self.logger, "Finished transaction sync."); + #[cfg(feature = "time")] + log_debug!(self.logger, "Finished transaction sync at tip {} in {}ms: {} confirmed, {} unconfirmed.", + tip_hash, start_time.elapsed().as_millis(), num_confirmed, num_unconfirmed); + #[cfg(not(feature = "time"))] + log_debug!(self.logger, "Finished transaction sync at tip {}: {} confirmed, {} unconfirmed.", + tip_hash, num_confirmed, num_unconfirmed); Ok(()) } #[maybe_async] fn sync_best_block_updated( - &self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, tip_hash: &BlockHash, + &self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, sync_state: &mut SyncState, tip_hash: &BlockHash, ) -> Result<(), InternalError> { // Inform the interface of the new block. @@ -192,6 +249,9 @@ where for c in confirmables { c.best_block_updated(&tip_header, tip_height); } + + // Prune any sufficiently confirmed output spends + sync_state.prune_output_spends(tip_height); } } else { return Err(InternalError::Inconsistency); @@ -199,26 +259,6 @@ where Ok(()) } - fn sync_confirmed_transactions( - &self, sync_state: &mut SyncState, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, confirmed_txs: Vec, - ) { - for ctx in confirmed_txs { - for c in confirmables { - c.transactions_confirmed( - &ctx.block_header, - &[(ctx.pos, &ctx.tx)], - ctx.block_height, - ); - } - - sync_state.watched_transactions.remove(&ctx.tx.txid()); - - for input in &ctx.tx.input { - sync_state.watched_outputs.remove(&input.previous_output); - } - } - } - #[maybe_async] fn get_confirmed_transactions( &self, sync_state: &SyncState, @@ -227,10 +267,13 @@ where // First, check the confirmation status of registered transactions as well as the // status of dependent transactions of registered outputs. - let mut confirmed_txs = Vec::new(); + let mut confirmed_txs: Vec = Vec::new(); for txid in &sync_state.watched_transactions { - if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(&txid, None, None))? { + if confirmed_txs.iter().any(|ctx| ctx.txid == *txid) { + continue; + } + if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(*txid, None, None))? { confirmed_txs.push(confirmed_tx); } } @@ -241,9 +284,19 @@ where { if let Some(spending_txid) = output_status.txid { if let Some(spending_tx_status) = output_status.status { + if confirmed_txs.iter().any(|ctx| ctx.txid == spending_txid) { + if spending_tx_status.confirmed { + // Skip inserting duplicate ConfirmedTx entry + continue; + } else { + log_trace!(self.logger, "Inconsistency: Detected previously-confirmed Tx {} as unconfirmed", spending_txid); + return Err(InternalError::Inconsistency); + } + } + if let Some(confirmed_tx) = maybe_await!(self .get_confirmed_tx( - &spending_txid, + spending_txid, spending_tx_status.block_hash, spending_tx_status.block_height, ))? @@ -266,7 +319,7 @@ where #[maybe_async] fn get_confirmed_tx( - &self, txid: &Txid, expected_block_hash: Option, known_block_height: Option, + &self, txid: Txid, expected_block_hash: Option, known_block_height: Option, ) -> Result, InternalError> { if let Some(merkle_block) = maybe_await!(self.client.get_merkle_block(&txid))? { let block_header = merkle_block.header; @@ -281,21 +334,27 @@ where let mut matches = Vec::new(); let mut indexes = Vec::new(); let _ = merkle_block.txn.extract_matches(&mut matches, &mut indexes); - if indexes.len() != 1 || matches.len() != 1 || matches[0] != *txid { + if indexes.len() != 1 || matches.len() != 1 || matches[0] != txid { log_error!(self.logger, "Retrieved Merkle block for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid); return Err(InternalError::Failed); } - let pos = *indexes.get(0).ok_or(InternalError::Failed)? as usize; + // unwrap() safety: len() > 0 is checked above + let pos = *indexes.first().unwrap() as usize; if let Some(tx) = maybe_await!(self.client.get_tx(&txid))? { + if tx.txid() != txid { + log_error!(self.logger, "Retrieved transaction for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid); + return Err(InternalError::Failed); + } + if let Some(block_height) = known_block_height { // We can take a shortcut here if a previous call already gave us the height. - return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height })); + return Ok(Some(ConfirmedTx { tx, txid, block_header, pos, block_height })); } let block_status = maybe_await!(self.client.get_block_status(&block_hash))?; if let Some(block_height) = block_status.height { - return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height })); + return Ok(Some(ConfirmedTx { tx, txid, block_header, pos, block_height })); } else { // If any previously-confirmed block suddenly is no longer confirmed, we found // an inconsistency and should start over. @@ -316,11 +375,11 @@ where let relevant_txids = confirmables .iter() .flat_map(|c| c.get_relevant_txids()) - .collect::)>>(); + .collect::)>>(); let mut unconfirmed_txs = Vec::new(); - for (txid, block_hash_opt) in relevant_txids { + for (txid, _conf_height, block_hash_opt) in relevant_txids { if let Some(block_hash) = block_hash_opt { let block_status = maybe_await!(self.client.get_block_status(&block_hash))?; if block_status.in_best_chain { @@ -337,18 +396,6 @@ where Ok(unconfirmed_txs) } - fn sync_unconfirmed_transactions( - &self, sync_state: &mut SyncState, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, unconfirmed_txs: Vec, - ) { - for txid in unconfirmed_txs { - for c in confirmables { - c.transaction_unconfirmed(&txid); - } - - sync_state.watched_transactions.insert(txid); - } - } - /// Returns a reference to the underlying esplora client. pub fn client(&self) -> &EsploraClientType { &self.client