From: Matt Corallo Date: Fri, 8 Oct 2021 19:07:00 +0000 (+0000) Subject: Make `ChainMonitor::monitors` private and expose monitor via getter X-Git-Tag: v0.0.102~4^2~4 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=79541b11e8b6e62de0fc613f416e30bf1de5f3d9;p=rust-lightning Make `ChainMonitor::monitors` private and expose monitor via getter Exposing a `RwLock>` directly was always a bit strange, and in upcoming changes we'd like to change the internal datastructure in `ChainMonitor`. Further, the use of `RwLock` and `HashMap` meant we weren't able to expose the ChannelMonitors themselves to users in bindings, leaving a bindings/rust API gap. Thus, we take this opportunity go expose ChannelMonitors directly via a wrapper, hiding the internals of `ChainMonitor` behind getters. We also update tests to use the new API. --- diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 8126e4d52..2de554005 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -38,7 +38,7 @@ use util::events::EventHandler; use ln::channelmanager::ChannelDetails; use prelude::*; -use sync::RwLock; +use sync::{RwLock, RwLockReadGuard}; use core::ops::Deref; /// `Persist` defines behavior for persisting channel monitors: this could mean @@ -92,6 +92,26 @@ pub trait Persist { fn update_persisted_channel(&self, id: OutPoint, update: &ChannelMonitorUpdate, data: &ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr>; } +struct MonitorHolder { + monitor: ChannelMonitor, +} + +/// A read-only reference to a current ChannelMonitor. +/// +/// Note that this holds a mutex in [`ChainMonitor`] and may block other events until it is +/// released. +pub struct LockedChannelMonitor<'a, ChannelSigner: Sign> { + lock: RwLockReadGuard<'a, HashMap>>, + funding_txo: OutPoint, +} + +impl Deref for LockedChannelMonitor<'_, ChannelSigner> { + type Target = ChannelMonitor; + fn deref(&self) -> &ChannelMonitor { + &self.lock.get(&self.funding_txo).expect("Checked at construction").monitor + } +} + /// An implementation of [`chain::Watch`] for monitoring channels. /// /// Connected and disconnected blocks must be provided to `ChainMonitor` as documented by @@ -108,8 +128,7 @@ pub struct ChainMonitor, { - /// The monitors - pub monitors: RwLock>>, + monitors: RwLock>>, chain_source: Option, broadcaster: T, logger: L, @@ -138,9 +157,9 @@ where C::Target: chain::Filter, FN: Fn(&ChannelMonitor, &TransactionData) -> Vec { let mut dependent_txdata = Vec::new(); - let monitors = self.monitors.read().unwrap(); - for monitor in monitors.values() { - let mut txn_outputs = process(monitor, txdata); + let monitor_states = self.monitors.read().unwrap(); + for monitor_state in monitor_states.values() { + let mut txn_outputs = process(&monitor_state.monitor, txdata); // Register any new outputs with the chain source for filtering, storing any dependent // transactions from within the block that previously had not been included in txdata. @@ -202,8 +221,8 @@ where C::Target: chain::Filter, /// inclusion in the return value. pub fn get_claimable_balances(&self, ignored_channels: &[&ChannelDetails]) -> Vec { let mut ret = Vec::new(); - let monitors = self.monitors.read().unwrap(); - for (_, monitor) in monitors.iter().filter(|(funding_outpoint, _)| { + let monitor_states = self.monitors.read().unwrap(); + for (_, monitor_state) in monitor_states.iter().filter(|(funding_outpoint, _)| { for chan in ignored_channels { if chan.funding_txo.as_ref() == Some(funding_outpoint) { return false; @@ -211,11 +230,38 @@ where C::Target: chain::Filter, } true }) { - ret.append(&mut monitor.get_claimable_balances()); + ret.append(&mut monitor_state.monitor.get_claimable_balances()); } ret } + /// Gets the [`LockedChannelMonitor`] for a given funding outpoint, returning an `Err` if no + /// such [`ChannelMonitor`] is currently being monitored for. + /// + /// Note that the result holds a mutex over our monitor set, and should not be held + /// indefinitely. + pub fn get_monitor(&self, funding_txo: OutPoint) -> Result, ()> { + let lock = self.monitors.read().unwrap(); + if lock.get(&funding_txo).is_some() { + Ok(LockedChannelMonitor { lock, funding_txo }) + } else { + Err(()) + } + } + + /// Lists the funding outpoint of each [`ChannelMonitor`] being monitored. + /// + /// Note that [`ChannelMonitor`]s are not removed when a channel is closed as they are always + /// monitoring for on-chain state resolutions. + pub fn list_monitors(&self) -> Vec { + self.monitors.read().unwrap().keys().map(|outpoint| *outpoint).collect() + } + + #[cfg(test)] + pub fn remove_monitor(&self, funding_txo: &OutPoint) -> ChannelMonitor { + self.monitors.write().unwrap().remove(funding_txo).unwrap().monitor + } + #[cfg(any(test, feature = "fuzztarget", feature = "_test_utils"))] pub fn get_and_clear_pending_events(&self) -> Vec { use util::events::EventsProvider; @@ -246,10 +292,10 @@ where } fn block_disconnected(&self, header: &BlockHeader, height: u32) { - let monitors = self.monitors.read().unwrap(); + let monitor_states = self.monitors.read().unwrap(); log_debug!(self.logger, "Latest block {} at height {} removed via block_disconnected", header.block_hash(), height); - for monitor in monitors.values() { - monitor.block_disconnected( + for monitor_state in monitor_states.values() { + monitor_state.monitor.block_disconnected( header, height, &*self.broadcaster, &*self.fee_estimator, &*self.logger); } } @@ -274,9 +320,9 @@ where fn transaction_unconfirmed(&self, txid: &Txid) { log_debug!(self.logger, "Transaction {} reorganized out of chain", txid); - let monitors = self.monitors.read().unwrap(); - for monitor in monitors.values() { - monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &*self.logger); + let monitor_states = self.monitors.read().unwrap(); + for monitor_state in monitor_states.values() { + monitor_state.monitor.transaction_unconfirmed(txid, &*self.broadcaster, &*self.fee_estimator, &*self.logger); } } @@ -293,9 +339,9 @@ where fn get_relevant_txids(&self) -> Vec { let mut txids = Vec::new(); - let monitors = self.monitors.read().unwrap(); - for monitor in monitors.values() { - txids.append(&mut monitor.get_relevant_txids()); + let monitor_states = self.monitors.read().unwrap(); + for monitor_state in monitor_states.values() { + txids.append(&mut monitor_state.monitor.get_relevant_txids()); } txids.sort_unstable(); @@ -338,7 +384,7 @@ where C::Target: chain::Filter, monitor.load_outputs_to_watch(chain_source); } } - entry.insert(monitor); + entry.insert(MonitorHolder { monitor }); Ok(()) } @@ -359,7 +405,8 @@ where C::Target: chain::Filter, #[cfg(not(any(test, feature = "fuzztarget")))] Err(ChannelMonitorUpdateErr::PermanentFailure) }, - Some(monitor) => { + Some(monitor_state) => { + let monitor = &monitor_state.monitor; log_trace!(self.logger, "Updating Channel Monitor for channel {}", log_funding_info!(monitor)); let update_res = monitor.update_monitor(&update, &self.broadcaster, &self.fee_estimator, &self.logger); if let Err(e) = &update_res { @@ -382,8 +429,8 @@ where C::Target: chain::Filter, fn release_pending_monitor_events(&self) -> Vec { let mut pending_monitor_events = Vec::new(); - for monitor in self.monitors.read().unwrap().values() { - pending_monitor_events.append(&mut monitor.get_and_clear_pending_monitor_events()); + for monitor_state in self.monitors.read().unwrap().values() { + pending_monitor_events.append(&mut monitor_state.monitor.get_and_clear_pending_monitor_events()); } pending_monitor_events } @@ -404,8 +451,8 @@ impl even /// [`SpendableOutputs`]: events::Event::SpendableOutputs fn process_pending_events(&self, handler: H) where H::Target: EventHandler { let mut pending_events = Vec::new(); - for monitor in self.monitors.read().unwrap().values() { - pending_events.append(&mut monitor.get_and_clear_pending_events()); + for monitor_state in self.monitors.read().unwrap().values() { + pending_events.append(&mut monitor_state.monitor.get_and_clear_pending_events()); } for event in pending_events.drain(..) { handler.handle_event(&event); diff --git a/lightning/src/ln/chanmon_update_fail_tests.rs b/lightning/src/ln/chanmon_update_fail_tests.rs index 543461d04..c8ed47831 100644 --- a/lightning/src/ln/chanmon_update_fail_tests.rs +++ b/lightning/src/ln/chanmon_update_fail_tests.rs @@ -114,8 +114,7 @@ fn test_monitor_and_persister_update_fail() { blocks: Arc::new(Mutex::new(vec![(genesis_block(Network::Testnet).header, 200); 200])), }; let chain_mon = { - let monitors = nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap(); - let monitor = monitors.get(&outpoint).unwrap(); + let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(outpoint).unwrap(); let mut w = test_utils::TestVecWriter(Vec::new()); monitor.write(&mut w).unwrap(); let new_monitor = <(BlockHash, ChannelMonitor)>::read( @@ -2256,7 +2255,7 @@ fn do_channel_holding_cell_serialize(disconnect: bool, reload_a: bool) { if reload_a { let nodes_0_serialized = nodes[0].node.encode(); let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new()); - nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter().next().unwrap().1.write(&mut chan_0_monitor_serialized).unwrap(); + get_monitor!(nodes[0], chan_id).write(&mut chan_0_monitor_serialized).unwrap(); persister = test_utils::TestPersister::new(); let keys_manager = &chanmon_cfgs[0].keys_manager; diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index 63dfae514..137214f0c 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -274,10 +274,9 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> { let feeest = test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) }; let mut deserialized_monitors = Vec::new(); { - let old_monitors = self.chain_monitor.chain_monitor.monitors.read().unwrap(); - for (_, old_monitor) in old_monitors.iter() { + for outpoint in self.chain_monitor.chain_monitor.list_monitors() { let mut w = test_utils::TestVecWriter(Vec::new()); - old_monitor.write(&mut w).unwrap(); + self.chain_monitor.chain_monitor.get_monitor(outpoint).unwrap().write(&mut w).unwrap(); let (_, deserialized_monitor) = <(BlockHash, ChannelMonitor)>::read( &mut io::Cursor::new(&w.0), self.keys_manager).unwrap(); deserialized_monitors.push(deserialized_monitor); @@ -437,20 +436,35 @@ macro_rules! get_feerate { } } -/// Returns any local commitment transactions for the channel. +/// Returns a channel monitor given a channel id, making some naive assumptions #[macro_export] -macro_rules! get_local_commitment_txn { +macro_rules! get_monitor { ($node: expr, $channel_id: expr) => { { - let monitors = $node.chain_monitor.chain_monitor.monitors.read().unwrap(); - let mut commitment_txn = None; - for (funding_txo, monitor) in monitors.iter() { - if funding_txo.to_channel_id() == $channel_id { - commitment_txn = Some(monitor.unsafe_get_latest_holder_commitment_txn(&$node.logger)); + use bitcoin::hashes::Hash; + let mut monitor = None; + // Assume funding vout is either 0 or 1 blindly + for index in 0..2 { + if let Ok(mon) = $node.chain_monitor.chain_monitor.get_monitor( + $crate::chain::transaction::OutPoint { + txid: bitcoin::Txid::from_slice(&$channel_id[..]).unwrap(), index + }) + { + monitor = Some(mon); break; } } - commitment_txn.unwrap() + monitor.unwrap() + } + } +} + +/// Returns any local commitment transactions for the channel. +#[macro_export] +macro_rules! get_local_commitment_txn { + ($node: expr, $channel_id: expr) => { + { + $crate::get_monitor!($node, $channel_id).unsafe_get_latest_holder_commitment_txn(&$node.logger) } } } diff --git a/lightning/src/ln/functional_tests.rs b/lightning/src/ln/functional_tests.rs index 9eb635345..ca4fa3b35 100644 --- a/lightning/src/ln/functional_tests.rs +++ b/lightning/src/ln/functional_tests.rs @@ -35,7 +35,7 @@ use util::errors::APIError; use util::ser::{Writeable, ReadableArgs}; use util::config::UserConfig; -use bitcoin::hash_types::{Txid, BlockHash}; +use bitcoin::hash_types::BlockHash; use bitcoin::blockdata::block::{Block, BlockHeader}; use bitcoin::blockdata::script::Builder; use bitcoin::blockdata::opcodes; @@ -2104,7 +2104,7 @@ fn channel_monitor_network_test() { // Drop the ChannelMonitor for the previous channel to avoid it broadcasting transactions and // confusing us in the following tests. - let chan_3_mon = nodes[3].chain_monitor.chain_monitor.monitors.write().unwrap().remove(&OutPoint { txid: chan_3.3.txid(), index: 0 }).unwrap(); + let chan_3_mon = nodes[3].chain_monitor.chain_monitor.remove_monitor(&OutPoint { txid: chan_3.3.txid(), index: 0 }); // One pending HTLC to time out: let payment_preimage_2 = route_payment(&nodes[3], &vec!(&nodes[4])[..], 3000000).0; @@ -2171,7 +2171,7 @@ fn channel_monitor_network_test() { assert_eq!(nodes[3].node.list_channels().len(), 0); assert_eq!(nodes[4].node.list_channels().len(), 0); - nodes[3].chain_monitor.chain_monitor.monitors.write().unwrap().insert(OutPoint { txid: chan_3.3.txid(), index: 0 }, chan_3_mon); + nodes[3].chain_monitor.chain_monitor.watch_channel(OutPoint { txid: chan_3.3.txid(), index: 0 }, chan_3_mon).unwrap(); check_closed_event!(nodes[3], 1, ClosureReason::CommitmentTxConfirmed); check_closed_event!(nodes[4], 1, ClosureReason::CommitmentTxConfirmed); } @@ -3268,8 +3268,7 @@ fn test_force_close_fail_back() { // Now check that if we add the preimage to ChannelMonitor it broadcasts our HTLC-Success.. { - let mut monitors = nodes[2].chain_monitor.chain_monitor.monitors.read().unwrap(); - monitors.get(&OutPoint{ txid: Txid::from_slice(&payment_event.commitment_msg.channel_id[..]).unwrap(), index: 0 }).unwrap() + get_monitor!(nodes[2], payment_event.commitment_msg.channel_id) .provide_payment_preimage(&our_payment_hash, &our_payment_preimage, &node_cfgs[2].tx_broadcaster, &node_cfgs[2].fee_estimator, &node_cfgs[2].logger); } mine_transaction(&nodes[2], &tx); @@ -3621,10 +3620,12 @@ fn test_funding_peer_disconnect() { confirm_transaction(&nodes[0], &tx); let events_1 = nodes[0].node.get_and_clear_pending_msg_events(); + let chan_id; assert_eq!(events_1.len(), 1); match events_1[0] { - MessageSendEvent::SendFundingLocked { ref node_id, msg: _ } => { + MessageSendEvent::SendFundingLocked { ref node_id, ref msg } => { assert_eq!(*node_id, nodes[1].node.get_our_node_id()); + chan_id = msg.channel_id; }, _ => panic!("Unexpected event"), } @@ -3696,7 +3697,7 @@ fn test_funding_peer_disconnect() { let nodes_0_serialized = nodes[0].node.encode(); let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new()); - nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter().next().unwrap().1.write(&mut chan_0_monitor_serialized).unwrap(); + get_monitor!(nodes[0], chan_id).write(&mut chan_0_monitor_serialized).unwrap(); persister = test_utils::TestPersister::new(); let keys_manager = &chanmon_cfgs[0].keys_manager; @@ -4040,7 +4041,8 @@ fn test_no_txn_manager_serialize_deserialize() { let nodes_0_serialized = nodes[0].node.encode(); let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new()); - nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter().next().unwrap().1.write(&mut chan_0_monitor_serialized).unwrap(); + get_monitor!(nodes[0], OutPoint { txid: tx.txid(), index: 0 }.to_channel_id()) + .write(&mut chan_0_monitor_serialized).unwrap(); logger = test_utils::TestLogger::new(); fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) }; @@ -4120,7 +4122,7 @@ fn test_dup_htlc_onchain_fails_on_reload() { let nodes_0_deserialized: ChannelManager; let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); - create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()); + let chan_id = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()).2; // Route a payment, but force-close the channel before the HTLC fulfill message arrives at // nodes[0]. @@ -4156,7 +4158,7 @@ fn test_dup_htlc_onchain_fails_on_reload() { // fairly normal behavior as ChannelMonitor(s) are often not re-serialized when on-chain events // happen, unlike ChannelManager which tends to be re-serialized after any relevant event(s). let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new()); - nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter().next().unwrap().1.write(&mut chan_0_monitor_serialized).unwrap(); + get_monitor!(nodes[0], chan_id).write(&mut chan_0_monitor_serialized).unwrap(); header.prev_blockhash = nodes[0].best_block_hash(); let claim_block = Block { header, txdata: claim_txn}; @@ -4243,7 +4245,8 @@ fn test_manager_serialize_deserialize_events() { added_monitors.clear(); } - node_a.node.handle_funding_signed(&node_b.node.get_our_node_id(), &get_event_msg!(node_b, MessageSendEvent::SendFundingSigned, node_a.node.get_our_node_id())); + let bs_funding_signed = get_event_msg!(node_b, MessageSendEvent::SendFundingSigned, node_a.node.get_our_node_id()); + node_a.node.handle_funding_signed(&node_b.node.get_our_node_id(), &bs_funding_signed); { let mut added_monitors = node_a.chain_monitor.added_monitors.lock().unwrap(); assert_eq!(added_monitors.len(), 1); @@ -4258,7 +4261,7 @@ fn test_manager_serialize_deserialize_events() { // Start the de/seriailization process mid-channel creation to check that the channel manager will hold onto events that are serialized let nodes_0_serialized = nodes[0].node.encode(); let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new()); - nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter().next().unwrap().1.write(&mut chan_0_monitor_serialized).unwrap(); + get_monitor!(nodes[0], bs_funding_signed.channel_id).write(&mut chan_0_monitor_serialized).unwrap(); fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) }; logger = test_utils::TestLogger::new(); @@ -4336,7 +4339,7 @@ fn test_simple_manager_serialize_deserialize() { let new_chain_monitor: test_utils::TestChainMonitor; let nodes_0_deserialized: ChannelManager; let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs); - create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()); + let chan_id = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()).2; let (our_payment_preimage, _, _) = route_payment(&nodes[0], &[&nodes[1]], 1000000); let (_, our_payment_hash, _) = route_payment(&nodes[0], &[&nodes[1]], 1000000); @@ -4345,7 +4348,7 @@ fn test_simple_manager_serialize_deserialize() { let nodes_0_serialized = nodes[0].node.encode(); let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new()); - nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter().next().unwrap().1.write(&mut chan_0_monitor_serialized).unwrap(); + get_monitor!(nodes[0], chan_id).write(&mut chan_0_monitor_serialized).unwrap(); logger = test_utils::TestLogger::new(); fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) }; @@ -4397,14 +4400,14 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() { let new_chain_monitor: test_utils::TestChainMonitor; let nodes_0_deserialized: ChannelManager; let mut nodes = create_network(4, &node_cfgs, &node_chanmgrs); - create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()); - create_announced_chan_between_nodes(&nodes, 2, 0, InitFeatures::known(), InitFeatures::known()); + let chan_id_1 = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()).2; + let chan_id_2 = create_announced_chan_between_nodes(&nodes, 2, 0, InitFeatures::known(), InitFeatures::known()).2; let (_, _, channel_id, funding_tx) = create_announced_chan_between_nodes(&nodes, 0, 3, InitFeatures::known(), InitFeatures::known()); let mut node_0_stale_monitors_serialized = Vec::new(); - for monitor in nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter() { + for chan_id_iter in &[chan_id_1, chan_id_2, channel_id] { let mut writer = test_utils::TestVecWriter(Vec::new()); - monitor.1.write(&mut writer).unwrap(); + get_monitor!(nodes[0], chan_id_iter).write(&mut writer).unwrap(); node_0_stale_monitors_serialized.push(writer.0); } @@ -4421,9 +4424,9 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() { // Now the ChannelMonitor (which is now out-of-sync with ChannelManager for channel w/ // nodes[3]) let mut node_0_monitors_serialized = Vec::new(); - for monitor in nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter() { + for chan_id_iter in &[chan_id_1, chan_id_2, channel_id] { let mut writer = test_utils::TestVecWriter(Vec::new()); - monitor.1.write(&mut writer).unwrap(); + get_monitor!(nodes[0], chan_id_iter).write(&mut writer).unwrap(); node_0_monitors_serialized.push(writer.0); } @@ -7163,7 +7166,7 @@ fn test_data_loss_protect() { // Cache node A state before any channel update let previous_node_state = nodes[0].node.encode(); let mut previous_chain_monitor_state = test_utils::TestVecWriter(Vec::new()); - nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter().next().unwrap().1.write(&mut previous_chain_monitor_state).unwrap(); + get_monitor!(nodes[0], chan.2).write(&mut previous_chain_monitor_state).unwrap(); send_payment(&nodes[0], &vec!(&nodes[1])[..], 8000000); send_payment(&nodes[0], &vec!(&nodes[1])[..], 8000000); @@ -7404,7 +7407,7 @@ fn test_priv_forwarding_rejection() { let nodes_1_deserialized: ChannelManager; let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs); - create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 500_000_000, InitFeatures::known(), InitFeatures::known()); + let chan_id_1 = create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 500_000_000, InitFeatures::known(), InitFeatures::known()).2; // Note that the create_*_chan functions in utils requires announcement_signatures, which we do // not send for private channels. @@ -7419,7 +7422,8 @@ fn test_priv_forwarding_rejection() { nodes[2].node.handle_funding_created(&nodes[1].node.get_our_node_id(), &get_event_msg!(nodes[1], MessageSendEvent::SendFundingCreated, nodes[2].node.get_our_node_id())); check_added_monitors!(nodes[2], 1); - nodes[1].node.handle_funding_signed(&nodes[2].node.get_our_node_id(), &get_event_msg!(nodes[2], MessageSendEvent::SendFundingSigned, nodes[1].node.get_our_node_id())); + let cs_funding_signed = get_event_msg!(nodes[2], MessageSendEvent::SendFundingSigned, nodes[1].node.get_our_node_id()); + nodes[1].node.handle_funding_signed(&nodes[2].node.get_our_node_id(), &cs_funding_signed); check_added_monitors!(nodes[1], 1); let conf_height = core::cmp::max(nodes[1].best_block_info().1 + 1, nodes[2].best_block_info().1 + 1); @@ -7479,12 +7483,8 @@ fn test_priv_forwarding_rejection() { let nodes_1_serialized = nodes[1].node.encode(); let mut monitor_a_serialized = test_utils::TestVecWriter(Vec::new()); let mut monitor_b_serialized = test_utils::TestVecWriter(Vec::new()); - { - let mons = nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap(); - let mut mon_iter = mons.iter(); - mon_iter.next().unwrap().1.write(&mut monitor_a_serialized).unwrap(); - mon_iter.next().unwrap().1.write(&mut monitor_b_serialized).unwrap(); - } + get_monitor!(nodes[1], chan_id_1).write(&mut monitor_a_serialized).unwrap(); + get_monitor!(nodes[1], cs_funding_signed.channel_id).write(&mut monitor_b_serialized).unwrap(); persister = test_utils::TestPersister::new(); let keys_manager = &chanmon_cfgs[1].keys_manager; @@ -8004,11 +8004,9 @@ fn test_bump_txn_sanitize_tracking_maps() { connect_block(&nodes[0], &Block { header: header_130, txdata: penalty_txn }); connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1); { - let monitors = nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap(); - if let Some(monitor) = monitors.get(&OutPoint { txid: chan.3.txid(), index: 0 }) { - assert!(monitor.inner.lock().unwrap().onchain_tx_handler.pending_claim_requests.is_empty()); - assert!(monitor.inner.lock().unwrap().onchain_tx_handler.claimable_outpoints.is_empty()); - } + let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(OutPoint { txid: chan.3.txid(), index: 0 }).unwrap(); + assert!(monitor.inner.lock().unwrap().onchain_tx_handler.pending_claim_requests.is_empty()); + assert!(monitor.inner.lock().unwrap().onchain_tx_handler.claimable_outpoints.is_empty()); } } @@ -8273,8 +8271,7 @@ fn test_update_err_monitor_lockdown() { let logger = test_utils::TestLogger::with_id(format!("node {}", 0)); let persister = test_utils::TestPersister::new(); let watchtower = { - let monitors = nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap(); - let monitor = monitors.get(&outpoint).unwrap(); + let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(outpoint).unwrap(); let mut w = test_utils::TestVecWriter(Vec::new()); monitor.write(&mut w).unwrap(); let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor)>::read( @@ -8335,8 +8332,7 @@ fn test_concurrent_monitor_claim() { let logger = test_utils::TestLogger::with_id(format!("node {}", "Alice")); let persister = test_utils::TestPersister::new(); let watchtower_alice = { - let monitors = nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap(); - let monitor = monitors.get(&outpoint).unwrap(); + let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(outpoint).unwrap(); let mut w = test_utils::TestVecWriter(Vec::new()); monitor.write(&mut w).unwrap(); let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor)>::read( @@ -8364,8 +8360,7 @@ fn test_concurrent_monitor_claim() { let logger = test_utils::TestLogger::with_id(format!("node {}", "Bob")); let persister = test_utils::TestPersister::new(); let watchtower_bob = { - let monitors = nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap(); - let monitor = monitors.get(&outpoint).unwrap(); + let monitor = nodes[0].chain_monitor.chain_monitor.get_monitor(outpoint).unwrap(); let mut w = test_utils::TestVecWriter(Vec::new()); monitor.write(&mut w).unwrap(); let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor)>::read( @@ -9050,8 +9045,8 @@ fn test_forwardable_regen() { let new_chain_monitor: test_utils::TestChainMonitor; let nodes_1_deserialized: ChannelManager; let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs); - create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()); - create_announced_chan_between_nodes(&nodes, 1, 2, InitFeatures::known(), InitFeatures::known()); + let chan_id_1 = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()).2; + let chan_id_2 = create_announced_chan_between_nodes(&nodes, 1, 2, InitFeatures::known(), InitFeatures::known()).2; // First send a payment to nodes[1] let (route, payment_hash, payment_preimage, payment_secret) = get_route_and_payment_hash!(nodes[0], nodes[1], 100_000); @@ -9088,12 +9083,8 @@ fn test_forwardable_regen() { let nodes_1_serialized = nodes[1].node.encode(); let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new()); let mut chan_1_monitor_serialized = test_utils::TestVecWriter(Vec::new()); - { - let monitors = nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap(); - let mut monitor_iter = monitors.iter(); - monitor_iter.next().unwrap().1.write(&mut chan_0_monitor_serialized).unwrap(); - monitor_iter.next().unwrap().1.write(&mut chan_1_monitor_serialized).unwrap(); - } + get_monitor!(nodes[1], chan_id_1).write(&mut chan_0_monitor_serialized).unwrap(); + get_monitor!(nodes[1], chan_id_2).write(&mut chan_1_monitor_serialized).unwrap(); persister = test_utils::TestPersister::new(); let keys_manager = &chanmon_cfgs[1].keys_manager; diff --git a/lightning/src/ln/monitor_tests.rs b/lightning/src/ln/monitor_tests.rs index d80afa389..0e4733ea5 100644 --- a/lightning/src/ln/monitor_tests.rs +++ b/lightning/src/ln/monitor_tests.rs @@ -107,9 +107,9 @@ fn chanmon_claim_value_coop_close() { assert_eq!(vec![Balance::ClaimableOnChannelClose { claimable_amount_satoshis: 1_000_000 - 1_000 - chan_feerate * channel::COMMITMENT_TX_BASE_WEIGHT / 1000 }], - nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances()); + nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()); assert_eq!(vec![Balance::ClaimableOnChannelClose { claimable_amount_satoshis: 1_000, }], - nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances()); + nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()); nodes[0].node.close_channel(&chan_id).unwrap(); let node_0_shutdown = get_event_msg!(nodes[0], MessageSendEvent::SendShutdown, nodes[1].node.get_our_node_id()); @@ -143,20 +143,20 @@ fn chanmon_claim_value_coop_close() { claimable_amount_satoshis: 1_000_000 - 1_000 - chan_feerate * channel::COMMITMENT_TX_BASE_WEIGHT / 1000, confirmation_height: nodes[0].best_block_info().1 + ANTI_REORG_DELAY - 1, }], - nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances()); + nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()); assert_eq!(vec![Balance::ClaimableAwaitingConfirmations { claimable_amount_satoshis: 1000, confirmation_height: nodes[1].best_block_info().1 + ANTI_REORG_DELAY - 1, }], - nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances()); + nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()); connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1); connect_blocks(&nodes[1], ANTI_REORG_DELAY - 1); assert_eq!(Vec::::new(), - nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances()); + nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()); assert_eq!(Vec::::new(), - nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances()); + nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()); let mut node_a_spendable = nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events(); assert_eq!(node_a_spendable.len(), 1); @@ -230,11 +230,11 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) { claimable_amount_satoshis: 4_000, claimable_height: htlc_cltv_timeout, }]), - sorted_vec(nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances())); + sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances())); assert_eq!(vec![Balance::ClaimableOnChannelClose { claimable_amount_satoshis: 1_000, }], - nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances()); + nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()); nodes[1].node.claim_funds(payment_preimage); check_added_monitors!(nodes[1], 1); @@ -284,11 +284,11 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) { }); } assert_eq!(sorted_vec(a_expected_balances), - sorted_vec(nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances())); + sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances())); assert_eq!(vec![Balance::ClaimableOnChannelClose { claimable_amount_satoshis: 1_000 + 3_000 + 4_000, }], - nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances()); + nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()); // Broadcast the closing transaction (which has both pending HTLCs in it) and get B's // broadcasted HTLC claim transaction with preimage. @@ -342,7 +342,7 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) { claimable_amount_satoshis: 4_000, claimable_height: htlc_cltv_timeout, }]), - sorted_vec(nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances())); + sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances())); // The main non-HTLC balance is just awaiting confirmations, but the claimable height is the // CSV delay, not ANTI_REORG_DELAY. assert_eq!(sorted_vec(vec![Balance::ClaimableAwaitingConfirmations { @@ -358,7 +358,7 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) { claimable_amount_satoshis: 4_000, timeout_height: htlc_cltv_timeout, }]), - sorted_vec(nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances())); + sorted_vec(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances())); connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1); expect_payment_failed!(nodes[0], dust_payment_hash, true); @@ -373,7 +373,7 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) { claimable_amount_satoshis: 4_000, claimable_height: htlc_cltv_timeout, }]), - sorted_vec(nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances())); + sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances())); assert_eq!(sorted_vec(vec![Balance::ClaimableAwaitingConfirmations { claimable_amount_satoshis: 1_000, confirmation_height: node_b_commitment_claimable, @@ -384,7 +384,7 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) { claimable_amount_satoshis: 4_000, timeout_height: htlc_cltv_timeout, }]), - sorted_vec(nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances())); + sorted_vec(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances())); let mut node_a_spendable = nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events(); assert_eq!(node_a_spendable.len(), 1); @@ -410,13 +410,13 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) { claimable_amount_satoshis: 4_000, claimable_height: htlc_cltv_timeout, }]), - sorted_vec(nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances())); + sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances())); connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1); assert_eq!(vec![Balance::MaybeClaimableHTLCAwaitingTimeout { claimable_amount_satoshis: 4_000, claimable_height: htlc_cltv_timeout, }], - nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances()); + nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()); // When the HTLC timeout output is spendable in the next block, A should broadcast it connect_blocks(&nodes[0], htlc_cltv_timeout - nodes[0].best_block_info().1 - 1); @@ -441,12 +441,12 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) { claimable_amount_satoshis: 4_000, confirmation_height: nodes[0].best_block_info().1 + ANTI_REORG_DELAY - 1, }], - nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances()); + nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()); // After ANTI_REORG_DELAY, A will generate a SpendableOutputs event and drop the claimable // balance entry. connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1); assert_eq!(Vec::::new(), - nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances()); + nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()); expect_payment_failed!(nodes[0], timeout_payment_hash, true); let mut node_a_spendable = nodes[0].chain_monitor.chain_monitor.get_and_clear_pending_events(); @@ -474,7 +474,7 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) { claimable_amount_satoshis: 4_000, timeout_height: htlc_cltv_timeout, }]), - sorted_vec(nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances())); + sorted_vec(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances())); // After reaching the commitment output CSV, we'll get a SpendableOutputs event for it and have // only the HTLCs claimable on node B. @@ -496,7 +496,7 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) { claimable_amount_satoshis: 4_000, timeout_height: htlc_cltv_timeout, }]), - sorted_vec(nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances())); + sorted_vec(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances())); // After reaching the claimed HTLC output CSV, we'll get a SpendableOutptus event for it and // have only one HTLC output left spendable. @@ -515,7 +515,7 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) { claimable_amount_satoshis: 4_000, timeout_height: htlc_cltv_timeout, }], - nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances()); + nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()); // Finally, mine the HTLC timeout transaction that A broadcasted (even though B should be able // to claim this HTLC with the preimage it knows!). It will remain listed as a claimable HTLC @@ -525,10 +525,10 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) { claimable_amount_satoshis: 4_000, timeout_height: htlc_cltv_timeout, }], - nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances()); + nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()); connect_blocks(&nodes[1], ANTI_REORG_DELAY - 1); assert_eq!(Vec::::new(), - nodes[1].chain_monitor.chain_monitor.monitors.read().unwrap().get(&funding_outpoint).unwrap().get_claimable_balances()); + nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()); } #[test] diff --git a/lightning/src/ln/reorg_tests.rs b/lightning/src/ln/reorg_tests.rs index 0409396db..a9b341b34 100644 --- a/lightning/src/ln/reorg_tests.rs +++ b/lightning/src/ln/reorg_tests.rs @@ -238,7 +238,7 @@ fn do_test_unconf_chan(reload_node: bool, reorg_after_reload: bool, use_funding_ // it when we go to deserialize, and then use the ChannelManager. let nodes_0_serialized = nodes[0].node.encode(); let mut chan_0_monitor_serialized = test_utils::TestVecWriter(Vec::new()); - nodes[0].chain_monitor.chain_monitor.monitors.read().unwrap().iter().next().unwrap().1.write(&mut chan_0_monitor_serialized).unwrap(); + get_monitor!(nodes[0], chan.2).write(&mut chan_0_monitor_serialized).unwrap(); persister = test_utils::TestPersister::new(); let keys_manager = &chanmon_cfgs[0].keys_manager; diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 942342042..836e638fd 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -156,8 +156,7 @@ impl<'a> chain::Watch for TestChainMonitor<'a> { let update_res = self.chain_monitor.update_channel(funding_txo, update); // At every point where we get a monitor update, we should be able to send a useful monitor // to a watchtower and disk... - let monitors = self.chain_monitor.monitors.read().unwrap(); - let monitor = monitors.get(&funding_txo).unwrap(); + let monitor = self.chain_monitor.get_monitor(funding_txo).unwrap(); w.0.clear(); monitor.write(&mut w).unwrap(); let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor)>::read(