Merge pull request #2528 from arik-so/arik/2023-08-2470-shorter-term-monitor-locks
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Mon, 28 Aug 2023 17:07:03 +0000 (17:07 +0000)
committerGitHub <noreply@github.com>
Mon, 28 Aug 2023 17:07:03 +0000 (17:07 +0000)
Release monitor write lock in between update iterations

lightning/src/chain/chainmonitor.rs
lightning/src/ln/monitor_tests.rs

index 7d698a220676b8dfd7c9a0df41b0b09f0799e2e9..6051f00b90a8327026ff89b2a6a758fd5c6fadc4 100644 (file)
@@ -42,6 +42,7 @@ use crate::ln::channelmanager::ChannelDetails;
 
 use crate::prelude::*;
 use crate::sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard};
+use core::iter::FromIterator;
 use core::ops::Deref;
 use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
 use bitcoin::secp256k1::PublicKey;
@@ -285,7 +286,22 @@ where C::Target: chain::Filter,
        where
                FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs>
        {
+               let funding_outpoints: HashSet<OutPoint> = HashSet::from_iter(self.monitors.read().unwrap().keys().cloned());
+               for funding_outpoint in funding_outpoints.iter() {
+                       let monitor_lock = self.monitors.read().unwrap();
+                       if let Some(monitor_state) = monitor_lock.get(funding_outpoint) {
+                               self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state);
+                       }
+               }
+
+               // do some followup cleanup if any funding outpoints were added in between iterations
                let monitor_states = self.monitors.write().unwrap();
+               for (funding_outpoint, monitor_state) in monitor_states.iter() {
+                       if !funding_outpoints.contains(funding_outpoint) {
+                               self.update_monitor_with_chain_data(header, best_height, txdata, &process, funding_outpoint, &monitor_state);
+                       }
+               }
+
                if let Some(height) = best_height {
                        // If the best block height is being updated, update highest_chain_height under the
                        // monitors write lock.
@@ -295,55 +311,55 @@ where C::Target: chain::Filter,
                                self.highest_chain_height.store(new_height, Ordering::Release);
                        }
                }
+       }
 
-               for (funding_outpoint, monitor_state) in monitor_states.iter() {
-                       let monitor = &monitor_state.monitor;
-                       let mut txn_outputs;
-                       {
-                               txn_outputs = process(monitor, txdata);
-                               let update_id = MonitorUpdateId {
-                                       contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()),
-                               };
-                               let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
-                               if let Some(height) = best_height {
-                                       if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) {
-                                               // If there are not ChainSync persists awaiting completion, go ahead and
-                                               // set last_chain_persist_height here - we wouldn't want the first
-                                               // InProgress to always immediately be considered "overly delayed".
-                                               monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release);
-                                       }
+       fn update_monitor_with_chain_data<FN>(&self, header: &BlockHeader, best_height: Option<u32>, txdata: &TransactionData, process: FN, funding_outpoint: &OutPoint, monitor_state: &MonitorHolder<ChannelSigner>) where FN: Fn(&ChannelMonitor<ChannelSigner>, &TransactionData) -> Vec<TransactionOutputs> {
+               let monitor = &monitor_state.monitor;
+               let mut txn_outputs;
+               {
+                       txn_outputs = process(monitor, txdata);
+                       let update_id = MonitorUpdateId {
+                               contents: UpdateOrigin::ChainSync(self.sync_persistence_id.get_increment()),
+                       };
+                       let mut pending_monitor_updates = monitor_state.pending_monitor_updates.lock().unwrap();
+                       if let Some(height) = best_height {
+                               if !monitor_state.has_pending_chainsync_updates(&pending_monitor_updates) {
+                                       // If there are not ChainSync persists awaiting completion, go ahead and
+                                       // set last_chain_persist_height here - we wouldn't want the first
+                                       // InProgress to always immediately be considered "overly delayed".
+                                       monitor_state.last_chain_persist_height.store(height as usize, Ordering::Release);
                                }
+                       }
 
-                               log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
-                               match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) {
-                                       ChannelMonitorUpdateStatus::Completed =>
-                                               log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
-                                       ChannelMonitorUpdateStatus::PermanentFailure => {
-                                               monitor_state.channel_perm_failed.store(true, Ordering::Release);
-                                               self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id()));
-                                               self.event_notifier.notify();
-                                       },
-                                       ChannelMonitorUpdateStatus::InProgress => {
-                                               log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
-                                               pending_monitor_updates.push(update_id);
-                                       },
+                       log_trace!(self.logger, "Syncing Channel Monitor for channel {}", log_funding_info!(monitor));
+                       match self.persister.update_persisted_channel(*funding_outpoint, None, monitor, update_id) {
+                               ChannelMonitorUpdateStatus::Completed =>
+                                       log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
+                               ChannelMonitorUpdateStatus::PermanentFailure => {
+                                       monitor_state.channel_perm_failed.store(true, Ordering::Release);
+                                       self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id()));
+                                       self.event_notifier.notify();
+                               }
+                               ChannelMonitorUpdateStatus::InProgress => {
+                                       log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
+                                       pending_monitor_updates.push(update_id);
                                }
                        }
