1 use bitcoin::block::Header;
2 use bitcoin::{BlockHash, OutPoint, Transaction, Txid};
3 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
4 use lightning::chain::{Confirm, WatchedOutput};
6 use std::collections::{HashMap, HashSet};
9 // Represents the current state.
10 pub(crate) struct SyncState {
11 // Transactions that were previously processed, but must not be forgotten
12 // yet since they still need to be monitored for confirmation on-chain.
13 pub watched_transactions: HashSet<Txid>,
14 // Outputs that were previously processed, but must not be forgotten yet as
15 // as we still need to monitor any spends on-chain.
16 pub watched_outputs: HashMap<OutPoint, WatchedOutput>,
17 // Outputs for which we previously saw a spend on-chain but kept around until the spends reach
19 pub outputs_spends_pending_threshold_conf: Vec<(Txid, u32, OutPoint, WatchedOutput)>,
20 // The tip hash observed during our last sync.
21 pub last_sync_hash: Option<BlockHash>,
22 // Indicates whether we need to resync, e.g., after encountering an error.
23 pub pending_sync: bool,
27 pub fn new() -> Self {
29 watched_transactions: HashSet::new(),
30 watched_outputs: HashMap::new(),
31 outputs_spends_pending_threshold_conf: Vec::new(),
36 pub fn sync_unconfirmed_transactions<C: Deref>(
37 &mut self, confirmables: &Vec<C>, unconfirmed_txs: Vec<Txid>,
41 for txid in unconfirmed_txs {
42 for c in confirmables {
43 c.transaction_unconfirmed(&txid);
46 self.watched_transactions.insert(txid);
48 // If a previously-confirmed output spend is unconfirmed, re-add the watched output to
50 self.outputs_spends_pending_threshold_conf.retain(
51 |(conf_txid, _, prev_outpoint, output)| {
52 if txid == *conf_txid {
53 self.watched_outputs.insert(*prev_outpoint, output.clone());
63 pub fn sync_confirmed_transactions<C: Deref>(
64 &mut self, confirmables: &Vec<C>, confirmed_txs: Vec<ConfirmedTx>,
68 for ctx in confirmed_txs {
69 for c in confirmables {
70 c.transactions_confirmed(
72 &[(ctx.pos, &ctx.tx)],
77 self.watched_transactions.remove(&ctx.tx.txid());
79 for input in &ctx.tx.input {
80 if let Some(output) = self.watched_outputs.remove(&input.previous_output) {
81 let spent = (ctx.tx.txid(), ctx.block_height, input.previous_output, output);
82 self.outputs_spends_pending_threshold_conf.push(spent);
88 pub fn prune_output_spends(&mut self, cur_height: u32) {
89 self.outputs_spends_pending_threshold_conf
90 .retain(|(_, conf_height, _, _)| cur_height < conf_height + ANTI_REORG_DELAY - 1);
94 // A queue that is to be filled by `Filter` and drained during the next syncing round.
95 pub(crate) struct FilterQueue {
96 // Transactions that were registered via the `Filter` interface and have to be processed.
97 pub transactions: HashSet<Txid>,
98 // Outputs that were registered via the `Filter` interface and have to be processed.
99 pub outputs: HashMap<OutPoint, WatchedOutput>,
103 pub fn new() -> Self {
104 Self { transactions: HashSet::new(), outputs: HashMap::new() }
107 // Processes the transaction and output queues and adds them to the given [`SyncState`].
109 // Returns `true` if new items had been registered.
110 pub fn process_queues(&mut self, sync_state: &mut SyncState) -> bool {
111 let mut pending_registrations = false;
113 if !self.transactions.is_empty() {
114 pending_registrations = true;
116 sync_state.watched_transactions.extend(self.transactions.drain());
119 if !self.outputs.is_empty() {
120 pending_registrations = true;
122 sync_state.watched_outputs.extend(self.outputs.drain());
124 pending_registrations
129 pub(crate) struct ConfirmedTx {
132 pub block_header: Header,
133 pub block_height: u32,