398216c0cef08449045d66170106abc116ca3784
[rust-lightning] / lightning / src / util / sweep.rs
1 // This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
2 // or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
4 // You may not use this file except in accordance with one or both of these
5 // licenses.
6
7 //! This module contains an [`OutputSweeper`] utility that keeps track of
8 //! [`SpendableOutputDescriptor`]s, i.e., persists them in a given [`KVStore`] and regularly retries
9 //! sweeping them.
10
11 use crate::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
12 use crate::chain::channelmonitor::ANTI_REORG_DELAY;
13 use crate::chain::{self, BestBlock, Confirm, Filter, Listen, WatchedOutput};
14 use crate::io;
15 use crate::ln::msgs::DecodeError;
16 use crate::ln::ChannelId;
17 use crate::prelude::Vec;
18 use crate::sign::{ChangeDestinationSource, OutputSpender, SpendableOutputDescriptor};
19 use crate::sync::Mutex;
20 use crate::util::logger::Logger;
21 use crate::util::persist::{
22         KVStore, OUTPUT_SWEEPER_PERSISTENCE_KEY, OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
23         OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
24 };
25 use crate::util::ser::{Readable, ReadableArgs, Writeable};
26 use crate::{impl_writeable_tlv_based, log_debug, log_error};
27
28 use bitcoin::blockdata::block::Header;
29 use bitcoin::blockdata::locktime::absolute::LockTime;
30 use bitcoin::secp256k1::Secp256k1;
31 use bitcoin::{BlockHash, Transaction, Txid};
32
33 use core::ops::Deref;
34
35 /// The state of a spendable output currently tracked by an [`OutputSweeper`].
36 #[derive(Clone, Debug, PartialEq, Eq)]
37 pub struct TrackedSpendableOutput {
38         /// The tracked output descriptor.
39         pub descriptor: SpendableOutputDescriptor,
40         /// The channel this output belongs to.
41         ///
42         /// Will be `None` if no `channel_id` was given to [`OutputSweeper::track_spendable_outputs`]
43         pub channel_id: Option<ChannelId>,
44         /// The current status of the output spend.
45         pub status: OutputSpendStatus,
46 }
47
48 impl TrackedSpendableOutput {
49         fn to_watched_output(&self, cur_hash: BlockHash) -> WatchedOutput {
50                 let block_hash = self.status.first_broadcast_hash().or(Some(cur_hash));
51                 match &self.descriptor {
52                         SpendableOutputDescriptor::StaticOutput { outpoint, output, channel_keys_id: _ } => {
53                                 WatchedOutput {
54                                         block_hash,
55                                         outpoint: *outpoint,
56                                         script_pubkey: output.script_pubkey.clone(),
57                                 }
58                         },
59                         SpendableOutputDescriptor::DelayedPaymentOutput(output) => WatchedOutput {
60                                 block_hash,
61                                 outpoint: output.outpoint,
62                                 script_pubkey: output.output.script_pubkey.clone(),
63                         },
64                         SpendableOutputDescriptor::StaticPaymentOutput(output) => WatchedOutput {
65                                 block_hash,
66                                 outpoint: output.outpoint,
67                                 script_pubkey: output.output.script_pubkey.clone(),
68                         },
69                 }
70         }
71
72         /// Returns whether the output is spent in the given transaction.
73         pub fn is_spent_in(&self, tx: &Transaction) -> bool {
74                 let prev_outpoint = match &self.descriptor {
75                         SpendableOutputDescriptor::StaticOutput { outpoint, .. } => *outpoint,
76                         SpendableOutputDescriptor::DelayedPaymentOutput(output) => output.outpoint,
77                         SpendableOutputDescriptor::StaticPaymentOutput(output) => output.outpoint,
78                 }
79                 .into_bitcoin_outpoint();
80
81                 tx.input.iter().any(|input| input.previous_output == prev_outpoint)
82         }
83 }
84
85 impl_writeable_tlv_based!(TrackedSpendableOutput, {
86         (0, descriptor, required),
87         (2, channel_id, option),
88         (4, status, required),
89 });
90
91 /// The current status of the output spend.
92 #[derive(Debug, Clone, PartialEq, Eq)]
93 pub enum OutputSpendStatus {
94         /// The output is tracked but an initial spending transaction hasn't been generated and
95         /// broadcasted yet.
96         PendingInitialBroadcast {
97                 /// The height at which we will first generate and broadcast a spending transaction.
98                 delayed_until_height: Option<u32>,
99         },
100         /// A transaction spending the output has been broadcasted but is pending its first confirmation on-chain.
101         PendingFirstConfirmation {
102                 /// The hash of the chain tip when we first broadcast a transaction spending this output.
103                 first_broadcast_hash: BlockHash,
104                 /// The best height when we last broadcast a transaction spending this output.
105                 latest_broadcast_height: u32,
106                 /// The transaction spending this output we last broadcasted.
107                 latest_spending_tx: Transaction,
108         },
109         /// A transaction spending the output has been confirmed on-chain but will be tracked until it
110         /// reaches [`ANTI_REORG_DELAY`] confirmations.
111         PendingThresholdConfirmations {
112                 /// The hash of the chain tip when we first broadcast a transaction spending this output.
113                 first_broadcast_hash: BlockHash,
114                 /// The best height when we last broadcast a transaction spending this output.
115                 latest_broadcast_height: u32,
116                 /// The transaction spending this output we saw confirmed on-chain.
117                 latest_spending_tx: Transaction,
118                 /// The height at which the spending transaction was confirmed.
119                 confirmation_height: u32,
120                 /// The hash of the block in which the spending transaction was confirmed.
121                 confirmation_hash: BlockHash,
122         },
123 }
124
125 impl OutputSpendStatus {
126         fn broadcast(&mut self, cur_hash: BlockHash, cur_height: u32, latest_spending_tx: Transaction) {
127                 match self {
128                         Self::PendingInitialBroadcast { delayed_until_height } => {
129                                 if let Some(delayed_until_height) = delayed_until_height {
130                                         debug_assert!(
131                                                 cur_height >= *delayed_until_height,
132                                                 "We should never broadcast before the required height is reached."
133                                         );
134                                 }
135                                 *self = Self::PendingFirstConfirmation {
136                                         first_broadcast_hash: cur_hash,
137                                         latest_broadcast_height: cur_height,
138                                         latest_spending_tx,
139                                 };
140                         },
141                         Self::PendingFirstConfirmation { first_broadcast_hash, .. } => {
142                                 *self = Self::PendingFirstConfirmation {
143                                         first_broadcast_hash: *first_broadcast_hash,
144                                         latest_broadcast_height: cur_height,
145                                         latest_spending_tx,
146                                 };
147                         },
148                         Self::PendingThresholdConfirmations { .. } => {
149                                 debug_assert!(false, "We should never rebroadcast confirmed transactions.");
150                         },
151                 }
152         }
153
154         fn confirmed(
155                 &mut self, confirmation_hash: BlockHash, confirmation_height: u32,
156                 latest_spending_tx: Transaction,
157         ) {
158                 match self {
159                         Self::PendingInitialBroadcast { .. } => {
160                                 // Generally we can't see any of our transactions confirmed if they haven't been
161                                 // broadcasted yet, so this should never be reachable via `transactions_confirmed`.
162                                 debug_assert!(false, "We should never confirm when we haven't broadcasted. This a bug and should never happen, please report.");
163                                 *self = Self::PendingThresholdConfirmations {
164                                         first_broadcast_hash: confirmation_hash,
165                                         latest_broadcast_height: confirmation_height,
166                                         latest_spending_tx,
167                                         confirmation_height,
168                                         confirmation_hash,
169                                 };
170                         },
171                         Self::PendingFirstConfirmation {
172                                 first_broadcast_hash,
173                                 latest_broadcast_height,
174                                 ..
175                         } => {
176                                 debug_assert!(confirmation_height >= *latest_broadcast_height);
177                                 *self = Self::PendingThresholdConfirmations {
178                                         first_broadcast_hash: *first_broadcast_hash,
179                                         latest_broadcast_height: *latest_broadcast_height,
180                                         latest_spending_tx,
181                                         confirmation_height,
182                                         confirmation_hash,
183                                 };
184                         },
185                         Self::PendingThresholdConfirmations {
186                                 first_broadcast_hash,
187                                 latest_broadcast_height,
188                                 ..
189                         } => {
190                                 *self = Self::PendingThresholdConfirmations {
191                                         first_broadcast_hash: *first_broadcast_hash,
192                                         latest_broadcast_height: *latest_broadcast_height,
193                                         latest_spending_tx,
194                                         confirmation_height,
195                                         confirmation_hash,
196                                 };
197                         },
198                 }
199         }
200
201         fn unconfirmed(&mut self) {
202                 match self {
203                         Self::PendingInitialBroadcast { .. } => {
204                                 debug_assert!(
205                                         false,
206                                         "We should only mark a spend as unconfirmed if it used to be confirmed."
207                                 );
208                         },
209                         Self::PendingFirstConfirmation { .. } => {
210                                 debug_assert!(
211                                         false,
212                                         "We should only mark a spend as unconfirmed if it used to be confirmed."
213                                 );
214                         },
215                         Self::PendingThresholdConfirmations {
216                                 first_broadcast_hash,
217                                 latest_broadcast_height,
218                                 latest_spending_tx,
219                                 ..
220                         } => {
221                                 *self = Self::PendingFirstConfirmation {
222                                         first_broadcast_hash: *first_broadcast_hash,
223                                         latest_broadcast_height: *latest_broadcast_height,
224                                         latest_spending_tx: latest_spending_tx.clone(),
225                                 };
226                         },
227                 }
228         }
229
230         fn is_delayed(&self, cur_height: u32) -> bool {
231                 match self {
232                         Self::PendingInitialBroadcast { delayed_until_height } => {
233                                 delayed_until_height.map_or(false, |req_height| cur_height < req_height)
234                         },
235                         Self::PendingFirstConfirmation { .. } => false,
236                         Self::PendingThresholdConfirmations { .. } => false,
237                 }
238         }
239
240         fn first_broadcast_hash(&self) -> Option<BlockHash> {
241                 match self {
242                         Self::PendingInitialBroadcast { .. } => None,
243                         Self::PendingFirstConfirmation { first_broadcast_hash, .. } => {
244                                 Some(*first_broadcast_hash)
245                         },
246                         Self::PendingThresholdConfirmations { first_broadcast_hash, .. } => {
247                                 Some(*first_broadcast_hash)
248                         },
249                 }
250         }
251
252         fn latest_broadcast_height(&self) -> Option<u32> {
253                 match self {
254                         Self::PendingInitialBroadcast { .. } => None,
255                         Self::PendingFirstConfirmation { latest_broadcast_height, .. } => {
256                                 Some(*latest_broadcast_height)
257                         },
258                         Self::PendingThresholdConfirmations { latest_broadcast_height, .. } => {
259                                 Some(*latest_broadcast_height)
260                         },
261                 }
262         }
263
264         fn confirmation_height(&self) -> Option<u32> {
265                 match self {
266                         Self::PendingInitialBroadcast { .. } => None,
267                         Self::PendingFirstConfirmation { .. } => None,
268                         Self::PendingThresholdConfirmations { confirmation_height, .. } => {
269                                 Some(*confirmation_height)
270                         },
271                 }
272         }
273
274         fn confirmation_hash(&self) -> Option<BlockHash> {
275                 match self {
276                         Self::PendingInitialBroadcast { .. } => None,
277                         Self::PendingFirstConfirmation { .. } => None,
278                         Self::PendingThresholdConfirmations { confirmation_hash, .. } => {
279                                 Some(*confirmation_hash)
280                         },
281                 }
282         }
283
284         fn latest_spending_tx(&self) -> Option<&Transaction> {
285                 match self {
286                         Self::PendingInitialBroadcast { .. } => None,
287                         Self::PendingFirstConfirmation { latest_spending_tx, .. } => Some(latest_spending_tx),
288                         Self::PendingThresholdConfirmations { latest_spending_tx, .. } => {
289                                 Some(latest_spending_tx)
290                         },
291                 }
292         }
293
294         fn is_confirmed(&self) -> bool {
295                 match self {
296                         Self::PendingInitialBroadcast { .. } => false,
297                         Self::PendingFirstConfirmation { .. } => false,
298                         Self::PendingThresholdConfirmations { .. } => true,
299                 }
300         }
301 }
302
303 impl_writeable_tlv_based_enum!(OutputSpendStatus,
304         (0, PendingInitialBroadcast) => {
305                 (0, delayed_until_height, option),
306         },
307         (2, PendingFirstConfirmation) => {
308                 (0, first_broadcast_hash, required),
309                 (2, latest_broadcast_height, required),
310                 (4, latest_spending_tx, required),
311         },
312         (4, PendingThresholdConfirmations) => {
313                 (0, first_broadcast_hash, required),
314                 (2, latest_broadcast_height, required),
315                 (4, latest_spending_tx, required),
316                 (6, confirmation_height, required),
317                 (8, confirmation_hash, required),
318         };
319 );
320
321 /// A utility that keeps track of [`SpendableOutputDescriptor`]s, persists them in a given
322 /// [`KVStore`] and regularly retries sweeping them based on a callback given to the constructor
323 /// methods.
324 ///
325 /// Users should call [`Self::track_spendable_outputs`] for any [`SpendableOutputDescriptor`]s received via [`Event::SpendableOutputs`].
326 ///
327 /// This needs to be notified of chain state changes either via its [`Listen`] or [`Confirm`]
328 /// implementation and hence has to be connected with the utilized chain data sources.
329 ///
330 /// If chain data is provided via the [`Confirm`] interface or via filtered blocks, users are
331 /// required to give their chain data sources (i.e., [`Filter`] implementation) to the respective
332 /// constructor.
333 ///
334 /// [`Event::SpendableOutputs`]: crate::events::Event::SpendableOutputs
335 pub struct OutputSweeper<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
336 where
337         B::Target: BroadcasterInterface,
338         D::Target: ChangeDestinationSource,
339         E::Target: FeeEstimator,
340         F::Target: Filter + Sync + Send,
341         K::Target: KVStore,
342         L::Target: Logger,
343         O::Target: OutputSpender,
344 {
345         sweeper_state: Mutex<SweeperState>,
346         broadcaster: B,
347         fee_estimator: E,
348         chain_data_source: Option<F>,
349         output_spender: O,
350         change_destination_source: D,
351         kv_store: K,
352         logger: L,
353 }
354
355 impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
356         OutputSweeper<B, D, E, F, K, L, O>
357 where
358         B::Target: BroadcasterInterface,
359         D::Target: ChangeDestinationSource,
360         E::Target: FeeEstimator,
361         F::Target: Filter + Sync + Send,
362         K::Target: KVStore,
363         L::Target: Logger,
364         O::Target: OutputSpender,
365 {
366         /// Constructs a new [`OutputSweeper`].
367         ///
368         /// If chain data is provided via the [`Confirm`] interface or via filtered blocks, users also
369         /// need to register their [`Filter`] implementation via the given `chain_data_source`.
370         pub fn new(
371                 best_block: BestBlock, broadcaster: B, fee_estimator: E, chain_data_source: Option<F>,
372                 output_spender: O, change_destination_source: D, kv_store: K, logger: L,
373         ) -> Self {
374                 let outputs = Vec::new();
375                 let sweeper_state = Mutex::new(SweeperState { outputs, best_block });
376                 Self {
377                         sweeper_state,
378                         broadcaster,
379                         fee_estimator,
380                         chain_data_source,
381                         output_spender,
382                         change_destination_source,
383                         kv_store,
384                         logger,
385                 }
386         }
387
388         /// Tells the sweeper to track the given outputs descriptors.
389         ///
390         /// Usually, this should be called based on the values emitted by the
391         /// [`Event::SpendableOutputs`].
392         ///
393         /// The given `exclude_static_outputs` flag controls whether the sweeper will filter out
394         /// [`SpendableOutputDescriptor::StaticOutput`]s, which may be handled directly by the on-chain
395         /// wallet implementation.
396         ///
397         /// If `delay_until_height` is set, we will delay the spending until the respective block
398         /// height is reached. This can be used to batch spends, e.g., to reduce on-chain fees.
399         ///
400         /// Returns `Err` on persistence failure, in which case the call may be safely retried.
401         ///
402         /// [`Event::SpendableOutputs`]: crate::events::Event::SpendableOutputs
403         pub fn track_spendable_outputs(
404                 &self, output_descriptors: Vec<SpendableOutputDescriptor>, channel_id: Option<ChannelId>,
405                 exclude_static_outputs: bool, delay_until_height: Option<u32>,
406         ) -> Result<(), ()> {
407                 let mut relevant_descriptors = output_descriptors
408                         .into_iter()
409                         .filter(|desc| {
410                                 !(exclude_static_outputs
411                                         && matches!(desc, SpendableOutputDescriptor::StaticOutput { .. }))
412                         })
413                         .peekable();
414
415                 if relevant_descriptors.peek().is_none() {
416                         return Ok(());
417                 }
418
419                 let spending_tx_opt;
420                 {
421                         let mut state_lock = self.sweeper_state.lock().unwrap();
422                         for descriptor in relevant_descriptors {
423                                 let output_info = TrackedSpendableOutput {
424                                         descriptor,
425                                         channel_id,
426                                         status: OutputSpendStatus::PendingInitialBroadcast {
427                                                 delayed_until_height: delay_until_height,
428                                         },
429                                 };
430
431                                 if state_lock
432                                         .outputs
433                                         .iter()
434                                         .find(|o| o.descriptor == output_info.descriptor)
435                                         .is_some()
436                                 {
437                                         continue;
438                                 }
439
440                                 state_lock.outputs.push(output_info);
441                         }
442                         spending_tx_opt = self.regenerate_spend_if_necessary(&mut *state_lock);
443                         self.persist_state(&*state_lock).map_err(|e| {
444                                 log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
445                         })?;
446                 }
447
448                 if let Some(spending_tx) = spending_tx_opt {
449                         self.broadcaster.broadcast_transactions(&[&spending_tx]);
450                 }
451
452                 Ok(())
453         }
454
455         /// Returns a list of the currently tracked spendable outputs.
456         pub fn tracked_spendable_outputs(&self) -> Vec<TrackedSpendableOutput> {
457                 self.sweeper_state.lock().unwrap().outputs.clone()
458         }
459
460         /// Gets the latest best block which was connected either via the [`Listen`] or
461         /// [`Confirm`] interfaces.
462         pub fn current_best_block(&self) -> BestBlock {
463                 self.sweeper_state.lock().unwrap().best_block
464         }
465
466         fn regenerate_spend_if_necessary(
467                 &self, sweeper_state: &mut SweeperState,
468         ) -> Option<Transaction> {
469                 let cur_height = sweeper_state.best_block.height;
470                 let cur_hash = sweeper_state.best_block.block_hash;
471                 let filter_fn = |o: &TrackedSpendableOutput| {
472                         if o.status.is_confirmed() {
473                                 // Don't rebroadcast confirmed txs.
474                                 return false;
475                         }
476
477                         if o.status.is_delayed(cur_height) {
478                                 // Don't generate and broadcast if still delayed
479                                 return false;
480                         }
481
482                         if o.status.latest_broadcast_height() >= Some(cur_height) {
483                                 // Only broadcast once per block height.
484                                 return false;
485                         }
486
487                         true
488                 };
489
490                 let respend_descriptors: Vec<&SpendableOutputDescriptor> =
491                         sweeper_state.outputs.iter().filter(|o| filter_fn(*o)).map(|o| &o.descriptor).collect();
492
493                 if respend_descriptors.is_empty() {
494                         // Nothing to do.
495                         return None;
496                 }
497
498                 let spending_tx = match self.spend_outputs(&*sweeper_state, respend_descriptors) {
499                         Ok(spending_tx) => {
500                                 log_debug!(
501                                         self.logger,
502                                         "Generating and broadcasting sweeping transaction {}",
503                                         spending_tx.txid()
504                                 );
505                                 spending_tx
506                         },
507                         Err(e) => {
508                                 log_error!(self.logger, "Error spending outputs: {:?}", e);
509                                 return None;
510                         },
511                 };
512
513                 // As we didn't modify the state so far, the same filter_fn yields the same elements as
514                 // above.
515                 let respend_outputs = sweeper_state.outputs.iter_mut().filter(|o| filter_fn(&**o));
516                 for output_info in respend_outputs {
517                         if let Some(filter) = self.chain_data_source.as_ref() {
518                                 let watched_output = output_info.to_watched_output(cur_hash);
519                                 filter.register_output(watched_output);
520                         }
521
522                         output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone());
523                 }
524
525                 Some(spending_tx)
526         }
527
528         fn prune_confirmed_outputs(&self, sweeper_state: &mut SweeperState) {
529                 let cur_height = sweeper_state.best_block.height;
530
531                 // Prune all outputs that have sufficient depth by now.
532                 sweeper_state.outputs.retain(|o| {
533                         if let Some(confirmation_height) = o.status.confirmation_height() {
534                                 if cur_height >= confirmation_height + ANTI_REORG_DELAY - 1 {
535                                         log_debug!(self.logger,
536                                                 "Pruning swept output as sufficiently confirmed via spend in transaction {:?}. Pruned descriptor: {:?}",
537                                                 o.status.latest_spending_tx().map(|t| t.txid()), o.descriptor
538                                         );
539                                         return false;
540                                 }
541                         }
542                         true
543                 });
544         }
545
546         fn persist_state(&self, sweeper_state: &SweeperState) -> Result<(), io::Error> {
547                 self.kv_store
548                         .write(
549                                 OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
550                                 OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
551                                 OUTPUT_SWEEPER_PERSISTENCE_KEY,
552                                 &sweeper_state.encode(),
553                         )
554                         .map_err(|e| {
555                                 log_error!(
556                                         self.logger,
557                                         "Write for key {}/{}/{} failed due to: {}",
558                                         OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
559                                         OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
560                                         OUTPUT_SWEEPER_PERSISTENCE_KEY,
561                                         e
562                                 );
563                                 e
564                         })
565         }
566
567         fn spend_outputs(
568                 &self, sweeper_state: &SweeperState, descriptors: Vec<&SpendableOutputDescriptor>,
569         ) -> Result<Transaction, ()> {
570                 let tx_feerate =
571                         self.fee_estimator.get_est_sat_per_1000_weight(ConfirmationTarget::OutputSpendingFee);
572                 let change_destination_script =
573                         self.change_destination_source.get_change_destination_script()?;
574                 let cur_height = sweeper_state.best_block.height;
575                 let locktime = Some(LockTime::from_height(cur_height).unwrap_or(LockTime::ZERO));
576                 self.output_spender.spend_spendable_outputs(
577                         &descriptors,
578                         Vec::new(),
579                         change_destination_script,
580                         tx_feerate,
581                         locktime,
582                         &Secp256k1::new(),
583                 )
584         }
585
586         fn transactions_confirmed_internal(
587                 &self, sweeper_state: &mut SweeperState, header: &Header,
588                 txdata: &chain::transaction::TransactionData, height: u32,
589         ) {
590                 let confirmation_hash = header.block_hash();
591                 for (_, tx) in txdata {
592                         for output_info in sweeper_state.outputs.iter_mut() {
593                                 if output_info.is_spent_in(*tx) {
594                                         output_info.status.confirmed(confirmation_hash, height, (*tx).clone())
595                                 }
596                         }
597                 }
598         }
599
600         fn best_block_updated_internal(
601                 &self, sweeper_state: &mut SweeperState, header: &Header, height: u32,
602         ) -> Option<Transaction> {
603                 sweeper_state.best_block = BestBlock::new(header.block_hash(), height);
604                 self.prune_confirmed_outputs(sweeper_state);
605                 let spending_tx_opt = self.regenerate_spend_if_necessary(sweeper_state);
606                 spending_tx_opt
607         }
608 }
609
610 impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref> Listen
611         for OutputSweeper<B, D, E, F, K, L, O>
612 where
613         B::Target: BroadcasterInterface,
614         D::Target: ChangeDestinationSource,
615         E::Target: FeeEstimator,
616         F::Target: Filter + Sync + Send,
617         K::Target: KVStore,
618         L::Target: Logger,
619         O::Target: OutputSpender,
620 {
621         fn filtered_block_connected(
622                 &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
623         ) {
624                 let mut spending_tx_opt;
625                 {
626                         let mut state_lock = self.sweeper_state.lock().unwrap();
627                         assert_eq!(state_lock.best_block.block_hash, header.prev_blockhash,
628                                 "Blocks must be connected in chain-order - the connected header must build on the last connected header");
629                         assert_eq!(state_lock.best_block.height, height - 1,
630                                 "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
631
632                         self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
633                         spending_tx_opt = self.best_block_updated_internal(&mut *state_lock, header, height);
634
635                         self.persist_state(&*state_lock).unwrap_or_else(|e| {
636                                 log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
637                                 // Skip broadcasting if the persist failed.
638                                 spending_tx_opt = None;
639                         });
640                 }
641
642                 if let Some(spending_tx) = spending_tx_opt {
643                         self.broadcaster.broadcast_transactions(&[&spending_tx]);
644                 }
645         }
646
647         fn block_disconnected(&self, header: &Header, height: u32) {
648                 let mut state_lock = self.sweeper_state.lock().unwrap();
649
650                 let new_height = height - 1;
651                 let block_hash = header.block_hash();
652
653                 assert_eq!(state_lock.best_block.block_hash, block_hash,
654                 "Blocks must be disconnected in chain-order - the disconnected header must be the last connected header");
655                 assert_eq!(state_lock.best_block.height, height,
656                         "Blocks must be disconnected in chain-order - the disconnected block must have the correct height");
657                 state_lock.best_block = BestBlock::new(header.prev_blockhash, new_height);
658
659                 for output_info in state_lock.outputs.iter_mut() {
660                         if output_info.status.confirmation_hash() == Some(block_hash) {
661                                 debug_assert_eq!(output_info.status.confirmation_height(), Some(height));
662                                 output_info.status.unconfirmed();
663                         }
664                 }
665
666                 self.persist_state(&*state_lock).unwrap_or_else(|e| {
667                         log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
668                 });
669         }
670 }
671
672 impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref> Confirm
673         for OutputSweeper<B, D, E, F, K, L, O>
674 where
675         B::Target: BroadcasterInterface,
676         D::Target: ChangeDestinationSource,
677         E::Target: FeeEstimator,
678         F::Target: Filter + Sync + Send,
679         K::Target: KVStore,
680         L::Target: Logger,
681         O::Target: OutputSpender,
682 {
683         fn transactions_confirmed(
684                 &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
685         ) {
686                 let mut state_lock = self.sweeper_state.lock().unwrap();
687                 self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
688                 self.persist_state(&*state_lock).unwrap_or_else(|e| {
689                         log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
690                 });
691         }
692
693         fn transaction_unconfirmed(&self, txid: &Txid) {
694                 let mut state_lock = self.sweeper_state.lock().unwrap();
695
696                 // Get what height was unconfirmed.
697                 let unconf_height = state_lock
698                         .outputs
699                         .iter()
700                         .find(|o| o.status.latest_spending_tx().map(|tx| tx.txid()) == Some(*txid))
701                         .and_then(|o| o.status.confirmation_height());
702
703                 if let Some(unconf_height) = unconf_height {
704                         // Unconfirm all >= this height.
705                         state_lock
706                                 .outputs
707                                 .iter_mut()
708                                 .filter(|o| o.status.confirmation_height() >= Some(unconf_height))
709                                 .for_each(|o| o.status.unconfirmed());
710
711                         self.persist_state(&*state_lock).unwrap_or_else(|e| {
712                                 log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
713                         });
714                 }
715         }
716
717         fn best_block_updated(&self, header: &Header, height: u32) {
718                 let mut spending_tx_opt;
719                 {
720                         let mut state_lock = self.sweeper_state.lock().unwrap();
721                         spending_tx_opt = self.best_block_updated_internal(&mut *state_lock, header, height);
722                         self.persist_state(&*state_lock).unwrap_or_else(|e| {
723                                 log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
724                                 // Skip broadcasting if the persist failed.
725                                 spending_tx_opt = None;
726                         });
727                 }
728
729                 if let Some(spending_tx) = spending_tx_opt {
730                         self.broadcaster.broadcast_transactions(&[&spending_tx]);
731                 }
732         }
733
734         fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
735                 let state_lock = self.sweeper_state.lock().unwrap();
736                 state_lock
737                         .outputs
738                         .iter()
739                         .filter_map(|o| match o.status {
740                                 OutputSpendStatus::PendingThresholdConfirmations {
741                                         ref latest_spending_tx,
742                                         confirmation_height,
743                                         confirmation_hash,
744                                         ..
745                                 } => Some((latest_spending_tx.txid(), confirmation_height, Some(confirmation_hash))),
746                                 _ => None,
747                         })
748                         .collect::<Vec<_>>()
749         }
750 }
751
752 #[derive(Debug, Clone)]
753 struct SweeperState {
754         outputs: Vec<TrackedSpendableOutput>,
755         best_block: BestBlock,
756 }
757
758 impl_writeable_tlv_based!(SweeperState, {
759         (0, outputs, required_vec),
760         (2, best_block, required),
761 });
762
763 /// A `enum` signalling to the [`OutputSweeper`] that it should delay spending an output until a
764 /// future block height is reached.
765 #[derive(Debug, Clone)]
766 pub enum SpendingDelay {
767         /// A relative delay indicating we shouldn't spend the output before `cur_height + num_blocks`
768         /// is reached.
769         Relative {
770                 /// The number of blocks until we'll generate and broadcast the spending transaction.
771                 num_blocks: u32,
772         },
773         /// An absolute delay indicating we shouldn't spend the output before `height` is reached.
774         Absolute {
775                 /// The height at which we'll generate and broadcast the spending transaction.
776                 height: u32,
777         },
778 }
779
780 impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
781         ReadableArgs<(B, E, Option<F>, O, D, K, L)> for OutputSweeper<B, D, E, F, K, L, O>
782 where
783         B::Target: BroadcasterInterface,
784         D::Target: ChangeDestinationSource,
785         E::Target: FeeEstimator,
786         F::Target: Filter + Sync + Send,
787         K::Target: KVStore,
788         L::Target: Logger,
789         O::Target: OutputSpender,
790 {
791         #[inline]
792         fn read<R: io::Read>(
793                 reader: &mut R, args: (B, E, Option<F>, O, D, K, L),
794         ) -> Result<Self, DecodeError> {
795                 let (
796                         broadcaster,
797                         fee_estimator,
798                         chain_data_source,
799                         output_spender,
800                         change_destination_source,
801                         kv_store,
802                         logger,
803                 ) = args;
804                 let state = SweeperState::read(reader)?;
805                 let best_block = state.best_block;
806
807                 if let Some(filter) = chain_data_source.as_ref() {
808                         for output_info in &state.outputs {
809                                 let watched_output = output_info.to_watched_output(best_block.block_hash);
810                                 filter.register_output(watched_output);
811                         }
812                 }
813
814                 let sweeper_state = Mutex::new(state);
815                 Ok(Self {
816                         sweeper_state,
817                         broadcaster,
818                         fee_estimator,
819                         chain_data_source,
820                         output_spender,
821                         change_destination_source,
822                         kv_store,
823                         logger,
824                 })
825         }
826 }
827
828 impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
829         ReadableArgs<(B, E, Option<F>, O, D, K, L)> for (BestBlock, OutputSweeper<B, D, E, F, K, L, O>)
830 where
831         B::Target: BroadcasterInterface,
832         D::Target: ChangeDestinationSource,
833         E::Target: FeeEstimator,
834         F::Target: Filter + Sync + Send,
835         K::Target: KVStore,
836         L::Target: Logger,
837         O::Target: OutputSpender,
838 {
839         #[inline]
840         fn read<R: io::Read>(
841                 reader: &mut R, args: (B, E, Option<F>, O, D, K, L),
842         ) -> Result<Self, DecodeError> {
843                 let (
844                         broadcaster,
845                         fee_estimator,
846                         chain_data_source,
847                         output_spender,
848                         change_destination_source,
849                         kv_store,
850                         logger,
851                 ) = args;
852                 let state = SweeperState::read(reader)?;
853                 let best_block = state.best_block;
854
855                 if let Some(filter) = chain_data_source.as_ref() {
856                         for output_info in &state.outputs {
857                                 let watched_output = output_info.to_watched_output(best_block.block_hash);
858                                 filter.register_output(watched_output);
859                         }
860                 }
861
862                 let sweeper_state = Mutex::new(state);
863                 Ok((
864                         best_block,
865                         OutputSweeper {
866                                 sweeper_state,
867                                 broadcaster,
868                                 fee_estimator,
869                                 chain_data_source,
870                                 output_spender,
871                                 change_destination_source,
872                                 kv_store,
873                                 logger,
874                         },
875                 ))
876         }
877 }