Merge pull request #1096 from valentinewallace/2021-09-mpp-retries
[rust-lightning] / lightning / src / chain / chainmonitor.rs
index 5055305d3c5d52bd9e01cc02d0934c79104f0b37..0d3f87645ce4c4c1250d2d42a3a085f419ac13a6 100644 (file)
@@ -30,16 +30,17 @@ use chain;
 use chain::{Filter, WatchedOutput};
 use chain::chaininterface::{BroadcasterInterface, FeeEstimator};
 use chain::channelmonitor;
-use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, MonitorEvent, Persist, TransactionOutputs};
+use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr, Balance, MonitorEvent, Persist, TransactionOutputs};
 use chain::transaction::{OutPoint, TransactionData};
 use chain::keysinterface::Sign;
 use util::logger::Logger;
 use util::events;
-use util::events::Event;
+use util::events::EventHandler;
+use ln::channelmanager::ChannelDetails;
 
-use std::collections::{HashMap, hash_map};
-use std::sync::RwLock;
-use std::ops::Deref;
+use prelude::*;
+use sync::RwLock;
+use core::ops::Deref;
 
 /// An implementation of [`chain::Watch`] for monitoring channels.
 ///
@@ -139,6 +140,40 @@ where C::Target: chain::Filter,
                        persister,
                }
        }
+
+       /// Gets the balances in the contained [`ChannelMonitor`]s which are claimable on-chain or
+       /// claims which are awaiting confirmation.
+       ///
+       /// Includes the balances from each [`ChannelMonitor`] *except* those included in
+       /// `ignored_channels`, allowing you to filter out balances from channels which are still open
+       /// (and whose balance should likely be pulled from the [`ChannelDetails`]).
+       ///
+       /// See [`ChannelMonitor::get_claimable_balances`] for more details on the exact criteria for
+       /// inclusion in the return value.
+       pub fn get_claimable_balances(&self, ignored_channels: &[&ChannelDetails]) -> Vec<Balance> {
+               let mut ret = Vec::new();
+               let monitors = self.monitors.read().unwrap();
+               for (_, monitor) in monitors.iter().filter(|(funding_outpoint, _)| {
+                       for chan in ignored_channels {
+                               if chan.funding_txo.as_ref() == Some(funding_outpoint) {
+                                       return false;
+                               }
+                       }
+                       true
+               }) {
+                       ret.append(&mut monitor.get_claimable_balances());
+               }
+               ret
+       }
+
+       #[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))]
+       pub fn get_and_clear_pending_events(&self) -> Vec<events::Event> {
+               use util::events::EventsProvider;
+               let events = core::cell::RefCell::new(Vec::new());
+               let event_handler = |event: &events::Event| events.borrow_mut().push(event.clone());
+               self.process_pending_events(&event_handler);
+               events.into_inner()
+       }
 }
 
 impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref>
@@ -153,6 +188,7 @@ where
        fn block_connected(&self, block: &Block, height: u32) {
                let header = &block.header;
                let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
+               log_debug!(self.logger, "New best block {} at height {} provided via block_connected", header.block_hash(), height);
                self.process_chain_data(header, &txdata, |monitor, txdata| {
                        monitor.block_connected(
                                header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
@@ -161,6 +197,7 @@ where
 
        fn block_disconnected(&self, header: &BlockHeader, height: u32) {
                let monitors = self.monitors.read().unwrap();
+               log_debug!(self.logger, "Latest block {} at height {} removed via block_disconnected", header.block_hash(), height);
                for monitor in monitors.values() {
                        monitor.block_disconnected(
                                header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
@@ -178,6 +215,7 @@ where
        P::Target: channelmonitor::Persist<ChannelSigner>,
 {
        fn transactions_confirmed(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) {
+               log_debug!(self.logger, "{} provided transactions confirmed at height {} in block {}", txdata.len(), height, header.block_hash());
                self.process_chain_data(header, txdata, |monitor, txdata| {
                        monitor.transactions_confirmed(
                                header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger)
@@ -185,6 +223,7 @@ where
        }
 
        fn transaction_unconfirmed(&self, txid: &Txid) {
+               log_debug!(self.logger, "Transaction {} reorganized out of chain", txid);
                let monitors = self.monitors.read().unwrap();
                for monitor in monitors.values() {
                        monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &*self.logger);
@@ -192,6 +231,7 @@ where
        }
 
        fn best_block_updated(&self, header: &BlockHeader, height: u32) {
+               log_debug!(self.logger, "New best block {} at height {} provided via best_block_updated", header.block_hash(), height);
                self.process_chain_data(header, &[], |monitor, txdata| {
                        // While in practice there shouldn't be any recursive calls when given empty txdata,
                        // it's still possible if a chain::Filter implementation returns a transaction.
@@ -214,7 +254,7 @@ where
        }
 }
 
-impl<ChannelSigner: Sign, C: Deref + Sync + Send, T: Deref + Sync + Send, F: Deref + Sync + Send, L: Deref + Sync + Send, P: Deref + Sync + Send>
+impl<ChannelSigner: Sign, C: Deref , T: Deref , F: Deref , L: Deref , P: Deref >
 chain::Watch<ChannelSigner> for ChainMonitor<ChannelSigner, C, T, F, L, P>
 where C::Target: chain::Filter,
            T::Target: BroadcasterInterface,
@@ -306,12 +346,20 @@ impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> even
              L::Target: Logger,
              P::Target: channelmonitor::Persist<ChannelSigner>,
 {
-       fn get_and_clear_pending_events(&self) -> Vec<Event> {
+       /// Processes [`SpendableOutputs`] events produced from each [`ChannelMonitor`] upon maturity.
+       ///
+       /// An [`EventHandler`] may safely call back to the provider, though this shouldn't be needed in
+       /// order to handle these events.
+       ///
+       /// [`SpendableOutputs`]: events::Event::SpendableOutputs
+       fn process_pending_events<H: Deref>(&self, handler: H) where H::Target: EventHandler {
                let mut pending_events = Vec::new();
                for monitor in self.monitors.read().unwrap().values() {
                        pending_events.append(&mut monitor.get_and_clear_pending_events());
                }
-               pending_events
+               for event in pending_events.drain(..) {
+                       handler.handle_event(&event);
+               }
        }
 }
 
@@ -320,7 +368,6 @@ mod tests {
        use ::{check_added_monitors, get_local_commitment_txn};
        use ln::features::InitFeatures;
        use ln::functional_test_utils::*;
-       use util::events::EventsProvider;
        use util::events::MessageSendEventsProvider;
        use util::test_utils::{OnRegisterOutput, TxOutReference};