Merge pull request #1657 from TheBlueMatt/2022-08-async-man-update
[rust-lightning] / lightning / src / chain / chainmonitor.rs
index 503e6bdee0669551d1853932447a5db08fc92c17..3d84fdf93a52f391200fde732b0954e77e01d724 100644 (file)
@@ -43,6 +43,7 @@ use prelude::*;
 use sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard};
 use core::ops::Deref;
 use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+use bitcoin::secp256k1::PublicKey;
 
 #[derive(Clone, Copy, Hash, PartialEq, Eq)]
 /// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents
@@ -235,7 +236,7 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
        persister: P,
        /// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
        /// from the user and not from a [`ChannelMonitor`].
-       pending_monitor_events: Mutex<Vec<(OutPoint, Vec<MonitorEvent>)>>,
+       pending_monitor_events: Mutex<Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)>>,
        /// The best block height seen, used as a proxy for the passage of time.
        highest_chain_height: AtomicUsize,
 }
@@ -262,82 +263,67 @@ where C::Target: chain::Filter,
        where
                FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
        {
-               let mut dependent_txdata = Vec::new();
-               {
-                       let monitor_states = self.monitors.write().unwrap();
-                       if let Some(height) = best_height {
-                               // If the best block height is being updated, update highest_chain_height under the
-                               // monitors write lock.
-                               let old_height = self.highest_chain_height.load(Ordering::Acquire);
-                               let new_height = height as usize;
-                               if new_height > old_height {
-                                       self.highest_chain_height.store(new_height, Ordering::Release);
-                               }
+               let monitor_states = self.monitors.write().unwrap();
+               if let Some(height) = best_height {
+                       // If the best block height is being updated, update highest_chain_height under the
+                       // monitors write lock.
+                       let old_height = self.highest_chain_height.load(Ordering::Acquire);
+                       let new_height = height as usize;
+                       if new_height > old_height {
+                               self.highest_chain_height.store(new_height, Ordering::Release);
                        }
+               }
 
-                       for (funding_outpoint, monitor_state) in monitor_states.iter() {
-                               let monitor = &monitor_state.monitor;
-                               let mut txn_outputs;
-                               {
-                                       txn_outputs = process(monitor, txdata);
-                                       let update_id = MonitorUpdateId {
-                                               contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()),
-                                       };
-                                       let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
-                                       if let Some(height) = best_height {
-                                               if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) {
-                                                       // If there are not ChainSync persists awaiting completion, go ahead and
-                                                       // set last_chain_persist_height here - we wouldn't want the first
-                                                       // TemporaryFailure to always immediately be considered "overly delayed".
-                                                       monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release);
-                                               }
+               for (funding_outpoint, monitor_state) in monitor_states.iter() {
+                       let monitor = &monitor_state.monitor;
+                       let mut txn_outputs;
+                       {
+                               txn_outputs = process(monitor, txdata);
+                               let update_id = MonitorUpdateId {
+                                       contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()),
+                               };
+                               let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
+                               if let Some(height) = best_height {
+                                       if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) {
+                                               // If there are not ChainSync persists awaiting completion, go ahead and
+                                               // set last_chain_persist_height here - we wouldn't want the first
+                                               // TemporaryFailure to always immediately be considered "overly delayed".
+                                               monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release);
                                        }
+                               }
 
-                                       log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
-                                       match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) {
-                                               Ok(()) =>
-                                                       log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
-                                               Err(ChannelMonitorUpdateErr::PermanentFailure) => {
-                                                       monitor_state.channel_perm_failed.store(true, Ordering::Release);
-                                                       self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)]));
-                                               },
-                                               Err(ChannelMonitorUpdateErr::TemporaryFailure) => {
-                                                       log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
-                                                       pending_monitor_updates.push(update_id);
-                                               },
-                                       }
+                               log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
+                               match self.persister.update_persisted_channel(*funding_outpoint, &None, monitor, update_id) {
+                                       Ok(()) =>
+                                               log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
+                                       Err(ChannelMonitorUpdateErr::PermanentFailure) => {
+                                               monitor_state.channel_perm_failed.store(true, Ordering::Release);
+                                               self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id()));
+                                       },
+                                       Err(ChannelMonitorUpdateErr::TemporaryFailure) => {
+                                               log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
+                                               pending_monitor_updates.push(update_id);
+                                       },
                                }
