Unblock channels awaiting monitor update based on `ChanMan` queue
[rust-lightning] / lightning-transaction-sync / src / electrum.rs
1 use crate::common::{ConfirmedTx, FilterQueue, SyncState};
2 use crate::error::{InternalError, TxSyncError};
3
4 use electrum_client::Client as ElectrumClient;
5 use electrum_client::ElectrumApi;
6 use electrum_client::GetMerkleRes;
7
8 use lightning::chain::WatchedOutput;
9 use lightning::chain::{Confirm, Filter};
10 use lightning::util::logger::Logger;
11 use lightning::{log_debug, log_error, log_trace};
12
13 use bitcoin::block::Header;
14 use bitcoin::hash_types::TxMerkleNode;
15 use bitcoin::hashes::sha256d::Hash as Sha256d;
16 use bitcoin::hashes::Hash;
17 use bitcoin::{BlockHash, Script, Transaction, Txid};
18
19 use std::collections::HashSet;
20 use std::ops::Deref;
21 use std::sync::Mutex;
22 use std::time::Instant;
23
24 /// Synchronizes LDK with a given Electrum server.
25 ///
26 /// Needs to be registered with a [`ChainMonitor`] via the [`Filter`] interface to be informed of
27 /// transactions and outputs to monitor for on-chain confirmation, unconfirmation, and
28 /// reconfirmation.
29 ///
30 /// Note that registration via [`Filter`] needs to happen before any calls to
31 /// [`Watch::watch_channel`] to ensure we get notified of the items to monitor.
32 ///
33 /// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
34 /// [`Watch::watch_channel`]: lightning::chain::Watch::watch_channel
35 /// [`Filter`]: lightning::chain::Filter
36 pub struct ElectrumSyncClient<L: Deref>
37 where
38         L::Target: Logger,
39 {
40         sync_state: Mutex<SyncState>,
41         queue: Mutex<FilterQueue>,
42         client: ElectrumClient,
43         logger: L,
44 }
45
46 impl<L: Deref> ElectrumSyncClient<L>
47 where
48         L::Target: Logger,
49 {
50         /// Returns a new [`ElectrumSyncClient`] object.
51         pub fn new(server_url: String, logger: L) -> Result<Self, TxSyncError> {
52                 let client = ElectrumClient::new(&server_url).map_err(|e| {
53                         log_error!(logger, "Failed to connect to electrum server '{}': {}", server_url, e);
54                         e
55                 })?;
56
57                 Self::from_client(client, logger)
58         }
59
60         /// Returns a new [`ElectrumSyncClient`] object using the given Electrum client.
61         ///
62         /// This is not exported to bindings users as the underlying client from BDK is not exported.
63         pub fn from_client(client: ElectrumClient, logger: L) -> Result<Self, TxSyncError> {
64                 let sync_state = Mutex::new(SyncState::new());
65                 let queue = Mutex::new(FilterQueue::new());
66
67                 Ok(Self { sync_state, queue, client, logger })
68         }
69
70         /// Synchronizes the given `confirmables` via their [`Confirm`] interface implementations. This
71         /// method should be called regularly to keep LDK up-to-date with current chain data.
72         ///
73         /// For example, instances of [`ChannelManager`] and [`ChainMonitor`] can be informed about the
74         /// newest on-chain activity related to the items previously registered via the [`Filter`]
75         /// interface.
76         ///
77         /// [`Confirm`]: lightning::chain::Confirm
78         /// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
79         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
80         /// [`Filter`]: lightning::chain::Filter
81         pub fn sync<C: Deref>(&self, confirmables: Vec<C>) -> Result<(), TxSyncError>
82         where
83                 C::Target: Confirm,
84         {
85                 // This lock makes sure we're syncing once at a time.
86                 let mut sync_state = self.sync_state.lock().unwrap();
87
88                 log_trace!(self.logger, "Starting transaction sync.");
89                 #[cfg(feature = "time")]
90                 let start_time = Instant::now();
91                 let mut num_confirmed = 0;
92                 let mut num_unconfirmed = 0;
93
94                 // Clear any header notifications we might have gotten to keep the queue count low.
95                 while let Some(_) = self.client.block_headers_pop()? {}
96
97                 let tip_notification = self.client.block_headers_subscribe()?;
98                 let mut tip_header = tip_notification.header;
99                 let mut tip_height = tip_notification.height as u32;
100
101                 loop {
102                         let pending_registrations = self.queue.lock().unwrap().process_queues(&mut sync_state);
103                         let tip_is_new = Some(tip_header.block_hash()) != sync_state.last_sync_hash;
104
105                         // We loop until any registered transactions have been processed at least once, or the
106                         // tip hasn't been updated during the last iteration.
107                         if !sync_state.pending_sync && !pending_registrations && !tip_is_new {
108                                 // Nothing to do.
109                                 break;
110                         } else {
111                                 // Update the known tip to the newest one.
112                                 if tip_is_new {
113                                         // First check for any unconfirmed transactions and act on it immediately.
114                                         match self.get_unconfirmed_transactions(&confirmables) {
115                                                 Ok(unconfirmed_txs) => {
116                                                         // Double-check the tip hash. If it changed, a reorg happened since
117                                                         // we started syncing and we need to restart last-minute.
118                                                         match self.check_update_tip(&mut tip_header, &mut tip_height) {
119                                                                 Ok(false) => {
120                                                                         num_unconfirmed += unconfirmed_txs.len();
121                                                                         sync_state.sync_unconfirmed_transactions(
122                                                                                 &confirmables,
123                                                                                 unconfirmed_txs,
124                                                                         );
125                                                                 },
126                                                                 Ok(true) => {
127                                                                         log_debug!(self.logger,
128                                                                                 "Encountered inconsistency during transaction sync, restarting.");
129                                                                         sync_state.pending_sync = true;
130                                                                         continue;
131                                                                 },
132                                                                 Err(err) => {
133                                                                         // (Semi-)permanent failure, retry later.
134                                                                         log_error!(self.logger,
135                                                                                 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
136                                                                                 num_confirmed,
137                                                                                 num_unconfirmed
138                                                                         );
139                                                                         sync_state.pending_sync = true;
140                                                                         return Err(TxSyncError::from(err));
141                                                                 },
142                                                         }
143                                                 },
144                                                 Err(err) => {
145                                                         // (Semi-)permanent failure, retry later.
146                                                         log_error!(self.logger,
147                                                                 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
148                                                                 num_confirmed,
149                                                                 num_unconfirmed
150                                                         );
151                                                         sync_state.pending_sync = true;
152                                                         return Err(TxSyncError::from(err));
153                                                 },
154                                         }
155
156                                         // Update the best block.
157                                         for c in &confirmables {
158                                                 c.best_block_updated(&tip_header, tip_height);
159                                         }
160
161                                         // Prune any sufficiently confirmed output spends
162                                         sync_state.prune_output_spends(tip_height);
163                                 }
164
165                                 match self.get_confirmed_transactions(&sync_state) {
166                                         Ok(confirmed_txs) => {
167                                                 // Double-check the tip hash. If it changed, a reorg happened since
168                                                 // we started syncing and we need to restart last-minute.
169                                                 match self.check_update_tip(&mut tip_header, &mut tip_height) {
170                                                         Ok(false) => {
171                                                                 num_confirmed += confirmed_txs.len();
172                                                                 sync_state
173                                                                         .sync_confirmed_transactions(&confirmables, confirmed_txs);
174                                                         },
175                                                         Ok(true) => {
176                                                                 log_debug!(self.logger,
177                                                                         "Encountered inconsistency during transaction sync, restarting.");
178                                                                 sync_state.pending_sync = true;
179                                                                 continue;
180                                                         },
181                                                         Err(err) => {
182                                                                 // (Semi-)permanent failure, retry later.
183                                                                 log_error!(self.logger,
184                                                                         "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
185                                                                         num_confirmed,
186                                                                         num_unconfirmed
187                                                                 );
188                                                                 sync_state.pending_sync = true;
189                                                                 return Err(TxSyncError::from(err));
190                                                         },
191                                                 }
192                                         },
193                                         Err(InternalError::Inconsistency) => {
194                                                 // Immediately restart syncing when we encounter any inconsistencies.
195                                                 log_debug!(
196                                                         self.logger,
197                                                         "Encountered inconsistency during transaction sync, restarting."
198                                                 );
199                                                 sync_state.pending_sync = true;
200                                                 continue;
201                                         },
202                                         Err(err) => {
203                                                 // (Semi-)permanent failure, retry later.
204                                                 log_error!(self.logger,
205                                                         "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
206                                                         num_confirmed,
207                                                         num_unconfirmed
208                                                 );
209                                                 sync_state.pending_sync = true;
210                                                 return Err(TxSyncError::from(err));
211                                         },
212                                 }
213                                 sync_state.last_sync_hash = Some(tip_header.block_hash());
214                                 sync_state.pending_sync = false;
215                         }
216                 }
217                 #[cfg(feature = "time")]
218                 log_debug!(
219                         self.logger,
220                         "Finished transaction sync at tip {} in {}ms: {} confirmed, {} unconfirmed.",
221                         tip_header.block_hash(),
222                         start_time.elapsed().as_millis(),
223                         num_confirmed,
224                         num_unconfirmed
225                 );
226                 #[cfg(not(feature = "time"))]
227                 log_debug!(
228                         self.logger,
229                         "Finished transaction sync at tip {}: {} confirmed, {} unconfirmed.",
230                         tip_header.block_hash(),
231                         num_confirmed,
232                         num_unconfirmed
233                 );
234                 Ok(())
235         }
236
237         fn check_update_tip(
238                 &self, cur_tip_header: &mut Header, cur_tip_height: &mut u32,
239         ) -> Result<bool, InternalError> {
240                 let check_notification = self.client.block_headers_subscribe()?;
241                 let check_tip_hash = check_notification.header.block_hash();
242
243                 // Restart if either the tip changed or we got some divergent tip
244                 // change notification since we started. In the latter case we
245                 // make sure we clear the queue before continuing.
246                 let mut restart_sync = check_tip_hash != cur_tip_header.block_hash();
247                 while let Some(queued_notif) = self.client.block_headers_pop()? {
248                         if queued_notif.header.block_hash() != check_tip_hash {
249                                 restart_sync = true
250                         }
251                 }
252
253                 if restart_sync {
254                         *cur_tip_header = check_notification.header;
255                         *cur_tip_height = check_notification.height as u32;
256                         Ok(true)
257                 } else {
258                         Ok(false)
259                 }
260         }
261
262         fn get_confirmed_transactions(
263                 &self, sync_state: &SyncState,
264         ) -> Result<Vec<ConfirmedTx>, InternalError> {
265                 // First, check the confirmation status of registered transactions as well as the
266                 // status of dependent transactions of registered outputs.
267                 let mut confirmed_txs: Vec<ConfirmedTx> = Vec::new();
268                 let mut watched_script_pubkeys = Vec::with_capacity(
269                         sync_state.watched_transactions.len() + sync_state.watched_outputs.len(),
270                 );
271                 let mut watched_txs = Vec::with_capacity(sync_state.watched_transactions.len());
272
273                 for txid in &sync_state.watched_transactions {
274                         match self.client.transaction_get(&txid) {
275                                 Ok(tx) => {
276                                         watched_txs.push((txid, tx.clone()));
277                                         if let Some(tx_out) = tx.output.first() {
278                                                 // We watch an arbitrary output of the transaction of interest in order to
279                                                 // retrieve the associated script history, before narrowing down our search
280                                                 // through `filter`ing by `txid` below.
281                                                 watched_script_pubkeys.push(tx_out.script_pubkey.clone());
282                                         } else {
283                                                 debug_assert!(false, "Failed due to retrieving invalid tx data.");
284                                                 log_error!(self.logger, "Failed due to retrieving invalid tx data.");
285                                                 return Err(InternalError::Failed);
286                                         }
287                                 },
288                                 Err(electrum_client::Error::Protocol(_)) => {
289                                         // We couldn't find the tx, do nothing.
290                                 },
291                                 Err(e) => {
292                                         log_error!(self.logger, "Failed to look up transaction {}: {}.", txid, e);
293                                         return Err(InternalError::Failed);
294                                 },
295                         }
296                 }
297
298                 let num_tx_lookups = watched_script_pubkeys.len();
299                 debug_assert_eq!(num_tx_lookups, watched_txs.len());
300
301                 for output in sync_state.watched_outputs.values() {
302                         watched_script_pubkeys.push(output.script_pubkey.clone());
303                 }
304
305                 let num_output_spend_lookups = watched_script_pubkeys.len() - num_tx_lookups;
306                 debug_assert_eq!(num_output_spend_lookups, sync_state.watched_outputs.len());
307
308                 match self.client.batch_script_get_history(watched_script_pubkeys.iter().map(|s| s.deref()))
309                 {
310                         Ok(results) => {
311                                 let (tx_results, output_results) = results.split_at(num_tx_lookups);
312                                 debug_assert_eq!(num_output_spend_lookups, output_results.len());
313
314                                 for (i, script_history) in tx_results.iter().enumerate() {
315                                         let (txid, tx) = &watched_txs[i];
316                                         if confirmed_txs.iter().any(|ctx| ctx.txid == **txid) {
317                                                 continue;
318                                         }
319                                         let mut filtered_history =
320                                                 script_history.iter().filter(|h| h.tx_hash == **txid);
321                                         if let Some(history) = filtered_history.next() {
322                                                 let prob_conf_height = history.height as u32;
323                                                 let confirmed_tx = self.get_confirmed_tx(tx, prob_conf_height)?;
324                                                 confirmed_txs.push(confirmed_tx);
325                                         }
326                                         debug_assert!(filtered_history.next().is_none());
327                                 }
328
329                                 for (watched_output, script_history) in
330                                         sync_state.watched_outputs.values().zip(output_results)
331                                 {
332                                         for possible_output_spend in script_history {
333                                                 if possible_output_spend.height <= 0 {
334                                                         continue;
335                                                 }
336
337                                                 let txid = possible_output_spend.tx_hash;
338                                                 if confirmed_txs.iter().any(|ctx| ctx.txid == txid) {
339                                                         continue;
340                                                 }
341
342                                                 match self.client.transaction_get(&txid) {
343                                                         Ok(tx) => {
344                                                                 let mut is_spend = false;
345                                                                 for txin in &tx.input {
346                                                                         let watched_outpoint =
347                                                                                 watched_output.outpoint.into_bitcoin_outpoint();
348                                                                         if txin.previous_output == watched_outpoint {
349                                                                                 is_spend = true;
350                                                                                 break;
351                                                                         }
352                                                                 }
353
354                                                                 if !is_spend {
355                                                                         continue;
356                                                                 }
357
358                                                                 let prob_conf_height = possible_output_spend.height as u32;
359                                                                 let confirmed_tx = self.get_confirmed_tx(&tx, prob_conf_height)?;
360                                                                 confirmed_txs.push(confirmed_tx);
361                                                         },
362                                                         Err(e) => {
363                                                                 log_trace!(
364                                                                         self.logger,
365                                                                         "Inconsistency: Tx {} was unconfirmed during syncing: {}",
366                                                                         txid,
367                                                                         e
368                                                                 );
369                                                                 return Err(InternalError::Inconsistency);
370                                                         },
371                                                 }
372                                         }
373                                 }
374                         },
375                         Err(e) => {
376                                 log_error!(self.logger, "Failed to look up script histories: {}.", e);
377                                 return Err(InternalError::Failed);
378                         },
379                 }
380
381                 // Sort all confirmed transactions first by block height, then by in-block
382                 // position, and finally feed them to the interface in order.
383                 confirmed_txs.sort_unstable_by(|tx1, tx2| {
384                         tx1.block_height.cmp(&tx2.block_height).then_with(|| tx1.pos.cmp(&tx2.pos))
385                 });
386
387                 Ok(confirmed_txs)
388         }
389
390         fn get_unconfirmed_transactions<C: Deref>(
391                 &self, confirmables: &Vec<C>,
392         ) -> Result<Vec<Txid>, InternalError>
393         where
394                 C::Target: Confirm,
395         {
396                 // Query the interface for relevant txids and check whether the relevant blocks are still
397                 // in the best chain, mark them unconfirmed otherwise
398                 let relevant_txids = confirmables
399                         .iter()
400                         .flat_map(|c| c.get_relevant_txids())
401                         .collect::<HashSet<(Txid, u32, Option<BlockHash>)>>();
402
403                 let mut unconfirmed_txs = Vec::new();
404
405                 for (txid, conf_height, block_hash_opt) in relevant_txids {
406                         if let Some(block_hash) = block_hash_opt {
407                                 let block_header = self.client.block_header(conf_height as usize)?;
408                                 if block_header.block_hash() == block_hash {
409                                         // Skip if the tx is still confirmed in the block in question.
410                                         continue;
411                                 }
412
413                                 unconfirmed_txs.push(txid);
414                         } else {
415                                 log_error!(self.logger,
416                                         "Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
417                                 panic!("Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
418                         }
419                 }
420                 Ok(unconfirmed_txs)
421         }
422
423         fn get_confirmed_tx(
424                 &self, tx: &Transaction, prob_conf_height: u32,
425         ) -> Result<ConfirmedTx, InternalError> {
426                 let txid = tx.txid();
427                 match self.client.transaction_get_merkle(&txid, prob_conf_height as usize) {
428                         Ok(merkle_res) => {
429                                 debug_assert_eq!(prob_conf_height, merkle_res.block_height as u32);
430                                 match self.client.block_header(prob_conf_height as usize) {
431                                         Ok(block_header) => {
432                                                 let pos = merkle_res.pos;
433                                                 if !self.validate_merkle_proof(
434                                                         &txid,
435                                                         &block_header.merkle_root,
436                                                         merkle_res,
437                                                 )? {
438                                                         log_trace!(
439                                                                 self.logger,
440                                                                 "Inconsistency: Block {} was unconfirmed during syncing.",
441                                                                 block_header.block_hash()
442                                                         );
443                                                         return Err(InternalError::Inconsistency);
444                                                 }
445                                                 let confirmed_tx = ConfirmedTx {
446                                                         tx: tx.clone(),
447                                                         txid,
448                                                         block_header,
449                                                         block_height: prob_conf_height,
450                                                         pos,
451                                                 };
452                                                 Ok(confirmed_tx)
453                                         },
454                                         Err(e) => {
455                                                 log_error!(
456                                                         self.logger,
457                                                         "Failed to retrieve block header for height {}: {}.",
458                                                         prob_conf_height,
459                                                         e
460                                                 );
461                                                 Err(InternalError::Failed)
462                                         },
463                                 }
464                         },
465                         Err(e) => {
466                                 log_trace!(
467                                         self.logger,
468                                         "Inconsistency: Tx {} was unconfirmed during syncing: {}",
469                                         txid,
470                                         e
471                                 );
472                                 Err(InternalError::Inconsistency)
473                         },
474                 }
475         }
476
477         /// Returns a reference to the underlying Electrum client.
478         ///
479         /// This is not exported to bindings users as the underlying client from BDK is not exported.
480         pub fn client(&self) -> &ElectrumClient {
481                 &self.client
482         }
483
484         fn validate_merkle_proof(
485                 &self, txid: &Txid, merkle_root: &TxMerkleNode, merkle_res: GetMerkleRes,
486         ) -> Result<bool, InternalError> {
487                 let mut index = merkle_res.pos;
488                 let mut cur = txid.to_raw_hash();
489                 for mut bytes in merkle_res.merkle {
490                         bytes.reverse();
491                         // unwrap() safety: `bytes` has len 32 so `from_slice` can never fail.
492                         let next_hash = Sha256d::from_slice(&bytes).unwrap();
493                         let (left, right) = if index % 2 == 0 { (cur, next_hash) } else { (next_hash, cur) };
494
495                         let data = [&left[..], &right[..]].concat();
496                         cur = Sha256d::hash(&data);
497                         index /= 2;
498                 }
499
500                 Ok(cur == merkle_root.to_raw_hash())
501         }
502 }
503
504 impl<L: Deref> Filter for ElectrumSyncClient<L>
505 where
506         L::Target: Logger,
507 {
508         fn register_tx(&self, txid: &Txid, _script_pubkey: &Script) {
509                 let mut locked_queue = self.queue.lock().unwrap();
510                 locked_queue.transactions.insert(*txid);
511         }
512
513         fn register_output(&self, output: WatchedOutput) {
514                 let mut locked_queue = self.queue.lock().unwrap();
515                 locked_queue.outputs.insert(output.outpoint.into_bitcoin_outpoint(), output);
516         }
517 }