a8393f70e96c6f6be38bfabea02a69b80a82c291
[rust-lightning] / lightning / src / util / sweep.rs
1 // This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
2 // or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
3 // <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
4 // You may not use this file except in accordance with one or both of these
5 // licenses.
6
7 //! This module contains an [`OutputSweeper`] utility that keeps track of
8 //! [`SpendableOutputDescriptor`]s, i.e., persists them in a given [`KVStore`] and regularly retries
9 //! sweeping them.
10
11 use crate::chain::chaininterface::{BroadcasterInterface, ConfirmationTarget, FeeEstimator};
12 use crate::chain::channelmonitor::ANTI_REORG_DELAY;
13 use crate::chain::{self, BestBlock, Confirm, Filter, Listen, WatchedOutput};
14 use crate::io;
15 use crate::ln::msgs::DecodeError;
16 use crate::ln::ChannelId;
17 use crate::prelude::Vec;
18 use crate::sign::{ChangeDestinationSource, OutputSpender, SpendableOutputDescriptor};
19 use crate::sync::Mutex;
20 use crate::util::logger::Logger;
21 use crate::util::persist::{
22         KVStore, OUTPUT_SWEEPER_PERSISTENCE_KEY, OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
23         OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
24 };
25 use crate::util::ser::{Readable, ReadableArgs, Writeable};
26 use crate::{impl_writeable_tlv_based, log_debug, log_error};
27
28 use bitcoin::blockdata::block::Header;
29 use bitcoin::blockdata::locktime::absolute::LockTime;
30 use bitcoin::secp256k1::Secp256k1;
31 use bitcoin::{BlockHash, Transaction, Txid};
32
33 use core::ops::Deref;
34
35 /// The state of a spendable output currently tracked by an [`OutputSweeper`].
36 #[derive(Clone, Debug, PartialEq, Eq)]
37 pub struct TrackedSpendableOutput {
38         /// The tracked output descriptor.
39         pub descriptor: SpendableOutputDescriptor,
40         /// The channel this output belongs to.
41         ///
42         /// Will be `None` if no `channel_id` was given to [`OutputSweeper::track_spendable_outputs`]
43         pub channel_id: Option<ChannelId>,
44         /// The current status of the output spend.
45         pub status: OutputSpendStatus,
46 }
47
48 impl TrackedSpendableOutput {
49         fn to_watched_output(&self, cur_hash: BlockHash) -> WatchedOutput {
50                 let block_hash = self.status.first_broadcast_hash().or(Some(cur_hash));
51                 match &self.descriptor {
52                         SpendableOutputDescriptor::StaticOutput { outpoint, output, channel_keys_id: _ } => {
53                                 WatchedOutput {
54                                         block_hash,
55                                         outpoint: *outpoint,
56                                         script_pubkey: output.script_pubkey.clone(),
57                                 }
58                         },
59                         SpendableOutputDescriptor::DelayedPaymentOutput(output) => WatchedOutput {
60                                 block_hash,
61                                 outpoint: output.outpoint,
62                                 script_pubkey: output.output.script_pubkey.clone(),
63                         },
64                         SpendableOutputDescriptor::StaticPaymentOutput(output) => WatchedOutput {
65                                 block_hash,
66                                 outpoint: output.outpoint,
67                                 script_pubkey: output.output.script_pubkey.clone(),
68                         },
69                 }
70         }
71
72         fn is_spent_in(&self, tx: &Transaction) -> bool {
73                 let prev_outpoint = match &self.descriptor {
74                         SpendableOutputDescriptor::StaticOutput { outpoint, .. } => *outpoint,
75                         SpendableOutputDescriptor::DelayedPaymentOutput(output) => output.outpoint,
76                         SpendableOutputDescriptor::StaticPaymentOutput(output) => output.outpoint,
77                 }
78                 .into_bitcoin_outpoint();
79
80                 tx.input.iter().any(|input| input.previous_output == prev_outpoint)
81         }
82 }
83
84 impl_writeable_tlv_based!(TrackedSpendableOutput, {
85         (0, descriptor, required),
86         (2, channel_id, option),
87         (4, status, required),
88 });
89
90 /// The current status of the output spend.
91 #[derive(Debug, Clone, PartialEq, Eq)]
92 pub enum OutputSpendStatus {
93         /// The output is tracked but an initial spending transaction hasn't been generated and
94         /// broadcasted yet.
95         PendingInitialBroadcast,
96         /// A transaction spending the output has been broadcasted but is pending its first confirmation on-chain.
97         PendingFirstConfirmation {
98                 /// The hash of the chain tip when we first broadcast a transaction spending this output.
99                 first_broadcast_hash: BlockHash,
100                 /// The best height when we last broadcast a transaction spending this output.
101                 latest_broadcast_height: u32,
102                 /// The transaction spending this output we last broadcasted.
103                 latest_spending_tx: Transaction,
104         },
105         /// A transaction spending the output has been confirmed on-chain but will be tracked until it
106         /// reaches [`ANTI_REORG_DELAY`] confirmations.
107         PendingThresholdConfirmations {
108                 /// The hash of the chain tip when we first broadcast a transaction spending this output.
109                 first_broadcast_hash: BlockHash,
110                 /// The best height when we last broadcast a transaction spending this output.
111                 latest_broadcast_height: u32,
112                 /// The transaction spending this output we saw confirmed on-chain.
113                 latest_spending_tx: Transaction,
114                 /// The height at which the spending transaction was confirmed.
115                 confirmation_height: u32,
116                 /// The hash of the block in which the spending transaction was confirmed.
117                 confirmation_hash: BlockHash,
118         },
119 }
120
121 impl OutputSpendStatus {
122         fn broadcast(&mut self, cur_hash: BlockHash, cur_height: u32, latest_spending_tx: Transaction) {
123                 match self {
124                         Self::PendingInitialBroadcast => {
125                                 *self = Self::PendingFirstConfirmation {
126                                         first_broadcast_hash: cur_hash,
127                                         latest_broadcast_height: cur_height,
128                                         latest_spending_tx,
129                                 };
130                         },
131                         Self::PendingFirstConfirmation { first_broadcast_hash, .. } => {
132                                 *self = Self::PendingFirstConfirmation {
133                                         first_broadcast_hash: *first_broadcast_hash,
134                                         latest_broadcast_height: cur_height,
135                                         latest_spending_tx,
136                                 };
137                         },
138                         Self::PendingThresholdConfirmations { .. } => {
139                                 debug_assert!(false, "We should never rebroadcast confirmed transactions.");
140                         },
141                 }
142         }
143
144         fn confirmed(
145                 &mut self, confirmation_hash: BlockHash, confirmation_height: u32,
146                 latest_spending_tx: Transaction,
147         ) {
148                 match self {
149                         Self::PendingInitialBroadcast => {
150                                 // Generally we can't see any of our transactions confirmed if they haven't been
151                                 // broadcasted yet, so this should never be reachable via `transactions_confirmed`.
152                                 debug_assert!(false, "We should never confirm when we haven't broadcasted. This a bug and should never happen, please report.");
153                                 *self = Self::PendingThresholdConfirmations {
154                                         first_broadcast_hash: confirmation_hash,
155                                         latest_broadcast_height: confirmation_height,
156                                         latest_spending_tx,
157                                         confirmation_height,
158                                         confirmation_hash,
159                                 };
160                         },
161                         Self::PendingFirstConfirmation {
162                                 first_broadcast_hash,
163                                 latest_broadcast_height,
164                                 ..
165                         } => {
166                                 debug_assert!(confirmation_height >= *latest_broadcast_height);
167                                 *self = Self::PendingThresholdConfirmations {
168                                         first_broadcast_hash: *first_broadcast_hash,
169                                         latest_broadcast_height: *latest_broadcast_height,
170                                         latest_spending_tx,
171                                         confirmation_height,
172                                         confirmation_hash,
173                                 };
174                         },
175                         Self::PendingThresholdConfirmations {
176                                 first_broadcast_hash,
177                                 latest_broadcast_height,
178                                 ..
179                         } => {
180                                 *self = Self::PendingThresholdConfirmations {
181                                         first_broadcast_hash: *first_broadcast_hash,
182                                         latest_broadcast_height: *latest_broadcast_height,
183                                         latest_spending_tx,
184                                         confirmation_height,
185                                         confirmation_hash,
186                                 };
187                         },
188                 }
189         }
190
191         fn unconfirmed(&mut self) {
192                 match self {
193                         Self::PendingInitialBroadcast => {
194                                 debug_assert!(
195                                         false,
196                                         "We should only mark a spend as unconfirmed if it used to be confirmed."
197                                 );
198                         },
199                         Self::PendingFirstConfirmation { .. } => {
200                                 debug_assert!(
201                                         false,
202                                         "We should only mark a spend as unconfirmed if it used to be confirmed."
203                                 );
204                         },
205                         Self::PendingThresholdConfirmations {
206                                 first_broadcast_hash,
207                                 latest_broadcast_height,
208                                 latest_spending_tx,
209                                 ..
210                         } => {
211                                 *self = Self::PendingFirstConfirmation {
212                                         first_broadcast_hash: *first_broadcast_hash,
213                                         latest_broadcast_height: *latest_broadcast_height,
214                                         latest_spending_tx: latest_spending_tx.clone(),
215                                 };
216                         },
217                 }
218         }
219
220         fn first_broadcast_hash(&self) -> Option<BlockHash> {
221                 match self {
222                         Self::PendingInitialBroadcast => None,
223                         Self::PendingFirstConfirmation { first_broadcast_hash, .. } => {
224                                 Some(*first_broadcast_hash)
225                         },
226                         Self::PendingThresholdConfirmations { first_broadcast_hash, .. } => {
227                                 Some(*first_broadcast_hash)
228                         },
229                 }
230         }
231
232         fn latest_broadcast_height(&self) -> Option<u32> {
233                 match self {
234                         Self::PendingInitialBroadcast => None,
235                         Self::PendingFirstConfirmation { latest_broadcast_height, .. } => {
236                                 Some(*latest_broadcast_height)
237                         },
238                         Self::PendingThresholdConfirmations { latest_broadcast_height, .. } => {
239                                 Some(*latest_broadcast_height)
240                         },
241                 }
242         }
243
244         fn confirmation_height(&self) -> Option<u32> {
245                 match self {
246                         Self::PendingInitialBroadcast => None,
247                         Self::PendingFirstConfirmation { .. } => None,
248                         Self::PendingThresholdConfirmations { confirmation_height, .. } => {
249                                 Some(*confirmation_height)
250                         },
251                 }
252         }
253
254         fn confirmation_hash(&self) -> Option<BlockHash> {
255                 match self {
256                         Self::PendingInitialBroadcast => None,
257                         Self::PendingFirstConfirmation { .. } => None,
258                         Self::PendingThresholdConfirmations { confirmation_hash, .. } => {
259                                 Some(*confirmation_hash)
260                         },
261                 }
262         }
263
264         fn latest_spending_tx(&self) -> Option<&Transaction> {
265                 match self {
266                         Self::PendingInitialBroadcast => None,
267                         Self::PendingFirstConfirmation { latest_spending_tx, .. } => Some(latest_spending_tx),
268                         Self::PendingThresholdConfirmations { latest_spending_tx, .. } => {
269                                 Some(latest_spending_tx)
270                         },
271                 }
272         }
273
274         fn is_confirmed(&self) -> bool {
275                 match self {
276                         Self::PendingInitialBroadcast => false,
277                         Self::PendingFirstConfirmation { .. } => false,
278                         Self::PendingThresholdConfirmations { .. } => true,
279                 }
280         }
281 }
282
283 impl_writeable_tlv_based_enum!(OutputSpendStatus,
284         (0, PendingInitialBroadcast) => {},
285         (2, PendingFirstConfirmation) => {
286                 (0, first_broadcast_hash, required),
287                 (2, latest_broadcast_height, required),
288                 (4, latest_spending_tx, required),
289         },
290         (4, PendingThresholdConfirmations) => {
291                 (0, first_broadcast_hash, required),
292                 (2, latest_broadcast_height, required),
293                 (4, latest_spending_tx, required),
294                 (6, confirmation_height, required),
295                 (8, confirmation_hash, required),
296         };
297 );
298
299 /// A utility that keeps track of [`SpendableOutputDescriptor`]s, persists them in a given
300 /// [`KVStore`] and regularly retries sweeping them based on a callback given to the constructor
301 /// methods.
302 ///
303 /// Users should call [`Self::track_spendable_outputs`] for any [`SpendableOutputDescriptor`]s received via [`Event::SpendableOutputs`].
304 ///
305 /// This needs to be notified of chain state changes either via its [`Listen`] or [`Confirm`]
306 /// implementation and hence has to be connected with the utilized chain data sources.
307 ///
308 /// If chain data is provided via the [`Confirm`] interface or via filtered blocks, users are
309 /// required to give their chain data sources (i.e., [`Filter`] implementation) to the respective
310 /// constructor.
311 ///
312 /// [`Event::SpendableOutputs`]: crate::events::Event::SpendableOutputs
313 pub struct OutputSweeper<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
314 where
315         B::Target: BroadcasterInterface,
316         D::Target: ChangeDestinationSource,
317         E::Target: FeeEstimator,
318         F::Target: Filter + Sync + Send,
319         K::Target: KVStore,
320         L::Target: Logger,
321         O::Target: OutputSpender,
322 {
323         sweeper_state: Mutex<SweeperState>,
324         broadcaster: B,
325         fee_estimator: E,
326         chain_data_source: Option<F>,
327         output_spender: O,
328         change_destination_source: D,
329         kv_store: K,
330         logger: L,
331 }
332
333 impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
334         OutputSweeper<B, D, E, F, K, L, O>
335 where
336         B::Target: BroadcasterInterface,
337         D::Target: ChangeDestinationSource,
338         E::Target: FeeEstimator,
339         F::Target: Filter + Sync + Send,
340         K::Target: KVStore,
341         L::Target: Logger,
342         O::Target: OutputSpender,
343 {
344         /// Constructs a new [`OutputSweeper`].
345         ///
346         /// If chain data is provided via the [`Confirm`] interface or via filtered blocks, users also
347         /// need to register their [`Filter`] implementation via the given `chain_data_source`.
348         pub fn new(
349                 best_block: BestBlock, broadcaster: B, fee_estimator: E, chain_data_source: Option<F>,
350                 output_spender: O, change_destination_source: D, kv_store: K, logger: L,
351         ) -> Self {
352                 let outputs = Vec::new();
353                 let sweeper_state = Mutex::new(SweeperState { outputs, best_block });
354                 Self {
355                         sweeper_state,
356                         broadcaster,
357                         fee_estimator,
358                         chain_data_source,
359                         output_spender,
360                         change_destination_source,
361                         kv_store,
362                         logger,
363                 }
364         }
365
366         /// Tells the sweeper to track the given outputs descriptors.
367         ///
368         /// Usually, this should be called based on the values emitted by the
369         /// [`Event::SpendableOutputs`].
370         ///
371         /// The given `exclude_static_ouputs` flag controls whether the sweeper will filter out
372         /// [`SpendableOutputDescriptor::StaticOutput`]s, which may be handled directly by the on-chain
373         /// wallet implementation.
374         ///
375         /// [`Event::SpendableOutputs`]: crate::events::Event::SpendableOutputs
376         pub fn track_spendable_outputs(
377                 &self, output_descriptors: Vec<SpendableOutputDescriptor>, channel_id: Option<ChannelId>,
378                 exclude_static_ouputs: bool,
379         ) {
380                 let mut relevant_descriptors = output_descriptors
381                         .into_iter()
382                         .filter(|desc| {
383                                 !(exclude_static_ouputs
384                                         && matches!(desc, SpendableOutputDescriptor::StaticOutput { .. }))
385                         })
386                         .peekable();
387
388                 if relevant_descriptors.peek().is_none() {
389                         return;
390                 }
391
392                 let mut spending_tx_opt;
393                 {
394                         let mut state_lock = self.sweeper_state.lock().unwrap();
395                         for descriptor in relevant_descriptors {
396                                 let output_info = TrackedSpendableOutput {
397                                         descriptor,
398                                         channel_id,
399                                         status: OutputSpendStatus::PendingInitialBroadcast,
400                                 };
401
402                                 if state_lock
403                                         .outputs
404                                         .iter()
405                                         .find(|o| o.descriptor == output_info.descriptor)
406                                         .is_some()
407                                 {
408                                         continue;
409                                 }
410
411                                 state_lock.outputs.push(output_info);
412                         }
413                         spending_tx_opt = self.regenerate_spend_if_necessary(&mut *state_lock);
414                         self.persist_state(&*state_lock).unwrap_or_else(|e| {
415                                 log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
416                                 // Skip broadcasting if the persist failed.
417                                 spending_tx_opt = None;
418                         });
419                 }
420
421                 if let Some(spending_tx) = spending_tx_opt {
422                         self.broadcaster.broadcast_transactions(&[&spending_tx]);
423                 }
424         }
425
426         /// Returns a list of the currently tracked spendable outputs.
427         pub fn tracked_spendable_outputs(&self) -> Vec<TrackedSpendableOutput> {
428                 self.sweeper_state.lock().unwrap().outputs.clone()
429         }
430
431         /// Gets the latest best block which was connected either via the [`Listen`] or
432         /// [`Confirm`] interfaces.
433         pub fn current_best_block(&self) -> BestBlock {
434                 self.sweeper_state.lock().unwrap().best_block
435         }
436
437         fn regenerate_spend_if_necessary(
438                 &self, sweeper_state: &mut SweeperState,
439         ) -> Option<Transaction> {
440                 let cur_height = sweeper_state.best_block.height;
441                 let cur_hash = sweeper_state.best_block.block_hash;
442                 let filter_fn = |o: &TrackedSpendableOutput| {
443                         if o.status.is_confirmed() {
444                                 // Don't rebroadcast confirmed txs.
445                                 return false;
446                         }
447
448                         if o.status.latest_broadcast_height() >= Some(cur_height) {
449                                 // Only broadcast once per block height.
450                                 return false;
451                         }
452
453                         true
454                 };
455
456                 let respend_descriptors: Vec<&SpendableOutputDescriptor> =
457                         sweeper_state.outputs.iter().filter(|o| filter_fn(*o)).map(|o| &o.descriptor).collect();
458
459                 if respend_descriptors.is_empty() {
460                         // Nothing to do.
461                         return None;
462                 }
463
464                 let spending_tx = match self.spend_outputs(&*sweeper_state, respend_descriptors) {
465                         Ok(spending_tx) => {
466                                 log_debug!(
467                                         self.logger,
468                                         "Generating and broadcasting sweeping transaction {}",
469                                         spending_tx.txid()
470                                 );
471                                 spending_tx
472                         },
473                         Err(e) => {
474                                 log_error!(self.logger, "Error spending outputs: {:?}", e);
475                                 return None;
476                         },
477                 };
478
479                 // As we didn't modify the state so far, the same filter_fn yields the same elements as
480                 // above.
481                 let respend_outputs = sweeper_state.outputs.iter_mut().filter(|o| filter_fn(&**o));
482                 for output_info in respend_outputs {
483                         if let Some(filter) = self.chain_data_source.as_ref() {
484                                 let watched_output = output_info.to_watched_output(cur_hash);
485                                 filter.register_output(watched_output);
486                         }
487
488                         output_info.status.broadcast(cur_hash, cur_height, spending_tx.clone());
489                 }
490
491                 Some(spending_tx)
492         }
493
494         fn prune_confirmed_outputs(&self, sweeper_state: &mut SweeperState) {
495                 let cur_height = sweeper_state.best_block.height;
496
497                 // Prune all outputs that have sufficient depth by now.
498                 sweeper_state.outputs.retain(|o| {
499                         if let Some(confirmation_height) = o.status.confirmation_height() {
500                                 if cur_height >= confirmation_height + ANTI_REORG_DELAY - 1 {
501                                         log_debug!(self.logger,
502                                                 "Pruning swept output as sufficiently confirmed via spend in transaction {:?}. Pruned descriptor: {:?}",
503                                                 o.status.latest_spending_tx().map(|t| t.txid()), o.descriptor
504                                         );
505                                         return false;
506                                 }
507                         }
508                         true
509                 });
510         }
511
512         fn persist_state(&self, sweeper_state: &SweeperState) -> Result<(), io::Error> {
513                 self.kv_store
514                         .write(
515                                 OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
516                                 OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
517                                 OUTPUT_SWEEPER_PERSISTENCE_KEY,
518                                 &sweeper_state.encode(),
519                         )
520                         .map_err(|e| {
521                                 log_error!(
522                                         self.logger,
523                                         "Write for key {}/{}/{} failed due to: {}",
524                                         OUTPUT_SWEEPER_PERSISTENCE_PRIMARY_NAMESPACE,
525                                         OUTPUT_SWEEPER_PERSISTENCE_SECONDARY_NAMESPACE,
526                                         OUTPUT_SWEEPER_PERSISTENCE_KEY,
527                                         e
528                                 );
529                                 e
530                         })
531         }
532
533         fn spend_outputs(
534                 &self, sweeper_state: &SweeperState, descriptors: Vec<&SpendableOutputDescriptor>,
535         ) -> Result<Transaction, ()> {
536                 let tx_feerate =
537                         self.fee_estimator.get_est_sat_per_1000_weight(ConfirmationTarget::OutputSpendingFee);
538                 let change_destination_script =
539                         self.change_destination_source.get_change_destination_script()?;
540                 let cur_height = sweeper_state.best_block.height;
541                 let locktime = Some(LockTime::from_height(cur_height).unwrap_or(LockTime::ZERO));
542                 self.output_spender.spend_spendable_outputs(
543                         &descriptors,
544                         Vec::new(),
545                         change_destination_script,
546                         tx_feerate,
547                         locktime,
548                         &Secp256k1::new(),
549                 )
550         }
551
552         fn transactions_confirmed_internal(
553                 &self, sweeper_state: &mut SweeperState, header: &Header,
554                 txdata: &chain::transaction::TransactionData, height: u32,
555         ) {
556                 let confirmation_hash = header.block_hash();
557                 for (_, tx) in txdata {
558                         for output_info in sweeper_state.outputs.iter_mut() {
559                                 if output_info.is_spent_in(*tx) {
560                                         output_info.status.confirmed(confirmation_hash, height, (*tx).clone())
561                                 }
562                         }
563                 }
564         }
565
566         fn best_block_updated_internal(
567                 &self, sweeper_state: &mut SweeperState, header: &Header, height: u32,
568         ) -> Option<Transaction> {
569                 sweeper_state.best_block = BestBlock::new(header.block_hash(), height);
570                 self.prune_confirmed_outputs(sweeper_state);
571                 let spending_tx_opt = self.regenerate_spend_if_necessary(sweeper_state);
572                 spending_tx_opt
573         }
574 }
575
576 impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref> Listen
577         for OutputSweeper<B, D, E, F, K, L, O>
578 where
579         B::Target: BroadcasterInterface,
580         D::Target: ChangeDestinationSource,
581         E::Target: FeeEstimator,
582         F::Target: Filter + Sync + Send,
583         K::Target: KVStore,
584         L::Target: Logger,
585         O::Target: OutputSpender,
586 {
587         fn filtered_block_connected(
588                 &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
589         ) {
590                 let mut spending_tx_opt;
591                 {
592                         let mut state_lock = self.sweeper_state.lock().unwrap();
593                         assert_eq!(state_lock.best_block.block_hash, header.prev_blockhash,
594                                 "Blocks must be connected in chain-order - the connected header must build on the last connected header");
595                         assert_eq!(state_lock.best_block.height, height - 1,
596                                 "Blocks must be connected in chain-order - the connected block height must be one greater than the previous height");
597
598                         self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
599                         spending_tx_opt = self.best_block_updated_internal(&mut *state_lock, header, height);
600
601                         self.persist_state(&*state_lock).unwrap_or_else(|e| {
602                                 log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
603                                 // Skip broadcasting if the persist failed.
604                                 spending_tx_opt = None;
605                         });
606                 }
607
608                 if let Some(spending_tx) = spending_tx_opt {
609                         self.broadcaster.broadcast_transactions(&[&spending_tx]);
610                 }
611         }
612
613         fn block_disconnected(&self, header: &Header, height: u32) {
614                 let mut state_lock = self.sweeper_state.lock().unwrap();
615
616                 let new_height = height - 1;
617                 let block_hash = header.block_hash();
618
619                 assert_eq!(state_lock.best_block.block_hash, block_hash,
620                 "Blocks must be disconnected in chain-order - the disconnected header must be the last connected header");
621                 assert_eq!(state_lock.best_block.height, height,
622                         "Blocks must be disconnected in chain-order - the disconnected block must have the correct height");
623                 state_lock.best_block = BestBlock::new(header.prev_blockhash, new_height);
624
625                 for output_info in state_lock.outputs.iter_mut() {
626                         if output_info.status.confirmation_hash() == Some(block_hash) {
627                                 debug_assert_eq!(output_info.status.confirmation_height(), Some(height));
628                                 output_info.status.unconfirmed();
629                         }
630                 }
631
632                 self.persist_state(&*state_lock).unwrap_or_else(|e| {
633                         log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
634                 });
635         }
636 }
637
638 impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref> Confirm
639         for OutputSweeper<B, D, E, F, K, L, O>
640 where
641         B::Target: BroadcasterInterface,
642         D::Target: ChangeDestinationSource,
643         E::Target: FeeEstimator,
644         F::Target: Filter + Sync + Send,
645         K::Target: KVStore,
646         L::Target: Logger,
647         O::Target: OutputSpender,
648 {
649         fn transactions_confirmed(
650                 &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
651         ) {
652                 let mut state_lock = self.sweeper_state.lock().unwrap();
653                 self.transactions_confirmed_internal(&mut *state_lock, header, txdata, height);
654                 self.persist_state(&*state_lock).unwrap_or_else(|e| {
655                         log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
656                 });
657         }
658
659         fn transaction_unconfirmed(&self, txid: &Txid) {
660                 let mut state_lock = self.sweeper_state.lock().unwrap();
661
662                 // Get what height was unconfirmed.
663                 let unconf_height = state_lock
664                         .outputs
665                         .iter()
666                         .find(|o| o.status.latest_spending_tx().map(|tx| tx.txid()) == Some(*txid))
667                         .and_then(|o| o.status.confirmation_height());
668
669                 if let Some(unconf_height) = unconf_height {
670                         // Unconfirm all >= this height.
671                         state_lock
672                                 .outputs
673                                 .iter_mut()
674                                 .filter(|o| o.status.confirmation_height() >= Some(unconf_height))
675                                 .for_each(|o| o.status.unconfirmed());
676
677                         self.persist_state(&*state_lock).unwrap_or_else(|e| {
678                                 log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
679                         });
680                 }
681         }
682
683         fn best_block_updated(&self, header: &Header, height: u32) {
684                 let mut spending_tx_opt;
685                 {
686                         let mut state_lock = self.sweeper_state.lock().unwrap();
687                         spending_tx_opt = self.best_block_updated_internal(&mut *state_lock, header, height);
688                         self.persist_state(&*state_lock).unwrap_or_else(|e| {
689                                 log_error!(self.logger, "Error persisting OutputSweeper: {:?}", e);
690                                 // Skip broadcasting if the persist failed.
691                                 spending_tx_opt = None;
692                         });
693                 }
694
695                 if let Some(spending_tx) = spending_tx_opt {
696                         self.broadcaster.broadcast_transactions(&[&spending_tx]);
697                 }
698         }
699
700         fn get_relevant_txids(&self) -> Vec<(Txid, u32, Option<BlockHash>)> {
701                 let state_lock = self.sweeper_state.lock().unwrap();
702                 state_lock
703                         .outputs
704                         .iter()
705                         .filter_map(|o| match o.status {
706                                 OutputSpendStatus::PendingThresholdConfirmations {
707                                         ref latest_spending_tx,
708                                         confirmation_height,
709                                         confirmation_hash,
710                                         ..
711                                 } => Some((latest_spending_tx.txid(), confirmation_height, Some(confirmation_hash))),
712                                 _ => None,
713                         })
714                         .collect::<Vec<_>>()
715         }
716 }
717
718 #[derive(Debug, Clone)]
719 struct SweeperState {
720         outputs: Vec<TrackedSpendableOutput>,
721         best_block: BestBlock,
722 }
723
724 impl_writeable_tlv_based!(SweeperState, {
725         (0, outputs, required_vec),
726         (2, best_block, required),
727 });
728
729 impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
730         ReadableArgs<(B, E, Option<F>, O, D, K, L)> for OutputSweeper<B, D, E, F, K, L, O>
731 where
732         B::Target: BroadcasterInterface,
733         D::Target: ChangeDestinationSource,
734         E::Target: FeeEstimator,
735         F::Target: Filter + Sync + Send,
736         K::Target: KVStore,
737         L::Target: Logger,
738         O::Target: OutputSpender,
739 {
740         #[inline]
741         fn read<R: io::Read>(
742                 reader: &mut R, args: (B, E, Option<F>, O, D, K, L),
743         ) -> Result<Self, DecodeError> {
744                 let (
745                         broadcaster,
746                         fee_estimator,
747                         chain_data_source,
748                         output_spender,
749                         change_destination_source,
750                         kv_store,
751                         logger,
752                 ) = args;
753                 let state = SweeperState::read(reader)?;
754                 let best_block = state.best_block;
755
756                 if let Some(filter) = chain_data_source.as_ref() {
757                         for output_info in &state.outputs {
758                                 let watched_output = output_info.to_watched_output(best_block.block_hash);
759                                 filter.register_output(watched_output);
760                         }
761                 }
762
763                 let sweeper_state = Mutex::new(state);
764                 Ok(Self {
765                         sweeper_state,
766                         broadcaster,
767                         fee_estimator,
768                         chain_data_source,
769                         output_spender,
770                         change_destination_source,
771                         kv_store,
772                         logger,
773                 })
774         }
775 }
776
777 impl<B: Deref, D: Deref, E: Deref, F: Deref, K: Deref, L: Deref, O: Deref>
778         ReadableArgs<(B, E, Option<F>, O, D, K, L)> for (BestBlock, OutputSweeper<B, D, E, F, K, L, O>)
779 where
780         B::Target: BroadcasterInterface,
781         D::Target: ChangeDestinationSource,
782         E::Target: FeeEstimator,
783         F::Target: Filter + Sync + Send,
784         K::Target: KVStore,
785         L::Target: Logger,
786         O::Target: OutputSpender,
787 {
788         #[inline]
789         fn read<R: io::Read>(
790                 reader: &mut R, args: (B, E, Option<F>, O, D, K, L),
791         ) -> Result<Self, DecodeError> {
792                 let (
793                         broadcaster,
794                         fee_estimator,
795                         chain_data_source,
796                         output_spender,
797                         change_destination_source,
798                         kv_store,
799                         logger,
800                 ) = args;
801                 let state = SweeperState::read(reader)?;
802                 let best_block = state.best_block;
803
804                 if let Some(filter) = chain_data_source.as_ref() {
805                         for output_info in &state.outputs {
806                                 let watched_output = output_info.to_watched_output(best_block.block_hash);
807                                 filter.register_output(watched_output);
808                         }
809                 }
810
811                 let sweeper_state = Mutex::new(state);
812                 Ok((
813                         best_block,
814                         OutputSweeper {
815                                 sweeper_state,
816                                 broadcaster,
817                                 fee_estimator,
818                                 chain_data_source,
819                                 output_spender,
820                                 change_destination_source,
821                                 kv_store,
822                                 logger,
823                         },
824                 ))
825         }
826 }