+                       }
 
-                               // Register any new outputs with the chain source for filtering, storing any dependent
-                               // transactions from within the block that previously had not been included in txdata.
-                               if let Some(ref chain_source) = self.chain_source {
-                                       let block_hash = header.block_hash();
-                                       for (txid, mut outputs) in txn_outputs.drain(..) {
-                                               for (idx, output) in outputs.drain(..) {
-                                                       // Register any new outputs with the chain source for filtering and recurse
-                                                       // if it indicates that there are dependent transactions within the block
-                                                       // that had not been previously included in txdata.
-                                                       let output = WatchedOutput {
-                                                               block_hash: Some(block_hash),
-                                                               outpoint: OutPoint { txid, index: idx as u16 },
-                                                               script_pubkey: output.script_pubkey,
-                                                       };
-                                                       if let Some(tx) = chain_source.register_output(output) {
-                                                               dependent_txdata.push(tx);
-                                                       }
-                                               }
+                       // Register any new outputs with the chain source for filtering, storing any dependent
+                       // transactions from within the block that previously had not been included in txdata.
+                       if let Some(ref chain_source) = self.chain_source {
+                               let block_hash = header.block_hash();
+                               for (txid, mut outputs) in txn_outputs.drain(..) {
+                                       for (idx, output) in outputs.drain(..) {
+                                               // Register any new outputs with the chain source for filtering
+                                               let output = WatchedOutput {
+                                                       block_hash: Some(block_hash),
+                                                       outpoint: OutPoint { txid, index: idx as u16 },
+                                                       script_pubkey: output.script_pubkey,
+                                               };
+                                               chain_source.register_output(output)
                                        }
                                }
                        }
                }
-
-               // Recursively call for any dependent transactions that were identified by the chain source.
-               if !dependent_txdata.is_empty() {
-                       dependent_txdata.sort_unstable_by_key(|(index, _tx)| *index);
-                       dependent_txdata.dedup_by_key(|(index, _tx)| *index);
-                       let txdata: Vec<_> = dependent_txdata.iter().map(|(index, tx)| (*index, tx)).collect();
-                       self.process_chain_data(header, None, &txdata, process); // We skip the best height the second go-around
-               }
        }
 
        /// Creates a new `ChainMonitor` used to watch on-chain activity pertaining to channels.
@@ -458,7 +444,7 @@ where C::Target: chain::Filter,
                                self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::UpdateCompleted {
                                        funding_txo,
                                        monitor_update_id: monitor_data.monitor.get_latest_update_id(),
-                               }]));
+                               }], monitor_data.monitor.get_counterparty_node_id()));
                        },
                        MonitorUpdateId { contents: UpdateOrigin::ChainSync(_) } => {
                                if !monitor_data.has_pending_chainsync_updates(&pending_monitor_updates) {
@@ -476,10 +462,12 @@ where C::Target: chain::Filter,
        /// channel_monitor_updated once with the highest ID.
        #[cfg(any(test, fuzzing))]
        pub fn force_channel_monitor_updated(&self, funding_txo: OutPoint, monitor_update_id: u64) {
+               let monitors = self.monitors.read().unwrap();
+               let counterparty_node_id = monitors.get(&funding_txo).and_then(|m| m.monitor.get_counterparty_node_id());
                self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::UpdateCompleted {
                        funding_txo,
                        monitor_update_id,
-               }]));
+               }], counterparty_node_id));
        }
 
        #[cfg(any(test, fuzzing, feature = "_test_utils"))]
