1 use lightning::chain::{Confirm, WatchedOutput};
2 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
3 use bitcoin::{Txid, BlockHash, Transaction, OutPoint};
4 use bitcoin::block::Header;
6 use std::collections::{HashSet, HashMap};
10 // Represents the current state.
11 pub(crate) struct SyncState {
12 // Transactions that were previously processed, but must not be forgotten
13 // yet since they still need to be monitored for confirmation on-chain.
14 pub watched_transactions: HashSet<Txid>,
15 // Outputs that were previously processed, but must not be forgotten yet as
16 // as we still need to monitor any spends on-chain.
17 pub watched_outputs: HashMap<OutPoint, WatchedOutput>,
18 // Outputs for which we previously saw a spend on-chain but kept around until the spends reach
20 pub outputs_spends_pending_threshold_conf: Vec<(Txid, u32, OutPoint, WatchedOutput)>,
21 // The tip hash observed during our last sync.
22 pub last_sync_hash: Option<BlockHash>,
23 // Indicates whether we need to resync, e.g., after encountering an error.
24 pub pending_sync: bool,
28 pub fn new() -> Self {
30 watched_transactions: HashSet::new(),
31 watched_outputs: HashMap::new(),
32 outputs_spends_pending_threshold_conf: Vec::new(),
37 pub fn sync_unconfirmed_transactions<C: Deref>(
38 &mut self, confirmables: &Vec<C>,
39 unconfirmed_txs: Vec<Txid>,
41 where C::Target: Confirm,
43 for txid in unconfirmed_txs {
44 for c in confirmables {
45 c.transaction_unconfirmed(&txid);
48 self.watched_transactions.insert(txid);
50 // If a previously-confirmed output spend is unconfirmed, re-add the watched output to
52 self.outputs_spends_pending_threshold_conf.retain(|(conf_txid, _, prev_outpoint, output)| {
53 if txid == *conf_txid {
54 self.watched_outputs.insert(*prev_outpoint, output.clone());
63 pub fn sync_confirmed_transactions<C: Deref>(
64 &mut self, confirmables: &Vec<C>,
65 confirmed_txs: Vec<ConfirmedTx>
67 where C::Target: Confirm,
69 for ctx in confirmed_txs {
70 for c in confirmables {
71 c.transactions_confirmed(
73 &[(ctx.pos, &ctx.tx)],
78 self.watched_transactions.remove(&ctx.tx.txid());
80 for input in &ctx.tx.input {
81 if let Some(output) = self.watched_outputs.remove(&input.previous_output) {
82 self.outputs_spends_pending_threshold_conf.push((ctx.tx.txid(), ctx.block_height, input.previous_output, output));
88 pub fn prune_output_spends(&mut self, cur_height: u32) {
89 self.outputs_spends_pending_threshold_conf.retain(|(_, conf_height, _, _)| {
90 cur_height < conf_height + ANTI_REORG_DELAY - 1
96 // A queue that is to be filled by `Filter` and drained during the next syncing round.
97 pub(crate) struct FilterQueue {
98 // Transactions that were registered via the `Filter` interface and have to be processed.
99 pub transactions: HashSet<Txid>,
100 // Outputs that were registered via the `Filter` interface and have to be processed.
101 pub outputs: HashMap<OutPoint, WatchedOutput>,
105 pub fn new() -> Self {
107 transactions: HashSet::new(),
108 outputs: HashMap::new(),
112 // Processes the transaction and output queues and adds them to the given [`SyncState`].
114 // Returns `true` if new items had been registered.
115 pub fn process_queues(&mut self, sync_state: &mut SyncState) -> bool {
116 let mut pending_registrations = false;
118 if !self.transactions.is_empty() {
119 pending_registrations = true;
121 sync_state.watched_transactions.extend(self.transactions.drain());
124 if !self.outputs.is_empty() {
125 pending_registrations = true;
127 sync_state.watched_outputs.extend(self.outputs.drain());
129 pending_registrations
134 pub(crate) struct ConfirmedTx {
137 pub block_header: Header,
138 pub block_height: u32,