Allow delaying generation and broadcasting of spending txs
[rust-lightning] / lightning / src / util / sweep.rs
1 // This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
2 // or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
4 // You may not use this file except in accordance with one or both of these
5 // licenses.
6
7 //! This module contains an [`OutputSweeper`] utility that keeps track of
8 //! [`SpendableOutputDescriptor`]s, i.e., persists them in a given [`KVStore`] and regularly retries
9 //! sweeping them.
10
11 use crate::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
12 use crate::chain::channelmonitor::ANTI_REORG_DELAY;
13 use crate::chain::{self, BestBlock, Confirm, Filter, Listen, WatchedOutput};
14 use crate::io;
15 use crate::ln::msgs::DecodeError;
16 use crate::ln::ChannelId;
17 use crate::prelude::Vec;
18 use crate::sign::{ChangeDestinationSource, OutputSpender, SpendableOutputDescriptor};
19 use crate::sync::Mutex;
20 use crate::util::logger::Logger;
21 use crate::util::persist::{
22         KVStore, OUTPUT_SWEEPER_PERSISTENCE_KEY, OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
23         OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
24 };
25 use crate::util::ser::{Readable, ReadableArgs, Writeable};
26 use crate::{impl_writeable_tlv_based, log_debug, log_error};
27
28 use bitcoin::blockdata::block::Header;
29 use bitcoin::blockdata::locktime::absolute::LockTime;
30 use bitcoin::secp256k1::Secp256k1;
31 use bitcoin::{BlockHash, Transaction, Txid};
32
33 use core::ops::Deref;
34
35 /// The state of a spendable output currently tracked by an [`OutputSweeper`].
36 #[derive(Clone, Debug, PartialEq, Eq)]
37 pub struct TrackedSpendableOutput {
38         /// The tracked output descriptor.
39         pub descriptor: SpendableOutputDescriptor,
40         /// The channel this output belongs to.
41         ///
42         /// Will be `None` if no `channel_id` was given to [`OutputSweeper::track_spendable_outputs`]
43         pub channel_id: Option<ChannelId>,
44         /// The current status of the output spend.
45         pub status: OutputSpendStatus,
46 }
47
48 impl TrackedSpendableOutput {
49         fn to_watched_output(&self, cur_hash: BlockHash) -> WatchedOutput {
50                 let block_hash = self.status.first_broadcast_hash().or(Some(cur_hash));
51                 match &self.descriptor {
52                         SpendableOutputDescriptor::StaticOutput { outpoint, output, channel_keys_id: _ } => {
53                                 WatchedOutput {
54                                         block_hash,
55                                         outpoint: *outpoint,
56                                         script_pubkey: output.script_pubkey.clone(),
57                                 }
58                         },
59                         SpendableOutputDescriptor::DelayedPaymentOutput(output) => WatchedOutput {
60                                 block_hash,
61                                 outpoint: output.outpoint,
62                                 script_pubkey: output.output.script_pubkey.clone(),
63                         },
64                         SpendableOutputDescriptor::StaticPaymentOutput(output) => WatchedOutput {
65                                 block_hash,
66                                 outpoint: output.outpoint,
67                                 script_pubkey: output.output.script_pubkey.clone(),
68                         },
69                 }
70         }
71
72         /// Returns whether the output is spent in the given transaction.
73         pub fn is_spent_in(&self, tx: &Transaction) -> bool {
74                 let prev_outpoint = match &self.descriptor {
75                         SpendableOutputDescriptor::StaticOutput { outpoint, .. } => *outpoint,
76                         SpendableOutputDescriptor::DelayedPaymentOutput(output) => output.outpoint,
77                         SpendableOutputDescriptor::StaticPaymentOutput(output) => output.outpoint,
78                 }
79                 .into_bitcoin_outpoint();
80
81                 tx.input.iter().any(|input| input.previous_output == prev_outpoint)
82         }
83 }
84
85 impl_writeable_tlv_based!(TrackedSpendableOutput, {
86         (0, descriptor, required),
87         (2, channel_id, option),
88         (4, status, required),
89 });
90
91 /// The current status of the output spend.
92 #[derive(Debug, Clone, PartialEq, Eq)]
93 pub enum OutputSpendStatus {
94         /// The output is tracked but an initial spending transaction hasn't been generated and
95         /// broadcasted yet.
96         PendingInitialBroadcast {
97                 /// The height at which we will first generate and broadcast a spending transaction.
98                 delayed_until_height: Option<u32>,
99         },
100         /// A transaction spending the output has been broadcasted but is pending its first confirmation on-chain.
101         PendingFirstConfirmation {
102                 /// The hash of the chain tip when we first broadcast a transaction spending this output.
103                 first_broadcast_hash: BlockHash,
104                 /// The best height when we last broadcast a transaction spending this output.
105                 latest_broadcast_height: u32,
106                 /// The transaction spending this output we last broadcasted.
107                 latest_spending_tx: Transaction,
108         },
109         /// A transaction spending the output has been confirmed on-chain but will be tracked until it
110         /// reaches [`ANTI_REORG_DELAY`] confirmations.
111         PendingThresholdConfirmations {
112                 /// The hash of the chain tip when we first broadcast a transaction spending this output.
113                 first_broadcast_hash: BlockHash,
114                 /// The best height when we last broadcast a transaction spending this output.
115                 latest_broadcast_height: u32,
116                 /// The transaction spending this output we saw confirmed on-chain.
117                 latest_spending_tx: Transaction,
118                 /// The height at which the spending transaction was confirmed.
119                 confirmation_height: u32,
120                 /// The hash of the block in which the spending transaction was confirmed.
121                 confirmation_hash: BlockHash,
122         },
123 }
124
125 impl OutputSpendStatus {
126         fn broadcast(&mut self, cur_hash: BlockHash, cur_height: u32, latest_spending_tx: Transaction) {
127                 match self {
128                         Self::PendingInitialBroadcast { delayed_until_height } => {
129                                 if let Some(delayed_until_height) = delayed_until_height {
130                                         debug_assert!(
131                                                 cur_height >= *delayed_until_height,
132                                                 "We should never broadcast before the required height is reached."
133                                         );
134                                 }
135                                 *self = Self::PendingFirstConfirmation {
136                                         first_broadcast_hash: cur_hash,
137                                         latest_broadcast_height: cur_height,
138                                         latest_spending_tx,
139                                 };
140                         },
141                         Self::PendingFirstConfirmation { first_broadcast_hash, .. } => {
142                                 *self = Self::PendingFirstConfirmation {
143                                         first_broadcast_hash: *first_broadcast_hash,
144                                         latest_broadcast_height: cur_height,
145                                         latest_spending_tx,
146                                 };
147                         },
148                         Self::PendingThresholdConfirmations { .. } => {
149                                 debug_assert!(false, "We should never rebroadcast confirmed transactions.");
150                         },
151                 }
152         }
153
154         fn confirmed(
155                 &mut self, confirmation_hash: BlockHash, confirmation_height: u32,
156                 latest_spending_tx: Transaction,
157         ) {
158                 match self {
159                         Self::PendingInitialBroadcast { .. } => {
160                                 // Generally we can't see any of our transactions confirmed if they haven't been
161                                 // broadcasted yet, so this should never be reachable via `transactions_confirmed`.
162                                 debug_assert!(false, "We should never confirm when we haven't broadcasted. This a bug and should never happen, please report.");
163                                 *self = Self::PendingThresholdConfirmations {
164                                         first_broadcast_hash: confirmation_hash,
165                                         latest_broadcast_height: confirmation_height,
166                                         latest_spending_tx,
167                                         confirmation_height,
168                                         confirmation_hash,
169                                 };
170                         },
171                         Self::PendingFirstConfirmation {
172                                 first_broadcast_hash,
173                                 latest_broadcast_height,
174                                 ..
175                         } => {
176                                 debug_assert!(confirmation_height >= *latest_broadcast_height);
177                                 *self = Self::PendingThresholdConfirmations {
178                                         first_broadcast_hash: *first_broadcast_hash,
179                                         latest_broadcast_height: *latest_broadcast_height,
180                                         latest_spending_tx,
181                                         confirmation_height,
182                                         confirmation_hash,
183                                 };
184                         },
185                         Self::PendingThresholdConfirmations {
186                                 first_broadcast_hash,
187                                 latest_broadcast_height,
188                                 ..
189                         } => {
190                                 *self = Self::PendingThresholdConfirmations {
191                                         first_broadcast_hash: *first_broadcast_hash,
192                                         latest_broadcast_height: *latest_broadcast_height,
193                                         latest_spending_tx,
194                                         confirmation_height,
195                                         confirmation_hash,
196                                 };
197                         },
198                 }
199         }
200
201         fn unconfirmed(&mut self) {
202                 match self {
203                         Self::PendingInitialBroadcast { .. } => {
204                                 debug_assert!(
205                                         false,
206                                         "We should only mark a spend as unconfirmed if it used to be confirmed."
207                                 );
208                         },
209                         Self::PendingFirstConfirmation { .. } => {
210                                 debug_assert!(
211                                         false,
212                                         "We should only mark a spend as unconfirmed if it used to be confirmed."
213                                 );
214                         },
215                         Self::PendingThresholdConfirmations {
216                                 first_broadcast_hash,
217                                 latest_broadcast_height,
218                                 latest_spending_tx,
219                                 ..
220                         } => {
221                                 *self = Self::PendingFirstConfirmation {
222                                         first_broadcast_hash: *first_broadcast_hash,
223                                         latest_broadcast_height: *latest_broadcast_height,
224                                         latest_spending_tx: latest_spending_tx.clone(),
225                                 };
226                         },
227                 }
228         }
229
230         fn is_delayed(&self, cur_height: u32) -> bool {
231                 match self {
232                         Self::PendingInitialBroadcast { delayed_until_height } => {
233                                 delayed_until_height.map_or(false, |req_height| cur_height < req_height)
234                         },
235                         Self::PendingFirstConfirmation { .. } => false,
236                         Self::PendingThresholdConfirmations { .. } => false,
237                 }
238         }
239
240         fn first_broadcast_hash(&self) -> Option<BlockHash> {
241                 match self {
242                         Self::PendingInitialBroadcast { .. } => None,
243                         Self::PendingFirstConfirmation { first_broadcast_hash, .. } => {
244                                 Some(*first_broadcast_hash)
245                         },
246                         Self::PendingThresholdConfirmations { first_broadcast_hash, .. } => {
247                                 Some(*first_broadcast_hash)
248                         },
249                 }
250         }
251
252         fn latest_broadcast_height(&self) -> Option<u32> {
253                 match self {
254                         Self::PendingInitialBroadcast { .. } => None,
255                         Self::PendingFirstConfirmation { latest_broadcast_height, .. } => {
256                                 Some(*latest_broadcast_height)
257                         },
258                         Self::PendingThresholdConfirmations { latest_broadcast_height, .. } => {
259                                 Some(*latest_broadcast_height)
260                         },
261                 }
262         }
263
264         fn confirmation_height(&self) -> Option<u32> {
265                 match self {
266                         Self::PendingInitialBroadcast { .. } => None,
267                         Self::PendingFirstConfirmation { .. } => None,
268                         Self::PendingThresholdConfirmations { confirmation_height, .. } => {
269                                 Some(*confirmation_height)
270                         },
271                 }
272         }
273
274         fn confirmation_hash(&self) -> Option<BlockHash> {
275                 match self {
276                         Self::PendingInitialBroadcast { .. } => None,
277                         Self::PendingFirstConfirmation { .. } => None,
278                         Self::PendingThresholdConfirmations { confirmation_hash, .. } => {
279                                 Some(*confirmation_hash)
280                         },
281                 }
282         }
283
284         fn latest_spending_tx(&self) -> Option<&Transaction> {
285                 match self {
286                         Self::PendingInitialBroadcast { .. } => None,
287                         Self::PendingFirstConfirmation { latest_spending_tx, .. } => Some(latest_spending_tx),
288                         Self::PendingThresholdConfirmations { latest_spending_tx, .. } => {
289                                 Some(latest_spending_tx)
290                         },
291                 }
292         }
293
294         fn is_confirmed(&self) -> bool {
295                 match self {
296                         Self::PendingInitialBroadcast { .. } => false,
297                         Self::PendingFirstConfirmation { .. } => false,
298                         Self::PendingThresholdConfirmations { .. } => true,
299                 }
300         }
301 }
302
303 impl_writeable_tlv_based_enum!(OutputSpendStatus,
304         (0, PendingInitialBroadcast) => {
305                 (0, delayed_until_height, option),
306         },
307         (2, PendingFirstConfirmation) => {
308                 (0, first_broadcast_hash, required),
309                 (2, latest_broadcast_height, required),
310                 (4, latest_spending_tx, required),
311         },
312         (4, PendingThresholdConfirmations) => {
313                 (0, first_broadcast_hash, required),
314                 (2, latest_broadcast_height, required),
315                 (4, latest_spending_tx, required),
316                 (6, confirmation_height, required),
317                 (8, confirmation_hash, required),
318         };
319 );
320
321 /// A utility that keeps track of [`SpendableOutputDescriptor`]s, persists them in a given
322 /// [`KVStore`] and regularly retries sweeping them based on a callback given to the constructor
323 /// methods.
324 ///
325 /// Users should call [`Self::track_spendable_outputs`] for any [`SpendableOutputDescriptor`]s received via [`Event::SpendableOutputs`].
326 ///
327 /// This needs to be notified of chain state changes either via its [`Listen`] or [`Confirm`]
328 /// implementation and hence has to be connected with the utilized chain data sources.
329 ///
330 /// If chain data is provided via the [`Confirm`] interface or via filtered blocks, users are
331 /// required to give their chain data sources (i.e., [`Filter`] implementation) to the respective
332 /// constructor.
333 ///
334 /// [`Event::SpendableOutputs`]: crate::events::Event::SpendableOutputs
335 pub struct OutputSweeper<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
336 where
337         B::Target: BroadcasterInterface,
338         D::Target: ChangeDestinationSource,
339         E::Target: FeeEstimator,
340         F::Target: Filter + Sync + Send,
341         K::Target: KVStore,
342         L::Target: Logger,
343         O::Target: OutputSpender,
344 {
345         sweeper_state: Mutex<SweeperState>,
346         broadcaster: B,
347         fee_estimator: E,
348         chain_data_source: Option<F>,
349         output_spender: O,
350         change_destination_source: D,
351         kv_store: K,
352         logger: L,
353 }
354
355 impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
356         OutputSweeper<B, D, E, F, K, L, O>
357 where
358         B::Target: BroadcasterInterface,
359         D::Target: ChangeDestinationSource,
360         E::Target: FeeEstimator,
361         F::Target: Filter + Sync + Send,
362         K::Target: KVStore,
363         L::Target: Logger,
364         O::Target: OutputSpender,
365 {
366         /// Constructs a new [`OutputSweeper`].
367         ///
368         /// If chain data is provided via the [`Confirm`] interface or via filtered blocks, users also
369         /// need to register their [`Filter`] implementation via the given `chain_data_source`.
370         pub fn new(
371                 best_block: BestBlock, broadcaster: B, fee_estimator: E, chain_data_source: Option<F>,
372                 output_spender: O, change_destination_source: D, kv_store: K, logger: L,
373         ) -> Self {
374                 let outputs = Vec::new();
375                 let sweeper_state = Mutex::new(SweeperState { outputs, best_block });
376                 Self {
377                         sweeper_state,
378                         broadcaster,
379                         fee_estimator,
380                         chain_data_source,
381                         output_spender,
382                         change_destination_source,
383                         kv_store,
384                         logger,
385                 }
386         }
387
388         /// Tells the sweeper to track the given outputs descriptors.
389         ///
390         /// Usually, this should be called based on the values emitted by the
391         /// [`Event::SpendableOutputs`].
392         ///
393         /// The given `exclude_static_ouputs` flag controls whether the sweeper will filter out
394         /// [`SpendableOutputDescriptor::StaticOutput`]s, which may be handled directly by the on-chain
395         /// wallet implementation.
396         ///
397         /// If `delay_until_height` is set, we will delay the spending until the respective block
398         /// height is reached. This can be used to batch spends, e.g., to reduce on-chain fees.
399         ///
400         /// [`Event::SpendableOutputs`]: crate::events::Event::SpendableOutputs
401         pub fn track_spendable_outputs(
402                 &self, output_descriptors: Vec<SpendableOutputDescriptor>, channel_id: Option<ChannelId>,
403                 exclude_static_ouputs: bool, delay_until_height: Option<u32>,
404         ) {
405                 let mut relevant_descriptors = output_descriptors
406                         .into_iter()
407                         .filter(|desc| {
408                                 !(exclude_static_ouputs
409                                         && matches!(desc, SpendableOutputDescriptor::StaticOutput { .. }))
410                         })
411                         .peekable();
412
413                 if relevant_descriptors.peek().is_none() {
414                         return;
415                 }
416
417                 let mut spending_tx_opt;
418                 {
419                         let mut state_lock = self.sweeper_state.lock().unwrap();
420                         for descriptor in relevant_descriptors {
421                                 let output_info = TrackedSpendableOutput {
422                                         descriptor,
423                                         channel_id,
424                                         status: OutputSpendStatus::PendingInitialBroadcast {
425                                                 delayed_until_height: delay_until_height,
426                                         },
427                                 };
428
429                                 if state_lock
430                                         .outputs
431                                         .iter()
432                                         .find(|o| o.descriptor == output_info.descriptor)
433                                         .is_some()
434                                 {
435                                         continue;
436                                 }
437
438                                 state_lock.outputs.push(output_info);
439                         }
440                         spending_tx_opt = self.regenerate_spend_if_necessary(&mut *state_lock);
441                         self.persist_state(&*state_lock).unwrap_or_else(|e| {
442                                 log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
443                                 // Skip broadcasting if the persist failed.
444                                 spending_tx_opt = None;
445                         });
446                 }
447
448                 if let Some(spending_tx) = spending_tx_opt {
449                         self.broadcaster.broadcast_transactions(&[&spending_tx]);
450                 }
451         }
452
453         /// Returns a list of the currently tracked spendable outputs.
454         pub fn tracked_spendable_outputs(&self) -> Vec<TrackedSpendableOutput> {
455                 self.sweeper_state.lock().unwrap().outputs.clone()
456         }
457
458         /// Gets the latest best block which was connected either via the [`Listen`] or
459         /// [`Confirm`] interfaces.
460         pub fn current_best_block(&self) -> BestBlock {
461                 self.sweeper_state.lock().unwrap().best_block
462         }
463
464         fn regenerate_spend_if_necessary(
465                 &self, sweeper_state: &mut SweeperState,
466         ) -> Option<Transaction> {
467                 let cur_height = sweeper_state.best_block.height;
468                 let cur_hash = sweeper_state.best_block.block_hash;
469                 let filter_fn = |o: &TrackedSpendableOutput| {
470                         if o.status.is_confirmed() {
471                                 // Don't rebroadcast confirmed txs.
472                                 return false;
473                         }
474
475                         if o.status.is_delayed(cur_height) {
476                                 // Don't generate and broadcast if still delayed
477                                 return false;
478                         }
479
480                         if o.status.latest_broadcast_height() >= Some(cur_height) {
481                                 // Only broadcast once per block height.
482                                 return false;
483                         }
484
485                         true
486                 };
487
488                 let respend_descriptors: Vec<&SpendableOutputDescriptor> =
489                         sweeper_state.outputs.iter().filter(|o| filter_fn(*o)).map(|o| &o.descriptor).collect();
490
491                 if respend_descriptors.is_empty() {
492                         // Nothing to do.
493                         return None;
494                 }
495
496                 let spending_tx = match self.spend_outputs(&*sweeper_state, respend_descriptors) {
497                         Ok(spending_tx) => {
498                                 log_debug!(
499                                         self.logger,
500                                         "Generating and broadcasting sweeping transaction {}",
501                                         spending_tx.txid()
502                                 );
503                                 spending_tx
504                         },
505                         Err(e) => {
506                                 log_error!(self.logger, "Error spending outputs: {:?}", e);
507                                 return None;
508                         },
509                 };
510
511                 // As we didn't modify the state so far, the same filter_fn yields the same elements as
512                 // above.
513                 let respend_outputs = sweeper_state.outputs.iter_mut().filter(|o| filter_fn(&**o));
514                 for output_info in respend_outputs {
515                         if let Some(filter) = self.chain_data_source.as_ref() {
516                                 let watched_output = output_info.to_watched_output(cur_hash);
517                                 filter.register_output(watched_output);
518                         }
519
520                         output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone());
521                 }
522
523                 Some(spending_tx)
524         }
525
526         fn prune_confirmed_outputs(&self, sweeper_state: &mut SweeperState) {
527                 let cur_height = sweeper_state.best_block.height;
528
529                 // Prune all outputs that have sufficient depth by now.
530                 sweeper_state.outputs.retain(|o| {
531                         if let Some(confirmation_height) = o.status.confirmation_height() {
532                                 if cur_height >= confirmation_height + ANTI_REORG_DELAY - 1 {
533                                         log_debug!(self.logger,
534                                                 "Pruning swept output as sufficiently confirmed via spend in transaction {:?}. Pruned descriptor: {:?}",
535                                                 o.status.latest_spending_tx().map(|t| t.txid()), o.descriptor
536                                         );
537                                         return false;
538                                 }
539                         }
540                         true
541                 });
542         }
543
544         fn persist_state(&self, sweeper_state: &SweeperState) -> Result<(), io::Error> {
545                 self.kv_store
546                         .write(
547                                 OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
548                                 OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
549                                 OUTPUT_SWEEPER_PERSISTENCE_KEY,
550                                 &sweeper_state.encode(),
551                         )
552                         .map_err(|e| {
553                                 log_error!(
554                                         self.logger,
555                                         "Write for key {}/{}/{} failed due to: {}",
556                                         OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
557                                         OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
558                                         OUTPUT_SWEEPER_PERSISTENCE_KEY,
559                                         e
560                                 );
561                                 e
562                         })
563         }
564
565         fn spend_outputs(
566                 &self, sweeper_state: &SweeperState, descriptors: Vec<&SpendableOutputDescriptor>,
567         ) -> Result<Transaction, ()> {
568                 let tx_feerate =
569                         self.fee_estimator.get_est_sat_per_1000_weight(ConfirmationTarget::OutputSpendingFee);
570                 let change_destination_script =
571                         self.change_destination_source.get_change_destination_script()?;
572                 let cur_height = sweeper_state.best_block.height;
573                 let locktime = Some(LockTime::from_height(cur_height).unwrap_or(LockTime::ZERO));
574                 self.output_spender.spend_spendable_outputs(
575                         &descriptors,
576                         Vec::new(),
577                         change_destination_script,
578                         tx_feerate,
579                         locktime,
580                         &Secp256k1::new(),
581                 )
582         }
583
584         fn transactions_confirmed_internal(
585                 &self, sweeper_state: &mut SweeperState, header: &Header,
586                 txdata: &chain::transaction::TransactionData, height: u32,
587         ) {
588                 let confirmation_hash = header.block_hash();
589                 for (_, tx) in txdata {
590                         for output_info in sweeper_state.outputs.iter_mut() {
591                                 if output_info.is_spent_in(*tx) {
592                                         output_info.status.confirmed(confirmation_hash, height, (*tx).clone())
593                                 }
594                         }
595                 }
596         }
597
598         fn best_block_updated_internal(
599                 &self, sweeper_state: &mut SweeperState, header: &Header, height: u32,
600         ) -> Option<Transaction> {
601                 sweeper_state.best_block = BestBlock::new(header.block_hash(), height);
602                 self.prune_confirmed_outputs(sweeper_state);
603                 let spending_tx_opt = self.regenerate_spend_if_necessary(sweeper_state);
604                 spending_tx_opt
605         }
606 }
607
608 impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref> Listen
609         for OutputSweeper<B, D, E, F, K, L, O>
610 where
611         B::Target: BroadcasterInterface,
612         D::Target: ChangeDestinationSource,
613         E::Target: FeeEstimator,
614         F::Target: Filter + Sync + Send,
615         K::Target: KVStore,
616         L::Target: Logger,
617         O::Target: OutputSpender,
618 {
619         fn filtered_block_connected(
620                 &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
621         ) {
622                 let mut spending_tx_opt;
623                 {
624                         let mut state_lock = self.sweeper_state.lock().unwrap();
625                         assert_eq!(state_lock.best_block.block_hash, header.prev_blockhash,
626                                 "Blocks must be connected in chain-order - the connected header must build on the last connected header");
627                         assert_eq!(state_lock.best_block.height, height - 1,
628                                 "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
629
630                         self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
631                         spending_tx_opt = self.best_block_updated_internal(&mut *state_lock, header, height);
632
633                         self.persist_state(&*state_lock).unwrap_or_else(|e| {
634                                 log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
635                                 // Skip broadcasting if the persist failed.
636                                 spending_tx_opt = None;
637                         });
638                 }
639
640                 if let Some(spending_tx) = spending_tx_opt {
641                         self.broadcaster.broadcast_transactions(&[&spending_tx]);
642                 }
643         }
644
645         fn block_disconnected(&self, header: &Header, height: u32) {
646                 let mut state_lock = self.sweeper_state.lock().unwrap();
647
648                 let new_height = height - 1;
649                 let block_hash = header.block_hash();
650
651                 assert_eq!(state_lock.best_block.block_hash, block_hash,
652                 "Blocks must be disconnected in chain-order - the disconnected header must be the last connected header");
653                 assert_eq!(state_lock.best_block.height, height,
654                         "Blocks must be disconnected in chain-order - the disconnected block must have the correct height");
655                 state_lock.best_block = BestBlock::new(header.prev_blockhash, new_height);
656
657                 for output_info in state_lock.outputs.iter_mut() {
658                         if output_info.status.confirmation_hash() == Some(block_hash) {
659                                 debug_assert_eq!(output_info.status.confirmation_height(), Some(height));
660                                 output_info.status.unconfirmed();
661                         }
662                 }
663
664                 self.persist_state(&*state_lock).unwrap_or_else(|e| {
665                         log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
666                 });
667         }
668 }
669
670 impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref> Confirm
671         for OutputSweeper<B, D, E, F, K, L, O>
672 where
673         B::Target: BroadcasterInterface,
674         D::Target: ChangeDestinationSource,
675         E::Target: FeeEstimator,
676         F::Target: Filter + Sync + Send,
677         K::Target: KVStore,
678         L::Target: Logger,
679         O::Target: OutputSpender,
680 {
681         fn transactions_confirmed(
682                 &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
683         ) {
684                 let mut state_lock = self.sweeper_state.lock().unwrap();
685                 self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
686                 self.persist_state(&*state_lock).unwrap_or_else(|e| {
687                         log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
688                 });
689         }
690
691         fn transaction_unconfirmed(&self, txid: &Txid) {
692                 let mut state_lock = self.sweeper_state.lock().unwrap();
693
694                 // Get what height was unconfirmed.
695                 let unconf_height = state_lock
696                         .outputs
697                         .iter()
698                         .find(|o| o.status.latest_spending_tx().map(|tx| tx.txid()) == Some(*txid))
699                         .and_then(|o| o.status.confirmation_height());
700
701                 if let Some(unconf_height) = unconf_height {
702                         // Unconfirm all >= this height.
703                         state_lock
704                                 .outputs
705                                 .iter_mut()
706                                 .filter(|o| o.status.confirmation_height() >= Some(unconf_height))
707                                 .for_each(|o| o.status.unconfirmed());
708
709                         self.persist_state(&*state_lock).unwrap_or_else(|e| {
710                                 log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
711                         });
712                 }
713         }
714
715         fn best_block_updated(&self, header: &Header, height: u32) {
716                 let mut spending_tx_opt;
717                 {
718                         let mut state_lock = self.sweeper_state.lock().unwrap();
719                         spending_tx_opt = self.best_block_updated_internal(&mut *state_lock, header, height);
720                         self.persist_state(&*state_lock).unwrap_or_else(|e| {
721                                 log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
722                                 // Skip broadcasting if the persist failed.
723                                 spending_tx_opt = None;
724                         });
725                 }
726
727                 if let Some(spending_tx) = spending_tx_opt {
728                         self.broadcaster.broadcast_transactions(&[&spending_tx]);
729                 }
730         }
731
732         fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
733                 let state_lock = self.sweeper_state.lock().unwrap();
734                 state_lock
735                         .outputs
736                         .iter()
737                         .filter_map(|o| match o.status {
738                                 OutputSpendStatus::PendingThresholdConfirmations {
739                                         ref latest_spending_tx,
740                                         confirmation_height,
741                                         confirmation_hash,
742                                         ..
743                                 } => Some((latest_spending_tx.txid(), confirmation_height, Some(confirmation_hash))),
744                                 _ => None,
745                         })
746                         .collect::<Vec<_>>()
747         }
748 }
749
750 #[derive(Debug, Clone)]
751 struct SweeperState {
752         outputs: Vec<TrackedSpendableOutput>,
753         best_block: BestBlock,
754 }
755
756 impl_writeable_tlv_based!(SweeperState, {
757         (0, outputs, required_vec),
758         (2, best_block, required),
759 });
760
761 /// A `enum` signalling to the [`OutputSweeper`] that it should delay spending an output until a
762 /// future block height is reached.
763 #[derive(Debug, Clone)]
764 pub enum SpendingDelay {
765         /// A relative delay indicating we shouldn't spend the output before `cur_height + num_blocks`
766         /// is reached.
767         Relative {
768                 /// The number of blocks until we'll generate and broadcast the spending transaction.
769                 num_blocks: u32,
770         },
771         /// An absolute delay indicating we shouldn't spend the output before `height` is reached.
772         Absolute {
773                 /// The height at which we'll generate and broadcast the spending transaction.
774                 height: u32,
775         },
776 }
777
778 impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
779         ReadableArgs<(B, E, Option<F>, O, D, K, L)> for OutputSweeper<B, D, E, F, K, L, O>
780 where
781         B::Target: BroadcasterInterface,
782         D::Target: ChangeDestinationSource,
783         E::Target: FeeEstimator,
784         F::Target: Filter + Sync + Send,
785         K::Target: KVStore,
786         L::Target: Logger,
787         O::Target: OutputSpender,
788 {
789         #[inline]
790         fn read<R: io::Read>(
791                 reader: &mut R, args: (B, E, Option<F>, O, D, K, L),
792         ) -> Result<Self, DecodeError> {
793                 let (
794                         broadcaster,
795                         fee_estimator,
796                         chain_data_source,
797                         output_spender,
798                         change_destination_source,
799                         kv_store,
800                         logger,
801                 ) = args;
802                 let state = SweeperState::read(reader)?;
803                 let best_block = state.best_block;
804
805                 if let Some(filter) = chain_data_source.as_ref() {
806                         for output_info in &state.outputs {
807                                 let watched_output = output_info.to_watched_output(best_block.block_hash);
808                                 filter.register_output(watched_output);
809                         }
810                 }
811
812                 let sweeper_state = Mutex::new(state);
813                 Ok(Self {
814                         sweeper_state,
815                         broadcaster,
816                         fee_estimator,
817                         chain_data_source,
818                         output_spender,
819                         change_destination_source,
820                         kv_store,
821                         logger,
822                 })
823         }
824 }
825
826 impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
827         ReadableArgs<(B, E, Option<F>, O, D, K, L)> for (BestBlock, OutputSweeper<B, D, E, F, K, L, O>)
828 where
829         B::Target: BroadcasterInterface,
830         D::Target: ChangeDestinationSource,
831         E::Target: FeeEstimator,
832         F::Target: Filter + Sync + Send,
833         K::Target: KVStore,
834         L::Target: Logger,
835         O::Target: OutputSpender,
836 {
837         #[inline]
838         fn read<R: io::Read>(
839                 reader: &mut R, args: (B, E, Option<F>, O, D, K, L),
840         ) -> Result<Self, DecodeError> {
841                 let (
842                         broadcaster,
843                         fee_estimator,
844                         chain_data_source,
845                         output_spender,
846                         change_destination_source,
847                         kv_store,
848                         logger,
849                 ) = args;
850                 let state = SweeperState::read(reader)?;
851                 let best_block = state.best_block;
852
853                 if let Some(filter) = chain_data_source.as_ref() {
854                         for output_info in &state.outputs {
855                                 let watched_output = output_info.to_watched_output(best_block.block_hash);
856                                 filter.register_output(watched_output);
857                         }
858                 }
859
860                 let sweeper_state = Mutex::new(state);
861                 Ok((
862                         best_block,
863                         OutputSweeper {
864                                 sweeper_state,
865                                 broadcaster,
866                                 fee_estimator,
867                                 chain_data_source,
868                                 output_spender,
869                                 change_destination_source,
870                                 kv_store,
871                                 logger,
872                         },
873                 ))
874         }
875 }