use crate::common::{SyncState, FilterQueue, ConfirmedTx};
use lightning::util::logger::Logger;
-use lightning::{log_error, log_info, log_debug, log_trace};
+use lightning::{log_error, log_debug, log_trace};
use lightning::chain::WatchedOutput;
use lightning::chain::{Confirm, Filter};
pub fn new(server_url: String, logger: L) -> Self {
let builder = Builder::new(&server_url);
#[cfg(not(feature = "async-interface"))]
- let client = builder.build_blocking().unwrap();
+ let client = builder.build_blocking();
#[cfg(feature = "async-interface")]
let client = builder.build_async().unwrap();
}
/// Returns a new [`EsploraSyncClient`] object using the given Esplora client.
+ ///
+ /// This is not exported to bindings users as the underlying client from BDK is not exported.
pub fn from_client(client: EsploraClientType, logger: L) -> Self {
let sync_state = MutexType::new(SyncState::new());
let queue = std::sync::Mutex::new(FilterQueue::new());
#[cfg(feature = "async-interface")]
let mut sync_state = self.sync_state.lock().await;
- log_info!(self.logger, "Starting transaction sync.");
+ log_trace!(self.logger, "Starting transaction sync.");
+ #[cfg(feature = "time")]
+ let start_time = std::time::Instant::now();
+ let mut num_confirmed = 0;
+ let mut num_unconfirmed = 0;
let mut tip_hash = maybe_await!(self.client.get_tip_hash())?;
Ok(unconfirmed_txs) => {
// Double-check the tip hash. If it changed, a reorg happened since
// we started syncing and we need to restart last-minute.
- let check_tip_hash = maybe_await!(self.client.get_tip_hash())?;
- if check_tip_hash != tip_hash {
- tip_hash = check_tip_hash;
- continue;
+ match maybe_await!(self.client.get_tip_hash()) {
+ Ok(check_tip_hash) => {
+ if check_tip_hash != tip_hash {
+ tip_hash = check_tip_hash;
+
+ log_debug!(self.logger, "Encountered inconsistency during transaction sync, restarting.");
+ sync_state.pending_sync = true;
+ continue;
+ }
+ num_unconfirmed += unconfirmed_txs.len();
+ sync_state.sync_unconfirmed_transactions(
+ &confirmables,
+ unconfirmed_txs
+ );
+ }
+ Err(err) => {
+ // (Semi-)permanent failure, retry later.
+ log_error!(self.logger,
+ "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
+ num_confirmed,
+ num_unconfirmed
+ );
+ sync_state.pending_sync = true;
+ return Err(TxSyncError::from(err));
+ }
}
-
- self.sync_unconfirmed_transactions(&mut sync_state, &confirmables, unconfirmed_txs);
},
Err(err) => {
// (Semi-)permanent failure, retry later.
- log_error!(self.logger, "Failed during transaction sync, aborting.");
+ log_error!(self.logger,
+ "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
+ num_confirmed,
+ num_unconfirmed
+ );
sync_state.pending_sync = true;
return Err(TxSyncError::from(err));
}
}
- match maybe_await!(self.sync_best_block_updated(&confirmables, &tip_hash)) {
+ match maybe_await!(self.sync_best_block_updated(&confirmables, &mut sync_state, &tip_hash)) {
Ok(()) => {}
Err(InternalError::Inconsistency) => {
// Immediately restart syncing when we encounter any inconsistencies.
}
Err(err) => {
// (Semi-)permanent failure, retry later.
+ log_error!(self.logger,
+ "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
+ num_confirmed,
+ num_unconfirmed
+ );
sync_state.pending_sync = true;
return Err(TxSyncError::from(err));
}
Ok(confirmed_txs) => {
// Double-check the tip hash. If it changed, a reorg happened since
// we started syncing and we need to restart last-minute.
- let check_tip_hash = maybe_await!(self.client.get_tip_hash())?;
- if check_tip_hash != tip_hash {
- tip_hash = check_tip_hash;
- continue;
+ match maybe_await!(self.client.get_tip_hash()) {
+ Ok(check_tip_hash) => {
+ if check_tip_hash != tip_hash {
+ tip_hash = check_tip_hash;
+
+ log_debug!(self.logger,
+ "Encountered inconsistency during transaction sync, restarting.");
+ sync_state.pending_sync = true;
+ continue;
+ }
+ num_confirmed += confirmed_txs.len();
+ sync_state.sync_confirmed_transactions(
+ &confirmables,
+ confirmed_txs
+ );
+ }
+ Err(err) => {
+ // (Semi-)permanent failure, retry later.
+ log_error!(self.logger,
+ "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
+ num_confirmed,
+ num_unconfirmed
+ );
+ sync_state.pending_sync = true;
+ return Err(TxSyncError::from(err));
+ }
}
-
- self.sync_confirmed_transactions(
- &mut sync_state,
- &confirmables,
- confirmed_txs,
- );
}
Err(InternalError::Inconsistency) => {
// Immediately restart syncing when we encounter any inconsistencies.
}
Err(err) => {
// (Semi-)permanent failure, retry later.
- log_error!(self.logger, "Failed during transaction sync, aborting.");
+ log_error!(self.logger,
+ "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
+ num_confirmed,
+ num_unconfirmed
+ );
sync_state.pending_sync = true;
return Err(TxSyncError::from(err));
}
sync_state.pending_sync = false;
}
}
- log_info!(self.logger, "Finished transaction sync.");
+ #[cfg(feature = "time")]
+ log_debug!(self.logger, "Finished transaction sync at tip {} in {}ms: {} confirmed, {} unconfirmed.",
+ tip_hash, start_time.elapsed().as_millis(), num_confirmed, num_unconfirmed);
+ #[cfg(not(feature = "time"))]
+ log_debug!(self.logger, "Finished transaction sync at tip {}: {} confirmed, {} unconfirmed.",
+ tip_hash, num_confirmed, num_unconfirmed);
Ok(())
}
#[maybe_async]
fn sync_best_block_updated(
- &self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, tip_hash: &BlockHash,
+ &self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, sync_state: &mut SyncState, tip_hash: &BlockHash,
) -> Result<(), InternalError> {
// Inform the interface of the new block.
for c in confirmables {
c.best_block_updated(&tip_header, tip_height);
}
+
+ // Prune any sufficiently confirmed output spends
+ sync_state.prune_output_spends(tip_height);
}
} else {
return Err(InternalError::Inconsistency);
Ok(())
}
- fn sync_confirmed_transactions(
- &self, sync_state: &mut SyncState, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, confirmed_txs: Vec<ConfirmedTx>,
- ) {
- for ctx in confirmed_txs {
- for c in confirmables {
- c.transactions_confirmed(
- &ctx.block_header,
- &[(ctx.pos, &ctx.tx)],
- ctx.block_height,
- );
- }
-
- sync_state.watched_transactions.remove(&ctx.tx.txid());
-
- for input in &ctx.tx.input {
- sync_state.watched_outputs.remove(&input.previous_output);
- }
- }
- }
-
#[maybe_async]
fn get_confirmed_transactions(
&self, sync_state: &SyncState,
// First, check the confirmation status of registered transactions as well as the
// status of dependent transactions of registered outputs.
- let mut confirmed_txs = Vec::new();
+ let mut confirmed_txs: Vec<ConfirmedTx> = Vec::new();
for txid in &sync_state.watched_transactions {
- if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(&txid, None, None))? {
+ if confirmed_txs.iter().any(|ctx| ctx.txid == *txid) {
+ continue;
+ }
+ if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(*txid, None, None))? {
confirmed_txs.push(confirmed_tx);
}
}
{
if let Some(spending_txid) = output_status.txid {
if let Some(spending_tx_status) = output_status.status {
+ if confirmed_txs.iter().any(|ctx| ctx.txid == spending_txid) {
+ if spending_tx_status.confirmed {
+ // Skip inserting duplicate ConfirmedTx entry
+ continue;
+ } else {
+ log_trace!(self.logger, "Inconsistency: Detected previously-confirmed Tx {} as unconfirmed", spending_txid);
+ return Err(InternalError::Inconsistency);
+ }
+ }
+
if let Some(confirmed_tx) = maybe_await!(self
.get_confirmed_tx(
- &spending_txid,
+ spending_txid,
spending_tx_status.block_hash,
spending_tx_status.block_height,
))?
#[maybe_async]
fn get_confirmed_tx(
- &self, txid: &Txid, expected_block_hash: Option<BlockHash>, known_block_height: Option<u32>,
+ &self, txid: Txid, expected_block_hash: Option<BlockHash>, known_block_height: Option<u32>,
) -> Result<Option<ConfirmedTx>, InternalError> {
if let Some(merkle_block) = maybe_await!(self.client.get_merkle_block(&txid))? {
let block_header = merkle_block.header;
let mut matches = Vec::new();
let mut indexes = Vec::new();
let _ = merkle_block.txn.extract_matches(&mut matches, &mut indexes);
- if indexes.len() != 1 || matches.len() != 1 || matches[0] != *txid {
+ if indexes.len() != 1 || matches.len() != 1 || matches[0] != txid {
log_error!(self.logger, "Retrieved Merkle block for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid);
return Err(InternalError::Failed);
}
- let pos = *indexes.get(0).ok_or(InternalError::Failed)? as usize;
+ // unwrap() safety: len() > 0 is checked above
+ let pos = *indexes.first().unwrap() as usize;
if let Some(tx) = maybe_await!(self.client.get_tx(&txid))? {
+ if tx.txid() != txid {
+ log_error!(self.logger, "Retrieved transaction for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid);
+ return Err(InternalError::Failed);
+ }
+
if let Some(block_height) = known_block_height {
// We can take a shortcut here if a previous call already gave us the height.
- return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height }));
+ return Ok(Some(ConfirmedTx { tx, txid, block_header, pos, block_height }));
}
let block_status = maybe_await!(self.client.get_block_status(&block_hash))?;
if let Some(block_height) = block_status.height {
- return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height }));
+ return Ok(Some(ConfirmedTx { tx, txid, block_header, pos, block_height }));
} else {
// If any previously-confirmed block suddenly is no longer confirmed, we found
// an inconsistency and should start over.
let relevant_txids = confirmables
.iter()
.flat_map(|c| c.get_relevant_txids())
- .collect::<HashSet<(Txid, Option<BlockHash>)>>();
+ .collect::<HashSet<(Txid, u32, Option<BlockHash>)>>();
let mut unconfirmed_txs = Vec::new();
- for (txid, block_hash_opt) in relevant_txids {
+ for (txid, _conf_height, block_hash_opt) in relevant_txids {
if let Some(block_hash) = block_hash_opt {
let block_status = maybe_await!(self.client.get_block_status(&block_hash))?;
if block_status.in_best_chain {
Ok(unconfirmed_txs)
}
- fn sync_unconfirmed_transactions(
- &self, sync_state: &mut SyncState, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, unconfirmed_txs: Vec<Txid>,
- ) {
- for txid in unconfirmed_txs {
- for c in confirmables {
- c.transaction_unconfirmed(&txid);
- }
-
- sync_state.watched_transactions.insert(txid);
- }
- }
-
/// Returns a reference to the underlying esplora client.
+ ///
+ /// This is not exported to bindings users as the underlying client from BDK is not exported.
pub fn client(&self) -> &EsploraClientType {
&self.client
}