From: Elias Rohrer Date: Wed, 23 Nov 2022 08:33:37 +0000 (+0100) Subject: Add transaction sync crate X-Git-Tag: v0.0.114-beta~25^2~2 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=fa77f343278e228427557395a04960e71905a526;p=rust-lightning Add transaction sync crate This crate provides utilities for syncing LDK via the transaction-based `Confirm` interface. The initial implementation facilitates synchronization with an Esplora backend server. --- diff --git a/Cargo.toml b/Cargo.toml index 89b92a8c6..e8565e7ac 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,7 @@ members = [ "lightning", "lightning-block-sync", + "lightning-transaction-sync", "lightning-invoice", "lightning-net-tokio", "lightning-persister", diff --git a/lightning-transaction-sync/Cargo.toml b/lightning-transaction-sync/Cargo.toml new file mode 100644 index 000000000..484616c19 --- /dev/null +++ b/lightning-transaction-sync/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "lightning-transaction-sync" +version = "0.0.113" +authors = ["Elias Rohrer"] +license = "MIT OR Apache-2.0" +repository = "http://github.com/lightningdevkit/rust-lightning" +description = """ +Utilities for syncing LDK via the transaction-based `Confirm` interface. +""" +edition = "2018" + +[package.metadata.docs.rs] +all-features = true +rustdoc-args = ["--cfg", "docsrs"] + +[features] +default = [] +esplora-async = ["async-interface", "esplora-client/async", "futures"] +esplora-blocking = ["esplora-client/blocking"] +async-interface = [] + +[dependencies] +lightning = { version = "0.0.113", path = "../lightning" } +bitcoin = "0.29.0" +bdk-macros = "0.6" +futures = { version = "0.3", optional = true } +esplora-client = { version = "0.3.0", default-features = false, optional = true } diff --git a/lightning-transaction-sync/src/common.rs b/lightning-transaction-sync/src/common.rs new file mode 100644 index 000000000..a6ee61e90 --- /dev/null +++ b/lightning-transaction-sync/src/common.rs @@ -0,0 +1,75 @@ +use lightning::chain::WatchedOutput; +use bitcoin::{Txid, BlockHash, Transaction, BlockHeader, OutPoint}; + +use std::collections::{HashSet, HashMap}; + + +// Represents the current state. +pub(crate) struct SyncState { + // Transactions that were previously processed, but must not be forgotten + // yet since they still need to be monitored for confirmation on-chain. + pub watched_transactions: HashSet, + // Outputs that were previously processed, but must not be forgotten yet as + // as we still need to monitor any spends on-chain. + pub watched_outputs: HashMap, + // The tip hash observed during our last sync. + pub last_sync_hash: Option, + // Indicates whether we need to resync, e.g., after encountering an error. + pub pending_sync: bool, +} + +impl SyncState { + pub fn new() -> Self { + Self { + watched_transactions: HashSet::new(), + watched_outputs: HashMap::new(), + last_sync_hash: None, + pending_sync: false, + } + } +} + + +// A queue that is to be filled by `Filter` and drained during the next syncing round. +pub(crate) struct FilterQueue { + // Transactions that were registered via the `Filter` interface and have to be processed. + pub transactions: HashSet, + // Outputs that were registered via the `Filter` interface and have to be processed. + pub outputs: HashMap, +} + +impl FilterQueue { + pub fn new() -> Self { + Self { + transactions: HashSet::new(), + outputs: HashMap::new(), + } + } + + // Processes the transaction and output queues and adds them to the given [`SyncState`]. + // + // Returns `true` if new items had been registered. + pub fn process_queues(&mut self, sync_state: &mut SyncState) -> bool { + let mut pending_registrations = false; + + if !self.transactions.is_empty() { + pending_registrations = true; + + sync_state.watched_transactions.extend(self.transactions.drain()); + } + + if !self.outputs.is_empty() { + pending_registrations = true; + + sync_state.watched_outputs.extend(self.outputs.drain()); + } + pending_registrations + } +} + +pub(crate) struct ConfirmedTx { + pub tx: Transaction, + pub block_header: BlockHeader, + pub block_height: u32, + pub pos: usize, +} diff --git a/lightning-transaction-sync/src/error.rs b/lightning-transaction-sync/src/error.rs new file mode 100644 index 000000000..0a529d063 --- /dev/null +++ b/lightning-transaction-sync/src/error.rs @@ -0,0 +1,63 @@ +use std::fmt; + +#[derive(Debug)] +/// An error that possibly needs to be handled by the user. +pub enum TxSyncError { + /// A transaction sync failed and needs to be retried eventually. + Failed, +} + +impl std::error::Error for TxSyncError {} + +impl fmt::Display for TxSyncError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Self::Failed => write!(f, "Failed to conduct transaction sync."), + } + } +} + +#[derive(Debug)] +#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] +pub(crate) enum InternalError { + /// A transaction sync failed and needs to be retried eventually. + Failed, + /// An inconsisteny was encounterd during transaction sync. + Inconsistency, +} + +#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] +impl fmt::Display for InternalError { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + Self::Failed => write!(f, "Failed to conduct transaction sync."), + Self::Inconsistency => { + write!(f, "Encountered an inconsisteny during transaction sync.") + } + } + } +} + +#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] +impl std::error::Error for InternalError {} + +#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] +impl From for TxSyncError { + fn from(_e: esplora_client::Error) -> Self { + Self::Failed + } +} + +#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] +impl From for InternalError { + fn from(_e: esplora_client::Error) -> Self { + Self::Failed + } +} + +#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] +impl From for TxSyncError { + fn from(_e: InternalError) -> Self { + Self::Failed + } +} diff --git a/lightning-transaction-sync/src/esplora.rs b/lightning-transaction-sync/src/esplora.rs new file mode 100644 index 000000000..807ef8075 --- /dev/null +++ b/lightning-transaction-sync/src/esplora.rs @@ -0,0 +1,383 @@ +use crate::error::{TxSyncError, InternalError}; +use crate::common::{SyncState, FilterQueue, ConfirmedTx}; + +use lightning::util::logger::Logger; +use lightning::{log_error, log_given_level, log_info, log_internal, log_debug, log_trace}; +use lightning::chain::WatchedOutput; +use lightning::chain::{Confirm, Filter}; + +use bitcoin::{BlockHash, Script, Txid}; + +use esplora_client::Builder; +#[cfg(feature = "async-interface")] +use esplora_client::r#async::AsyncClient; +#[cfg(not(feature = "async-interface"))] +use esplora_client::blocking::BlockingClient; + +use std::collections::HashSet; +use core::ops::Deref; + +/// Synchronizes LDK with a given [`Esplora`] server. +/// +/// Needs to be registered with a [`ChainMonitor`] via the [`Filter`] interface to be informed of +/// transactions and outputs to monitor for on-chain confirmation, unconfirmation, and +/// reconfirmation. +/// +/// Note that registration via [`Filter`] needs to happen before any calls to +/// [`Watch::watch_channel`] to ensure we get notified of the items to monitor. +/// +/// This uses and exposes either a blocking or async client variant dependent on whether the +/// `esplora-blocking` or the `esplora-async` feature is enabled. +/// +/// [`Esplora`]: https://github.com/Blockstream/electrs +/// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor +/// [`Watch::watch_channel`]: lightning::chain::Watch::watch_channel +/// [`Filter`]: lightning::chain::Filter +pub struct EsploraSyncClient +where + L::Target: Logger, +{ + sync_state: MutexType, + queue: std::sync::Mutex, + client: EsploraClientType, + logger: L, +} + +impl EsploraSyncClient +where + L::Target: Logger, +{ + /// Returns a new [`EsploraSyncClient`] object. + pub fn new(server_url: String, logger: L) -> Self { + let builder = Builder::new(&server_url); + #[cfg(not(feature = "async-interface"))] + let client = builder.build_blocking().unwrap(); + #[cfg(feature = "async-interface")] + let client = builder.build_async().unwrap(); + + EsploraSyncClient::from_client(client, logger) + } + + /// Returns a new [`EsploraSyncClient`] object using the given Esplora client. + pub fn from_client(client: EsploraClientType, logger: L) -> Self { + let sync_state = MutexType::new(SyncState::new()); + let queue = std::sync::Mutex::new(FilterQueue::new()); + Self { + sync_state, + queue, + client, + logger, + } + } + + /// Synchronizes the given `confirmables` via their [`Confirm`] interface implementations. This + /// method should be called regularly to keep LDK up-to-date with current chain data. + /// + /// For example, instances of [`ChannelManager`] and [`ChainMonitor`] can be informed about the + /// newest on-chain activity related to the items previously registered via the [`Filter`] + /// interface. + /// + /// [`Confirm`]: lightning::chain::Confirm + /// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor + /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager + /// [`Filter`]: lightning::chain::Filter + #[maybe_async] + pub fn sync(&self, confirmables: Vec<&(dyn Confirm + Sync + Send)>) -> Result<(), TxSyncError> { + // This lock makes sure we're syncing once at a time. + #[cfg(not(feature = "async-interface"))] + let mut sync_state = self.sync_state.lock().unwrap(); + #[cfg(feature = "async-interface")] + let mut sync_state = self.sync_state.lock().await; + + log_info!(self.logger, "Starting transaction sync."); + + let mut tip_hash = maybe_await!(self.client.get_tip_hash())?; + + loop { + let pending_registrations = self.queue.lock().unwrap().process_queues(&mut sync_state); + let tip_is_new = Some(tip_hash) != sync_state.last_sync_hash; + + // We loop until any registered transactions have been processed at least once, or the + // tip hasn't been updated during the last iteration. + if !sync_state.pending_sync && !pending_registrations && !tip_is_new { + // Nothing to do. + break; + } else { + // Update the known tip to the newest one. + if tip_is_new { + // First check for any unconfirmed transactions and act on it immediately. + match maybe_await!(self.get_unconfirmed_transactions(&confirmables)) { + Ok(unconfirmed_txs) => { + // Double-check the tip hash. If it changed, a reorg happened since + // we started syncing and we need to restart last-minute. + let check_tip_hash = maybe_await!(self.client.get_tip_hash())?; + if check_tip_hash != tip_hash { + tip_hash = check_tip_hash; + continue; + } + + self.sync_unconfirmed_transactions(&mut sync_state, &confirmables, unconfirmed_txs); + }, + Err(err) => { + // (Semi-)permanent failure, retry later. + log_error!(self.logger, "Failed during transaction sync, aborting."); + sync_state.pending_sync = true; + return Err(TxSyncError::from(err)); + } + } + + match maybe_await!(self.sync_best_block_updated(&confirmables, &tip_hash)) { + Ok(()) => {} + Err(InternalError::Inconsistency) => { + // Immediately restart syncing when we encounter any inconsistencies. + log_debug!(self.logger, "Encountered inconsistency during transaction sync, restarting."); + sync_state.pending_sync = true; + continue; + } + Err(err) => { + // (Semi-)permanent failure, retry later. + sync_state.pending_sync = true; + return Err(TxSyncError::from(err)); + } + } + } + + match maybe_await!(self.get_confirmed_transactions(&sync_state)) { + Ok(confirmed_txs) => { + // Double-check the tip hash. If it changed, a reorg happened since + // we started syncing and we need to restart last-minute. + let check_tip_hash = maybe_await!(self.client.get_tip_hash())?; + if check_tip_hash != tip_hash { + tip_hash = check_tip_hash; + continue; + } + + self.sync_confirmed_transactions( + &mut sync_state, + &confirmables, + confirmed_txs, + ); + } + Err(InternalError::Inconsistency) => { + // Immediately restart syncing when we encounter any inconsistencies. + log_debug!(self.logger, "Encountered inconsistency during transaction sync, restarting."); + sync_state.pending_sync = true; + continue; + } + Err(err) => { + // (Semi-)permanent failure, retry later. + log_error!(self.logger, "Failed during transaction sync, aborting."); + sync_state.pending_sync = true; + return Err(TxSyncError::from(err)); + } + } + sync_state.last_sync_hash = Some(tip_hash); + sync_state.pending_sync = false; + } + } + log_info!(self.logger, "Finished transaction sync."); + Ok(()) + } + + #[maybe_async] + fn sync_best_block_updated( + &self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, tip_hash: &BlockHash, + ) -> Result<(), InternalError> { + + // Inform the interface of the new block. + let tip_header = maybe_await!(self.client.get_header_by_hash(tip_hash))?; + let tip_status = maybe_await!(self.client.get_block_status(&tip_hash))?; + if tip_status.in_best_chain { + if let Some(tip_height) = tip_status.height { + for c in confirmables { + c.best_block_updated(&tip_header, tip_height); + } + } + } else { + return Err(InternalError::Inconsistency); + } + Ok(()) + } + + fn sync_confirmed_transactions( + &self, sync_state: &mut SyncState, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, confirmed_txs: Vec, + ) { + for ctx in confirmed_txs { + for c in confirmables { + c.transactions_confirmed( + &ctx.block_header, + &[(ctx.pos, &ctx.tx)], + ctx.block_height, + ); + } + + sync_state.watched_transactions.remove(&ctx.tx.txid()); + + for input in &ctx.tx.input { + sync_state.watched_outputs.remove(&input.previous_output); + } + } + } + + #[maybe_async] + fn get_confirmed_transactions( + &self, sync_state: &SyncState, + ) -> Result, InternalError> { + + // First, check the confirmation status of registered transactions as well as the + // status of dependent transactions of registered outputs. + + let mut confirmed_txs = Vec::new(); + + for txid in &sync_state.watched_transactions { + if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(&txid, None, None))? { + confirmed_txs.push(confirmed_tx); + } + } + + for (_, output) in &sync_state.watched_outputs { + if let Some(output_status) = maybe_await!(self.client + .get_output_status(&output.outpoint.txid, output.outpoint.index as u64))? + { + if let Some(spending_txid) = output_status.txid { + if let Some(spending_tx_status) = output_status.status { + if let Some(confirmed_tx) = maybe_await!(self + .get_confirmed_tx( + &spending_txid, + spending_tx_status.block_hash, + spending_tx_status.block_height, + ))? + { + confirmed_txs.push(confirmed_tx); + } + } + } + } + } + + // Sort all confirmed transactions first by block height, then by in-block + // position, and finally feed them to the interface in order. + confirmed_txs.sort_unstable_by(|tx1, tx2| { + tx1.block_height.cmp(&tx2.block_height).then_with(|| tx1.pos.cmp(&tx2.pos)) + }); + + Ok(confirmed_txs) + } + + #[maybe_async] + fn get_confirmed_tx( + &self, txid: &Txid, expected_block_hash: Option, known_block_height: Option, + ) -> Result, InternalError> { + if let Some(merkle_block) = maybe_await!(self.client.get_merkle_block(&txid))? { + let block_header = merkle_block.header; + let block_hash = block_header.block_hash(); + if let Some(expected_block_hash) = expected_block_hash { + if expected_block_hash != block_hash { + log_trace!(self.logger, "Inconsistency: Tx {} expected in block {}, but is confirmed in {}", txid, expected_block_hash, block_hash); + return Err(InternalError::Inconsistency); + } + } + + let mut matches = Vec::new(); + let mut indexes = Vec::new(); + let _ = merkle_block.txn.extract_matches(&mut matches, &mut indexes); + if indexes.len() != 1 || matches.len() != 1 || matches[0] != *txid { + log_error!(self.logger, "Retrieved Merkle block for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid); + return Err(InternalError::Failed); + } + + let pos = *indexes.get(0).ok_or(InternalError::Failed)? as usize; + if let Some(tx) = maybe_await!(self.client.get_tx(&txid))? { + if let Some(block_height) = known_block_height { + // We can take a shortcut here if a previous call already gave us the height. + return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height })); + } + + let block_status = maybe_await!(self.client.get_block_status(&block_hash))?; + if let Some(block_height) = block_status.height { + return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height })); + } else { + // If any previously-confirmed block suddenly is no longer confirmed, we found + // an inconsistency and should start over. + log_trace!(self.logger, "Inconsistency: Tx {} was unconfirmed during syncing.", txid); + return Err(InternalError::Inconsistency); + } + } + } + Ok(None) + } + + #[maybe_async] + fn get_unconfirmed_transactions( + &self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, + ) -> Result, InternalError> { + // Query the interface for relevant txids and check whether the relevant blocks are still + // in the best chain, mark them unconfirmed otherwise + let relevant_txids = confirmables + .iter() + .flat_map(|c| c.get_relevant_txids()) + .collect::)>>(); + + let mut unconfirmed_txs = Vec::new(); + + for (txid, block_hash_opt) in relevant_txids { + if let Some(block_hash) = block_hash_opt { + let block_status = maybe_await!(self.client.get_block_status(&block_hash))?; + if block_status.in_best_chain { + // Skip if the block in question is still confirmed. + continue; + } + + unconfirmed_txs.push(txid); + } else { + log_error!(self.logger, "Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!"); + panic!("Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!"); + } + } + Ok(unconfirmed_txs) + } + + fn sync_unconfirmed_transactions( + &self, sync_state: &mut SyncState, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, unconfirmed_txs: Vec, + ) { + for txid in unconfirmed_txs { + for c in confirmables { + c.transaction_unconfirmed(&txid); + } + + sync_state.watched_transactions.insert(txid); + } + } + + /// Returns a reference to the underlying esplora client. + pub fn client(&self) -> &EsploraClientType { + &self.client + } +} + +#[cfg(feature = "async-interface")] +type MutexType = futures::lock::Mutex; +#[cfg(not(feature = "async-interface"))] +type MutexType = std::sync::Mutex; + +// The underlying client type. +#[cfg(feature = "async-interface")] +type EsploraClientType = AsyncClient; +#[cfg(not(feature = "async-interface"))] +type EsploraClientType = BlockingClient; + + +impl Filter for EsploraSyncClient +where + L::Target: Logger, +{ + fn register_tx(&self, txid: &Txid, _script_pubkey: &Script) { + let mut locked_queue = self.queue.lock().unwrap(); + locked_queue.transactions.insert(*txid); + } + + fn register_output(&self, output: WatchedOutput) { + let mut locked_queue = self.queue.lock().unwrap(); + locked_queue.outputs.insert(output.outpoint.into_bitcoin_outpoint(), output); + } +} diff --git a/lightning-transaction-sync/src/lib.rs b/lightning-transaction-sync/src/lib.rs new file mode 100644 index 000000000..791490d9d --- /dev/null +++ b/lightning-transaction-sync/src/lib.rs @@ -0,0 +1,82 @@ +//! Provides utilities for syncing LDK via the transaction-based [`Confirm`] interface. +//! +//! The provided synchronization clients need to be registered with a [`ChainMonitor`] via the +//! [`Filter`] interface. Then, the respective `fn sync` needs to be called with the [`Confirm`] +//! implementations to be synchronized, i.e., usually instances of [`ChannelManager`] and +//! [`ChainMonitor`]. +//! +//! ## Features and Backend Support +//! +//!- `esplora_blocking` enables syncing against an Esplora backend based on a blocking client. +//!- `esplora_async` enables syncing against an Esplora backend based on an async client. +//! +//! ## Version Compatibility +//! +//! Currently this crate is compatible with nodes that were created with LDK version 0.0.113 and above. +//! +//! ## Usage Example: +//! +//! ```ignore +//! let tx_sync = Arc::new(EsploraSyncClient::new( +//! esplora_server_url, +//! Arc::clone(&some_logger), +//! )); +//! +//! let chain_monitor = Arc::new(ChainMonitor::new( +//! Some(Arc::clone(&tx_sync)), +//! Arc::clone(&some_broadcaster), +//! Arc::clone(&some_logger), +//! Arc::clone(&some_fee_estimator), +//! Arc::clone(&some_persister), +//! )); +//! +//! let channel_manager = Arc::new(ChannelManager::new( +//! Arc::clone(&some_fee_estimator), +//! Arc::clone(&chain_monitor), +//! Arc::clone(&some_broadcaster), +//! Arc::clone(&some_router), +//! Arc::clone(&some_logger), +//! Arc::clone(&some_entropy_source), +//! Arc::clone(&some_node_signer), +//! Arc::clone(&some_signer_provider), +//! user_config, +//! chain_params, +//! )); +//! +//! let confirmables = vec![ +//! &*channel_manager as &(dyn Confirm + Sync + Send), +//! &*chain_monitor as &(dyn Confirm + Sync + Send), +//! ]; +//! +//! tx_sync.sync(confirmables).unwrap(); +//! ``` +//! +//! [`Confirm`]: lightning::chain::Confirm +//! [`Filter`]: lightning::chain::Filter +//! [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor +//! [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager + +// Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings. +#![deny(broken_intra_doc_links)] +#![deny(private_intra_doc_links)] + +#![deny(missing_docs)] +#![deny(unsafe_code)] + +#![cfg_attr(docsrs, feature(doc_auto_cfg))] + +#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] +#[macro_use] +extern crate bdk_macros; + +#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] +mod esplora; + +#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] +mod common; + +mod error; +pub use error::TxSyncError; + +#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))] +pub use esplora::EsploraSyncClient;