1 use lightning::chain::{Confirm, WatchedOutput};
2 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
3 use bitcoin::{Txid, BlockHash, Transaction, OutPoint};
4 use bitcoin::block::Header;
6 use std::collections::{HashSet, HashMap};
9 // Represents the current state.
10 pub(crate) struct SyncState {
11 // Transactions that were previously processed, but must not be forgotten
12 // yet since they still need to be monitored for confirmation on-chain.
13 pub watched_transactions: HashSet<Txid>,
14 // Outputs that were previously processed, but must not be forgotten yet as
15 // as we still need to monitor any spends on-chain.
16 pub watched_outputs: HashMap<OutPoint, WatchedOutput>,
17 // Outputs for which we previously saw a spend on-chain but kept around until the spends reach
19 pub outputs_spends_pending_threshold_conf: Vec<(Txid, u32, OutPoint, WatchedOutput)>,
20 // The tip hash observed during our last sync.
21 pub last_sync_hash: Option<BlockHash>,
22 // Indicates whether we need to resync, e.g., after encountering an error.
23 pub pending_sync: bool,
27 pub fn new() -> Self {
29 watched_transactions: HashSet::new(),
30 watched_outputs: HashMap::new(),
31 outputs_spends_pending_threshold_conf: Vec::new(),
36 pub fn sync_unconfirmed_transactions(
37 &mut self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>,
38 unconfirmed_txs: Vec<Txid>,
40 for txid in unconfirmed_txs {
41 for c in confirmables {
42 c.transaction_unconfirmed(&txid);
45 self.watched_transactions.insert(txid);
47 // If a previously-confirmed output spend is unconfirmed, re-add the watched output to
49 self.outputs_spends_pending_threshold_conf.retain(|(conf_txid, _, prev_outpoint, output)| {
50 if txid == *conf_txid {
51 self.watched_outputs.insert(*prev_outpoint, output.clone());
60 pub fn sync_confirmed_transactions(
61 &mut self, confirmables: &Vec<&(dyn Confirm + Sync + Send)>,
62 confirmed_txs: Vec<ConfirmedTx>
64 for ctx in confirmed_txs {
65 for c in confirmables {
66 c.transactions_confirmed(
68 &[(ctx.pos, &ctx.tx)],
73 self.watched_transactions.remove(&ctx.tx.txid());
75 for input in &ctx.tx.input {
76 if let Some(output) = self.watched_outputs.remove(&input.previous_output) {
77 self.outputs_spends_pending_threshold_conf.push((ctx.tx.txid(), ctx.block_height, input.previous_output, output));
83 pub fn prune_output_spends(&mut self, cur_height: u32) {
84 self.outputs_spends_pending_threshold_conf.retain(|(_, conf_height, _, _)| {
85 cur_height < conf_height + ANTI_REORG_DELAY - 1
91 // A queue that is to be filled by `Filter` and drained during the next syncing round.
92 pub(crate) struct FilterQueue {
93 // Transactions that were registered via the `Filter` interface and have to be processed.
94 pub transactions: HashSet<Txid>,
95 // Outputs that were registered via the `Filter` interface and have to be processed.
96 pub outputs: HashMap<OutPoint, WatchedOutput>,
100 pub fn new() -> Self {
102 transactions: HashSet::new(),
103 outputs: HashMap::new(),
107 // Processes the transaction and output queues and adds them to the given [`SyncState`].
109 // Returns `true` if new items had been registered.
110 pub fn process_queues(&mut self, sync_state: &mut SyncState) -> bool {
111 let mut pending_registrations = false;
113 if !self.transactions.is_empty() {
114 pending_registrations = true;
116 sync_state.watched_transactions.extend(self.transactions.drain());
119 if !self.outputs.is_empty() {
120 pending_registrations = true;
122 sync_state.watched_outputs.extend(self.outputs.drain());
124 pending_registrations
129 pub(crate) struct ConfirmedTx {
131 pub block_header: Header,
132 pub block_height: u32,