beta,
# 1.41.1 is MSRV for Rust-Lightning, lightning-invoice, and lightning-persister
1.41.1,
- # 1.45.2 is MSRV for lightning-net-tokio, lightning-block-sync, lightning-background-processor, and coverage generation
+ # 1.45.2 is MSRV for lightning-net-tokio, lightning-block-sync, lightning-background-processor
1.45.2,
# 1.47.0 will be the MSRV for no-std builds using hashbrown once core2 is updated
- 1.47.0]
+ 1.47.0,
+ # 1.59.0 is the MSRV for lightning-transaction-sync
+ 1.59.0]
include:
- toolchain: stable
build-net-tokio: true
build-no-std: true
build-futures: true
+ build-tx-sync: true
+ coverage: true
- toolchain: stable
platform: macos-latest
build-net-tokio: true
build-no-std: true
build-futures: true
+ build-tx-sync: true
- toolchain: beta
platform: macos-latest
build-net-tokio: true
build-no-std: true
build-futures: true
+ build-tx-sync: true
- toolchain: stable
platform: windows-latest
build-net-tokio: true
build-no-std: true
build-futures: true
+ build-tx-sync: false
- toolchain: beta
platform: windows-latest
build-net-tokio: true
build-no-std: true
build-futures: true
+ build-tx-sync: false
- toolchain: beta
build-net-tokio: true
build-no-std: true
build-futures: true
+ build-tx-sync: true
- toolchain: 1.41.1
build-no-std: false
test-log-variants: true
build-futures: false
+ build-tx-sync: false
- toolchain: 1.45.2
build-net-old-tokio: true
build-net-tokio: true
build-no-std: false
build-futures: true
- coverage: true
+ build-tx-sync: false
- toolchain: 1.47.0
build-futures: true
build-no-std: true
+ build-tx-sync: false
+ - toolchain: 1.59.0
+ build-net-tokio: false
+ build-no-std: false
+ build-futures: false
+ build-tx-sync: true
runs-on: ${{ matrix.platform }}
steps:
- name: Checkout source code
run: cargo update -p tokio --precise "1.14.0" --verbose
env:
CARGO_NET_GIT_FETCH_WITH_CLI: "true"
- - name: Build on Rust ${{ matrix.toolchain }} with net-tokio
- if: "matrix.build-net-tokio && !matrix.coverage"
+ - name: Build on Rust ${{ matrix.toolchain }} with net-tokio and tx-sync
+ if: "matrix.build-net-tokio && !matrix.coverage && matrix.build-tx-sync"
run: cargo build --verbose --color always
- - name: Build on Rust ${{ matrix.toolchain }} with net-tokio and full code-linking for coverage generation
+ - name: Build on Rust ${{ matrix.toolchain }} with net-tokio, tx-sync, and full code-linking for coverage generation
if: matrix.coverage
run: RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always
- name: Build on Rust ${{ matrix.toolchain }}
RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features rpc-client
RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features rpc-client,rest-client
RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features rpc-client,rest-client,tokio
+ - name: Build Transaction Sync Clients on Rust ${{ matrix.toolchain }} with features
+ if: "matrix.build-tx-sync && !matrix.coverage"
+ run: |
+ cd lightning-transaction-sync
+ cargo build --verbose --color always --features esplora-blocking
+ cargo build --verbose --color always --features esplora-async
+ - name: Build transaction sync clients on Rust ${{ matrix.toolchain }} with features and full code-linking for coverage generation
+ if: "matrix.build-tx-sync && matrix.coverage"
+ run: |
+ cd lightning-transaction-sync
+ RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features esplora-blocking
+ RUSTFLAGS="-C link-dead-code" cargo build --verbose --color always --features esplora-async
+ - name: Test transaction sync clients on Rust ${{ matrix.toolchain }} with features
+ if: "matrix.build-tx-sync"
+ run: |
+ cd lightning-transaction-sync
+ cargo test --verbose --color always --features esplora-blocking
+ cargo test --verbose --color always --features esplora-async
- name: Test backtrace-debug builds on Rust ${{ matrix.toolchain }}
if: "matrix.toolchain == 'stable'"
run: |
cd lightning && cargo test --verbose --color always --features backtrace
- name: Test on Rust ${{ matrix.toolchain }} with net-tokio
- if: "matrix.build-net-tokio && !matrix.coverage"
+ if: "matrix.build-net-tokio && !matrix.coverage && matrix.build-tx-sync"
run: cargo test --verbose --color always
- - name: Test on Rust ${{ matrix.toolchain }} with net-tokio and full code-linking for coverage generation
+ - name: Test on Rust ${{ matrix.toolchain }} with net-tokio, tx-sync, and full code-linking for coverage generation
if: matrix.coverage
run: RUSTFLAGS="-C link-dead-code" cargo test --verbose --color always
- name: Test no-std builds on Rust ${{ matrix.toolchain }}
linting:
runs-on: ubuntu-latest
env:
- TOOLCHAIN: 1.47.0
+ TOOLCHAIN: stable
steps:
- name: Checkout source code
uses: actions/checkout@v3
members = [
"lightning",
"lightning-block-sync",
+ "lightning-transaction-sync",
"lightning-invoice",
"lightning-net-tokio",
"lightning-persister",
--- /dev/null
+[package]
+name = "lightning-transaction-sync"
+version = "0.0.113"
+authors = ["Elias Rohrer"]
+license = "MIT OR Apache-2.0"
+repository = "http://github.com/lightningdevkit/rust-lightning"
+description = """
+Utilities for syncing LDK via the transaction-based `Confirm` interface.
+"""
+edition = "2018"
+
+[package.metadata.docs.rs]
+all-features = true
+rustdoc-args = ["--cfg", "docsrs"]
+
+[features]
+default = []
+esplora-async = ["async-interface", "esplora-client/async", "futures"]
+esplora-blocking = ["esplora-client/blocking"]
+async-interface = []
+
+[dependencies]
+lightning = { version = "0.0.113", path = "../lightning" }
+bitcoin = "0.29.0"
+bdk-macros = "0.6"
+futures = { version = "0.3", optional = true }
+esplora-client = { version = "0.3.0", default-features = false, optional = true }
+
+[dev-dependencies]
+electrsd = { version = "0.22.0", features = ["legacy", "esplora_a33e97e1", "bitcoind_23_0"] }
+electrum-client = "0.12.0"
+once_cell = "1.16.0"
+tokio = { version = "1.14.0", features = ["full"] }
--- /dev/null
+use lightning::chain::WatchedOutput;
+use bitcoin::{Txid, BlockHash, Transaction, BlockHeader, OutPoint};
+
+use std::collections::{HashSet, HashMap};
+
+
+// Represents the current state.
+pub(crate) struct SyncState {
+ // Transactions that were previously processed, but must not be forgotten
+ // yet since they still need to be monitored for confirmation on-chain.
+ pub watched_transactions: HashSet<Txid>,
+ // Outputs that were previously processed, but must not be forgotten yet as
+ // as we still need to monitor any spends on-chain.
+ pub watched_outputs: HashMap<OutPoint, WatchedOutput>,
+ // The tip hash observed during our last sync.
+ pub last_sync_hash: Option<BlockHash>,
+ // Indicates whether we need to resync, e.g., after encountering an error.
+ pub pending_sync: bool,
+}
+
+impl SyncState {
+ pub fn new() -> Self {
+ Self {
+ watched_transactions: HashSet::new(),
+ watched_outputs: HashMap::new(),
+ last_sync_hash: None,
+ pending_sync: false,
+ }
+ }
+}
+
+
+// A queue that is to be filled by `Filter` and drained during the next syncing round.
+pub(crate) struct FilterQueue {
+ // Transactions that were registered via the `Filter` interface and have to be processed.
+ pub transactions: HashSet<Txid>,
+ // Outputs that were registered via the `Filter` interface and have to be processed.
+ pub outputs: HashMap<OutPoint, WatchedOutput>,
+}
+
+impl FilterQueue {
+ pub fn new() -> Self {
+ Self {
+ transactions: HashSet::new(),
+ outputs: HashMap::new(),
+ }
+ }
+
+ // Processes the transaction and output queues and adds them to the given [`SyncState`].
+ //
+ // Returns `true` if new items had been registered.
+ pub fn process_queues(&mut self, sync_state: &mut SyncState) -> bool {
+ let mut pending_registrations = false;
+
+ if !self.transactions.is_empty() {
+ pending_registrations = true;
+
+ sync_state.watched_transactions.extend(self.transactions.drain());
+ }
+
+ if !self.outputs.is_empty() {
+ pending_registrations = true;
+
+ sync_state.watched_outputs.extend(self.outputs.drain());
+ }
+ pending_registrations
+ }
+}
+
+pub(crate) struct ConfirmedTx {
+ pub tx: Transaction,
+ pub block_header: BlockHeader,
+ pub block_height: u32,
+ pub pos: usize,
+}
--- /dev/null
+use std::fmt;
+
+#[derive(Debug)]
+/// An error that possibly needs to be handled by the user.
+pub enum TxSyncError {
+ /// A transaction sync failed and needs to be retried eventually.
+ Failed,
+}
+
+impl std::error::Error for TxSyncError {}
+
+impl fmt::Display for TxSyncError {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ match *self {
+ Self::Failed => write!(f, "Failed to conduct transaction sync."),
+ }
+ }
+}
+
+#[derive(Debug)]
+#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))]
+pub(crate) enum InternalError {
+ /// A transaction sync failed and needs to be retried eventually.
+ Failed,
+ /// An inconsisteny was encounterd during transaction sync.
+ Inconsistency,
+}
+
+#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))]
+impl fmt::Display for InternalError {
+ fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
+ match *self {
+ Self::Failed => write!(f, "Failed to conduct transaction sync."),
+ Self::Inconsistency => {
+ write!(f, "Encountered an inconsisteny during transaction sync.")
+ }
+ }
+ }
+}
+
+#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))]
+impl std::error::Error for InternalError {}
+
+#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))]
+impl From<esplora_client::Error> for TxSyncError {
+ fn from(_e: esplora_client::Error) -> Self {
+ Self::Failed
+ }
+}
+
+#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))]
+impl From<esplora_client::Error> for InternalError {
+ fn from(_e: esplora_client::Error) -> Self {
+ Self::Failed
+ }
+}
+
+#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))]
+impl From<InternalError> for TxSyncError {
+ fn from(_e: InternalError) -> Self {
+ Self::Failed
+ }
+}
--- /dev/null
+use crate::error::{TxSyncError, InternalError};
+use crate::common::{SyncState, FilterQueue, ConfirmedTx};
+
+use lightning::util::logger::Logger;
+use lightning::{log_error, log_given_level, log_info, log_internal, log_debug, log_trace};
+use lightning::chain::WatchedOutput;
+use lightning::chain::{Confirm, Filter};
+
+use bitcoin::{BlockHash, Script, Txid};
+
+use esplora_client::Builder;
+#[cfg(feature = "async-interface")]
+use esplora_client::r#async::AsyncClient;
+#[cfg(not(feature = "async-interface"))]
+use esplora_client::blocking::BlockingClient;
+
+use std::collections::HashSet;
+use core::ops::Deref;
+
+/// Synchronizes LDK with a given [`Esplora`] server.
+///
+/// Needs to be registered with a [`ChainMonitor`] via the [`Filter`] interface to be informed of
+/// transactions and outputs to monitor for on-chain confirmation, unconfirmation, and
+/// reconfirmation.
+///
+/// Note that registration via [`Filter`] needs to happen before any calls to
+/// [`Watch::watch_channel`] to ensure we get notified of the items to monitor.
+///
+/// This uses and exposes either a blocking or async client variant dependent on whether the
+/// `esplora-blocking` or the `esplora-async` feature is enabled.
+///
+/// [`Esplora`]: https://github.com/Blockstream/electrs
+/// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
+/// [`Watch::watch_channel`]: lightning::chain::Watch::watch_channel
+/// [`Filter`]: lightning::chain::Filter
+pub struct EsploraSyncClient<L: Deref>
+where
+ L::Target: Logger,
+{
+ sync_state: MutexType<SyncState>,
+ queue: std::sync::Mutex<FilterQueue>,
+ client: EsploraClientType,
+ logger: L,
+}
+
+impl<L: Deref> EsploraSyncClient<L>
+where
+ L::Target: Logger,
+{
+ /// Returns a new [`EsploraSyncClient`] object.
+ pub fn new(server_url: String, logger: L) -> Self {
+ let builder = Builder::new(&server_url);
+ #[cfg(not(feature = "async-interface"))]
+ let client = builder.build_blocking().unwrap();
+ #[cfg(feature = "async-interface")]
+ let client = builder.build_async().unwrap();
+
+ EsploraSyncClient::from_client(client, logger)
+ }
+
+ /// Returns a new [`EsploraSyncClient`] object using the given Esplora client.
+ pub fn from_client(client: EsploraClientType, logger: L) -> Self {
+ let sync_state = MutexType::new(SyncState::new());
+ let queue = std::sync::Mutex::new(FilterQueue::new());
+ Self {
+ sync_state,
+ queue,
+ client,
+ logger,
+ }
+ }
+
+ /// Synchronizes the given `confirmables` via their [`Confirm`] interface implementations. This
+ /// method should be called regularly to keep LDK up-to-date with current chain data.
+ ///
+ /// For example, instances of [`ChannelManager`] and [`ChainMonitor`] can be informed about the
+ /// newest on-chain activity related to the items previously registered via the [`Filter`]
+ /// interface.
+ ///
+ /// [`Confirm`]: lightning::chain::Confirm
+ /// [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
+ /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
+ /// [`Filter`]: lightning::chain::Filter
+ #[maybe_async]
+ pub fn sync(&self, confirmables: Vec<&(dyn Confirm + Sync + Send)>) -> Result<(), TxSyncError> {
+ // This lock makes sure we're syncing once at a time.
+ #[cfg(not(feature = "async-interface"))]
+ let mut sync_state = self.sync_state.lock().unwrap();
+ #[cfg(feature = "async-interface")]
+ let mut sync_state = self.sync_state.lock().await;
+
+ log_info!(self.logger, "Starting transaction sync.");
+
+ let mut tip_hash = maybe_await!(self.client.get_tip_hash())?;
+
+ loop {
+ let pending_registrations = self.queue.lock().unwrap().process_queues(&mut sync_state);
+ let tip_is_new = Some(tip_hash) != sync_state.last_sync_hash;
+
+ // We loop until any registered transactions have been processed at least once, or the
+ // tip hasn't been updated during the last iteration.
+ if !sync_state.pending_sync && !pending_registrations && !tip_is_new {
+ // Nothing to do.
+ break;
+ } else {
+ // Update the known tip to the newest one.
+ if tip_is_new {
+ // First check for any unconfirmed transactions and act on it immediately.
+ match maybe_await!(self.get_unconfirmed_transactions(&confirmables)) {
+ Ok(unconfirmed_txs) => {
+ // Double-check the tip hash. If it changed, a reorg happened since
+ // we started syncing and we need to restart last-minute.
+ let check_tip_hash = maybe_await!(self.client.get_tip_hash())?;
+ if check_tip_hash != tip_hash {
+ tip_hash = check_tip_hash;
+ continue;
+ }
+
+ self.sync_unconfirmed_transactions(&mut sync_state, &confirmables, unconfirmed_txs);
+ },
+ Err(err) => {
+ // (Semi-)permanent failure, retry later.
+ log_error!(self.logger, "Failed during transaction sync, aborting.");
+ sync_state.pending_sync = true;
+ return Err(TxSyncError::from(err));
+ }
+ }
+
+ match maybe_await!(self.sync_best_block_updated(&confirmables, &tip_hash)) {
+ Ok(()) => {}
+ Err(InternalError::Inconsistency) => {
+ // Immediately restart syncing when we encounter any inconsistencies.
+ log_debug!(self.logger, "Encountered inconsistency during transaction sync, restarting.");
+ sync_state.pending_sync = true;
+ continue;
+ }
+ Err(err) => {
+ // (Semi-)permanent failure, retry later.
+ sync_state.pending_sync = true;
+ return Err(TxSyncError::from(err));
+ }
+ }
+ }
+
+ match maybe_await!(self.get_confirmed_transactions(&sync_state)) {
+ Ok(confirmed_txs) => {
+ // Double-check the tip hash. If it changed, a reorg happened since
+ // we started syncing and we need to restart last-minute.
+ let check_tip_hash = maybe_await!(self.client.get_tip_hash())?;
+ if check_tip_hash != tip_hash {
+ tip_hash = check_tip_hash;
+ continue;
+ }
+
+ self.sync_confirmed_transactions(
+ &mut sync_state,
+ &confirmables,
+ confirmed_txs,
+ );
+ }
+ Err(InternalError::Inconsistency) => {
+ // Immediately restart syncing when we encounter any inconsistencies.
+ log_debug!(self.logger, "Encountered inconsistency during transaction sync, restarting.");
+ sync_state.pending_sync = true;
+ continue;
+ }
+ Err(err) => {
+ // (Semi-)permanent failure, retry later.
+ log_error!(self.logger, "Failed during transaction sync, aborting.");
+ sync_state.pending_sync = true;
+ return Err(TxSyncError::from(err));
+ }
+ }
+ sync_state.last_sync_hash = Some(tip_hash);
+ sync_state.pending_sync = false;
+ }
+ }
+ log_info!(self.logger, "Finished transaction sync.");
+ Ok(())
+ }
+
+ #[maybe_async]
+ fn sync_best_block_updated(
+ &self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, tip_hash: &BlockHash,
+ ) -> Result<(), InternalError> {
+
+ // Inform the interface of the new block.
+ let tip_header = maybe_await!(self.client.get_header_by_hash(tip_hash))?;
+ let tip_status = maybe_await!(self.client.get_block_status(&tip_hash))?;
+ if tip_status.in_best_chain {
+ if let Some(tip_height) = tip_status.height {
+ for c in confirmables {
+ c.best_block_updated(&tip_header, tip_height);
+ }
+ }
+ } else {
+ return Err(InternalError::Inconsistency);
+ }
+ Ok(())
+ }
+
+ fn sync_confirmed_transactions(
+ &self, sync_state: &mut SyncState, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, confirmed_txs: Vec<ConfirmedTx>,
+ ) {
+ for ctx in confirmed_txs {
+ for c in confirmables {
+ c.transactions_confirmed(
+ &ctx.block_header,
+ &[(ctx.pos, &ctx.tx)],
+ ctx.block_height,
+ );
+ }
+
+ sync_state.watched_transactions.remove(&ctx.tx.txid());
+
+ for input in &ctx.tx.input {
+ sync_state.watched_outputs.remove(&input.previous_output);
+ }
+ }
+ }
+
+ #[maybe_async]
+ fn get_confirmed_transactions(
+ &self, sync_state: &SyncState,
+ ) -> Result<Vec<ConfirmedTx>, InternalError> {
+
+ // First, check the confirmation status of registered transactions as well as the
+ // status of dependent transactions of registered outputs.
+
+ let mut confirmed_txs = Vec::new();
+
+ for txid in &sync_state.watched_transactions {
+ if let Some(confirmed_tx) = maybe_await!(self.get_confirmed_tx(&txid, None, None))? {
+ confirmed_txs.push(confirmed_tx);
+ }
+ }
+
+ for (_, output) in &sync_state.watched_outputs {
+ if let Some(output_status) = maybe_await!(self.client
+ .get_output_status(&output.outpoint.txid, output.outpoint.index as u64))?
+ {
+ if let Some(spending_txid) = output_status.txid {
+ if let Some(spending_tx_status) = output_status.status {
+ if let Some(confirmed_tx) = maybe_await!(self
+ .get_confirmed_tx(
+ &spending_txid,
+ spending_tx_status.block_hash,
+ spending_tx_status.block_height,
+ ))?
+ {
+ confirmed_txs.push(confirmed_tx);
+ }
+ }
+ }
+ }
+ }
+
+ // Sort all confirmed transactions first by block height, then by in-block
+ // position, and finally feed them to the interface in order.
+ confirmed_txs.sort_unstable_by(|tx1, tx2| {
+ tx1.block_height.cmp(&tx2.block_height).then_with(|| tx1.pos.cmp(&tx2.pos))
+ });
+
+ Ok(confirmed_txs)
+ }
+
+ #[maybe_async]
+ fn get_confirmed_tx(
+ &self, txid: &Txid, expected_block_hash: Option<BlockHash>, known_block_height: Option<u32>,
+ ) -> Result<Option<ConfirmedTx>, InternalError> {
+ if let Some(merkle_block) = maybe_await!(self.client.get_merkle_block(&txid))? {
+ let block_header = merkle_block.header;
+ let block_hash = block_header.block_hash();
+ if let Some(expected_block_hash) = expected_block_hash {
+ if expected_block_hash != block_hash {
+ log_trace!(self.logger, "Inconsistency: Tx {} expected in block {}, but is confirmed in {}", txid, expected_block_hash, block_hash);
+ return Err(InternalError::Inconsistency);
+ }
+ }
+
+ let mut matches = Vec::new();
+ let mut indexes = Vec::new();
+ let _ = merkle_block.txn.extract_matches(&mut matches, &mut indexes);
+ if indexes.len() != 1 || matches.len() != 1 || matches[0] != *txid {
+ log_error!(self.logger, "Retrieved Merkle block for txid {} doesn't match expectations. This should not happen. Please verify server integrity.", txid);
+ return Err(InternalError::Failed);
+ }
+
+ let pos = *indexes.get(0).ok_or(InternalError::Failed)? as usize;
+ if let Some(tx) = maybe_await!(self.client.get_tx(&txid))? {
+ if let Some(block_height) = known_block_height {
+ // We can take a shortcut here if a previous call already gave us the height.
+ return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height }));
+ }
+
+ let block_status = maybe_await!(self.client.get_block_status(&block_hash))?;
+ if let Some(block_height) = block_status.height {
+ return Ok(Some(ConfirmedTx { tx, block_header, pos, block_height }));
+ } else {
+ // If any previously-confirmed block suddenly is no longer confirmed, we found
+ // an inconsistency and should start over.
+ log_trace!(self.logger, "Inconsistency: Tx {} was unconfirmed during syncing.", txid);
+ return Err(InternalError::Inconsistency);
+ }
+ }
+ }
+ Ok(None)
+ }
+
+ #[maybe_async]
+ fn get_unconfirmed_transactions(
+ &self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>,
+ ) -> Result<Vec<Txid>, InternalError> {
+ // Query the interface for relevant txids and check whether the relevant blocks are still
+ // in the best chain, mark them unconfirmed otherwise
+ let relevant_txids = confirmables
+ .iter()
+ .flat_map(|c| c.get_relevant_txids())
+ .collect::<HashSet<(Txid, Option<BlockHash>)>>();
+
+ let mut unconfirmed_txs = Vec::new();
+
+ for (txid, block_hash_opt) in relevant_txids {
+ if let Some(block_hash) = block_hash_opt {
+ let block_status = maybe_await!(self.client.get_block_status(&block_hash))?;
+ if block_status.in_best_chain {
+ // Skip if the block in question is still confirmed.
+ continue;
+ }
+
+ unconfirmed_txs.push(txid);
+ } else {
+ log_error!(self.logger, "Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
+ panic!("Untracked confirmation of funding transaction. Please ensure none of your channels had been created with LDK prior to version 0.0.113!");
+ }
+ }
+ Ok(unconfirmed_txs)
+ }
+
+ fn sync_unconfirmed_transactions(
+ &self, sync_state: &mut SyncState, confirmables: &Vec<&(dyn Confirm + Sync + Send)>, unconfirmed_txs: Vec<Txid>,
+ ) {
+ for txid in unconfirmed_txs {
+ for c in confirmables {
+ c.transaction_unconfirmed(&txid);
+ }
+
+ sync_state.watched_transactions.insert(txid);
+ }
+ }
+
+ /// Returns a reference to the underlying esplora client.
+ pub fn client(&self) -> &EsploraClientType {
+ &self.client
+ }
+}
+
+#[cfg(feature = "async-interface")]
+type MutexType<I> = futures::lock::Mutex<I>;
+#[cfg(not(feature = "async-interface"))]
+type MutexType<I> = std::sync::Mutex<I>;
+
+// The underlying client type.
+#[cfg(feature = "async-interface")]
+type EsploraClientType = AsyncClient;
+#[cfg(not(feature = "async-interface"))]
+type EsploraClientType = BlockingClient;
+
+
+impl<L: Deref> Filter for EsploraSyncClient<L>
+where
+ L::Target: Logger,
+{
+ fn register_tx(&self, txid: &Txid, _script_pubkey: &Script) {
+ let mut locked_queue = self.queue.lock().unwrap();
+ locked_queue.transactions.insert(*txid);
+ }
+
+ fn register_output(&self, output: WatchedOutput) {
+ let mut locked_queue = self.queue.lock().unwrap();
+ locked_queue.outputs.insert(output.outpoint.into_bitcoin_outpoint(), output);
+ }
+}
--- /dev/null
+//! Provides utilities for syncing LDK via the transaction-based [`Confirm`] interface.
+//!
+//! The provided synchronization clients need to be registered with a [`ChainMonitor`] via the
+//! [`Filter`] interface. Then, the respective `fn sync` needs to be called with the [`Confirm`]
+//! implementations to be synchronized, i.e., usually instances of [`ChannelManager`] and
+//! [`ChainMonitor`].
+//!
+//! ## Features and Backend Support
+//!
+//!- `esplora_blocking` enables syncing against an Esplora backend based on a blocking client.
+//!- `esplora_async` enables syncing against an Esplora backend based on an async client.
+//!
+//! ## Version Compatibility
+//!
+//! Currently this crate is compatible with nodes that were created with LDK version 0.0.113 and above.
+//!
+//! ## Usage Example:
+//!
+//! ```ignore
+//! let tx_sync = Arc::new(EsploraSyncClient::new(
+//! esplora_server_url,
+//! Arc::clone(&some_logger),
+//! ));
+//!
+//! let chain_monitor = Arc::new(ChainMonitor::new(
+//! Some(Arc::clone(&tx_sync)),
+//! Arc::clone(&some_broadcaster),
+//! Arc::clone(&some_logger),
+//! Arc::clone(&some_fee_estimator),
+//! Arc::clone(&some_persister),
+//! ));
+//!
+//! let channel_manager = Arc::new(ChannelManager::new(
+//! Arc::clone(&some_fee_estimator),
+//! Arc::clone(&chain_monitor),
+//! Arc::clone(&some_broadcaster),
+//! Arc::clone(&some_router),
+//! Arc::clone(&some_logger),
+//! Arc::clone(&some_entropy_source),
+//! Arc::clone(&some_node_signer),
+//! Arc::clone(&some_signer_provider),
+//! user_config,
+//! chain_params,
+//! ));
+//!
+//! let confirmables = vec![
+//! &*channel_manager as &(dyn Confirm + Sync + Send),
+//! &*chain_monitor as &(dyn Confirm + Sync + Send),
+//! ];
+//!
+//! tx_sync.sync(confirmables).unwrap();
+//! ```
+//!
+//! [`Confirm`]: lightning::chain::Confirm
+//! [`Filter`]: lightning::chain::Filter
+//! [`ChainMonitor`]: lightning::chain::chainmonitor::ChainMonitor
+//! [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
+
+// Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
+#![deny(broken_intra_doc_links)]
+#![deny(private_intra_doc_links)]
+
+#![deny(missing_docs)]
+#![deny(unsafe_code)]
+
+#![cfg_attr(docsrs, feature(doc_auto_cfg))]
+
+#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))]
+#[macro_use]
+extern crate bdk_macros;
+
+#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))]
+mod esplora;
+
+#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))]
+mod common;
+
+mod error;
+pub use error::TxSyncError;
+
+#[cfg(any(feature = "esplora-blocking", feature = "esplora-async"))]
+pub use esplora::EsploraSyncClient;
--- /dev/null
+#![cfg(any(feature = "esplora-blocking", feature = "esplora-async"))]
+use lightning_transaction_sync::EsploraSyncClient;
+use lightning::chain::{Confirm, Filter};
+use lightning::chain::transaction::TransactionData;
+use lightning::util::logger::{Logger, Record};
+
+use electrsd::{bitcoind, bitcoind::BitcoinD, ElectrsD};
+use bitcoin::{Amount, Txid, BlockHash, BlockHeader};
+use bitcoin::blockdata::constants::genesis_block;
+use bitcoin::network::constants::Network;
+use electrsd::bitcoind::bitcoincore_rpc::bitcoincore_rpc_json::AddressType;
+use bitcoind::bitcoincore_rpc::RpcApi;
+use electrum_client::ElectrumApi;
+
+use once_cell::sync::OnceCell;
+
+use std::env;
+use std::sync::Mutex;
+use std::time::Duration;
+use std::collections::{HashMap, HashSet};
+
+static BITCOIND: OnceCell<BitcoinD> = OnceCell::new();
+static ELECTRSD: OnceCell<ElectrsD> = OnceCell::new();
+static PREMINE: OnceCell<()> = OnceCell::new();
+static MINER_LOCK: OnceCell<Mutex<()>> = OnceCell::new();
+
+fn get_bitcoind() -> &'static BitcoinD {
+ BITCOIND.get_or_init(|| {
+ let bitcoind_exe =
+ env::var("BITCOIND_EXE").ok().or_else(|| bitcoind::downloaded_exe_path().ok()).expect(
+ "you need to provide an env var BITCOIND_EXE or specify a bitcoind version feature",
+ );
+ let mut conf = bitcoind::Conf::default();
+ conf.network = "regtest";
+ BitcoinD::with_conf(bitcoind_exe, &conf).unwrap()
+ })
+}
+
+fn get_electrsd() -> &'static ElectrsD {
+ ELECTRSD.get_or_init(|| {
+ let bitcoind = get_bitcoind();
+ let electrs_exe =
+ env::var("ELECTRS_EXE").ok().or_else(electrsd::downloaded_exe_path).expect(
+ "you need to provide env var ELECTRS_EXE or specify an electrsd version feature",
+ );
+ let mut conf = electrsd::Conf::default();
+ conf.http_enabled = true;
+ conf.network = "regtest";
+ ElectrsD::with_conf(electrs_exe, &bitcoind, &conf).unwrap()
+ })
+}
+
+fn generate_blocks_and_wait(num: usize) {
+ let miner_lock = MINER_LOCK.get_or_init(|| Mutex::new(()));
+ let _miner = miner_lock.lock().unwrap();
+ let cur_height = get_bitcoind().client.get_block_count().unwrap();
+ let address = get_bitcoind().client.get_new_address(Some("test"), Some(AddressType::Legacy)).unwrap();
+ let _block_hashes = get_bitcoind().client.generate_to_address(num as u64, &address).unwrap();
+ wait_for_block(cur_height as usize + num);
+}
+
+fn wait_for_block(min_height: usize) {
+ let mut header = get_electrsd().client.block_headers_subscribe().unwrap();
+ loop {
+ if header.height >= min_height {
+ break;
+ }
+ header = exponential_backoff_poll(|| {
+ get_electrsd().trigger().unwrap();
+ get_electrsd().client.ping().unwrap();
+ get_electrsd().client.block_headers_pop().unwrap()
+ });
+ }
+}
+
+fn exponential_backoff_poll<T, F>(mut poll: F) -> T
+where
+ F: FnMut() -> Option<T>,
+{
+ let mut delay = Duration::from_millis(64);
+ let mut tries = 0;
+ loop {
+ match poll() {
+ Some(data) => break data,
+ None if delay.as_millis() < 512 => {
+ delay = delay.mul_f32(2.0);
+ tries += 1;
+ }
+ None if tries == 10 => panic!("Exceeded our maximum wait time."),
+ None => tries += 1,
+ }
+
+ std::thread::sleep(delay);
+ }
+}
+
+fn premine() {
+ PREMINE.get_or_init(|| {
+ generate_blocks_and_wait(101);
+ });
+}
+
+#[derive(Debug)]
+enum TestConfirmableEvent {
+ Confirmed(Txid, BlockHash, u32),
+ Unconfirmed(Txid),
+ BestBlockUpdated(BlockHash, u32),
+}
+
+struct TestConfirmable {
+ pub confirmed_txs: Mutex<HashMap<Txid, (BlockHash, u32)>>,
+ pub unconfirmed_txs: Mutex<HashSet<Txid>>,
+ pub best_block: Mutex<(BlockHash, u32)>,
+ pub events: Mutex<Vec<TestConfirmableEvent>>,
+}
+
+impl TestConfirmable {
+ pub fn new() -> Self {
+ let genesis_hash = genesis_block(Network::Regtest).block_hash();
+ Self {
+ confirmed_txs: Mutex::new(HashMap::new()),
+ unconfirmed_txs: Mutex::new(HashSet::new()),
+ best_block: Mutex::new((genesis_hash, 0)),
+ events: Mutex::new(Vec::new()),
+ }
+ }
+}
+
+impl Confirm for TestConfirmable {
+ fn transactions_confirmed(&self, header: &BlockHeader, txdata: &TransactionData<'_>, height: u32) {
+ for (_, tx) in txdata {
+ let txid = tx.txid();
+ let block_hash = header.block_hash();
+ self.confirmed_txs.lock().unwrap().insert(txid, (block_hash, height));
+ self.unconfirmed_txs.lock().unwrap().remove(&txid);
+ self.events.lock().unwrap().push(TestConfirmableEvent::Confirmed(txid, block_hash, height));
+ }
+ }
+
+ fn transaction_unconfirmed(&self, txid: &Txid) {
+ self.unconfirmed_txs.lock().unwrap().insert(*txid);
+ self.confirmed_txs.lock().unwrap().remove(txid);
+ self.events.lock().unwrap().push(TestConfirmableEvent::Unconfirmed(*txid));
+ }
+
+ fn best_block_updated(&self, header: &BlockHeader, height: u32) {
+ let block_hash = header.block_hash();
+ *self.best_block.lock().unwrap() = (block_hash, height);
+ self.events.lock().unwrap().push(TestConfirmableEvent::BestBlockUpdated(block_hash, height));
+ }
+
+ fn get_relevant_txids(&self) -> Vec<(Txid, Option<BlockHash>)> {
+ self.confirmed_txs.lock().unwrap().iter().map(|(&txid, (hash, _))| (txid, Some(*hash))).collect::<Vec<_>>()
+ }
+}
+
+pub struct TestLogger {}
+
+impl Logger for TestLogger {
+ fn log(&self, record: &Record) {
+ println!("{} -- {}",
+ record.level,
+ record.args);
+ }
+}
+
+#[test]
+#[cfg(feature = "esplora-blocking")]
+fn test_esplora_syncs() {
+ premine();
+ let mut logger = TestLogger {};
+ let esplora_url = format!("http://{}", get_electrsd().esplora_url.as_ref().unwrap());
+ let tx_sync = EsploraSyncClient::new(esplora_url, &mut logger);
+ let confirmable = TestConfirmable::new();
+
+ // Check we pick up on new best blocks
+ let expected_height = 0u32;
+ assert_eq!(confirmable.best_block.lock().unwrap().1, expected_height);
+
+ tx_sync.sync(vec![&confirmable]).unwrap();
+
+ let expected_height = get_bitcoind().client.get_block_count().unwrap() as u32;
+ assert_eq!(confirmable.best_block.lock().unwrap().1, expected_height);
+
+ let events = std::mem::take(&mut *confirmable.events.lock().unwrap());
+ assert_eq!(events.len(), 1);
+
+ // Check registered confirmed transactions are marked confirmed
+ let new_address = get_bitcoind().client.get_new_address(Some("test"), Some(AddressType::Legacy)).unwrap();
+ let txid = get_bitcoind().client.send_to_address(&new_address, Amount::from_sat(5000), None, None, None, None, None, None).unwrap();
+ tx_sync.register_tx(&txid, &new_address.script_pubkey());
+
+ tx_sync.sync(vec![&confirmable]).unwrap();
+
+ let events = std::mem::take(&mut *confirmable.events.lock().unwrap());
+ assert_eq!(events.len(), 0);
+ assert!(confirmable.confirmed_txs.lock().unwrap().is_empty());
+ assert!(confirmable.unconfirmed_txs.lock().unwrap().is_empty());
+
+ generate_blocks_and_wait(1);
+ tx_sync.sync(vec![&confirmable]).unwrap();
+
+ let events = std::mem::take(&mut *confirmable.events.lock().unwrap());
+ assert_eq!(events.len(), 2);
+ assert!(confirmable.confirmed_txs.lock().unwrap().contains_key(&txid));
+ assert!(confirmable.unconfirmed_txs.lock().unwrap().is_empty());
+
+ // Check previously confirmed transactions are marked unconfirmed when they are reorged.
+ let best_block_hash = get_bitcoind().client.get_best_block_hash().unwrap();
+ get_bitcoind().client.invalidate_block(&best_block_hash).unwrap();
+
+ // We're getting back to the previous height with a new tip, but best block shouldn't change.
+ generate_blocks_and_wait(1);
+ assert_ne!(get_bitcoind().client.get_best_block_hash().unwrap(), best_block_hash);
+ tx_sync.sync(vec![&confirmable]).unwrap();
+ let events = std::mem::take(&mut *confirmable.events.lock().unwrap());
+ assert_eq!(events.len(), 0);
+
+ // Now we're surpassing previous height, getting new tip.
+ generate_blocks_and_wait(1);
+ assert_ne!(get_bitcoind().client.get_best_block_hash().unwrap(), best_block_hash);
+ tx_sync.sync(vec![&confirmable]).unwrap();
+
+ // Transaction still confirmed but under new tip.
+ assert!(confirmable.confirmed_txs.lock().unwrap().contains_key(&txid));
+ assert!(confirmable.unconfirmed_txs.lock().unwrap().is_empty());
+
+ // Check we got unconfirmed, then reconfirmed in the meantime.
+ let events = std::mem::take(&mut *confirmable.events.lock().unwrap());
+ assert_eq!(events.len(), 3);
+
+ match events[0] {
+ TestConfirmableEvent::Unconfirmed(t) => {
+ assert_eq!(t, txid);
+ },
+ _ => panic!("Unexpected event"),
+ }
+
+ match events[1] {
+ TestConfirmableEvent::BestBlockUpdated(..) => {},
+ _ => panic!("Unexpected event"),
+ }
+
+ match events[2] {
+ TestConfirmableEvent::Confirmed(t, _, _) => {
+ assert_eq!(t, txid);
+ },
+ _ => panic!("Unexpected event"),
+ }
+}
+
+#[tokio::test]
+#[cfg(feature = "esplora-async")]
+async fn test_esplora_syncs() {
+ premine();
+ let mut logger = TestLogger {};
+ let esplora_url = format!("http://{}", get_electrsd().esplora_url.as_ref().unwrap());
+ let tx_sync = EsploraSyncClient::new(esplora_url, &mut logger);
+ let confirmable = TestConfirmable::new();
+
+ // Check we pick up on new best blocks
+ let expected_height = 0u32;
+ assert_eq!(confirmable.best_block.lock().unwrap().1, expected_height);
+
+ tx_sync.sync(vec![&confirmable]).await.unwrap();
+
+ let expected_height = get_bitcoind().client.get_block_count().unwrap() as u32;
+ assert_eq!(confirmable.best_block.lock().unwrap().1, expected_height);
+
+ let events = std::mem::take(&mut *confirmable.events.lock().unwrap());
+ assert_eq!(events.len(), 1);
+
+ // Check registered confirmed transactions are marked confirmed
+ let new_address = get_bitcoind().client.get_new_address(Some("test"), Some(AddressType::Legacy)).unwrap();
+ let txid = get_bitcoind().client.send_to_address(&new_address, Amount::from_sat(5000), None, None, None, None, None, None).unwrap();
+ tx_sync.register_tx(&txid, &new_address.script_pubkey());
+
+ tx_sync.sync(vec![&confirmable]).await.unwrap();
+
+ let events = std::mem::take(&mut *confirmable.events.lock().unwrap());
+ assert_eq!(events.len(), 0);
+ assert!(confirmable.confirmed_txs.lock().unwrap().is_empty());
+ assert!(confirmable.unconfirmed_txs.lock().unwrap().is_empty());
+
+ generate_blocks_and_wait(1);
+ tx_sync.sync(vec![&confirmable]).await.unwrap();
+
+ let events = std::mem::take(&mut *confirmable.events.lock().unwrap());
+ assert_eq!(events.len(), 2);
+ assert!(confirmable.confirmed_txs.lock().unwrap().contains_key(&txid));
+ assert!(confirmable.unconfirmed_txs.lock().unwrap().is_empty());
+
+ // Check previously confirmed transactions are marked unconfirmed when they are reorged.
+ let best_block_hash = get_bitcoind().client.get_best_block_hash().unwrap();
+ get_bitcoind().client.invalidate_block(&best_block_hash).unwrap();
+
+ // We're getting back to the previous height with a new tip, but best block shouldn't change.
+ generate_blocks_and_wait(1);
+ assert_ne!(get_bitcoind().client.get_best_block_hash().unwrap(), best_block_hash);
+ tx_sync.sync(vec![&confirmable]).await.unwrap();
+ let events = std::mem::take(&mut *confirmable.events.lock().unwrap());
+ assert_eq!(events.len(), 0);
+
+ // Now we're surpassing previous height, getting new tip.
+ generate_blocks_and_wait(1);
+ assert_ne!(get_bitcoind().client.get_best_block_hash().unwrap(), best_block_hash);
+ tx_sync.sync(vec![&confirmable]).await.unwrap();
+
+ // Transaction still confirmed but under new tip.
+ assert!(confirmable.confirmed_txs.lock().unwrap().contains_key(&txid));
+ assert!(confirmable.unconfirmed_txs.lock().unwrap().is_empty());
+
+ // Check we got unconfirmed, then reconfirmed in the meantime.
+ let events = std::mem::take(&mut *confirmable.events.lock().unwrap());
+ assert_eq!(events.len(), 3);
+
+ match events[0] {
+ TestConfirmableEvent::Unconfirmed(t) => {
+ assert_eq!(t, txid);
+ },
+ _ => panic!("Unexpected event"),
+ }
+
+ match events[1] {
+ TestConfirmableEvent::BestBlockUpdated(..) => {},
+ _ => panic!("Unexpected event"),
+ }
+
+ match events[2] {
+ TestConfirmableEvent::Confirmed(t, _, _) => {
+ assert_eq!(t, txid);
+ },
+ _ => panic!("Unexpected event"),
+ }
+}
/// [`PaymentHash`] and [`PaymentPreimage`] for you.
///
/// The [`PaymentPreimage`] will ultimately be returned to you in the [`PaymentClaimable`], which
- /// will have the [`PaymentClaimable::payment_preimage`] field filled in. That should then be
+ /// will have the [`PaymentClaimable::purpose`] be [`PaymentPurpose::InvoicePayment`] with
+ /// its [`PaymentPurpose::InvoicePayment::payment_preimage`] field filled in. That should then be
/// passed directly to [`claim_funds`].
///
/// See [`create_inbound_payment_for_hash`] for detailed documentation on behavior and requirements.
///
/// [`claim_funds`]: Self::claim_funds
/// [`PaymentClaimable`]: events::Event::PaymentClaimable
- /// [`PaymentClaimable::payment_preimage`]: events::Event::PaymentClaimable::payment_preimage
+ /// [`PaymentClaimable::purpose`]: events::Event::PaymentClaimable::purpose
+ /// [`PaymentPurpose::InvoicePayment`]: events::PaymentPurpose::InvoicePayment
+ /// [`PaymentPurpose::InvoicePayment::payment_preimage`]: events::PaymentPurpose::InvoicePayment::payment_preimage
/// [`create_inbound_payment_for_hash`]: Self::create_inbound_payment_for_hash
pub fn create_inbound_payment(&self, min_value_msat: Option<u64>, invoice_expiry_delta_secs: u32,
min_final_cltv_expiry_delta: Option<u16>) -> Result<(PaymentHash, PaymentSecret), ()> {