From 389c4ad6fa21caf62c6616f879536c7b693a9d1f Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Sun, 28 Feb 2021 22:26:41 -0800 Subject: [PATCH] Change Mutex to RwLock in ChainMonitor Now that ChannelMonitor uses an internal Mutex to support interior mutability, ChainMonitor can use a RwLock to manage its ChannelMonitor map. This allows parallelization of update_channel operations since an exclusive lock only needs to be held when adding to the map in watch_channel. --- lightning/src/chain/chainmonitor.rs | 24 +++++++++---------- lightning/src/ln/chanmon_update_fail_tests.rs | 2 +- lightning/src/ln/functional_test_utils.rs | 6 ++--- lightning/src/ln/functional_tests.rs | 24 +++++++++---------- lightning/src/util/test_utils.rs | 2 +- 5 files changed, 29 insertions(+), 29 deletions(-) diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index e9cf5e8d8..de826d054 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -43,7 +43,7 @@ use util::events; use util::events::Event; use std::collections::{HashMap, hash_map}; -use std::sync::Mutex; +use std::sync::RwLock; use std::ops::Deref; /// An implementation of [`chain::Watch`] for monitoring channels. @@ -64,7 +64,7 @@ pub struct ChainMonitor, { /// The monitors - pub monitors: Mutex>>, + pub monitors: RwLock>>, chain_source: Option, broadcaster: T, logger: L, @@ -93,8 +93,8 @@ where C::Target: chain::Filter, /// [`chain::Watch::release_pending_monitor_events`]: ../trait.Watch.html#tymethod.release_pending_monitor_events /// [`chain::Filter`]: ../trait.Filter.html pub fn block_connected(&self, header: &BlockHeader, txdata: &TransactionData, height: u32) { - let mut monitors = self.monitors.lock().unwrap(); - for monitor in monitors.values_mut() { + let monitors = self.monitors.read().unwrap(); + for monitor in monitors.values() { let mut txn_outputs = monitor.block_connected(header, txdata, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger); if let Some(ref chain_source) = self.chain_source { @@ -113,8 +113,8 @@ where C::Target: chain::Filter, /// /// [`ChannelMonitor::block_disconnected`]: ../channelmonitor/struct.ChannelMonitor.html#method.block_disconnected pub fn block_disconnected(&self, header: &BlockHeader, disconnected_height: u32) { - let mut monitors = self.monitors.lock().unwrap(); - for monitor in monitors.values_mut() { + let monitors = self.monitors.read().unwrap(); + for monitor in monitors.values() { monitor.block_disconnected(header, disconnected_height, &*self.broadcaster, &*self.fee_estimator, &*self.logger); } } @@ -130,7 +130,7 @@ where C::Target: chain::Filter, /// [`chain::Filter`]: ../trait.Filter.html pub fn new(chain_source: Option, broadcaster: T, logger: L, feeest: F, persister: P) -> Self { Self { - monitors: Mutex::new(HashMap::new()), + monitors: RwLock::new(HashMap::new()), chain_source, broadcaster, logger, @@ -177,7 +177,7 @@ where C::Target: chain::Filter, /// /// [`chain::Filter`]: ../trait.Filter.html fn watch_channel(&self, funding_outpoint: OutPoint, monitor: ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr> { - let mut monitors = self.monitors.lock().unwrap(); + let mut monitors = self.monitors.write().unwrap(); let entry = match monitors.entry(funding_outpoint) { hash_map::Entry::Occupied(_) => { log_error!(self.logger, "Failed to add new channel data: channel monitor for given outpoint is already present"); @@ -209,8 +209,8 @@ where C::Target: chain::Filter, /// `ChainMonitor` monitors lock. fn update_channel(&self, funding_txo: OutPoint, update: ChannelMonitorUpdate) -> Result<(), ChannelMonitorUpdateErr> { // Update the monitor that watches the channel referred to by the given outpoint. - let mut monitors = self.monitors.lock().unwrap(); - match monitors.get_mut(&funding_txo) { + let monitors = self.monitors.read().unwrap(); + match monitors.get(&funding_txo) { None => { log_error!(self.logger, "Failed to update channel monitor: no such monitor registered"); @@ -245,7 +245,7 @@ where C::Target: chain::Filter, fn release_pending_monitor_events(&self) -> Vec { let mut pending_monitor_events = Vec::new(); - for monitor in self.monitors.lock().unwrap().values_mut() { + for monitor in self.monitors.read().unwrap().values() { pending_monitor_events.append(&mut monitor.get_and_clear_pending_monitor_events()); } pending_monitor_events @@ -261,7 +261,7 @@ impl even { fn get_and_clear_pending_events(&self) -> Vec { let mut pending_events = Vec::new(); - for monitor in self.monitors.lock().unwrap().values_mut() { + for monitor in self.monitors.read().unwrap().values() { pending_events.append(&mut monitor.get_and_clear_pending_events()); } pending_events diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 8fc773f68..d764bc782 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -102,7 +102,7 @@ fn test_monitor_and_persister_update_fail() { let logger = test_utils::TestLogger::with_id(format!("node {}", 0)); let persister = test_utils::TestPersister::new(); let chain_mon = { - let monitors = nodes[0].chain_monitor.chain_monitor.monitors.lock().unwrap(); + let monitors = nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap(); let monitor = monitors.get(&outpoint).unwrap(); let mut w = test_utils::TestVecWriter(Vec::new()); monitor.write(&mut w).unwrap(); diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index 219417d56..f0937817b 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -168,7 +168,7 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> { let feeest = test_utils::TestFeeEstimator { sat_per_kw: 253 }; let mut deserialized_monitors = Vec::new(); { - let old_monitors = self.chain_monitor.chain_monitor.monitors.lock().unwrap(); + let old_monitors = self.chain_monitor.chain_monitor.monitors.read().unwrap(); for (_, old_monitor) in old_monitors.iter() { let mut w = test_utils::TestVecWriter(Vec::new()); old_monitor.write(&mut w).unwrap(); @@ -305,9 +305,9 @@ macro_rules! get_feerate { macro_rules! get_local_commitment_txn { ($node: expr, $channel_id: expr) => { { - let mut monitors = $node.chain_monitor.chain_monitor.monitors.lock().unwrap(); + let monitors = $node.chain_monitor.chain_monitor.monitors.read().unwrap(); let mut commitment_txn = None; - for (funding_txo, monitor) in monitors.iter_mut() { + for (funding_txo, monitor) in monitors.iter() { if funding_txo.to_channel_id() == $channel_id { commitment_txn = Some(monitor.unsafe_get_latest_holder_commitment_txn(&$node.logger)); break; diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index 04afca261..fdf5d2006 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -3517,8 +3517,8 @@ fn test_force_close_fail_back() { // Now check that if we add the preimage to ChannelMonitor it broadcasts our HTLC-Success.. { - let mut monitors = nodes[2].chain_monitor.chain_monitor.monitors.lock().unwrap(); - monitors.get_mut(&OutPoint{ txid: Txid::from_slice(&payment_event.commitment_msg.channel_id[..]).unwrap(), index: 0 }).unwrap() + let mut monitors = nodes[2].chain_monitor.chain_monitor.monitors.read().unwrap(); + monitors.get(&OutPoint{ txid: Txid::from_slice(&payment_event.commitment_msg.channel_id[..]).unwrap(), index: 0 }).unwrap() .provide_payment_preimage(&our_payment_hash, &our_payment_preimage, &node_cfgs[2].tx_broadcaster, &node_cfgs[2].fee_estimator, &&logger); } connect_block(&nodes[2], &block, 1); @@ -4314,7 +4314,7 @@ fn test_no_txn_manager_serialize_deserialize() { let nodes_0_serialized = nodes[0].node.encode(); let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new()); - nodes[0].chain_monitor.chain_monitor.monitors.lock().unwrap().iter().next().unwrap().1.write(&mut chan_0_monitor_serialized).unwrap(); + nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter().next().unwrap().1.write(&mut chan_0_monitor_serialized).unwrap(); logger = test_utils::TestLogger::new(); fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 }; @@ -4423,7 +4423,7 @@ fn test_manager_serialize_deserialize_events() { // Start the de/seriailization process mid-channel creation to check that the channel manager will hold onto events that are serialized let nodes_0_serialized = nodes[0].node.encode(); let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new()); - nodes[0].chain_monitor.chain_monitor.monitors.lock().unwrap().iter().next().unwrap().1.write(&mut chan_0_monitor_serialized).unwrap(); + nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter().next().unwrap().1.write(&mut chan_0_monitor_serialized).unwrap(); fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 }; logger = test_utils::TestLogger::new(); @@ -4515,7 +4515,7 @@ fn test_simple_manager_serialize_deserialize() { let nodes_0_serialized = nodes[0].node.encode(); let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new()); - nodes[0].chain_monitor.chain_monitor.monitors.lock().unwrap().iter().next().unwrap().1.write(&mut chan_0_monitor_serialized).unwrap(); + nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter().next().unwrap().1.write(&mut chan_0_monitor_serialized).unwrap(); logger = test_utils::TestLogger::new(); fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 }; @@ -4572,7 +4572,7 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() { let (_, _, channel_id, funding_tx) = create_announced_chan_between_nodes(&nodes, 0, 3, InitFeatures::known(), InitFeatures::known()); let mut node_0_stale_monitors_serialized = Vec::new(); - for monitor in nodes[0].chain_monitor.chain_monitor.monitors.lock().unwrap().iter() { + for monitor in nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter() { let mut writer = test_utils::TestVecWriter(Vec::new()); monitor.1.write(&mut writer).unwrap(); node_0_stale_monitors_serialized.push(writer.0); @@ -4591,7 +4591,7 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() { // Now the ChannelMonitor (which is now out-of-sync with ChannelManager for channel w/ // nodes[3]) let mut node_0_monitors_serialized = Vec::new(); - for monitor in nodes[0].chain_monitor.chain_monitor.monitors.lock().unwrap().iter() { + for monitor in nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter() { let mut writer = test_utils::TestVecWriter(Vec::new()); monitor.1.write(&mut writer).unwrap(); node_0_monitors_serialized.push(writer.0); @@ -7479,7 +7479,7 @@ fn test_data_loss_protect() { // Cache node A state before any channel update let previous_node_state = nodes[0].node.encode(); let mut previous_chain_monitor_state = test_utils::TestVecWriter(Vec::new()); - nodes[0].chain_monitor.chain_monitor.monitors.lock().unwrap().iter().next().unwrap().1.write(&mut previous_chain_monitor_state).unwrap(); + nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter().next().unwrap().1.write(&mut previous_chain_monitor_state).unwrap(); send_payment(&nodes[0], &vec!(&nodes[1])[..], 8000000, 8_000_000); send_payment(&nodes[0], &vec!(&nodes[1])[..], 8000000, 8_000_000); @@ -8226,7 +8226,7 @@ fn test_bump_txn_sanitize_tracking_maps() { connect_block(&nodes[0], &Block { header: header_130, txdata: penalty_txn }, 130); connect_blocks(&nodes[0], 5, 130, false, header_130.block_hash()); { - let monitors = nodes[0].chain_monitor.chain_monitor.monitors.lock().unwrap(); + let monitors = nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap(); if let Some(monitor) = monitors.get(&OutPoint { txid: chan.3.txid(), index: 0 }) { assert!(monitor.inner.lock().unwrap().onchain_tx_handler.pending_claim_requests.is_empty()); assert!(monitor.inner.lock().unwrap().onchain_tx_handler.claimable_outpoints.is_empty()); @@ -8360,7 +8360,7 @@ fn test_update_err_monitor_lockdown() { let logger = test_utils::TestLogger::with_id(format!("node {}", 0)); let persister = test_utils::TestPersister::new(); let watchtower = { - let monitors = nodes[0].chain_monitor.chain_monitor.monitors.lock().unwrap(); + let monitors = nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap(); let monitor = monitors.get(&outpoint).unwrap(); let mut w = test_utils::TestVecWriter(Vec::new()); monitor.write(&mut w).unwrap(); @@ -8419,7 +8419,7 @@ fn test_concurrent_monitor_claim() { let logger = test_utils::TestLogger::with_id(format!("node {}", "Alice")); let persister = test_utils::TestPersister::new(); let watchtower_alice = { - let monitors = nodes[0].chain_monitor.chain_monitor.monitors.lock().unwrap(); + let monitors = nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap(); let monitor = monitors.get(&outpoint).unwrap(); let mut w = test_utils::TestVecWriter(Vec::new()); monitor.write(&mut w).unwrap(); @@ -8445,7 +8445,7 @@ fn test_concurrent_monitor_claim() { let logger = test_utils::TestLogger::with_id(format!("node {}", "Bob")); let persister = test_utils::TestPersister::new(); let watchtower_bob = { - let monitors = nodes[0].chain_monitor.chain_monitor.monitors.lock().unwrap(); + let monitors = nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap(); let monitor = monitors.get(&outpoint).unwrap(); let mut w = test_utils::TestVecWriter(Vec::new()); monitor.write(&mut w).unwrap(); diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index c2d22a4a0..9f87795e3 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -133,7 +133,7 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { let update_res = self.chain_monitor.update_channel(funding_txo, update); // At every point where we get a monitor update, we should be able to send a useful monitor // to a watchtower and disk... - let monitors = self.chain_monitor.monitors.lock().unwrap(); + let monitors = self.chain_monitor.monitors.read().unwrap(); let monitor = monitors.get(&funding_txo).unwrap(); w.0.clear(); monitor.write(&mut w).unwrap(); -- 2.39.5