Merge pull request #3107 from mhrheaume/mhr/closure_reason_abandoned
[rust-lightning] / lightning-transaction-sync / src / esplora.rs
1 use crate::common::{ConfirmedTx, FilterQueue, SyncState};
2 use crate::error::{InternalError, TxSyncError};
3
4 use lightning::chain::WatchedOutput;
5 use lightning::chain::{Confirm, Filter};
6 use lightning::util::logger::Logger;
7 use lightning::{log_debug, log_error, log_trace};
8
9 use bitcoin::{BlockHash, Script, Txid};
10
11 #[cfg(not(feature = "async-interface"))]
12 use esplora_client::blocking::BlockingClient;
13 #[cfg(feature = "async-interface")]
14 use esplora_client::r#async::AsyncClient;
15 use esplora_client::Builder;
16
17 use core::ops::Deref;
18 use std::collections::HashSet;
19
20 /// Synchronizes LDK with a given [`Esplora`] server.
21 ///
22 /// Needs to be registered with a [`ChainMonitor`] via the [`Filter`] interface to be informed of
23 /// transactions and outputs to monitor for on-chain confirmation, unconfirmation, and
24 /// reconfirmation.
25 ///
26 /// Note that registration via [`Filter`] needs to happen before any calls to
27 /// [`Watch::watch_channel`] to ensure we get notified of the items to monitor.
28 ///
29 /// This uses and exposes either a blocking or async client variant dependent on whether the
30 /// `esplora-blocking` or the `esplora-async` feature is enabled.
31 ///
32 /// [`Esplora`]: https://github.com/Blockstream/electrs
33 /// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
34 /// [`Watch::watch_channel`]: lightning::chain::Watch::watch_channel
35 /// [`Filter`]: lightning::chain::Filter
36 pub struct EsploraSyncClient<L: Deref>
37 where
38         L::Target: Logger,
39 {
40         sync_state: MutexType<SyncState>,
41         queue: std::sync::Mutex<FilterQueue>,
42         client: EsploraClientType,
43         logger: L,
44 }
45
46 impl<L: Deref> EsploraSyncClient<L>
47 where
48         L::Target: Logger,
49 {
50         /// Returns a new [`EsploraSyncClient`] object.
51         pub fn new(server_url: String, logger: L) -> Self {
52                 let builder = Builder::new(&server_url);
53                 #[cfg(not(feature = "async-interface"))]
54                 let client = builder.build_blocking();
55                 #[cfg(feature = "async-interface")]
56                 let client = builder.build_async().unwrap();
57
58                 EsploraSyncClient::from_client(client, logger)
59         }
60
61         /// Returns a new [`EsploraSyncClient`] object using the given Esplora client.
62         ///
63         /// This is not exported to bindings users as the underlying client from BDK is not exported.
64         pub fn from_client(client: EsploraClientType, logger: L) -> Self {
65                 let sync_state = MutexType::new(SyncState::new());
66                 let queue = std::sync::Mutex::new(FilterQueue::new());
67                 Self { sync_state, queue, client, logger }
68         }
69
70         /// Synchronizes the given `confirmables` via their [`Confirm`] interface implementations. This
71         /// method should be called regularly to keep LDK up-to-date with current chain data.
72         ///
73         /// For example, instances of [`ChannelManager`] and [`ChainMonitor`] can be informed about the
74         /// newest on-chain activity related to the items previously registered via the [`Filter`]
75         /// interface.
76         ///
77         /// [`Confirm`]: lightning::chain::Confirm
78         /// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
79         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
80         /// [`Filter`]: lightning::chain::Filter
81         #[maybe_async]
82         pub fn sync<C: Deref>(&self, confirmables: Vec<C>) -> Result<(), TxSyncError>
83         where
84                 C::Target: Confirm,
85         {
86                 // This lock makes sure we're syncing once at a time.
87                 #[cfg(not(feature = "async-interface"))]
88                 let mut sync_state = self.sync_state.lock().unwrap();
89                 #[cfg(feature = "async-interface")]
90                 let mut sync_state = self.sync_state.lock().await;
91
92                 log_trace!(self.logger, "Starting transaction sync.");
93                 #[cfg(feature = "time")]
94                 let start_time = std::time::Instant::now();
95                 let mut num_confirmed = 0;
96                 let mut num_unconfirmed = 0;
97
98                 let mut tip_hash = maybe_await!(self.client.get_tip_hash())?;
99
100                 loop {
101                         let pending_registrations = self.queue.lock().unwrap().process_queues(&mut sync_state);
102                         let tip_is_new = Some(tip_hash) != sync_state.last_sync_hash;
103
104                         // We loop until any registered transactions have been processed at least once, or the
105                         // tip hasn't been updated during the last iteration.
106                         if !sync_state.pending_sync && !pending_registrations && !tip_is_new {
107                                 // Nothing to do.
108                                 break;
109                         } else {
110                                 // Update the known tip to the newest one.
111                                 if tip_is_new {
112                                         // First check for any unconfirmed transactions and act on it immediately.
113                                         match maybe_await!(self.get_unconfirmed_transactions(&confirmables)) {
114                                                 Ok(unconfirmed_txs) => {
115                                                         // Double-check the tip hash. If it changed, a reorg happened since
116                                                         // we started syncing and we need to restart last-minute.
117                                                         match maybe_await!(self.client.get_tip_hash()) {
118                                                                 Ok(check_tip_hash) => {
119                                                                         if check_tip_hash != tip_hash {
120                                                                                 tip_hash = check_tip_hash;
121
122                                                                                 log_debug!(self.logger, "Encountered inconsistency during transaction sync, restarting.");
123                                                                                 sync_state.pending_sync = true;
124                                                                                 continue;
125                                                                         }
126                                                                         num_unconfirmed += unconfirmed_txs.len();
127                                                                         sync_state.sync_unconfirmed_transactions(
128                                                                                 &confirmables,
129                                                                                 unconfirmed_txs,
130                                                                         );
131                                                                 },
132                                                                 Err(err) => {
133                                                                         // (Semi-)permanent failure, retry later.
134                                                                         log_error!(self.logger,
135                                                                                 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
136                                                                                 num_confirmed,
137                                                                                 num_unconfirmed
138                                                                                 );
139                                                                         sync_state.pending_sync = true;
140                                                                         return Err(TxSyncError::from(err));
141                                                                 },
142                                                         }
143                                                 },
144                                                 Err(err) => {
145                                                         // (Semi-)permanent failure, retry later.
146                                                         log_error!(self.logger,
147                                                                 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
148                                                                 num_confirmed,
149                                                                 num_unconfirmed
150                                                         );
151                                                         sync_state.pending_sync = true;
152                                                         return Err(TxSyncError::from(err));
153                                                 },
154                                         }
155
156                                         match maybe_await!(self.sync_best_block_updated(
157                                                 &confirmables,
158                                                 &mut sync_state,
159                                                 &tip_hash
160                                         )) {
161                                                 Ok(()) => {},
162                                                 Err(InternalError::Inconsistency) => {
163                                                         // Immediately restart syncing when we encounter any inconsistencies.
164                                                         log_debug!(
165                                                                 self.logger,
166                                                                 "Encountered inconsistency during transaction sync, restarting."
167                                                         );
168                                                         sync_state.pending_sync = true;
169                                                         continue;
170                                                 },
171                                                 Err(err) => {
172                                                         // (Semi-)permanent failure, retry later.
173                                                         log_error!(self.logger,
174                                                                 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
175                                                                 num_confirmed,
176                                                                 num_unconfirmed
177                                                         );
178                                                         sync_state.pending_sync = true;
179                                                         return Err(TxSyncError::from(err));
180                                                 },
181                                         }
182                                 }
183
184                                 match maybe_await!(self.get_confirmed_transactions(&sync_state)) {
185                                         Ok(confirmed_txs) => {
186                                                 // Double-check the tip hash. If it changed, a reorg happened since
187                                                 // we started syncing and we need to restart last-minute.
188                                                 match maybe_await!(self.client.get_tip_hash()) {
189                                                         Ok(check_tip_hash) => {
190                                                                 if check_tip_hash != tip_hash {
191                                                                         tip_hash = check_tip_hash;
192
193                                                                         log_debug!(self.logger,
194                                                                                 "Encountered inconsistency during transaction sync, restarting.");
195                                                                         sync_state.pending_sync = true;
196                                                                         continue;
197                                                                 }
198                                                                 num_confirmed += confirmed_txs.len();
199                                                                 sync_state
200                                                                         .sync_confirmed_transactions(&confirmables, confirmed_txs);
201                                                         },
202                                                         Err(err) => {
203                                                                 // (Semi-)permanent failure, retry later.
204                                                                 log_error!(self.logger,
205                                                                         "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
206                                                                         num_confirmed,
207                                                                         num_unconfirmed
208                                                                 );
209                                                                 sync_state.pending_sync = true;
210                                                                 return Err(TxSyncError::from(err));
211                                                         },
212                                                 }
213                                         },
214                                         Err(InternalError::Inconsistency) => {
215                                                 // Immediately restart syncing when we encounter any inconsistencies.
216                                                 log_debug!(
217                                                         self.logger,
218                                                         "Encountered inconsistency during transaction sync, restarting."
219                                                 );
220                                                 sync_state.pending_sync = true;
221                                                 continue;
222                                         },
223                                         Err(err) => {
224                                                 // (Semi-)permanent failure, retry later.
225                                                 log_error!(self.logger,
226                                                         "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
227                                                         num_confirmed,
228                                                         num_unconfirmed
229                                                 );
230                                                 sync_state.pending_sync = true;
231                                                 return Err(TxSyncError::from(err));
232                                         },
233                                 }
234                                 sync_state.last_sync_hash = Some(tip_hash);
235                                 sync_state.pending_sync = false;
236                         }
237                 }
238                 #[cfg(feature = "time")]
239                 log_debug!(
240                         self.logger,
241                         "Finished transaction sync at tip {} in {}ms: {} confirmed, {} unconfirmed.",
242                         tip_hash,
243                         start_time.elapsed().as_millis(),
244                         num_confirmed,
245                         num_unconfirmed
246                 );
247                 #[cfg(not(feature = "time"))]
248                 log_debug!(
249                         self.logger,
250                         "Finished transaction sync at tip {}: {} confirmed, {} unconfirmed.",
251                         tip_hash,
252                         num_confirmed,
253                         num_unconfirmed
254                 );
255                 Ok(())
256         }
257
258         #[maybe_async]
259         fn sync_best_block_updated<C: Deref>(
260                 &self, confirmables: &Vec<C>, sync_state: &mut SyncState, tip_hash: &BlockHash,
261         ) -> Result<(), InternalError>
262         where
263                 C::Target: Confirm,
264         {
265                 // Inform the interface of the new block.
266                 let tip_header = maybe_await!(self.client.get_header_by_hash(tip_hash))?;
267                 let tip_status = maybe_await!(self.client.get_block_status(&tip_hash))?;
268                 if tip_status.in_best_chain {
269                         if let Some(tip_height) = tip_status.height {
270                                 for c in confirmables {
271                                         c.best_block_updated(&tip_header, tip_height);
272                                 }
273
274                                 // Prune any sufficiently confirmed output spends
275                                 sync_state.prune_output_spends(tip_height);
276                         }
277                 } else {
278                         return Err(InternalError::Inconsistency);
279                 }
280                 Ok(())
281         }
282
283         #[maybe_async]
284         fn get_confirmed_transactions(
285                 &self, sync_state: &SyncState,
286         ) -> Result<Vec<ConfirmedTx>, InternalError> {
287                 // First, check the confirmation status of registered transactions as well as the
288                 // status of dependent transactions of registered outputs.
289
290                 let mut confirmed_txs: Vec<ConfirmedTx> = Vec::new();
291
292                 for txid in &sync_state.watched_transactions {
293                         if confirmed_txs.iter().any(|ctx| ctx.txid == *txid) {
294                                 continue;
295                         }
296                         if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(*txid, None, None))? {
297                                 confirmed_txs.push(confirmed_tx);
298                         }
299                 }
300
301                 for (_, output) in &sync_state.watched_outputs {
302                         if let Some(output_status) = maybe_await!(self
303                                 .client
304                                 .get_output_status(&output.outpoint.txid, output.outpoint.index as u64))?
305                         {
306                                 if let Some(spending_txid) = output_status.txid {
307                                         if let Some(spending_tx_status) = output_status.status {
308                                                 if confirmed_txs.iter().any(|ctx| ctx.txid == spending_txid) {
309                                                         if spending_tx_status.confirmed {
310                                                                 // Skip inserting duplicate ConfirmedTx entry
311                                                                 continue;
312                                                         } else {
313                                                                 log_trace!(self.logger, "Inconsistency: Detected previously-confirmed Tx {} as unconfirmed", spending_txid);
314                                                                 return Err(InternalError::Inconsistency);
315                                                         }
316                                                 }
317
318                                                 if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(
319                                                         spending_txid,
320                                                         spending_tx_status.block_hash,
321                                                         spending_tx_status.block_height,
322                                                 ))? {
323                                                         confirmed_txs.push(confirmed_tx);
324                                                 }
325                                         }
326                                 }
327                         }
328                 }
329
330                 // Sort all confirmed transactions first by block height, then by in-block
331                 // position, and finally feed them to the interface in order.
332                 confirmed_txs.sort_unstable_by(|tx1, tx2| {
333                         tx1.block_height.cmp(&tx2.block_height).then_with(|| tx1.pos.cmp(&tx2.pos))
334                 });
335
336                 Ok(confirmed_txs)
337         }
338
339         #[maybe_async]
340         fn get_confirmed_tx(
341                 &self, txid: Txid, expected_block_hash: Option<BlockHash>, known_block_height: Option<u32>,
342         ) -> Result<Option<ConfirmedTx>, InternalError> {
343                 if let Some(merkle_block) = maybe_await!(self.client.get_merkle_block(&txid))? {
344                         let block_header = merkle_block.header;
345                         let block_hash = block_header.block_hash();
346                         if let Some(expected_block_hash) = expected_block_hash {
347                                 if expected_block_hash != block_hash {
348                                         log_trace!(
349                                                 self.logger,
350                                                 "Inconsistency: Tx {} expected in block {}, but is confirmed in {}",
351                                                 txid,
352                                                 expected_block_hash,
353                                                 block_hash
354                                         );
355                                         return Err(InternalError::Inconsistency);
356                                 }
357                         }
358
359                         let mut matches = Vec::new();
360                         let mut indexes = Vec::new();
361                         let _ = merkle_block.txn.extract_matches(&mut matches, &mut indexes);
362                         if indexes.len() != 1 || matches.len() != 1 || matches[0] != txid {
363                                 log_error!(self.logger, "Retrieved Merkle block for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid);
364                                 return Err(InternalError::Failed);
365                         }
366
367                         // unwrap() safety: len() > 0 is checked above
368                         let pos = *indexes.first().unwrap() as usize;
369                         if let Some(tx) = maybe_await!(self.client.get_tx(&txid))? {
370                                 if tx.txid() != txid {
371                                         log_error!(self.logger, "Retrieved transaction for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid);
372                                         return Err(InternalError::Failed);
373                                 }
374
375                                 if let Some(block_height) = known_block_height {
376                                         // We can take a shortcut here if a previous call already gave us the height.
377                                         return Ok(Some(ConfirmedTx { tx, txid, block_header, pos, block_height }));
378                                 }
379
380                                 let block_status = maybe_await!(self.client.get_block_status(&block_hash))?;
381                                 if let Some(block_height) = block_status.height {
382                                         return Ok(Some(ConfirmedTx { tx, txid, block_header, pos, block_height }));
383                                 } else {
384                                         // If any previously-confirmed block suddenly is no longer confirmed, we found
385                                         // an inconsistency and should start over.
386                                         log_trace!(
387                                                 self.logger,
388                                                 "Inconsistency: Tx {} was unconfirmed during syncing.",
389                                                 txid
390                                         );
391                                         return Err(InternalError::Inconsistency);
392                                 }
393                         }
394                 }
395                 Ok(None)
396         }
397
398         #[maybe_async]
399         fn get_unconfirmed_transactions<C: Deref>(
400                 &self, confirmables: &Vec<C>,
401         ) -> Result<Vec<Txid>, InternalError>
402         where
403                 C::Target: Confirm,
404         {
405                 // Query the interface for relevant txids and check whether the relevant blocks are still
406                 // in the best chain, mark them unconfirmed otherwise
407                 let relevant_txids = confirmables
408                         .iter()
409                         .flat_map(|c| c.get_relevant_txids())
410                         .collect::<HashSet<(Txid, u32, Option<BlockHash>)>>();
411
412                 let mut unconfirmed_txs = Vec::new();
413
414                 for (txid, _conf_height, block_hash_opt) in relevant_txids {
415                         if let Some(block_hash) = block_hash_opt {
416                                 let block_status = maybe_await!(self.client.get_block_status(&block_hash))?;
417                                 if block_status.in_best_chain {
418                                         // Skip if the block in question is still confirmed.
419                                         continue;
420                                 }
421
422                                 unconfirmed_txs.push(txid);
423                         } else {
424                                 log_error!(self.logger, "Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
425                                 panic!("Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
426                         }
427                 }
428                 Ok(unconfirmed_txs)
429         }
430
431         /// Returns a reference to the underlying esplora client.
432         ///
433         /// This is not exported to bindings users as the underlying client from BDK is not exported.
434         pub fn client(&self) -> &EsploraClientType {
435                 &self.client
436         }
437 }
438
439 #[cfg(feature = "async-interface")]
440 type MutexType<I> = futures::lock::Mutex<I>;
441 #[cfg(not(feature = "async-interface"))]
442 type MutexType<I> = std::sync::Mutex<I>;
443
444 // The underlying client type.
445 #[cfg(feature = "async-interface")]
446 type EsploraClientType = AsyncClient;
447 #[cfg(not(feature = "async-interface"))]
448 type EsploraClientType = BlockingClient;
449
450 impl<L: Deref> Filter for EsploraSyncClient<L>
451 where
452         L::Target: Logger,
453 {
454         fn register_tx(&self, txid: &Txid, _script_pubkey: &Script) {
455                 let mut locked_queue = self.queue.lock().unwrap();
456                 locked_queue.transactions.insert(*txid);
457         }
458
459         fn register_output(&self, output: WatchedOutput) {
460                 let mut locked_queue = self.queue.lock().unwrap();
461                 locked_queue.outputs.insert(output.outpoint.into_bitcoin_outpoint(), output);
462         }
463 }