Return confirmation height via `Confirm::get_relevant_txids`
[rust-lightning] / lightning-transaction-sync / src / esplora.rs
1 use crate::error::{TxSyncError, InternalError};
2 use crate::common::{SyncState, FilterQueue, ConfirmedTx};
3
4 use lightning::util::logger::Logger;
5 use lightning::{log_error, log_info, log_debug, log_trace};
6 use lightning::chain::WatchedOutput;
7 use lightning::chain::{Confirm, Filter};
8
9 use bitcoin::{BlockHash, Script, Txid};
10
11 use esplora_client::Builder;
12 #[cfg(feature = "async-interface")]
13 use esplora_client::r#async::AsyncClient;
14 #[cfg(not(feature = "async-interface"))]
15 use esplora_client::blocking::BlockingClient;
16
17 use std::collections::HashSet;
18 use core::ops::Deref;
19
20 /// Synchronizes LDK with a given [`Esplora`] server.
21 ///
22 /// Needs to be registered with a [`ChainMonitor`] via the [`Filter`] interface to be informed of
23 /// transactions and outputs to monitor for on-chain confirmation, unconfirmation, and
24 /// reconfirmation.
25 ///
26 /// Note that registration via [`Filter`] needs to happen before any calls to
27 /// [`Watch::watch_channel`] to ensure we get notified of the items to monitor.
28 ///
29 /// This uses and exposes either a blocking or async client variant dependent on whether the
30 /// `esplora-blocking` or the `esplora-async` feature is enabled.
31 ///
32 /// [`Esplora`]: https://github.com/Blockstream/electrs
33 /// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
34 /// [`Watch::watch_channel`]: lightning::chain::Watch::watch_channel
35 /// [`Filter`]: lightning::chain::Filter
36 pub struct EsploraSyncClient<L: Deref>
37 where
38         L::Target: Logger,
39 {
40         sync_state: MutexType<SyncState>,
41         queue: std::sync::Mutex<FilterQueue>,
42         client: EsploraClientType,
43         logger: L,
44 }
45
46 impl<L: Deref> EsploraSyncClient<L>
47 where
48         L::Target: Logger,
49 {
50         /// Returns a new [`EsploraSyncClient`] object.
51         pub fn new(server_url: String, logger: L) -> Self {
52                 let builder = Builder::new(&server_url);
53                 #[cfg(not(feature = "async-interface"))]
54                 let client = builder.build_blocking().unwrap();
55                 #[cfg(feature = "async-interface")]
56                 let client = builder.build_async().unwrap();
57
58                 EsploraSyncClient::from_client(client, logger)
59         }
60
61         /// Returns a new [`EsploraSyncClient`] object using the given Esplora client.
62         pub fn from_client(client: EsploraClientType, logger: L) -> Self {
63                 let sync_state = MutexType::new(SyncState::new());
64                 let queue = std::sync::Mutex::new(FilterQueue::new());
65                 Self {
66                         sync_state,
67                         queue,
68                         client,
69                         logger,
70                 }
71         }
72
73         /// Synchronizes the given `confirmables` via their [`Confirm`] interface implementations. This
74         /// method should be called regularly to keep LDK up-to-date with current chain data.
75         ///
76         /// For example, instances of [`ChannelManager`] and [`ChainMonitor`] can be informed about the
77         /// newest on-chain activity related to the items previously registered via the [`Filter`]
78         /// interface.
79         ///
80         /// [`Confirm`]: lightning::chain::Confirm
81         /// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
82         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
83         /// [`Filter`]: lightning::chain::Filter
84         #[maybe_async]
85         pub fn sync(&self, confirmables: Vec<&(dyn Confirm + Sync + Send)>) -> Result<(), TxSyncError> {
86                 // This lock makes sure we're syncing once at a time.
87                 #[cfg(not(feature = "async-interface"))]
88                 let mut sync_state = self.sync_state.lock().unwrap();
89                 #[cfg(feature = "async-interface")]
90                 let mut sync_state = self.sync_state.lock().await;
91
92                 log_info!(self.logger, "Starting transaction sync.");
93
94                 let mut tip_hash = maybe_await!(self.client.get_tip_hash())?;
95
96                 loop {
97                         let pending_registrations = self.queue.lock().unwrap().process_queues(&mut sync_state);
98                         let tip_is_new = Some(tip_hash) != sync_state.last_sync_hash;
99
100                         // We loop until any registered transactions have been processed at least once, or the
101                         // tip hasn't been updated during the last iteration.
102                         if !sync_state.pending_sync && !pending_registrations && !tip_is_new {
103                                 // Nothing to do.
104                                 break;
105                         } else {
106                                 // Update the known tip to the newest one.
107                                 if tip_is_new {
108                                         // First check for any unconfirmed transactions and act on it immediately.
109                                         match maybe_await!(self.get_unconfirmed_transactions(&confirmables)) {
110                                                 Ok(unconfirmed_txs) => {
111                                                         // Double-check the tip hash. If it changed, a reorg happened since
112                                                         // we started syncing and we need to restart last-minute.
113                                                         let check_tip_hash = maybe_await!(self.client.get_tip_hash())?;
114                                                         if check_tip_hash != tip_hash {
115                                                                 tip_hash = check_tip_hash;
116                                                                 continue;
117                                                         }
118
119                                                         self.sync_unconfirmed_transactions(&mut sync_state, &confirmables, unconfirmed_txs);
120                                                 },
121                                                 Err(err) => {
122                                                         // (Semi-)permanent failure, retry later.
123                                                         log_error!(self.logger, "Failed during transaction sync, aborting.");
124                                                         sync_state.pending_sync = true;
125                                                         return Err(TxSyncError::from(err));
126                                                 }
127                                         }
128
129                                         match maybe_await!(self.sync_best_block_updated(&confirmables, &tip_hash)) {
130                                                 Ok(()) => {}
131                                                 Err(InternalError::Inconsistency) => {
132                                                         // Immediately restart syncing when we encounter any inconsistencies.
133                                                         log_debug!(self.logger, "Encountered inconsistency during transaction sync, restarting.");
134                                                         sync_state.pending_sync = true;
135                                                         continue;
136                                                 }
137                                                 Err(err) => {
138                                                         // (Semi-)permanent failure, retry later.
139                                                         sync_state.pending_sync = true;
140                                                         return Err(TxSyncError::from(err));
141                                                 }
142                                         }
143                                 }
144
145                                 match maybe_await!(self.get_confirmed_transactions(&sync_state)) {
146                                         Ok(confirmed_txs) => {
147                                                 // Double-check the tip hash. If it changed, a reorg happened since
148                                                 // we started syncing and we need to restart last-minute.
149                                                 let check_tip_hash = maybe_await!(self.client.get_tip_hash())?;
150                                                 if check_tip_hash != tip_hash {
151                                                         tip_hash = check_tip_hash;
152                                                         continue;
153                                                 }
154
155                                                 self.sync_confirmed_transactions(
156                                                         &mut sync_state,
157                                                         &confirmables,
158                                                         confirmed_txs,
159                                                 );
160                                         }
161                                         Err(InternalError::Inconsistency) => {
162                                                 // Immediately restart syncing when we encounter any inconsistencies.
163                                                 log_debug!(self.logger, "Encountered inconsistency during transaction sync, restarting.");
164                                                 sync_state.pending_sync = true;
165                                                 continue;
166                                         }
167                                         Err(err) => {
168                                                 // (Semi-)permanent failure, retry later.
169                                                 log_error!(self.logger, "Failed during transaction sync, aborting.");
170                                                 sync_state.pending_sync = true;
171                                                 return Err(TxSyncError::from(err));
172                                         }
173                                 }
174                                 sync_state.last_sync_hash = Some(tip_hash);
175                                 sync_state.pending_sync = false;
176                         }
177                 }
178                 log_info!(self.logger, "Finished transaction sync.");
179                 Ok(())
180         }
181
182         #[maybe_async]
183         fn sync_best_block_updated(
184                 &self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, tip_hash: &BlockHash,
185         ) -> Result<(), InternalError> {
186
187                 // Inform the interface of the new block.
188                 let tip_header = maybe_await!(self.client.get_header_by_hash(tip_hash))?;
189                 let tip_status = maybe_await!(self.client.get_block_status(&tip_hash))?;
190                 if tip_status.in_best_chain {
191                         if let Some(tip_height) = tip_status.height {
192                                 for c in confirmables {
193                                         c.best_block_updated(&tip_header, tip_height);
194                                 }
195                         }
196                 } else {
197                         return Err(InternalError::Inconsistency);
198                 }
199                 Ok(())
200         }
201
202         fn sync_confirmed_transactions(
203                 &self, sync_state: &mut SyncState, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, confirmed_txs: Vec<ConfirmedTx>,
204         ) {
205                 for ctx in confirmed_txs {
206                         for c in confirmables {
207                                 c.transactions_confirmed(
208                                         &ctx.block_header,
209                                         &[(ctx.pos, &ctx.tx)],
210                                         ctx.block_height,
211                                 );
212                         }
213
214                         sync_state.watched_transactions.remove(&ctx.tx.txid());
215
216                         for input in &ctx.tx.input {
217                                 sync_state.watched_outputs.remove(&input.previous_output);
218                         }
219                 }
220         }
221
222         #[maybe_async]
223         fn get_confirmed_transactions(
224                 &self, sync_state: &SyncState,
225         ) -> Result<Vec<ConfirmedTx>, InternalError> {
226
227                 // First, check the confirmation status of registered transactions as well as the
228                 // status of dependent transactions of registered outputs.
229
230                 let mut confirmed_txs = Vec::new();
231
232                 for txid in &sync_state.watched_transactions {
233                         if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(&txid, None, None))? {
234                                 confirmed_txs.push(confirmed_tx);
235                         }
236                 }
237
238                 for (_, output) in &sync_state.watched_outputs {
239                         if let Some(output_status) = maybe_await!(self.client
240                                 .get_output_status(&output.outpoint.txid, output.outpoint.index as u64))?
241                         {
242                                 if let Some(spending_txid) = output_status.txid {
243                                         if let Some(spending_tx_status) = output_status.status {
244                                                 if let Some(confirmed_tx) = maybe_await!(self
245                                                         .get_confirmed_tx(
246                                                                 &spending_txid,
247                                                                 spending_tx_status.block_hash,
248                                                                 spending_tx_status.block_height,
249                                                         ))?
250                                                 {
251                                                         confirmed_txs.push(confirmed_tx);
252                                                 }
253                                         }
254                                 }
255                         }
256                 }
257
258                 // Sort all confirmed transactions first by block height, then by in-block
259                 // position, and finally feed them to the interface in order.
260                 confirmed_txs.sort_unstable_by(|tx1, tx2| {
261                         tx1.block_height.cmp(&tx2.block_height).then_with(|| tx1.pos.cmp(&tx2.pos))
262                 });
263
264                 Ok(confirmed_txs)
265         }
266
267         #[maybe_async]
268         fn get_confirmed_tx(
269                 &self, txid: &Txid, expected_block_hash: Option<BlockHash>, known_block_height: Option<u32>,
270         ) -> Result<Option<ConfirmedTx>, InternalError> {
271                 if let Some(merkle_block) = maybe_await!(self.client.get_merkle_block(&txid))? {
272                         let block_header = merkle_block.header;
273                         let block_hash = block_header.block_hash();
274                         if let Some(expected_block_hash) = expected_block_hash {
275                                 if expected_block_hash != block_hash {
276                                         log_trace!(self.logger, "Inconsistency: Tx {} expected in block {}, but is confirmed in {}", txid, expected_block_hash, block_hash);
277                                         return Err(InternalError::Inconsistency);
278                                 }
279                         }
280
281                         let mut matches = Vec::new();
282                         let mut indexes = Vec::new();
283                         let _ = merkle_block.txn.extract_matches(&mut matches, &mut indexes);
284                         if indexes.len() != 1 || matches.len() != 1 || matches[0] != *txid {
285                                 log_error!(self.logger, "Retrieved Merkle block for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid);
286                                 return Err(InternalError::Failed);
287                         }
288
289                         let pos = *indexes.get(0).ok_or(InternalError::Failed)? as usize;
290                         if let Some(tx) = maybe_await!(self.client.get_tx(&txid))? {
291                                 if let Some(block_height) = known_block_height {
292                                         // We can take a shortcut here if a previous call already gave us the height.
293                                         return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height }));
294                                 }
295
296                                 let block_status = maybe_await!(self.client.get_block_status(&block_hash))?;
297                                 if let Some(block_height) = block_status.height {
298                                         return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height }));
299                                 } else {
300                                         // If any previously-confirmed block suddenly is no longer confirmed, we found
301                                         // an inconsistency and should start over.
302                                         log_trace!(self.logger, "Inconsistency: Tx {} was unconfirmed during syncing.", txid);
303                                         return Err(InternalError::Inconsistency);
304                                 }
305                         }
306                 }
307                 Ok(None)
308         }
309
310         #[maybe_async]
311         fn get_unconfirmed_transactions(
312                 &self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>,
313         ) -> Result<Vec<Txid>, InternalError> {
314                 // Query the interface for relevant txids and check whether the relevant blocks are still
315                 // in the best chain, mark them unconfirmed otherwise
316                 let relevant_txids = confirmables
317                         .iter()
318                         .flat_map(|c| c.get_relevant_txids())
319                         .collect::<HashSet<(Txid, u32, Option<BlockHash>)>>();
320
321                 let mut unconfirmed_txs = Vec::new();
322
323                 for (txid, _conf_height, block_hash_opt) in relevant_txids {
324                         if let Some(block_hash) = block_hash_opt {
325                                 let block_status = maybe_await!(self.client.get_block_status(&block_hash))?;
326                                 if block_status.in_best_chain {
327                                         // Skip if the block in question is still confirmed.
328                                         continue;
329                                 }
330
331                                 unconfirmed_txs.push(txid);
332                         } else {
333                                 log_error!(self.logger, "Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
334                                 panic!("Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
335                         }
336                 }
337                 Ok(unconfirmed_txs)
338         }
339
340         fn sync_unconfirmed_transactions(
341                 &self, sync_state: &mut SyncState, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, unconfirmed_txs: Vec<Txid>,
342         ) {
343                 for txid in unconfirmed_txs {
344                         for c in confirmables {
345                                 c.transaction_unconfirmed(&txid);
346                         }
347
348                         sync_state.watched_transactions.insert(txid);
349                 }
350         }
351
352         /// Returns a reference to the underlying esplora client.
353         pub fn client(&self) -> &EsploraClientType {
354                 &self.client
355         }
356 }
357
358 #[cfg(feature = "async-interface")]
359 type MutexType<I> = futures::lock::Mutex<I>;
360 #[cfg(not(feature = "async-interface"))]
361 type MutexType<I> = std::sync::Mutex<I>;
362
363 // The underlying client type.
364 #[cfg(feature = "async-interface")]
365 type EsploraClientType = AsyncClient;
366 #[cfg(not(feature = "async-interface"))]
367 type EsploraClientType = BlockingClient;
368
369
370 impl<L: Deref> Filter for EsploraSyncClient<L>
371 where
372         L::Target: Logger,
373 {
374         fn register_tx(&self, txid: &Txid, _script_pubkey: &Script) {
375                 let mut locked_queue = self.queue.lock().unwrap();
376                 locked_queue.transactions.insert(*txid);
377         }
378
379         fn register_output(&self, output: WatchedOutput) {
380                 let mut locked_queue = self.queue.lock().unwrap();
381                 locked_queue.outputs.insert(output.outpoint.into_bitcoin_outpoint(), output);
382         }
383 }