@@ -636,7 +624,7 @@ where C::Target: chain::Filter,
                        Some(monitor_state) => {
                                let monitor = &monitor_state.monitor;
                                log_trace!(self.logger, "Updating ChannelMonitor for channel {}", log_funding_info!(monitor));
-                               let update_res = monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger);
+                               let update_res = monitor.update_monitor(&update, &self.broadcaster, &*self.fee_estimator, &self.logger);
                                if update_res.is_err() {
                                        log_error!(self.logger, "Failed to update ChannelMonitor for channel {}.", log_funding_info!(monitor));
                                }
@@ -666,7 +654,7 @@ where C::Target: chain::Filter,
                }
        }
 
-       fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>)> {
+       fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)> {
                let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
                for monitor_state in self.monitors.read().unwrap().values() {
                        let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap());
@@ -695,7 +683,8 @@ where C::Target: chain::Filter,
                                let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();
                                if monitor_events.len() > 0 {
                                        let monitor_outpoint = monitor_state.monitor.get_funding_txo().0;
-                                       pending_monitor_events.push((monitor_outpoint, monitor_events));
+                                       let counterparty_node_id = monitor_state.monitor.get_counterparty_node_id();
+                                       pending_monitor_events.push((monitor_outpoint, monitor_events, counterparty_node_id));
                                }
                        }
                }
