1 use crate::common::{ConfirmedTx, FilterQueue, SyncState};
2 use crate::error::{InternalError, TxSyncError};
4 use lightning::chain::WatchedOutput;
5 use lightning::chain::{Confirm, Filter};
6 use lightning::util::logger::Logger;
7 use lightning::{log_debug, log_error, log_trace};
9 use bitcoin::{BlockHash, Script, Txid};
11 #[cfg(not(feature = "async-interface"))]
12 use esplora_client::blocking::BlockingClient;
13 #[cfg(feature = "async-interface")]
14 use esplora_client::r#async::AsyncClient;
15 use esplora_client::Builder;
18 use std::collections::HashSet;
20 /// Synchronizes LDK with a given [`Esplora`] server.
22 /// Needs to be registered with a [`ChainMonitor`] via the [`Filter`] interface to be informed of
23 /// transactions and outputs to monitor for on-chain confirmation, unconfirmation, and
26 /// Note that registration via [`Filter`] needs to happen before any calls to
27 /// [`Watch::watch_channel`] to ensure we get notified of the items to monitor.
29 /// This uses and exposes either a blocking or async client variant dependent on whether the
30 /// `esplora-blocking` or the `esplora-async` feature is enabled.
32 /// [`Esplora`]: https://github.com/Blockstream/electrs
33 /// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
34 /// [`Watch::watch_channel`]: lightning::chain::Watch::watch_channel
35 /// [`Filter`]: lightning::chain::Filter
36 pub struct EsploraSyncClient<L: Deref>
40 sync_state: MutexType<SyncState>,
41 queue: std::sync::Mutex<FilterQueue>,
42 client: EsploraClientType,
46 impl<L: Deref> EsploraSyncClient<L>
50 /// Returns a new [`EsploraSyncClient`] object.
51 pub fn new(server_url: String, logger: L) -> Self {
52 let builder = Builder::new(&server_url);
53 #[cfg(not(feature = "async-interface"))]
54 let client = builder.build_blocking();
55 #[cfg(feature = "async-interface")]
56 let client = builder.build_async().unwrap();
58 EsploraSyncClient::from_client(client, logger)
61 /// Returns a new [`EsploraSyncClient`] object using the given Esplora client.
63 /// This is not exported to bindings users as the underlying client from BDK is not exported.
64 pub fn from_client(client: EsploraClientType, logger: L) -> Self {
65 let sync_state = MutexType::new(SyncState::new());
66 let queue = std::sync::Mutex::new(FilterQueue::new());
67 Self { sync_state, queue, client, logger }
70 /// Synchronizes the given `confirmables` via their [`Confirm`] interface implementations. This
71 /// method should be called regularly to keep LDK up-to-date with current chain data.
73 /// For example, instances of [`ChannelManager`] and [`ChainMonitor`] can be informed about the
74 /// newest on-chain activity related to the items previously registered via the [`Filter`]
77 /// [`Confirm`]: lightning::chain::Confirm
78 /// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
79 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
80 /// [`Filter`]: lightning::chain::Filter
82 pub fn sync<C: Deref>(&self, confirmables: Vec<C>) -> Result<(), TxSyncError>
86 // This lock makes sure we're syncing once at a time.
87 #[cfg(not(feature = "async-interface"))]
88 let mut sync_state = self.sync_state.lock().unwrap();
89 #[cfg(feature = "async-interface")]
90 let mut sync_state = self.sync_state.lock().await;
92 log_trace!(self.logger, "Starting transaction sync.");
93 #[cfg(feature = "time")]
94 let start_time = std::time::Instant::now();
95 let mut num_confirmed = 0;
96 let mut num_unconfirmed = 0;
98 let mut tip_hash = maybe_await!(self.client.get_tip_hash())?;
101 let pending_registrations = self.queue.lock().unwrap().process_queues(&mut sync_state);
102 let tip_is_new = Some(tip_hash) != sync_state.last_sync_hash;
104 // We loop until any registered transactions have been processed at least once, or the
105 // tip hasn't been updated during the last iteration.
106 if !sync_state.pending_sync && !pending_registrations && !tip_is_new {
110 // Update the known tip to the newest one.
112 // First check for any unconfirmed transactions and act on it immediately.
113 match maybe_await!(self.get_unconfirmed_transactions(&confirmables)) {
114 Ok(unconfirmed_txs) => {
115 // Double-check the tip hash. If it changed, a reorg happened since
116 // we started syncing and we need to restart last-minute.
117 match maybe_await!(self.client.get_tip_hash()) {
118 Ok(check_tip_hash) => {
119 if check_tip_hash != tip_hash {
120 tip_hash = check_tip_hash;
122 log_debug!(self.logger, "Encountered inconsistency during transaction sync, restarting.");
123 sync_state.pending_sync = true;
126 num_unconfirmed += unconfirmed_txs.len();
127 sync_state.sync_unconfirmed_transactions(
133 // (Semi-)permanent failure, retry later.
134 log_error!(self.logger,
135 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
139 sync_state.pending_sync = true;
140 return Err(TxSyncError::from(err));
145 // (Semi-)permanent failure, retry later.
146 log_error!(self.logger,
147 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
151 sync_state.pending_sync = true;
152 return Err(TxSyncError::from(err));
156 match maybe_await!(self.sync_best_block_updated(
162 Err(InternalError::Inconsistency) => {
163 // Immediately restart syncing when we encounter any inconsistencies.
166 "Encountered inconsistency during transaction sync, restarting."
168 sync_state.pending_sync = true;
172 // (Semi-)permanent failure, retry later.
173 log_error!(self.logger,
174 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
178 sync_state.pending_sync = true;
179 return Err(TxSyncError::from(err));
184 match maybe_await!(self.get_confirmed_transactions(&sync_state)) {
185 Ok(confirmed_txs) => {
186 // Double-check the tip hash. If it changed, a reorg happened since
187 // we started syncing and we need to restart last-minute.
188 match maybe_await!(self.client.get_tip_hash()) {
189 Ok(check_tip_hash) => {
190 if check_tip_hash != tip_hash {
191 tip_hash = check_tip_hash;
193 log_debug!(self.logger,
194 "Encountered inconsistency during transaction sync, restarting.");
195 sync_state.pending_sync = true;
198 num_confirmed += confirmed_txs.len();
200 .sync_confirmed_transactions(&confirmables, confirmed_txs);
203 // (Semi-)permanent failure, retry later.
204 log_error!(self.logger,
205 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
209 sync_state.pending_sync = true;
210 return Err(TxSyncError::from(err));
214 Err(InternalError::Inconsistency) => {
215 // Immediately restart syncing when we encounter any inconsistencies.
218 "Encountered inconsistency during transaction sync, restarting."
220 sync_state.pending_sync = true;
224 // (Semi-)permanent failure, retry later.
225 log_error!(self.logger,
226 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
230 sync_state.pending_sync = true;
231 return Err(TxSyncError::from(err));
234 sync_state.last_sync_hash = Some(tip_hash);
235 sync_state.pending_sync = false;
238 #[cfg(feature = "time")]
241 "Finished transaction sync at tip {} in {}ms: {} confirmed, {} unconfirmed.",
243 start_time.elapsed().as_millis(),
247 #[cfg(not(feature = "time"))]
250 "Finished transaction sync at tip {}: {} confirmed, {} unconfirmed.",
259 fn sync_best_block_updated<C: Deref>(
260 &self, confirmables: &Vec<C>, sync_state: &mut SyncState, tip_hash: &BlockHash,
261 ) -> Result<(), InternalError>
265 // Inform the interface of the new block.
266 let tip_header = maybe_await!(self.client.get_header_by_hash(tip_hash))?;
267 let tip_status = maybe_await!(self.client.get_block_status(&tip_hash))?;
268 if tip_status.in_best_chain {
269 if let Some(tip_height) = tip_status.height {
270 for c in confirmables {
271 c.best_block_updated(&tip_header, tip_height);
274 // Prune any sufficiently confirmed output spends
275 sync_state.prune_output_spends(tip_height);
278 return Err(InternalError::Inconsistency);
284 fn get_confirmed_transactions(
285 &self, sync_state: &SyncState,
286 ) -> Result<Vec<ConfirmedTx>, InternalError> {
287 // First, check the confirmation status of registered transactions as well as the
288 // status of dependent transactions of registered outputs.
290 let mut confirmed_txs: Vec<ConfirmedTx> = Vec::new();
292 for txid in &sync_state.watched_transactions {
293 if confirmed_txs.iter().any(|ctx| ctx.txid == *txid) {
296 if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(*txid, None, None))? {
297 confirmed_txs.push(confirmed_tx);
301 for (_, output) in &sync_state.watched_outputs {
302 if let Some(output_status) = maybe_await!(self
304 .get_output_status(&output.outpoint.txid, output.outpoint.index as u64))?
306 if let Some(spending_txid) = output_status.txid {
307 if let Some(spending_tx_status) = output_status.status {
308 if confirmed_txs.iter().any(|ctx| ctx.txid == spending_txid) {
309 if spending_tx_status.confirmed {
310 // Skip inserting duplicate ConfirmedTx entry
313 log_trace!(self.logger, "Inconsistency: Detected previously-confirmed Tx {} as unconfirmed", spending_txid);
314 return Err(InternalError::Inconsistency);
318 if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(
320 spending_tx_status.block_hash,
321 spending_tx_status.block_height,
323 confirmed_txs.push(confirmed_tx);
330 // Sort all confirmed transactions first by block height, then by in-block
331 // position, and finally feed them to the interface in order.
332 confirmed_txs.sort_unstable_by(|tx1, tx2| {
333 tx1.block_height.cmp(&tx2.block_height).then_with(|| tx1.pos.cmp(&tx2.pos))
341 &self, txid: Txid, expected_block_hash: Option<BlockHash>, known_block_height: Option<u32>,
342 ) -> Result<Option<ConfirmedTx>, InternalError> {
343 if let Some(merkle_block) = maybe_await!(self.client.get_merkle_block(&txid))? {
344 let block_header = merkle_block.header;
345 let block_hash = block_header.block_hash();
346 if let Some(expected_block_hash) = expected_block_hash {
347 if expected_block_hash != block_hash {
350 "Inconsistency: Tx {} expected in block {}, but is confirmed in {}",
355 return Err(InternalError::Inconsistency);
359 let mut matches = Vec::new();
360 let mut indexes = Vec::new();
361 let _ = merkle_block.txn.extract_matches(&mut matches, &mut indexes);
362 if indexes.len() != 1 || matches.len() != 1 || matches[0] != txid {
363 log_error!(self.logger, "Retrieved Merkle block for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid);
364 return Err(InternalError::Failed);
367 // unwrap() safety: len() > 0 is checked above
368 let pos = *indexes.first().unwrap() as usize;
369 if let Some(tx) = maybe_await!(self.client.get_tx(&txid))? {
370 if tx.txid() != txid {
371 log_error!(self.logger, "Retrieved transaction for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid);
372 return Err(InternalError::Failed);
375 if let Some(block_height) = known_block_height {
376 // We can take a shortcut here if a previous call already gave us the height.
377 return Ok(Some(ConfirmedTx { tx, txid, block_header, pos, block_height }));
380 let block_status = maybe_await!(self.client.get_block_status(&block_hash))?;
381 if let Some(block_height) = block_status.height {
382 return Ok(Some(ConfirmedTx { tx, txid, block_header, pos, block_height }));
384 // If any previously-confirmed block suddenly is no longer confirmed, we found
385 // an inconsistency and should start over.
388 "Inconsistency: Tx {} was unconfirmed during syncing.",
391 return Err(InternalError::Inconsistency);
399 fn get_unconfirmed_transactions<C: Deref>(
400 &self, confirmables: &Vec<C>,
401 ) -> Result<Vec<Txid>, InternalError>
405 // Query the interface for relevant txids and check whether the relevant blocks are still
406 // in the best chain, mark them unconfirmed otherwise
407 let relevant_txids = confirmables
409 .flat_map(|c| c.get_relevant_txids())
410 .collect::<HashSet<(Txid, u32, Option<BlockHash>)>>();
412 let mut unconfirmed_txs = Vec::new();
414 for (txid, _conf_height, block_hash_opt) in relevant_txids {
415 if let Some(block_hash) = block_hash_opt {
416 let block_status = maybe_await!(self.client.get_block_status(&block_hash))?;
417 if block_status.in_best_chain {
418 // Skip if the block in question is still confirmed.
422 unconfirmed_txs.push(txid);
424 log_error!(self.logger, "Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
425 panic!("Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
431 /// Returns a reference to the underlying esplora client.
433 /// This is not exported to bindings users as the underlying client from BDK is not exported.
434 pub fn client(&self) -> &EsploraClientType {
439 #[cfg(feature = "async-interface")]
440 type MutexType<I> = futures::lock::Mutex<I>;
441 #[cfg(not(feature = "async-interface"))]
442 type MutexType<I> = std::sync::Mutex<I>;
444 // The underlying client type.
445 #[cfg(feature = "async-interface")]
446 type EsploraClientType = AsyncClient;
447 #[cfg(not(feature = "async-interface"))]
448 type EsploraClientType = BlockingClient;
450 impl<L: Deref> Filter for EsploraSyncClient<L>
454 fn register_tx(&self, txid: &Txid, _script_pubkey: &Script) {
455 let mut locked_queue = self.queue.lock().unwrap();
456 locked_queue.transactions.insert(*txid);
459 fn register_output(&self, output: WatchedOutput) {
460 let mut locked_queue = self.queue.lock().unwrap();
461 locked_queue.outputs.insert(output.outpoint.into_bitcoin_outpoint(), output);