Improve `EsploraSyncClient` logging
[rust-lightning] / lightning-transaction-sync / src / esplora.rs
1 use crate::error::{TxSyncError, InternalError};
2 use crate::common::{SyncState, FilterQueue, ConfirmedTx};
3
4 use lightning::util::logger::Logger;
5 use lightning::{log_error, log_debug, log_trace};
6 use lightning::chain::WatchedOutput;
7 use lightning::chain::{Confirm, Filter};
8
9 use bitcoin::{BlockHash, Script, Txid};
10
11 use esplora_client::Builder;
12 #[cfg(feature = "async-interface")]
13 use esplora_client::r#async::AsyncClient;
14 #[cfg(not(feature = "async-interface"))]
15 use esplora_client::blocking::BlockingClient;
16
17 use std::time::Instant;
18 use std::collections::HashSet;
19 use core::ops::Deref;
20
21 /// Synchronizes LDK with a given [`Esplora`] server.
22 ///
23 /// Needs to be registered with a [`ChainMonitor`] via the [`Filter`] interface to be informed of
24 /// transactions and outputs to monitor for on-chain confirmation, unconfirmation, and
25 /// reconfirmation.
26 ///
27 /// Note that registration via [`Filter`] needs to happen before any calls to
28 /// [`Watch::watch_channel`] to ensure we get notified of the items to monitor.
29 ///
30 /// This uses and exposes either a blocking or async client variant dependent on whether the
31 /// `esplora-blocking` or the `esplora-async` feature is enabled.
32 ///
33 /// [`Esplora`]: https://github.com/Blockstream/electrs
34 /// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
35 /// [`Watch::watch_channel`]: lightning::chain::Watch::watch_channel
36 /// [`Filter`]: lightning::chain::Filter
37 pub struct EsploraSyncClient<L: Deref>
38 where
39         L::Target: Logger,
40 {
41         sync_state: MutexType<SyncState>,
42         queue: std::sync::Mutex<FilterQueue>,
43         client: EsploraClientType,
44         logger: L,
45 }
46
47 impl<L: Deref> EsploraSyncClient<L>
48 where
49         L::Target: Logger,
50 {
51         /// Returns a new [`EsploraSyncClient`] object.
52         pub fn new(server_url: String, logger: L) -> Self {
53                 let builder = Builder::new(&server_url);
54                 #[cfg(not(feature = "async-interface"))]
55                 let client = builder.build_blocking().unwrap();
56                 #[cfg(feature = "async-interface")]
57                 let client = builder.build_async().unwrap();
58
59                 EsploraSyncClient::from_client(client, logger)
60         }
61
62         /// Returns a new [`EsploraSyncClient`] object using the given Esplora client.
63         pub fn from_client(client: EsploraClientType, logger: L) -> Self {
64                 let sync_state = MutexType::new(SyncState::new());
65                 let queue = std::sync::Mutex::new(FilterQueue::new());
66                 Self {
67                         sync_state,
68                         queue,
69                         client,
70                         logger,
71                 }
72         }
73
74         /// Synchronizes the given `confirmables` via their [`Confirm`] interface implementations. This
75         /// method should be called regularly to keep LDK up-to-date with current chain data.
76         ///
77         /// For example, instances of [`ChannelManager`] and [`ChainMonitor`] can be informed about the
78         /// newest on-chain activity related to the items previously registered via the [`Filter`]
79         /// interface.
80         ///
81         /// [`Confirm`]: lightning::chain::Confirm
82         /// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
83         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
84         /// [`Filter`]: lightning::chain::Filter
85         #[maybe_async]
86         pub fn sync(&self, confirmables: Vec<&(dyn Confirm + Sync + Send)>) -> Result<(), TxSyncError> {
87                 // This lock makes sure we're syncing once at a time.
88                 #[cfg(not(feature = "async-interface"))]
89                 let mut sync_state = self.sync_state.lock().unwrap();
90                 #[cfg(feature = "async-interface")]
91                 let mut sync_state = self.sync_state.lock().await;
92
93                 log_trace!(self.logger, "Starting transaction sync.");
94                 let start_time = Instant::now();
95                 let mut num_confirmed = 0;
96                 let mut num_unconfirmed = 0;
97
98                 let mut tip_hash = maybe_await!(self.client.get_tip_hash())?;
99
100                 loop {
101                         let pending_registrations = self.queue.lock().unwrap().process_queues(&mut sync_state);
102                         let tip_is_new = Some(tip_hash) != sync_state.last_sync_hash;
103
104                         // We loop until any registered transactions have been processed at least once, or the
105                         // tip hasn't been updated during the last iteration.
106                         if !sync_state.pending_sync && !pending_registrations && !tip_is_new {
107                                 // Nothing to do.
108                                 break;
109                         } else {
110                                 // Update the known tip to the newest one.
111                                 if tip_is_new {
112                                         // First check for any unconfirmed transactions and act on it immediately.
113                                         match maybe_await!(self.get_unconfirmed_transactions(&confirmables)) {
114                                                 Ok(unconfirmed_txs) => {
115                                                         // Double-check the tip hash. If it changed, a reorg happened since
116                                                         // we started syncing and we need to restart last-minute.
117                                                         let check_tip_hash = maybe_await!(self.client.get_tip_hash())?;
118                                                         if check_tip_hash != tip_hash {
119                                                                 tip_hash = check_tip_hash;
120
121                                                                 log_debug!(self.logger, "Encountered inconsistency during transaction sync, restarting.");
122                                                                 sync_state.pending_sync = true;
123                                                                 continue;
124                                                         }
125                                                         num_unconfirmed += unconfirmed_txs.len();
126                                                         self.sync_unconfirmed_transactions(&mut sync_state, &confirmables, unconfirmed_txs);
127                                                 },
128                                                 Err(err) => {
129                                                         // (Semi-)permanent failure, retry later.
130                                                         log_error!(self.logger,
131                                                                 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
132                                                                 num_confirmed,
133                                                                 num_unconfirmed
134                                                         );
135                                                         sync_state.pending_sync = true;
136                                                         return Err(TxSyncError::from(err));
137                                                 }
138                                         }
139
140                                         match maybe_await!(self.sync_best_block_updated(&confirmables, &tip_hash)) {
141                                                 Ok(()) => {}
142                                                 Err(InternalError::Inconsistency) => {
143                                                         // Immediately restart syncing when we encounter any inconsistencies.
144                                                         log_debug!(self.logger, "Encountered inconsistency during transaction sync, restarting.");
145                                                         sync_state.pending_sync = true;
146                                                         continue;
147                                                 }
148                                                 Err(err) => {
149                                                         // (Semi-)permanent failure, retry later.
150                                                         log_error!(self.logger,
151                                                                 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
152                                                                 num_confirmed,
153                                                                 num_unconfirmed
154                                                         );
155                                                         sync_state.pending_sync = true;
156                                                         return Err(TxSyncError::from(err));
157                                                 }
158                                         }
159                                 }
160
161                                 match maybe_await!(self.get_confirmed_transactions(&sync_state)) {
162                                         Ok(confirmed_txs) => {
163                                                 // Double-check the tip hash. If it changed, a reorg happened since
164                                                 // we started syncing and we need to restart last-minute.
165                                                 let check_tip_hash = maybe_await!(self.client.get_tip_hash())?;
166                                                 if check_tip_hash != tip_hash {
167                                                         tip_hash = check_tip_hash;
168                                                         continue;
169                                                 }
170
171                                                 num_confirmed += confirmed_txs.len();
172                                                 self.sync_confirmed_transactions(
173                                                         &mut sync_state,
174                                                         &confirmables,
175                                                         confirmed_txs,
176                                                 );
177                                         }
178                                         Err(InternalError::Inconsistency) => {
179                                                 // Immediately restart syncing when we encounter any inconsistencies.
180                                                 log_debug!(self.logger, "Encountered inconsistency during transaction sync, restarting.");
181                                                 sync_state.pending_sync = true;
182                                                 continue;
183                                         }
184                                         Err(err) => {
185                                                 // (Semi-)permanent failure, retry later.
186                                                 log_error!(self.logger,
187                                                         "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
188                                                         num_confirmed,
189                                                         num_unconfirmed
190                                                 );
191                                                 sync_state.pending_sync = true;
192                                                 return Err(TxSyncError::from(err));
193                                         }
194                                 }
195                                 sync_state.last_sync_hash = Some(tip_hash);
196                                 sync_state.pending_sync = false;
197                         }
198                 }
199                 log_debug!(self.logger, "Finished transaction sync at tip {} in {}ms: {} confirmed, {} unconfirmed.",
200                                 tip_hash, start_time.elapsed().as_millis(), num_confirmed, num_unconfirmed);
201                 Ok(())
202         }
203
204         #[maybe_async]
205         fn sync_best_block_updated(
206                 &self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, tip_hash: &BlockHash,
207         ) -> Result<(), InternalError> {
208
209                 // Inform the interface of the new block.
210                 let tip_header = maybe_await!(self.client.get_header_by_hash(tip_hash))?;
211                 let tip_status = maybe_await!(self.client.get_block_status(&tip_hash))?;
212                 if tip_status.in_best_chain {
213                         if let Some(tip_height) = tip_status.height {
214                                 for c in confirmables {
215                                         c.best_block_updated(&tip_header, tip_height);
216                                 }
217                         }
218                 } else {
219                         return Err(InternalError::Inconsistency);
220                 }
221                 Ok(())
222         }
223
224         fn sync_confirmed_transactions(
225                 &self, sync_state: &mut SyncState, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, confirmed_txs: Vec<ConfirmedTx>,
226         ) {
227                 for ctx in confirmed_txs {
228                         for c in confirmables {
229                                 c.transactions_confirmed(
230                                         &ctx.block_header,
231                                         &[(ctx.pos, &ctx.tx)],
232                                         ctx.block_height,
233                                 );
234                         }
235
236                         sync_state.watched_transactions.remove(&ctx.tx.txid());
237
238                         for input in &ctx.tx.input {
239                                 sync_state.watched_outputs.remove(&input.previous_output);
240                         }
241                 }
242         }
243
244         #[maybe_async]
245         fn get_confirmed_transactions(
246                 &self, sync_state: &SyncState,
247         ) -> Result<Vec<ConfirmedTx>, InternalError> {
248
249                 // First, check the confirmation status of registered transactions as well as the
250                 // status of dependent transactions of registered outputs.
251
252                 let mut confirmed_txs = Vec::new();
253
254                 for txid in &sync_state.watched_transactions {
255                         if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(&txid, None, None))? {
256                                 confirmed_txs.push(confirmed_tx);
257                         }
258                 }
259
260                 for (_, output) in &sync_state.watched_outputs {
261                         if let Some(output_status) = maybe_await!(self.client
262                                 .get_output_status(&output.outpoint.txid, output.outpoint.index as u64))?
263                         {
264                                 if let Some(spending_txid) = output_status.txid {
265                                         if let Some(spending_tx_status) = output_status.status {
266                                                 if let Some(confirmed_tx) = maybe_await!(self
267                                                         .get_confirmed_tx(
268                                                                 &spending_txid,
269                                                                 spending_tx_status.block_hash,
270                                                                 spending_tx_status.block_height,
271                                                         ))?
272                                                 {
273                                                         confirmed_txs.push(confirmed_tx);
274                                                 }
275                                         }
276                                 }
277                         }
278                 }
279
280                 // Sort all confirmed transactions first by block height, then by in-block
281                 // position, and finally feed them to the interface in order.
282                 confirmed_txs.sort_unstable_by(|tx1, tx2| {
283                         tx1.block_height.cmp(&tx2.block_height).then_with(|| tx1.pos.cmp(&tx2.pos))
284                 });
285
286                 Ok(confirmed_txs)
287         }
288
289         #[maybe_async]
290         fn get_confirmed_tx(
291                 &self, txid: &Txid, expected_block_hash: Option<BlockHash>, known_block_height: Option<u32>,
292         ) -> Result<Option<ConfirmedTx>, InternalError> {
293                 if let Some(merkle_block) = maybe_await!(self.client.get_merkle_block(&txid))? {
294                         let block_header = merkle_block.header;
295                         let block_hash = block_header.block_hash();
296                         if let Some(expected_block_hash) = expected_block_hash {
297                                 if expected_block_hash != block_hash {
298                                         log_trace!(self.logger, "Inconsistency: Tx {} expected in block {}, but is confirmed in {}", txid, expected_block_hash, block_hash);
299                                         return Err(InternalError::Inconsistency);
300                                 }
301                         }
302
303                         let mut matches = Vec::new();
304                         let mut indexes = Vec::new();
305                         let _ = merkle_block.txn.extract_matches(&mut matches, &mut indexes);
306                         if indexes.len() != 1 || matches.len() != 1 || matches[0] != *txid {
307                                 log_error!(self.logger, "Retrieved Merkle block for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid);
308                                 return Err(InternalError::Failed);
309                         }
310
311                         // unwrap() safety: len() > 0 is checked above
312                         let pos = *indexes.first().unwrap() as usize;
313                         if let Some(tx) = maybe_await!(self.client.get_tx(&txid))? {
314                                 if let Some(block_height) = known_block_height {
315                                         // We can take a shortcut here if a previous call already gave us the height.
316                                         return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height }));
317                                 }
318
319                                 let block_status = maybe_await!(self.client.get_block_status(&block_hash))?;
320                                 if let Some(block_height) = block_status.height {
321                                         return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height }));
322                                 } else {
323                                         // If any previously-confirmed block suddenly is no longer confirmed, we found
324                                         // an inconsistency and should start over.
325                                         log_trace!(self.logger, "Inconsistency: Tx {} was unconfirmed during syncing.", txid);
326                                         return Err(InternalError::Inconsistency);
327                                 }
328                         }
329                 }
330                 Ok(None)
331         }
332
333         #[maybe_async]
334         fn get_unconfirmed_transactions(
335                 &self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>,
336         ) -> Result<Vec<Txid>, InternalError> {
337                 // Query the interface for relevant txids and check whether the relevant blocks are still
338                 // in the best chain, mark them unconfirmed otherwise
339                 let relevant_txids = confirmables
340                         .iter()
341                         .flat_map(|c| c.get_relevant_txids())
342                         .collect::<HashSet<(Txid, u32, Option<BlockHash>)>>();
343
344                 let mut unconfirmed_txs = Vec::new();
345
346                 for (txid, _conf_height, block_hash_opt) in relevant_txids {
347                         if let Some(block_hash) = block_hash_opt {
348                                 let block_status = maybe_await!(self.client.get_block_status(&block_hash))?;
349                                 if block_status.in_best_chain {
350                                         // Skip if the block in question is still confirmed.
351                                         continue;
352                                 }
353
354                                 unconfirmed_txs.push(txid);
355                         } else {
356                                 log_error!(self.logger, "Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
357                                 panic!("Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
358                         }
359                 }
360                 Ok(unconfirmed_txs)
361         }
362
363         fn sync_unconfirmed_transactions(
364                 &self, sync_state: &mut SyncState, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, unconfirmed_txs: Vec<Txid>,
365         ) {
366                 for txid in unconfirmed_txs {
367                         for c in confirmables {
368                                 c.transaction_unconfirmed(&txid);
369                         }
370
371                         sync_state.watched_transactions.insert(txid);
372                 }
373         }
374
375         /// Returns a reference to the underlying esplora client.
376         pub fn client(&self) -> &EsploraClientType {
377                 &self.client
378         }
379 }
380
381 #[cfg(feature = "async-interface")]
382 type MutexType<I> = futures::lock::Mutex<I>;
383 #[cfg(not(feature = "async-interface"))]
384 type MutexType<I> = std::sync::Mutex<I>;
385
386 // The underlying client type.
387 #[cfg(feature = "async-interface")]
388 type EsploraClientType = AsyncClient;
389 #[cfg(not(feature = "async-interface"))]
390 type EsploraClientType = BlockingClient;
391
392
393 impl<L: Deref> Filter for EsploraSyncClient<L>
394 where
395         L::Target: Logger,
396 {
397         fn register_tx(&self, txid: &Txid, _script_pubkey: &Script) {
398                 let mut locked_queue = self.queue.lock().unwrap();
399                 locked_queue.transactions.insert(*txid);
400         }
401
402         fn register_output(&self, output: WatchedOutput) {
403                 let mut locked_queue = self.queue.lock().unwrap();
404                 locked_queue.outputs.insert(output.outpoint.into_bitcoin_outpoint(), output);
405         }
406 }