@@ -729,9 +718,10 @@ impl<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> even
 
 #[cfg(test)]
 mod tests {
-       use bitcoin::BlockHeader;
+       use bitcoin::{BlockHeader, TxMerkleNode};
+       use bitcoin::hashes::Hash;
        use ::{check_added_monitors, check_closed_broadcast, check_closed_event};
-       use ::{expect_payment_sent, expect_payment_sent_without_paths, expect_payment_path_successful, get_event_msg};
+       use ::{expect_payment_sent, expect_payment_claimed, expect_payment_sent_without_paths, expect_payment_path_successful, get_event_msg};
        use ::{get_htlc_update_msgs, get_local_commitment_txn, get_revoke_commit_msgs, get_route_and_payment_hash, unwrap_send_err};
        use chain::{ChannelMonitorUpdateErr, Confirm, Watch};
        use chain::channelmonitor::LATENCY_GRACE_PERIOD_BLOCKS;
@@ -741,50 +731,6 @@ mod tests {
        use ln::msgs::ChannelMessageHandler;
        use util::errors::APIError;
        use util::events::{ClosureReason, MessageSendEvent, MessageSendEventsProvider};
-       use util::test_utils::{OnRegisterOutput, TxOutReference};
-
-       /// Tests that in-block dependent transactions are processed by `block_connected` when not
-       /// included in `txdata` but returned by [`chain::Filter::register_output`]. For instance,
-       /// a (non-anchor) commitment transaction's HTLC output may be spent in the same block as the
-       /// commitment transaction itself. An Electrum client may filter the commitment transaction but
-       /// needs to return the HTLC transaction so it can be processed.
-       #[test]
-       fn connect_block_checks_dependent_transactions() {
-               let chanmon_cfgs = create_chanmon_cfgs(2);
-               let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
-               let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
-               let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
-               let channel = create_announced_chan_between_nodes(
-                       &nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
-
-               // Send a payment, saving nodes[0]'s revoked commitment and HTLC-Timeout transactions.
-               let (commitment_tx, htlc_tx) = {
-                       let payment_preimage = route_payment(&nodes[0], &vec!(&nodes[1])[..], 5_000_000).0;
-                       let mut txn = get_local_commitment_txn!(nodes[0], channel.2);
-                       claim_payment(&nodes[0], &vec!(&nodes[1])[..], payment_preimage);
-
-                       assert_eq!(txn.len(), 2);
-                       (txn.remove(0), txn.remove(0))
-               };
-
-               // Set expectations on nodes[1]'s chain source to return dependent transactions.
-               let htlc_output = TxOutReference(commitment_tx.clone(), 0);
-               let to_local_output = TxOutReference(commitment_tx.clone(), 1);
-               let htlc_timeout_output = TxOutReference(htlc_tx.clone(), 0);
-               nodes[1].chain_source
-                       .expect(OnRegisterOutput { with: htlc_output, returns: Some((1, htlc_tx)) })
-                       .expect(OnRegisterOutput { with: to_local_output, returns: None })
-                       .expect(OnRegisterOutput { with: htlc_timeout_output, returns: None });
-
-               // Notify nodes[1] that nodes[0]'s revoked commitment transaction was mined. The chain
-               // source should return the dependent HTLC transaction when the HTLC output is registered.
-               mine_transaction(&nodes[1], &commitment_tx);
-
-               // Clean up so uninteresting assertions don't fail.
-               check_added_monitors!(nodes[1], 1);
-               nodes[1].node.get_and_clear_pending_msg_events();
-               nodes[1].node.get_and_clear_pending_events();
-       }
 
        #[test]
        fn test_async_ooo_offchain_updates() {
@@ -798,16 +744,18 @@ mod tests {
                create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
 
                // Route two payments to be claimed at the same time.
-               let payment_preimage_1 = route_payment(&nodes[0], &[&nodes[1]], 1_000_000).0;
-               let payment_preimage_2 = route_payment(&nodes[0], &[&nodes[1]], 1_000_000).0;
+               let (payment_preimage_1, payment_hash_1, _) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000);
+               let (payment_preimage_2, payment_hash_2, _) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000);
 
                chanmon_cfgs[1].persister.offchain_monitor_updates.lock().unwrap().clear();
                chanmon_cfgs[1].persister.set_update_ret(Err(ChannelMonitorUpdateErr::TemporaryFailure));
 
                nodes[1].node.claim_funds(payment_preimage_1);
                check_added_monitors!(nodes[1], 1);
+               expect_payment_claimed!(nodes[1], payment_hash_1, 1_000_000);
                nodes[1].node.claim_funds(payment_preimage_2);
                check_added_monitors!(nodes[1], 1);
+               expect_payment_claimed!(nodes[1], payment_hash_2, 1_000_000);
 
                chanmon_cfgs[1].persister.set_update_ret(Ok(()));
 
@@ -877,8 +825,9 @@ mod tests {
                let (route, second_payment_hash, _, second_payment_secret) = get_route_and_payment_hash!(nodes[0], nodes[1], 100_000);
 
                // First route a payment that we will claim on chain and give the recipient the preimage.
-               let payment_preimage = route_payment(&nodes[0], &[&nodes[1]], 1_000_000).0;
+               let (payment_preimage, payment_hash, _) = route_payment(&nodes[0], &[&nodes[1]], 1_000_000);
                nodes[1].node.claim_funds(payment_preimage);
+               expect_payment_claimed!(nodes[1], payment_hash, 1_000_000);
                nodes[1].node.get_and_clear_pending_msg_events();
                check_added_monitors!(nodes[1], 1);
                let remote_txn = get_local_commitment_txn!(nodes[1], channel.2);
@@ -893,7 +842,7 @@ mod tests {
                let new_header = BlockHeader {
                        version: 2, time: 0, bits: 0, nonce: 0,
                        prev_blockhash: nodes[0].best_block_info().0,
-                       merkle_root: Default::default() };
+                       merkle_root: TxMerkleNode::all_zeros() };
                nodes[0].chain_monitor.chain_monitor.transactions_confirmed(&new_header,
                        &[(0, &remote_txn[0]), (1, &remote_txn[1])], nodes[0].best_block_info().1 + 1);
                assert!(nodes[0].chain_monitor.release_pending_monitor_events().is_empty());
@@ -919,7 +868,7 @@ mod tests {
                        let latest_header = BlockHeader {
                                version: 2, time: 0, bits: 0, nonce: 0,
                                prev_blockhash: nodes[0].best_block_info().0,
-                               merkle_root: Default::default() };
+                               merkle_root: TxMerkleNode::all_zeros() };
                        nodes[0].chain_monitor.chain_monitor.best_block_updated(&latest_header, nodes[0].best_block_info().1 + LATENCY_GRACE_PERIOD_BLOCKS);
                } else {
                        let persistences = chanmon_cfgs[0].persister.chain_sync_monitor_persistences.lock().unwrap().clone();