+               }
 
-                       // Register any new outputs with the chain source for filtering, storing any dependent
-                       // transactions from within the block that previously had not been included in txdata.
-                       if let Some(ref chain_source) = self.chain_source {
-                               let block_hash = header.block_hash();
-                               for (txid, mut outputs) in txn_outputs.drain(..) {
-                                       for (idx, output) in outputs.drain(..) {
-                                               // Register any new outputs with the chain source for filtering
-                                               let output = WatchedOutput {
-                                                       block_hash: Some(block_hash),
-                                                       outpoint: OutPoint { txid, index: idx as u16 },
-                                                       script_pubkey: output.script_pubkey,
-                                               };
-                                               chain_source.register_output(output)
-                                       }
+               // Register any new outputs with the chain source for filtering, storing any dependent
+               // transactions from within the block that previously had not been included in txdata.
+               if let Some(ref chain_source) = self.chain_source {
+                       let block_hash = header.block_hash();
+                       for (txid, mut outputs) in txn_outputs.drain(..) {
+                               for (idx, output) in outputs.drain(..) {
+                                       // Register any new outputs with the chain source for filtering
+                                       let output = WatchedOutput {
+                                               block_hash: Some(block_hash),
+                                               outpoint: OutPoint { txid, index: idx as u16 },
+                                               script_pubkey: output.script_pubkey,
+                                       };
+                                       chain_source.register_output(output)
                                }
                        }
                }
@@ -976,7 +992,7 @@ mod tests {
                        assert!(err.contains("ChannelMonitor storage failure")));
                check_added_monitors!(nodes[0], 2); // After the failure we generate a close-channel monitor update
                check_closed_broadcast!(nodes[0], true);
-               check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() }, 
+               check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: "ChannelMonitor storage failure".to_string() },
                        [nodes[1].node.get_our_node_id()], 100000);
 
                // However, as the ChainMonitor is still waiting for the original persistence to complete,
index 728752f97c310bc478abc213d165fc347a2db407..58878a9998c8a3b5aa2d87b8e14bc294c4574335 100644 (file)
@@ -2191,7 +2191,7 @@ fn test_anchors_aggregated_revoked_htlc_tx() {
 
        // Alice should see that Bob is trying to claim to HTLCs, so she should now try to claim them at
        // the second level instead.
-       let revoked_claims = {
+       let revoked_claim_transactions = {
                let txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
                assert_eq!(txn.len(), 2);
 
@@ -2205,10 +2205,14 @@ fn test_anchors_aggregated_revoked_htlc_tx() {
                        check_spends!(revoked_htlc_claim, htlc_tx);
                }
 
-               txn
+               let mut revoked_claim_transaction_map = HashMap::new();
+               for current_tx in txn.into_iter() {
+                       revoked_claim_transaction_map.insert(current_tx.txid(), current_tx);
+               }
+               revoked_claim_transaction_map
        };
        for node in &nodes {
-               mine_transactions(node, &revoked_claims.iter().collect::<Vec<_>>());
+               mine_transactions(node, &revoked_claim_transactions.values().collect::<Vec<_>>());
        }
 
 
@@ -2234,7 +2238,8 @@ fn test_anchors_aggregated_revoked_htlc_tx() {
                        let spend_tx = nodes[0].keys_manager.backing.spend_spendable_outputs(
                                &[&outputs[0]], Vec::new(), Script::new_op_return(&[]), 253, None, &Secp256k1::new(),
                        ).unwrap();
-                       check_spends!(spend_tx, revoked_claims[idx]);
+
+                       check_spends!(spend_tx, revoked_claim_transactions.get(&spend_tx.input[0].previous_output.txid).unwrap());
                } else {
                        panic!("unexpected event");
                }