1 use crate::common::{ConfirmedTx, SyncState, FilterQueue};
2 use crate::error::{TxSyncError, InternalError};
4 use electrum_client::Client as ElectrumClient;
5 use electrum_client::ElectrumApi;
6 use electrum_client::GetMerkleRes;
8 use lightning::util::logger::Logger;
9 use lightning::{log_error, log_debug, log_trace};
10 use lightning::chain::WatchedOutput;
11 use lightning::chain::{Confirm, Filter};
13 use bitcoin::{BlockHash, Script, Transaction, Txid};
14 use bitcoin::block::Header;
15 use bitcoin::hash_types::TxMerkleNode;
16 use bitcoin::hashes::Hash;
17 use bitcoin::hashes::sha256d::Hash as Sha256d;
21 use std::collections::HashSet;
22 use std::time::Instant;
24 /// Synchronizes LDK with a given Electrum server.
26 /// Needs to be registered with a [`ChainMonitor`] via the [`Filter`] interface to be informed of
27 /// transactions and outputs to monitor for on-chain confirmation, unconfirmation, and
30 /// Note that registration via [`Filter`] needs to happen before any calls to
31 /// [`Watch::watch_channel`] to ensure we get notified of the items to monitor.
33 /// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
34 /// [`Watch::watch_channel`]: lightning::chain::Watch::watch_channel
35 /// [`Filter`]: lightning::chain::Filter
36 pub struct ElectrumSyncClient<L: Deref>
40 sync_state: Mutex<SyncState>,
41 queue: Mutex<FilterQueue>,
42 client: ElectrumClient,
46 impl<L: Deref> ElectrumSyncClient<L>
50 /// Returns a new [`ElectrumSyncClient`] object.
51 pub fn new(server_url: String, logger: L) -> Result<Self, TxSyncError> {
52 let client = ElectrumClient::new(&server_url).map_err(|e| {
53 log_error!(logger, "Failed to connect to electrum server '{}': {}", server_url, e);
57 Self::from_client(client, logger)
60 /// Returns a new [`ElectrumSyncClient`] object using the given Electrum client.
62 /// This is not exported to bindings users as the underlying client from BDK is not exported.
63 pub fn from_client(client: ElectrumClient, logger: L) -> Result<Self, TxSyncError> {
64 let sync_state = Mutex::new(SyncState::new());
65 let queue = Mutex::new(FilterQueue::new());
75 /// Synchronizes the given `confirmables` via their [`Confirm`] interface implementations. This
76 /// method should be called regularly to keep LDK up-to-date with current chain data.
78 /// For example, instances of [`ChannelManager`] and [`ChainMonitor`] can be informed about the
79 /// newest on-chain activity related to the items previously registered via the [`Filter`]
82 /// [`Confirm`]: lightning::chain::Confirm
83 /// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
84 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
85 /// [`Filter`]: lightning::chain::Filter
86 pub fn sync<C: Deref>(&self, confirmables: Vec<C>) -> Result<(), TxSyncError>
87 where C::Target: Confirm
89 // This lock makes sure we're syncing once at a time.
90 let mut sync_state = self.sync_state.lock().unwrap();
92 log_trace!(self.logger, "Starting transaction sync.");
93 #[cfg(feature = "time")]
94 let start_time = Instant::now();
95 let mut num_confirmed = 0;
96 let mut num_unconfirmed = 0;
98 // Clear any header notifications we might have gotten to keep the queue count low.
99 while let Some(_) = self.client.block_headers_pop()? {}
101 let tip_notification = self.client.block_headers_subscribe()?;
102 let mut tip_header = tip_notification.header;
103 let mut tip_height = tip_notification.height as u32;
106 let pending_registrations = self.queue.lock().unwrap().process_queues(&mut sync_state);
107 let tip_is_new = Some(tip_header.block_hash()) != sync_state.last_sync_hash;
109 // We loop until any registered transactions have been processed at least once, or the
110 // tip hasn't been updated during the last iteration.
111 if !sync_state.pending_sync && !pending_registrations && !tip_is_new {
115 // Update the known tip to the newest one.
117 // First check for any unconfirmed transactions and act on it immediately.
118 match self.get_unconfirmed_transactions(&confirmables) {
119 Ok(unconfirmed_txs) => {
120 // Double-check the tip hash. If it changed, a reorg happened since
121 // we started syncing and we need to restart last-minute.
122 match self.check_update_tip(&mut tip_header, &mut tip_height) {
124 num_unconfirmed += unconfirmed_txs.len();
125 sync_state.sync_unconfirmed_transactions(
131 log_debug!(self.logger,
132 "Encountered inconsistency during transaction sync, restarting.");
133 sync_state.pending_sync = true;
137 // (Semi-)permanent failure, retry later.
138 log_error!(self.logger,
139 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
143 sync_state.pending_sync = true;
144 return Err(TxSyncError::from(err));
149 // (Semi-)permanent failure, retry later.
150 log_error!(self.logger,
151 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
155 sync_state.pending_sync = true;
156 return Err(TxSyncError::from(err));
160 // Update the best block.
161 for c in &confirmables {
162 c.best_block_updated(&tip_header, tip_height);
165 // Prune any sufficiently confirmed output spends
166 sync_state.prune_output_spends(tip_height);
169 match self.get_confirmed_transactions(&sync_state) {
170 Ok(confirmed_txs) => {
171 // Double-check the tip hash. If it changed, a reorg happened since
172 // we started syncing and we need to restart last-minute.
173 match self.check_update_tip(&mut tip_header, &mut tip_height) {
175 num_confirmed += confirmed_txs.len();
176 sync_state.sync_confirmed_transactions(
182 log_debug!(self.logger,
183 "Encountered inconsistency during transaction sync, restarting.");
184 sync_state.pending_sync = true;
188 // (Semi-)permanent failure, retry later.
189 log_error!(self.logger,
190 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
194 sync_state.pending_sync = true;
195 return Err(TxSyncError::from(err));
199 Err(InternalError::Inconsistency) => {
200 // Immediately restart syncing when we encounter any inconsistencies.
201 log_debug!(self.logger,
202 "Encountered inconsistency during transaction sync, restarting.");
203 sync_state.pending_sync = true;
207 // (Semi-)permanent failure, retry later.
208 log_error!(self.logger,
209 "Failed during transaction sync, aborting. Synced so far: {} confirmed, {} unconfirmed.",
213 sync_state.pending_sync = true;
214 return Err(TxSyncError::from(err));
217 sync_state.last_sync_hash = Some(tip_header.block_hash());
218 sync_state.pending_sync = false;
221 #[cfg(feature = "time")]
222 log_debug!(self.logger,
223 "Finished transaction sync at tip {} in {}ms: {} confirmed, {} unconfirmed.",
224 tip_header.block_hash(), start_time.elapsed().as_millis(), num_confirmed,
226 #[cfg(not(feature = "time"))]
227 log_debug!(self.logger,
228 "Finished transaction sync at tip {}: {} confirmed, {} unconfirmed.",
229 tip_header.block_hash(), num_confirmed, num_unconfirmed);
233 fn check_update_tip(&self, cur_tip_header: &mut Header, cur_tip_height: &mut u32)
234 -> Result<bool, InternalError>
236 let check_notification = self.client.block_headers_subscribe()?;
237 let check_tip_hash = check_notification.header.block_hash();
239 // Restart if either the tip changed or we got some divergent tip
240 // change notification since we started. In the latter case we
241 // make sure we clear the queue before continuing.
242 let mut restart_sync = check_tip_hash != cur_tip_header.block_hash();
243 while let Some(queued_notif) = self.client.block_headers_pop()? {
244 if queued_notif.header.block_hash() != check_tip_hash {
250 *cur_tip_header = check_notification.header;
251 *cur_tip_height = check_notification.height as u32;
258 fn get_confirmed_transactions(
259 &self, sync_state: &SyncState,
260 ) -> Result<Vec<ConfirmedTx>, InternalError> {
262 // First, check the confirmation status of registered transactions as well as the
263 // status of dependent transactions of registered outputs.
264 let mut confirmed_txs: Vec<ConfirmedTx> = Vec::new();
265 let mut watched_script_pubkeys = Vec::with_capacity(
266 sync_state.watched_transactions.len() + sync_state.watched_outputs.len());
267 let mut watched_txs = Vec::with_capacity(sync_state.watched_transactions.len());
269 for txid in &sync_state.watched_transactions {
270 match self.client.transaction_get(&txid) {
272 watched_txs.push((txid, tx.clone()));
273 if let Some(tx_out) = tx.output.first() {
274 // We watch an arbitrary output of the transaction of interest in order to
275 // retrieve the associated script history, before narrowing down our search
276 // through `filter`ing by `txid` below.
277 watched_script_pubkeys.push(tx_out.script_pubkey.clone());
279 debug_assert!(false, "Failed due to retrieving invalid tx data.");
280 log_error!(self.logger, "Failed due to retrieving invalid tx data.");
281 return Err(InternalError::Failed);
284 Err(electrum_client::Error::Protocol(_)) => {
285 // We couldn't find the tx, do nothing.
288 log_error!(self.logger, "Failed to look up transaction {}: {}.", txid, e);
289 return Err(InternalError::Failed);
294 let num_tx_lookups = watched_script_pubkeys.len();
295 debug_assert_eq!(num_tx_lookups, watched_txs.len());
297 for output in sync_state.watched_outputs.values() {
298 watched_script_pubkeys.push(output.script_pubkey.clone());
301 let num_output_spend_lookups = watched_script_pubkeys.len() - num_tx_lookups;
302 debug_assert_eq!(num_output_spend_lookups, sync_state.watched_outputs.len());
304 match self.client.batch_script_get_history(watched_script_pubkeys.iter().map(|s| s.deref()))
307 let (tx_results, output_results) = results.split_at(num_tx_lookups);
308 debug_assert_eq!(num_output_spend_lookups, output_results.len());
310 for (i, script_history) in tx_results.iter().enumerate() {
311 let (txid, tx) = &watched_txs[i];
312 if confirmed_txs.iter().any(|ctx| ctx.txid == **txid) {
315 let mut filtered_history = script_history.iter().filter(|h| h.tx_hash == **txid);
316 if let Some(history) = filtered_history.next()
318 let prob_conf_height = history.height as u32;
319 let confirmed_tx = self.get_confirmed_tx(tx, prob_conf_height)?;
320 confirmed_txs.push(confirmed_tx);
322 debug_assert!(filtered_history.next().is_none());
325 for (watched_output, script_history) in sync_state.watched_outputs.values()
328 for possible_output_spend in script_history {
329 if possible_output_spend.height <= 0 {
333 let txid = possible_output_spend.tx_hash;
334 if confirmed_txs.iter().any(|ctx| ctx.txid == txid) {
338 match self.client.transaction_get(&txid) {
340 let mut is_spend = false;
341 for txin in &tx.input {
342 let watched_outpoint = watched_output.outpoint
343 .into_bitcoin_outpoint();
344 if txin.previous_output == watched_outpoint {
354 let prob_conf_height = possible_output_spend.height as u32;
355 let confirmed_tx = self.get_confirmed_tx(&tx, prob_conf_height)?;
356 confirmed_txs.push(confirmed_tx);
359 log_trace!(self.logger,
360 "Inconsistency: Tx {} was unconfirmed during syncing: {}",
362 return Err(InternalError::Inconsistency);
369 log_error!(self.logger, "Failed to look up script histories: {}.", e);
370 return Err(InternalError::Failed);
374 // Sort all confirmed transactions first by block height, then by in-block
375 // position, and finally feed them to the interface in order.
376 confirmed_txs.sort_unstable_by(|tx1, tx2| {
377 tx1.block_height.cmp(&tx2.block_height).then_with(|| tx1.pos.cmp(&tx2.pos))
383 fn get_unconfirmed_transactions<C: Deref>(
384 &self, confirmables: &Vec<C>,
385 ) -> Result<Vec<Txid>, InternalError>
386 where C::Target: Confirm
388 // Query the interface for relevant txids and check whether the relevant blocks are still
389 // in the best chain, mark them unconfirmed otherwise
390 let relevant_txids = confirmables
392 .flat_map(|c| c.get_relevant_txids())
393 .collect::<HashSet<(Txid, u32, Option<BlockHash>)>>();
395 let mut unconfirmed_txs = Vec::new();
397 for (txid, conf_height, block_hash_opt) in relevant_txids {
398 if let Some(block_hash) = block_hash_opt {
399 let block_header = self.client.block_header(conf_height as usize)?;
400 if block_header.block_hash() == block_hash {
401 // Skip if the tx is still confirmed in the block in question.
405 unconfirmed_txs.push(txid);
407 log_error!(self.logger,
408 "Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
409 panic!("Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
415 fn get_confirmed_tx(&self, tx: &Transaction, prob_conf_height: u32)
416 -> Result<ConfirmedTx, InternalError>
418 let txid = tx.txid();
419 match self.client.transaction_get_merkle(&txid, prob_conf_height as usize) {
421 debug_assert_eq!(prob_conf_height, merkle_res.block_height as u32);
422 match self.client.block_header(prob_conf_height as usize) {
423 Ok(block_header) => {
424 let pos = merkle_res.pos;
425 if !self.validate_merkle_proof(&txid,
426 &block_header.merkle_root, merkle_res)?
428 log_trace!(self.logger,
429 "Inconsistency: Block {} was unconfirmed during syncing.",
430 block_header.block_hash());
431 return Err(InternalError::Inconsistency);
433 let confirmed_tx = ConfirmedTx {
436 block_header, block_height: prob_conf_height,
442 log_error!(self.logger,
443 "Failed to retrieve block header for height {}: {}.",
444 prob_conf_height, e);
445 Err(InternalError::Failed)
450 log_trace!(self.logger,
451 "Inconsistency: Tx {} was unconfirmed during syncing: {}",
453 Err(InternalError::Inconsistency)
458 /// Returns a reference to the underlying Electrum client.
460 /// This is not exported to bindings users as the underlying client from BDK is not exported.
461 pub fn client(&self) -> &ElectrumClient {
465 fn validate_merkle_proof(&self, txid: &Txid, merkle_root: &TxMerkleNode,
466 merkle_res: GetMerkleRes) -> Result<bool, InternalError>
468 let mut index = merkle_res.pos;
469 let mut cur = txid.to_raw_hash();
470 for mut bytes in merkle_res.merkle {
472 // unwrap() safety: `bytes` has len 32 so `from_slice` can never fail.
473 let next_hash = Sha256d::from_slice(&bytes).unwrap();
474 let (left, right) = if index % 2 == 0 {
480 let data = [&left[..], &right[..]].concat();
481 cur = Sha256d::hash(&data);
485 Ok(cur == merkle_root.to_raw_hash())
489 impl<L: Deref> Filter for ElectrumSyncClient<L>
493 fn register_tx(&self, txid: &Txid, _script_pubkey: &Script) {
494 let mut locked_queue = self.queue.lock().unwrap();
495 locked_queue.transactions.insert(*txid);
498 fn register_output(&self, output: WatchedOutput) {
499 let mut locked_queue = self.queue.lock().unwrap();
500 locked_queue.outputs.insert(output.outpoint.into_bitcoin_outpoint(), output);