Merge pull request #1519 from tnull/2022-06-require-htlc-max
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Mon, 25 Jul 2022 21:04:54 +0000 (21:04 +0000)
committerGitHub <noreply@github.com>
Mon, 25 Jul 2022 21:04:54 +0000 (21:04 +0000)
Make `htlc_maximum_msat` a required field.

lightning-block-sync/src/poll.rs
lightning-net-tokio/src/lib.rs
lightning/src/chain/chaininterface.rs
lightning/src/chain/channelmonitor.rs
lightning/src/debug_sync.rs
lightning/src/ln/functional_test_utils.rs
lightning/src/ln/functional_tests.rs
lightning/src/ln/peer_handler.rs
lightning/src/ln/reorg_tests.rs

index 6e30d2e86d20d268529bd37d942492db810d84b6..4c6cb0c0600725e9cf3552045865e8b5be472199 100644 (file)
@@ -59,12 +59,11 @@ impl Validate for BlockHeaderData {
        type T = ValidatedBlockHeader;
 
        fn validate(self, block_hash: BlockHash) -> BlockSourceResult<Self::T> {
-               self.header
+               let pow_valid_block_hash = self.header
                        .validate_pow(&self.header.target())
                        .or_else(|e| Err(BlockSourceError::persistent(e)))?;
 
-               // TODO: Use the result of validate_pow instead of recomputing the block hash once upstream.
-               if self.header.block_hash() != block_hash {
+               if pow_valid_block_hash != block_hash {
                        return Err(BlockSourceError::persistent("invalid block hash"));
                }
 
@@ -76,12 +75,11 @@ impl Validate for Block {
        type T = ValidatedBlock;
 
        fn validate(self, block_hash: BlockHash) -> BlockSourceResult<Self::T> {
-               self.header
+               let pow_valid_block_hash = self.header
                        .validate_pow(&self.header.target())
                        .or_else(|e| Err(BlockSourceError::persistent(e)))?;
 
-               // TODO: Use the result of validate_pow instead of recomputing the block hash once upstream.
-               if self.block_hash() != block_hash {
+               if pow_valid_block_hash != block_hash {
                        return Err(BlockSourceError::persistent("invalid block hash"));
                }
 
index 2ac10762b04eaf23f602bc3d8dcc987438542cae..645a7434e4536534138c5dd6150fe52bad55ff0c 100644 (file)
@@ -84,6 +84,7 @@ use lightning::ln::peer_handler::CustomMessageHandler;
 use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler, NetAddress};
 use lightning::util::logger::Logger;
 
+use std::ops::Deref;
 use std::task;
 use std::net::SocketAddr;
 use std::net::TcpStream as StdTcpStream;
@@ -120,11 +121,16 @@ struct Connection {
        id: u64,
 }
 impl Connection {
-       async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, mut event_receiver: mpsc::Receiver<()>) where
-                       CMH: ChannelMessageHandler + 'static + Send + Sync,
-                       RMH: RoutingMessageHandler + 'static + Send + Sync,
-                       L: Logger + 'static + ?Sized + Send + Sync,
-                       UMH: CustomMessageHandler + 'static + Send + Sync {
+       async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, mut event_receiver: mpsc::Receiver<()>) where
+                       CMH: Deref + 'static + Send + Sync,
+                       RMH: Deref + 'static + Send + Sync,
+                       L: Deref + 'static + Send + Sync,
+                       UMH: Deref + 'static + Send + Sync,
+                       CMH::Target: ChannelMessageHandler + Send + Sync,
+                       RMH::Target: RoutingMessageHandler + Send + Sync,
+                       L::Target: Logger + Send + Sync,
+                       UMH::Target: CustomMessageHandler + Send + Sync,
+    {
                loop {
                        if event_receiver.recv().await.is_none() {
                                return;
@@ -133,11 +139,16 @@ impl Connection {
                }
        }
 
-       async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
-                       CMH: ChannelMessageHandler + 'static + Send + Sync,
-                       RMH: RoutingMessageHandler + 'static + Send + Sync,
-                       L: Logger + 'static + ?Sized + Send + Sync,
-                       UMH: CustomMessageHandler + 'static + Send + Sync {
+       async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
+                       CMH: Deref + 'static + Send + Sync,
+                       RMH: Deref + 'static + Send + Sync,
+                       L: Deref + 'static + Send + Sync,
+                       UMH: Deref + 'static + Send + Sync,
+                       CMH::Target: ChannelMessageHandler + 'static + Send + Sync,
+                       RMH::Target: RoutingMessageHandler + 'static + Send + Sync,
+                       L::Target: Logger + 'static + Send + Sync,
+                       UMH::Target: CustomMessageHandler + 'static + Send + Sync,
+        {
                // Create a waker to wake up poll_event_process, above
                let (event_waker, event_receiver) = mpsc::channel(1);
                tokio::spawn(Self::poll_event_process(Arc::clone(&peer_manager), event_receiver));
@@ -255,11 +266,16 @@ fn get_addr_from_stream(stream: &StdTcpStream) -> Option<NetAddress> {
 /// The returned future will complete when the peer is disconnected and associated handling
 /// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
 /// not need to poll the provided future in order to make progress.
-pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
-               CMH: ChannelMessageHandler + 'static + Send + Sync,
-               RMH: RoutingMessageHandler + 'static + Send + Sync,
-               L: Logger + 'static + ?Sized + Send + Sync,
-               UMH: CustomMessageHandler + 'static + Send + Sync {
+pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
+               CMH: Deref + 'static + Send + Sync,
+               RMH: Deref + 'static + Send + Sync,
+               L: Deref + 'static + Send + Sync,
+               UMH: Deref + 'static + Send + Sync,
+               CMH::Target: ChannelMessageHandler + Send + Sync,
+               RMH::Target: RoutingMessageHandler + Send + Sync,
+               L::Target: Logger + Send + Sync,
+               UMH::Target: CustomMessageHandler + Send + Sync,
+{
        let remote_addr = get_addr_from_stream(&stream);
        let (reader, write_receiver, read_receiver, us) = Connection::new(stream);
        #[cfg(debug_assertions)]
@@ -297,11 +313,16 @@ pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManag
 /// The returned future will complete when the peer is disconnected and associated handling
 /// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
 /// not need to poll the provided future in order to make progress.
-pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
-               CMH: ChannelMessageHandler + 'static + Send + Sync,
-               RMH: RoutingMessageHandler + 'static + Send + Sync,
-               L: Logger + 'static + ?Sized + Send + Sync,
-               UMH: CustomMessageHandler + 'static + Send + Sync {
+pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
+               CMH: Deref + 'static + Send + Sync,
+               RMH: Deref + 'static + Send + Sync,
+               L: Deref + 'static + Send + Sync,
+               UMH: Deref + 'static + Send + Sync,
+               CMH::Target: ChannelMessageHandler + Send + Sync,
+               RMH::Target: RoutingMessageHandler + Send + Sync,
+               L::Target: Logger + Send + Sync,
+               UMH::Target: CustomMessageHandler + Send + Sync,
+{
        let remote_addr = get_addr_from_stream(&stream);
        let (reader, mut write_receiver, read_receiver, us) = Connection::new(stream);
        #[cfg(debug_assertions)]
@@ -368,11 +389,16 @@ pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerMana
 /// disconnected and associated handling futures are freed, though, because all processing in said
 /// futures are spawned with tokio::spawn, you do not need to poll the second future in order to
 /// make progress.
-pub async fn connect_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>, Arc<UMH>>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
-               CMH: ChannelMessageHandler + 'static + Send + Sync,
-               RMH: RoutingMessageHandler + 'static + Send + Sync,
-               L: Logger + 'static + ?Sized + Send + Sync,
-               UMH: CustomMessageHandler + 'static + Send + Sync {
+pub async fn connect_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
+               CMH: Deref + 'static + Send + Sync,
+               RMH: Deref + 'static + Send + Sync,
+               L: Deref + 'static + Send + Sync,
+               UMH: Deref + 'static + Send + Sync,
+               CMH::Target: ChannelMessageHandler + Send + Sync,
+               RMH::Target: RoutingMessageHandler + Send + Sync,
+               L::Target: Logger + Send + Sync,
+               UMH::Target: CustomMessageHandler + Send + Sync,
+{
        if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }).await {
                Some(setup_outbound(peer_manager, their_node_id, stream))
        } else { None }
index 2ec7f318ff57bb5a3e798a0ecd651cc1fb5f4e1d..546e1b2df53e915a6fc04aca800127117aef2a98 100644 (file)
@@ -64,7 +64,7 @@ impl<D: Deref> FeeEstimator for D where D::Target: FeeEstimator {
 pub const MIN_RELAY_FEE_SAT_PER_1000_WEIGHT: u64 = 4000;
 /// Minimum feerate that takes a sane approach to bitcoind weight-to-vbytes rounding.
 /// See the following Core Lightning commit for an explanation:
-/// https://github.com/ElementsProject/lightning/commit/2e687b9b352c9092b5e8bd4a688916ac50b44af0
+/// <https://github.com/ElementsProject/lightning/commit/2e687b9b352c9092b5e8bd4a688916ac50b44af0>
 pub const FEERATE_FLOOR_SATS_PER_KW: u32 = 253;
 
 /// Wraps a `Deref` to a `FeeEstimator` so that any fee estimations provided by it
index 80cd9cb9d45958629e1789513190a4de63b87e7b..8dd3d4b43b680755581d9bc94ac2c2881035e763 100644 (file)
@@ -965,6 +965,13 @@ impl<Signer: Sign> Writeable for ChannelMonitorImpl<Signer> {
 }
 
 impl<Signer: Sign> ChannelMonitor<Signer> {
+       /// For lockorder enforcement purposes, we need to have a single site which constructs the
+       /// `inner` mutex, otherwise cases where we lock two monitors at the same time (eg in our
+       /// PartialEq implementation) we may decide a lockorder violation has occurred.
+       fn from_impl(imp: ChannelMonitorImpl<Signer>) -> Self {
+               ChannelMonitor { inner: Mutex::new(imp) }
+       }
+
        pub(crate) fn new(secp_ctx: Secp256k1<secp256k1::All>, keys: Signer, shutdown_script: Option<Script>,
                          on_counterparty_tx_csv: u16, destination_script: &Script, funding_info: (OutPoint, Script),
                          channel_parameters: &ChannelTransactionParameters,
@@ -1012,60 +1019,58 @@ impl<Signer: Sign> ChannelMonitor<Signer> {
                let mut outputs_to_watch = HashMap::new();
                outputs_to_watch.insert(funding_info.0.txid, vec![(funding_info.0.index as u32, funding_info.1.clone())]);
 
-               ChannelMonitor {
-                       inner: Mutex::new(ChannelMonitorImpl {
-                               latest_update_id: 0,
-                               commitment_transaction_number_obscure_factor,
+               Self::from_impl(ChannelMonitorImpl {
+                       latest_update_id: 0,
+                       commitment_transaction_number_obscure_factor,
 
-                               destination_script: destination_script.clone(),
-                               broadcasted_holder_revokable_script: None,
-                               counterparty_payment_script,
-                               shutdown_script,
+                       destination_script: destination_script.clone(),
+                       broadcasted_holder_revokable_script: None,
+                       counterparty_payment_script,
+                       shutdown_script,
 
-                               channel_keys_id,
-                               holder_revocation_basepoint,
-                               funding_info,
-                               current_counterparty_commitment_txid: None,
-                               prev_counterparty_commitment_txid: None,
+                       channel_keys_id,
+                       holder_revocation_basepoint,
+                       funding_info,
+                       current_counterparty_commitment_txid: None,
+                       prev_counterparty_commitment_txid: None,
 
-                               counterparty_commitment_params,
-                               funding_redeemscript,
-                               channel_value_satoshis,
-                               their_cur_per_commitment_points: None,
+                       counterparty_commitment_params,
+                       funding_redeemscript,
+                       channel_value_satoshis,
+                       their_cur_per_commitment_points: None,
 
-                               on_holder_tx_csv: counterparty_channel_parameters.selected_contest_delay,
+                       on_holder_tx_csv: counterparty_channel_parameters.selected_contest_delay,
 
-                               commitment_secrets: CounterpartyCommitmentSecrets::new(),
-                               counterparty_claimable_outpoints: HashMap::new(),
-                               counterparty_commitment_txn_on_chain: HashMap::new(),
-                               counterparty_hash_commitment_number: HashMap::new(),
+                       commitment_secrets: CounterpartyCommitmentSecrets::new(),
+                       counterparty_claimable_outpoints: HashMap::new(),
+                       counterparty_commitment_txn_on_chain: HashMap::new(),
+                       counterparty_hash_commitment_number: HashMap::new(),
 
-                               prev_holder_signed_commitment_tx: None,
-                               current_holder_commitment_tx: holder_commitment_tx,
-                               current_counterparty_commitment_number: 1 << 48,
-                               current_holder_commitment_number,
+                       prev_holder_signed_commitment_tx: None,
+                       current_holder_commitment_tx: holder_commitment_tx,
+                       current_counterparty_commitment_number: 1 << 48,
+                       current_holder_commitment_number,
 
-                               payment_preimages: HashMap::new(),
-                               pending_monitor_events: Vec::new(),
-                               pending_events: Vec::new(),
+                       payment_preimages: HashMap::new(),
+                       pending_monitor_events: Vec::new(),
+                       pending_events: Vec::new(),
 
-                               onchain_events_awaiting_threshold_conf: Vec::new(),
-                               outputs_to_watch,
+                       onchain_events_awaiting_threshold_conf: Vec::new(),
+                       outputs_to_watch,
 
-                               onchain_tx_handler,
+                       onchain_tx_handler,
 
-                               lockdown_from_offchain: false,
-                               holder_tx_signed: false,
-                               funding_spend_seen: false,
-                               funding_spend_confirmed: None,
-                               htlcs_resolved_on_chain: Vec::new(),
+                       lockdown_from_offchain: false,
+                       holder_tx_signed: false,
+                       funding_spend_seen: false,
+                       funding_spend_confirmed: None,
+                       htlcs_resolved_on_chain: Vec::new(),
 
-                               best_block,
-                               counterparty_node_id: Some(counterparty_node_id),
+                       best_block,
+                       counterparty_node_id: Some(counterparty_node_id),
 
-                               secp_ctx,
-                       }),
-               }
+                       secp_ctx,
+               })
        }
 
        #[cfg(test)]
@@ -3361,60 +3366,58 @@ impl<'a, Signer: Sign, K: KeysInterface<Signer = Signer>> ReadableArgs<&'a K>
                let mut secp_ctx = Secp256k1::new();
                secp_ctx.seeded_randomize(&keys_manager.get_secure_random_bytes());
 
-               Ok((best_block.block_hash(), ChannelMonitor {
-                       inner: Mutex::new(ChannelMonitorImpl {
-                               latest_update_id,
-                               commitment_transaction_number_obscure_factor,
+               Ok((best_block.block_hash(), ChannelMonitor::from_impl(ChannelMonitorImpl {
+                       latest_update_id,
+                       commitment_transaction_number_obscure_factor,
 
-                               destination_script,
-                               broadcasted_holder_revokable_script,
-                               counterparty_payment_script,
-                               shutdown_script,
+                       destination_script,
+                       broadcasted_holder_revokable_script,
+                       counterparty_payment_script,
+                       shutdown_script,
 
-                               channel_keys_id,
-                               holder_revocation_basepoint,
-                               funding_info,
-                               current_counterparty_commitment_txid,
-                               prev_counterparty_commitment_txid,
+                       channel_keys_id,
+                       holder_revocation_basepoint,
+                       funding_info,
+                       current_counterparty_commitment_txid,
+                       prev_counterparty_commitment_txid,
 
-                               counterparty_commitment_params,
-                               funding_redeemscript,
-                               channel_value_satoshis,
-                               their_cur_per_commitment_points,
+                       counterparty_commitment_params,
+                       funding_redeemscript,
+                       channel_value_satoshis,
+                       their_cur_per_commitment_points,
 
-                               on_holder_tx_csv,
+                       on_holder_tx_csv,
 
-                               commitment_secrets,
-                               counterparty_claimable_outpoints,
-                               counterparty_commitment_txn_on_chain,
-                               counterparty_hash_commitment_number,
+                       commitment_secrets,
+                       counterparty_claimable_outpoints,
+                       counterparty_commitment_txn_on_chain,
+                       counterparty_hash_commitment_number,
 
-                               prev_holder_signed_commitment_tx,
-                               current_holder_commitment_tx,
-                               current_counterparty_commitment_number,
-                               current_holder_commitment_number,
+                       prev_holder_signed_commitment_tx,
+                       current_holder_commitment_tx,
+                       current_counterparty_commitment_number,
+                       current_holder_commitment_number,
 
-                               payment_preimages,
-                               pending_monitor_events: pending_monitor_events.unwrap(),
-                               pending_events,
+                       payment_preimages,
+                       pending_monitor_events: pending_monitor_events.unwrap(),
+                       pending_events,
 
-                               onchain_events_awaiting_threshold_conf,
-                               outputs_to_watch,
+                       onchain_events_awaiting_threshold_conf,
+                       outputs_to_watch,
 
-                               onchain_tx_handler,
+                       onchain_tx_handler,
 
-                               lockdown_from_offchain,
-                               holder_tx_signed,
-                               funding_spend_seen: funding_spend_seen.unwrap(),
-                               funding_spend_confirmed,
-                               htlcs_resolved_on_chain: htlcs_resolved_on_chain.unwrap(),
+                       lockdown_from_offchain,
+                       holder_tx_signed,
+                       funding_spend_seen: funding_spend_seen.unwrap(),
+                       funding_spend_confirmed,
+                       htlcs_resolved_on_chain: htlcs_resolved_on_chain.unwrap(),
 
-                               best_block,
-                               counterparty_node_id,
+                       best_block,
+                       counterparty_node_id,
 
-                               secp_ctx,
-                       }),
-               }))
+                       secp_ctx,
+               })))
        }
 }
 
index 6b36682f43233f876dcacaf2e1ae34da0fef5e4e..daec309e9b5e35f81684ef9cdc5ed1df2be9e55c 100644 (file)
@@ -2,11 +2,9 @@ pub use ::alloc::sync::Arc;
 use core::ops::{Deref, DerefMut};
 use core::time::Duration;
 
-use std::collections::HashSet;
 use std::cell::RefCell;
 
 use std::sync::atomic::{AtomicUsize, Ordering};
-
 use std::sync::Mutex as StdMutex;
 use std::sync::MutexGuard as StdMutexGuard;
 use std::sync::RwLock as StdRwLock;
@@ -14,8 +12,15 @@ use std::sync::RwLockReadGuard as StdRwLockReadGuard;
 use std::sync::RwLockWriteGuard as StdRwLockWriteGuard;
 use std::sync::Condvar as StdCondvar;
 
+use prelude::HashMap;
+
 #[cfg(feature = "backtrace")]
-use backtrace::Backtrace;
+use {prelude::hash_map, backtrace::Backtrace, std::sync::Once};
+
+#[cfg(not(feature = "backtrace"))]
+struct Backtrace{}
+#[cfg(not(feature = "backtrace"))]
+impl Backtrace { fn new() -> Backtrace { Backtrace {} } }
 
 pub type LockResult<Guard> = Result<Guard, ()>;
 
@@ -44,34 +49,76 @@ impl Condvar {
 
 thread_local! {
        /// We track the set of locks currently held by a reference to their `LockMetadata`
-       static LOCKS_HELD: RefCell<HashSet<Arc<LockMetadata>>> = RefCell::new(HashSet::new());
+       static LOCKS_HELD: RefCell<HashMap<u64, Arc<LockMetadata>>> = RefCell::new(HashMap::new());
 }
 static LOCK_IDX: AtomicUsize = AtomicUsize::new(0);
 
+#[cfg(feature = "backtrace")]
+static mut LOCKS: Option<StdMutex<HashMap<String, Arc<LockMetadata>>>> = None;
+#[cfg(feature = "backtrace")]
+static LOCKS_INIT: Once = Once::new();
+
 /// Metadata about a single lock, by id, the set of things locked-before it, and the backtrace of
 /// when the Mutex itself was constructed.
 struct LockMetadata {
        lock_idx: u64,
-       locked_before: StdMutex<HashSet<Arc<LockMetadata>>>,
-       #[cfg(feature = "backtrace")]
-       lock_construction_bt: Backtrace,
+       locked_before: StdMutex<HashMap<u64, LockDep>>,
+       _lock_construction_bt: Backtrace,
 }
-impl PartialEq for LockMetadata {
-       fn eq(&self, o: &LockMetadata) -> bool { self.lock_idx == o.lock_idx }
+
+struct LockDep {
+       lock: Arc<LockMetadata>,
+       lockdep_trace: Backtrace,
 }
-impl Eq for LockMetadata {}
-impl std::hash::Hash for LockMetadata {
-       fn hash<H: std::hash::Hasher>(&self, hasher: &mut H) { hasher.write_u64(self.lock_idx); }
+
+#[cfg(feature = "backtrace")]
+fn get_construction_location(backtrace: &Backtrace) -> String {
+       // Find the first frame that is after `debug_sync` (or that is in our tests) and use
+       // that as the mutex construction site. Note that the first few frames may be in
+       // the `backtrace` crate, so we have to ignore those.
+       let sync_mutex_constr_regex = regex::Regex::new(r"lightning.*debug_sync.*new").unwrap();
+       let mut found_debug_sync = false;
+       for frame in backtrace.frames() {
+               for symbol in frame.symbols() {
+                       let symbol_name = symbol.name().unwrap().as_str().unwrap();
+                       if !sync_mutex_constr_regex.is_match(symbol_name) {
+                               if found_debug_sync {
+                                       if let Some(col) = symbol.colno() {
+                                               return format!("{}:{}:{}", symbol.filename().unwrap().display(), symbol.lineno().unwrap(), col);
+                                       } else {
+                                               // Windows debug symbols don't support column numbers, so fall back to
+                                               // line numbers only if no `colno` is available
+                                               return format!("{}:{}", symbol.filename().unwrap().display(), symbol.lineno().unwrap());
+                                       }
+                               }
+                       } else { found_debug_sync = true; }
+               }
+       }
+       panic!("Couldn't find mutex construction callsite");
 }
 
 impl LockMetadata {
-       fn new() -> LockMetadata {
-               LockMetadata {
-                       locked_before: StdMutex::new(HashSet::new()),
-                       lock_idx: LOCK_IDX.fetch_add(1, Ordering::Relaxed) as u64,
-                       #[cfg(feature = "backtrace")]
-                       lock_construction_bt: Backtrace::new(),
+       fn new() -> Arc<LockMetadata> {
+               let backtrace = Backtrace::new();
+               let lock_idx = LOCK_IDX.fetch_add(1, Ordering::Relaxed) as u64;
+
+               let res = Arc::new(LockMetadata {
+                       locked_before: StdMutex::new(HashMap::new()),
+                       lock_idx,
+                       _lock_construction_bt: backtrace,
+               });
+
+               #[cfg(feature = "backtrace")]
+               {
+                       let lock_constr_location = get_construction_location(&res._lock_construction_bt);
+                       LOCKS_INIT.call_once(|| { unsafe { LOCKS = Some(StdMutex::new(HashMap::new())); } });
+                       let mut locks = unsafe { LOCKS.as_ref() }.unwrap().lock().unwrap();
+                       match locks.entry(lock_constr_location) {
+                               hash_map::Entry::Occupied(e) => return Arc::clone(e.get()),
+                               hash_map::Entry::Vacant(e) => { e.insert(Arc::clone(&res)); },
+                       }
                }
+               res
        }
 
        // Returns whether we were a recursive lock (only relevant for read)
@@ -81,28 +128,37 @@ impl LockMetadata {
                        // For each lock which is currently locked, check that no lock's locked-before
                        // set includes the lock we're about to lock, which would imply a lockorder
                        // inversion.
-                       for locked in held.borrow().iter() {
-                               if read && *locked == *this {
+                       for (locked_idx, _locked) in held.borrow().iter() {
+                               if read && *locked_idx == this.lock_idx {
                                        // Recursive read locks are explicitly allowed
                                        return;
                                }
                        }
-                       for locked in held.borrow().iter() {
-                               if !read && *locked == *this {
-                                       panic!("Tried to lock a lock while it was held!");
+                       for (locked_idx, locked) in held.borrow().iter() {
+                               if !read && *locked_idx == this.lock_idx {
+                                       // With `feature = "backtrace"` set, we may be looking at different instances
+                                       // of the same lock.
+                                       debug_assert!(cfg!(feature = "backtrace"), "Tried to acquire a lock while it was held!");
                                }
-                               for locked_dep in locked.locked_before.lock().unwrap().iter() {
-                                       if *locked_dep == *this {
+                               for (locked_dep_idx, locked_dep) in locked.locked_before.lock().unwrap().iter() {
+                                       if *locked_dep_idx == this.lock_idx && *locked_dep_idx != locked.lock_idx {
                                                #[cfg(feature = "backtrace")]
-                                               panic!("Tried to violate existing lockorder.\nMutex that should be locked after the current lock was created at the following backtrace.\nNote that to get a backtrace for the lockorder violation, you should set RUST_BACKTRACE=1\n{:?}", locked.lock_construction_bt);
+                                               panic!("Tried to violate existing lockorder.\nMutex that should be locked after the current lock was created at the following backtrace.\nNote that to get a backtrace for the lockorder violation, you should set RUST_BACKTRACE=1\nLock being taken constructed at: {} ({}):\n{:?}\nLock constructed at: {} ({})\n{:?}\n\nLock dep created at:\n{:?}\n\n",
+                                                       get_construction_location(&this._lock_construction_bt), this.lock_idx, this._lock_construction_bt,
+                                                       get_construction_location(&locked._lock_construction_bt), locked.lock_idx, locked._lock_construction_bt,
+                                                       locked_dep.lockdep_trace);
                                                #[cfg(not(feature = "backtrace"))]
                                                panic!("Tried to violate existing lockorder. Build with the backtrace feature for more info.");
                                        }
                                }
                                // Insert any already-held locks in our locked-before set.
-                               this.locked_before.lock().unwrap().insert(Arc::clone(locked));
+                               let mut locked_before = this.locked_before.lock().unwrap();
+                               if !locked_before.contains_key(&locked.lock_idx) {
+                                       let lockdep = LockDep { lock: Arc::clone(locked), lockdep_trace: Backtrace::new() };
+                                       locked_before.insert(lockdep.lock.lock_idx, lockdep);
+                               }
                        }
-                       held.borrow_mut().insert(Arc::clone(this));
+                       held.borrow_mut().insert(this.lock_idx, Arc::clone(this));
                        inserted = true;
                });
                inserted
@@ -116,10 +172,14 @@ impl LockMetadata {
                        // Since a try-lock will simply fail if the lock is held already, we do not
                        // consider try-locks to ever generate lockorder inversions. However, if a try-lock
                        // succeeds, we do consider it to have created lockorder dependencies.
-                       for locked in held.borrow().iter() {
-                               this.locked_before.lock().unwrap().insert(Arc::clone(locked));
+                       let mut locked_before = this.locked_before.lock().unwrap();
+                       for (locked_idx, locked) in held.borrow().iter() {
+                               if !locked_before.contains_key(locked_idx) {
+                                       let lockdep = LockDep { lock: Arc::clone(locked), lockdep_trace: Backtrace::new() };
+                                       locked_before.insert(*locked_idx, lockdep);
+                               }
                        }
-                       held.borrow_mut().insert(Arc::clone(this));
+                       held.borrow_mut().insert(this.lock_idx, Arc::clone(this));
                });
        }
 }
@@ -149,7 +209,7 @@ impl<'a, T: Sized> MutexGuard<'a, T> {
 impl<T: Sized> Drop for MutexGuard<'_, T> {
        fn drop(&mut self) {
                LOCKS_HELD.with(|held| {
-                       held.borrow_mut().remove(&self.mutex.deps);
+                       held.borrow_mut().remove(&self.mutex.deps.lock_idx);
                });
        }
 }
@@ -170,7 +230,7 @@ impl<T: Sized> DerefMut for MutexGuard<'_, T> {
 
 impl<T> Mutex<T> {
        pub fn new(inner: T) -> Mutex<T> {
-               Mutex { inner: StdMutex::new(inner), deps: Arc::new(LockMetadata::new()) }
+               Mutex { inner: StdMutex::new(inner), deps: LockMetadata::new() }
        }
 
        pub fn lock<'a>(&'a self) -> LockResult<MutexGuard<'a, T>> {
@@ -220,7 +280,7 @@ impl<T: Sized> Drop for RwLockReadGuard<'_, T> {
                        return;
                }
                LOCKS_HELD.with(|held| {
-                       held.borrow_mut().remove(&self.lock.deps);
+                       held.borrow_mut().remove(&self.lock.deps.lock_idx);
                });
        }
 }
@@ -236,7 +296,7 @@ impl<T: Sized> Deref for RwLockWriteGuard<'_, T> {
 impl<T: Sized> Drop for RwLockWriteGuard<'_, T> {
        fn drop(&mut self) {
                LOCKS_HELD.with(|held| {
-                       held.borrow_mut().remove(&self.lock.deps);
+                       held.borrow_mut().remove(&self.lock.deps.lock_idx);
                });
        }
 }
@@ -249,7 +309,7 @@ impl<T: Sized> DerefMut for RwLockWriteGuard<'_, T> {
 
 impl<T> RwLock<T> {
        pub fn new(inner: T) -> RwLock<T> {
-               RwLock { inner: StdRwLock::new(inner), deps: Arc::new(LockMetadata::new()) }
+               RwLock { inner: StdRwLock::new(inner), deps: LockMetadata::new() }
        }
 
        pub fn read<'a>(&'a self) -> LockResult<RwLockReadGuard<'a, T>> {
@@ -271,96 +331,101 @@ impl<T> RwLock<T> {
        }
 }
 
-#[test]
-#[should_panic]
-fn recursive_lock_fail() {
-       let mutex = Mutex::new(());
-       let _a = mutex.lock().unwrap();
-       let _b = mutex.lock().unwrap();
-}
-
-#[test]
-fn recursive_read() {
-       let lock = RwLock::new(());
-       let _a = lock.read().unwrap();
-       let _b = lock.read().unwrap();
-}
+pub type FairRwLock<T> = RwLock<T>;
 
-#[test]
-#[should_panic]
-fn lockorder_fail() {
-       let a = Mutex::new(());
-       let b = Mutex::new(());
-       {
-               let _a = a.lock().unwrap();
-               let _b = b.lock().unwrap();
-       }
-       {
-               let _b = b.lock().unwrap();
-               let _a = a.lock().unwrap();
+mod tests {
+       use super::{RwLock, Mutex};
+
+       #[test]
+       #[should_panic]
+       #[cfg(not(feature = "backtrace"))]
+       fn recursive_lock_fail() {
+               let mutex = Mutex::new(());
+               let _a = mutex.lock().unwrap();
+               let _b = mutex.lock().unwrap();
+       }
+
+       #[test]
+       fn recursive_read() {
+               let lock = RwLock::new(());
+               let _a = lock.read().unwrap();
+               let _b = lock.read().unwrap();
+       }
+
+       #[test]
+       #[should_panic]
+       fn lockorder_fail() {
+               let a = Mutex::new(());
+               let b = Mutex::new(());
+               {
+                       let _a = a.lock().unwrap();
+                       let _b = b.lock().unwrap();
+               }
+               {
+                       let _b = b.lock().unwrap();
+                       let _a = a.lock().unwrap();
+               }
        }
-}
 
-#[test]
-#[should_panic]
-fn write_lockorder_fail() {
-       let a = RwLock::new(());
-       let b = RwLock::new(());
-       {
-               let _a = a.write().unwrap();
-               let _b = b.write().unwrap();
-       }
-       {
-               let _b = b.write().unwrap();
-               let _a = a.write().unwrap();
+       #[test]
+       #[should_panic]
+       fn write_lockorder_fail() {
+               let a = RwLock::new(());
+               let b = RwLock::new(());
+               {
+                       let _a = a.write().unwrap();
+                       let _b = b.write().unwrap();
+               }
+               {
+                       let _b = b.write().unwrap();
+                       let _a = a.write().unwrap();
+               }
        }
-}
 
-#[test]
-#[should_panic]
-fn read_lockorder_fail() {
-       let a = RwLock::new(());
-       let b = RwLock::new(());
-       {
-               let _a = a.read().unwrap();
-               let _b = b.read().unwrap();
-       }
-       {
-               let _b = b.read().unwrap();
-               let _a = a.read().unwrap();
+       #[test]
+       #[should_panic]
+       fn read_lockorder_fail() {
+               let a = RwLock::new(());
+               let b = RwLock::new(());
+               {
+                       let _a = a.read().unwrap();
+                       let _b = b.read().unwrap();
+               }
+               {
+                       let _b = b.read().unwrap();
+                       let _a = a.read().unwrap();
+               }
        }
-}
 
-#[test]
-fn read_recurisve_no_lockorder() {
-       // Like the above, but note that no lockorder is implied when we recursively read-lock a
-       // RwLock, causing this to pass just fine.
-       let a = RwLock::new(());
-       let b = RwLock::new(());
-       let _outer = a.read().unwrap();
-       {
-               let _a = a.read().unwrap();
-               let _b = b.read().unwrap();
-       }
-       {
-               let _b = b.read().unwrap();
-               let _a = a.read().unwrap();
+       #[test]
+       fn read_recursive_no_lockorder() {
+               // Like the above, but note that no lockorder is implied when we recursively read-lock a
+               // RwLock, causing this to pass just fine.
+               let a = RwLock::new(());
+               let b = RwLock::new(());
+               let _outer = a.read().unwrap();
+               {
+                       let _a = a.read().unwrap();
+                       let _b = b.read().unwrap();
+               }
+               {
+                       let _b = b.read().unwrap();
+                       let _a = a.read().unwrap();
+               }
        }
-}
 
-#[test]
-#[should_panic]
-fn read_write_lockorder_fail() {
-       let a = RwLock::new(());
-       let b = RwLock::new(());
-       {
-               let _a = a.write().unwrap();
-               let _b = b.read().unwrap();
-       }
-       {
-               let _b = b.read().unwrap();
-               let _a = a.write().unwrap();
+       #[test]
+       #[should_panic]
+       fn read_write_lockorder_fail() {
+               let a = RwLock::new(());
+               let b = RwLock::new(());
+               {
+                       let _a = a.write().unwrap();
+                       let _b = b.read().unwrap();
+               }
+               {
+                       let _b = b.read().unwrap();
+                       let _a = a.write().unwrap();
+               }
        }
 }
-
-pub type FairRwLock<T> = RwLock<T>;
index a49e711b808d3902cc93e2b81606b9de414b6d23..cff632e4bc86b6a9e2a9f36b9b9787c0d3d6f8e7 100644 (file)
@@ -352,6 +352,11 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> {
                                }
                        }
 
+                       let broadcaster = test_utils::TestBroadcaster {
+                               txn_broadcasted: Mutex::new(self.tx_broadcaster.txn_broadcasted.lock().unwrap().clone()),
+                               blocks: Arc::new(Mutex::new(self.tx_broadcaster.blocks.lock().unwrap().clone())),
+                       };
+
                        // Before using all the new monitors to check the watch outpoints, use the full set of
                        // them to ensure we can write and reload our ChannelManager.
                        {
@@ -367,20 +372,13 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> {
                                        keys_manager: self.keys_manager,
                                        fee_estimator: &test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) },
                                        chain_monitor: self.chain_monitor,
-                                       tx_broadcaster: &test_utils::TestBroadcaster {
-                                               txn_broadcasted: Mutex::new(self.tx_broadcaster.txn_broadcasted.lock().unwrap().clone()),
-                                               blocks: Arc::new(Mutex::new(self.tx_broadcaster.blocks.lock().unwrap().clone())),
-                                       },
+                                       tx_broadcaster: &broadcaster,
                                        logger: &self.logger,
                                        channel_monitors,
                                }).unwrap();
                        }
 
                        let persister = test_utils::TestPersister::new();
-                       let broadcaster = test_utils::TestBroadcaster {
-                               txn_broadcasted: Mutex::new(self.tx_broadcaster.txn_broadcasted.lock().unwrap().clone()),
-                               blocks: Arc::new(Mutex::new(self.tx_broadcaster.blocks.lock().unwrap().clone())),
-                       };
                        let chain_source = test_utils::TestChainSource::new(Network::Testnet);
                        let chain_monitor = test_utils::TestChainMonitor::new(Some(&chain_source), &broadcaster, &self.logger, &feeest, &persister, &self.keys_manager);
                        for deserialized_monitor in deserialized_monitors.drain(..) {
index 23dfbf36f093a6b5bb4562a8d51905e7dc0a20f5..7572abff8313e90cdd149483a3ec5e919c864398 100644 (file)
@@ -3395,7 +3395,7 @@ fn test_htlc_ignore_latest_remote_commitment() {
        check_added_monitors!(nodes[0], 1);
        check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed);
 
-       let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap();
+       let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
        assert_eq!(node_txn.len(), 3);
        assert_eq!(node_txn[0], node_txn[1]);
 
@@ -4813,7 +4813,7 @@ fn test_claim_on_remote_sizeable_push_msat() {
        check_added_monitors!(nodes[0], 1);
        check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed);
 
-       let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap();
+       let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
        assert_eq!(node_txn.len(), 1);
        check_spends!(node_txn[0], chan.3);
        assert_eq!(node_txn[0].output.len(), 2); // We can't force trimming of to_remote output as channel_reserve_satoshis block us to do so at channel opening
@@ -5019,7 +5019,7 @@ fn test_static_spendable_outputs_justice_tx_revoked_htlc_timeout_tx() {
        check_closed_event!(nodes[0], 1, ClosureReason::CommitmentTxConfirmed);
        connect_blocks(&nodes[0], TEST_FINAL_CLTV - 1); // Confirm blocks until the HTLC expires
 
-       let revoked_htlc_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap();
+       let revoked_htlc_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
        assert_eq!(revoked_htlc_txn.len(), 2);
        check_spends!(revoked_htlc_txn[0], chan_1.3);
        assert_eq!(revoked_htlc_txn[1].input.len(), 1);
@@ -7824,7 +7824,7 @@ fn test_bump_penalty_txn_on_revoked_htlcs() {
        check_closed_event!(nodes[1], 1, ClosureReason::CommitmentTxConfirmed);
        connect_blocks(&nodes[1], 49); // Confirm blocks until the HTLC expires (note CLTV was explicitly 50 above)
 
-       let revoked_htlc_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap();
+       let revoked_htlc_txn = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
        assert_eq!(revoked_htlc_txn.len(), 3);
        check_spends!(revoked_htlc_txn[1], chan.3);
 
@@ -8085,22 +8085,26 @@ fn test_counterparty_raa_skip_no_crash() {
        let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
        let channel_id = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known()).2;
 
-       let mut guard = nodes[0].node.channel_state.lock().unwrap();
-       let keys = guard.by_id.get_mut(&channel_id).unwrap().get_signer();
+       let per_commitment_secret;
+       let next_per_commitment_point;
+       {
+               let mut guard = nodes[0].node.channel_state.lock().unwrap();
+               let keys = guard.by_id.get_mut(&channel_id).unwrap().get_signer();
 
-       const INITIAL_COMMITMENT_NUMBER: u64 = (1 << 48) - 1;
+               const INITIAL_COMMITMENT_NUMBER: u64 = (1 << 48) - 1;
 
-       // Make signer believe we got a counterparty signature, so that it allows the revocation
-       keys.get_enforcement_state().last_holder_commitment -= 1;
-       let per_commitment_secret = keys.release_commitment_secret(INITIAL_COMMITMENT_NUMBER);
+               // Make signer believe we got a counterparty signature, so that it allows the revocation
+               keys.get_enforcement_state().last_holder_commitment -= 1;
+               per_commitment_secret = keys.release_commitment_secret(INITIAL_COMMITMENT_NUMBER);
 
-       // Must revoke without gaps
-       keys.get_enforcement_state().last_holder_commitment -= 1;
-       keys.release_commitment_secret(INITIAL_COMMITMENT_NUMBER - 1);
+               // Must revoke without gaps
+               keys.get_enforcement_state().last_holder_commitment -= 1;
+               keys.release_commitment_secret(INITIAL_COMMITMENT_NUMBER - 1);
 
-       keys.get_enforcement_state().last_holder_commitment -= 1;
-       let next_per_commitment_point = PublicKey::from_secret_key(&Secp256k1::new(),
-               &SecretKey::from_slice(&keys.release_commitment_secret(INITIAL_COMMITMENT_NUMBER - 2)).unwrap());
+               keys.get_enforcement_state().last_holder_commitment -= 1;
+               next_per_commitment_point = PublicKey::from_secret_key(&Secp256k1::new(),
+                       &SecretKey::from_slice(&keys.release_commitment_secret(INITIAL_COMMITMENT_NUMBER - 2)).unwrap());
+       }
 
        nodes[1].node.handle_revoke_and_ack(&nodes[0].node.get_our_node_id(),
                &msgs::RevokeAndACK { channel_id, per_commitment_secret, next_per_commitment_point });
@@ -8442,12 +8446,12 @@ fn test_reject_funding_before_inbound_channel_accepted() {
        // `MessageSendEvent::SendAcceptChannel` event. The message is passed to `nodes[0]`
        // `handle_accept_channel`, which is required in order for `create_funding_transaction` to
        // succeed when `nodes[0]` is passed to it.
-       {
+       let accept_chan_msg = {
                let mut lock;
                let channel = get_channel_ref!(&nodes[1], lock, temp_channel_id);
-               let accept_chan_msg = channel.get_accept_channel_message();
-               nodes[0].node.handle_accept_channel(&nodes[1].node.get_our_node_id(), InitFeatures::known(), &accept_chan_msg);
-       }
+               channel.get_accept_channel_message()
+       };
+       nodes[0].node.handle_accept_channel(&nodes[1].node.get_our_node_id(), InitFeatures::known(), &accept_chan_msg);
 
        let (temporary_channel_id, tx, _) = create_funding_transaction(&nodes[0], &nodes[1].node.get_our_node_id(), 100000, 42);
 
index 2a026821f8adf91a9f0d2ba9bfdacbc861e1db48..f8339f0eae9e6942b95e6c038504eda669aea423 100644 (file)
@@ -1926,11 +1926,18 @@ mod tests {
                peer_a.new_inbound_connection(fd_a.clone(), None).unwrap();
                assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false);
                peer_a.process_events();
-               assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+
+               let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
+               assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false);
+
                peer_b.process_events();
-               assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+               let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
+               assert_eq!(peer_a.read_event(&mut fd_a, &b_data).unwrap(), false);
+
                peer_a.process_events();
-               assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+               let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
+               assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false);
+
                (fd_a.clone(), fd_b.clone())
        }
 
@@ -2084,14 +2091,16 @@ mod tests {
 
                assert_eq!(peers[0].read_event(&mut fd_a, &initial_data).unwrap(), false);
                peers[0].process_events();
-               assert_eq!(peers[1].read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
+               let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
+               assert_eq!(peers[1].read_event(&mut fd_b, &a_data).unwrap(), false);
                peers[1].process_events();
 
                // ...but if we get a second timer tick, we should disconnect the peer
                peers[0].timer_tick_occurred();
                assert_eq!(peers[0].peers.read().unwrap().len(), 0);
 
-               assert!(peers[0].read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).is_err());
+               let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
+               assert!(peers[0].read_event(&mut fd_a, &b_data).is_err());
        }
 
        #[test]
index 29349392f761438b311cca2eb98f49ba14bd338f..561fa8104694985f321c692a4cf949c7ef6ddddb 100644 (file)
@@ -82,7 +82,7 @@ fn do_test_onchain_htlc_reorg(local_commitment: bool, claim: bool) {
                check_added_monitors!(nodes[2], 1);
                check_closed_broadcast!(nodes[2], true); // We should get a BroadcastChannelUpdate (and *only* a BroadcstChannelUpdate)
                check_closed_event!(nodes[2], 1, ClosureReason::CommitmentTxConfirmed);
-               let node_2_commitment_txn = nodes[2].tx_broadcaster.txn_broadcasted.lock().unwrap();
+               let node_2_commitment_txn = nodes[2].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
                assert_eq!(node_2_commitment_txn.len(), 3); // ChannelMonitor: 1 offered HTLC-Claim, ChannelManger: 1 local commitment tx, 1 Received HTLC-Claim
                assert_eq!(node_2_commitment_txn[1].output.len(), 2); // to-remote and Received HTLC (to-self is dust)
                check_spends!(node_2_commitment_txn[1], chan_2.3);