Merge pull request #3125 from valentinewallace/2024-06-async-payments-prefactor
[rust-lightning] / lightning / src / ln / functional_test_utils.rs
index fa1b1e7905f10e6e9dfb3c232f8f4639881f24dc..00168fdfb0ca0252c3c54117ad6a05533cbdd794 100644 (file)
@@ -15,7 +15,7 @@ use crate::chain::channelmonitor::ChannelMonitor;
 use crate::chain::transaction::OutPoint;
 use crate::events::{ClaimedHTLC, ClosureReason, Event, HTLCDestination, MessageSendEvent, MessageSendEventsProvider, PathFailure, PaymentPurpose, PaymentFailureReason};
 use crate::events::bump_transaction::{BumpTransactionEvent, BumpTransactionEventHandler, Wallet, WalletSource};
-use crate::ln::{ChannelId, PaymentPreimage, PaymentHash, PaymentSecret};
+use crate::ln::types::{ChannelId, PaymentPreimage, PaymentHash, PaymentSecret};
 use crate::ln::channelmanager::{AChannelManager, ChainParameters, ChannelManager, ChannelManagerReadArgs, RAACommitmentOrder, PaymentSendFailure, RecipientOnionFields, PaymentId, MIN_CLTV_EXPIRY_DELTA};
 use crate::ln::features::InitFeatures;
 use crate::ln::msgs;
@@ -31,19 +31,23 @@ use crate::util::errors::APIError;
 use crate::util::logger::Logger;
 use crate::util::scid_utils;
 use crate::util::test_channel_signer::TestChannelSigner;
+#[cfg(test)]
+use crate::util::test_channel_signer::SignerOp;
 use crate::util::test_utils;
 use crate::util::test_utils::{panicking, TestChainMonitor, TestScorer, TestKeysInterface};
 use crate::util::ser::{ReadableArgs, Writeable};
 
+use bitcoin::amount::Amount;
 use bitcoin::blockdata::block::{Block, Header, Version};
 use bitcoin::blockdata::locktime::absolute::LockTime;
 use bitcoin::blockdata::transaction::{Transaction, TxIn, TxOut};
 use bitcoin::hash_types::{BlockHash, TxMerkleNode};
 use bitcoin::hashes::sha256::Hash as Sha256;
 use bitcoin::hashes::Hash as _;
-use bitcoin::network::constants::Network;
+use bitcoin::network::Network;
 use bitcoin::pow::CompactTarget;
 use bitcoin::secp256k1::{PublicKey, SecretKey};
+use bitcoin::transaction;
 
 use alloc::rc::Rc;
 use core::cell::RefCell;
@@ -95,7 +99,7 @@ pub fn mine_transaction_without_consistency_checks<'a, 'b, 'c, 'd>(node: &'a Nod
                txdata: Vec::new(),
        };
        for _ in 0..*node.network_chan_count.borrow() { // Make sure we don't end up with channels at the same short id by offsetting by chan_count
-               block.txdata.push(Transaction { version: 0, lock_time: LockTime::ZERO, input: Vec::new(), output: Vec::new() });
+               block.txdata.push(Transaction { version: transaction::Version(0), lock_time: LockTime::ZERO, input: Vec::new(), output: Vec::new() });
        }
        block.txdata.push((*tx).clone());
        do_connect_block_without_consistency_checks(node, block, false);
@@ -113,7 +117,7 @@ pub fn confirm_transactions_at<'a, 'b, 'c, 'd>(node: &'a Node<'b, 'c, 'd>, txn:
        }
        let mut txdata = Vec::new();
        for _ in 0..*node.network_chan_count.borrow() { // Make sure we don't end up with channels at the same short id by offsetting by chan_count
-               txdata.push(Transaction { version: 0, lock_time: LockTime::ZERO, input: Vec::new(), output: Vec::new() });
+               txdata.push(Transaction { version: transaction::Version(0), lock_time: LockTime::ZERO, input: Vec::new(), output: Vec::new() });
        }
        for tx in txn {
                txdata.push((*tx).clone());
@@ -415,9 +419,11 @@ type TestOnionMessenger<'chan_man, 'node_cfg, 'chan_mon_cfg> = OnionMessenger<
        DedicatedEntropy,
        &'node_cfg test_utils::TestKeysInterface,
        &'chan_mon_cfg test_utils::TestLogger,
+       &'chan_man TestChannelManager<'node_cfg, 'chan_mon_cfg>,
        &'node_cfg test_utils::TestMessageRouter<'chan_mon_cfg>,
        &'chan_man TestChannelManager<'node_cfg, 'chan_mon_cfg>,
        IgnoringMessageHandler,
+       IgnoringMessageHandler,
 >;
 
 /// For use with [`OnionMessenger`] otherwise `test_restored_packages_retry` will fail. This is
@@ -479,24 +485,74 @@ impl<'a, 'b, 'c> Node<'a, 'b, 'c> {
        pub fn get_block_header(&self, height: u32) -> Header {
                self.blocks.lock().unwrap()[height as usize].0.header
        }
-       /// Changes the channel signer's availability for the specified peer and channel.
+
+       /// Toggles this node's signer to be available for the given signer operation.
+       /// This is useful for testing behavior for restoring an async signer that previously
+       /// could not return a signature immediately.
+       #[cfg(test)]
+       pub fn enable_channel_signer_op(&self, peer_id: &PublicKey, chan_id: &ChannelId, signer_op: SignerOp) {
+               self.set_channel_signer_ops(peer_id, chan_id, signer_op, true);
+       }
+
+       /// Toggles this node's signer to be unavailable, returning `Err` for the given signer operation.
+       /// This is useful for testing behavior for an async signer that cannot return a signature
+       /// immediately.
+       #[cfg(test)]
+       pub fn disable_channel_signer_op(&self, peer_id: &PublicKey, chan_id: &ChannelId, signer_op: SignerOp) {
+               self.set_channel_signer_ops(peer_id, chan_id, signer_op, false);
+       }
+
+       /// Changes the channel signer's availability for the specified peer, channel, and signer
+       /// operation.
        ///
-       /// When `available` is set to `true`, the channel signer will behave normally. When set to
-       /// `false`, the channel signer will act like an off-line remote signer and will return `Err` for
-       /// several of the signing methods. Currently, only `get_per_commitment_point` and
-       /// `release_commitment_secret` are affected by this setting.
+       /// For the specified signer operation, when `available` is set to `true`, the channel signer
+       /// will behave normally, returning `Ok`. When set to `false`, and the channel signer will
+       /// act like an off-line remote signer, returning `Err`. This applies to the signer in all
+       /// relevant places, i.e. the channel manager, chain monitor, and the keys manager.
        #[cfg(test)]
-       pub fn set_channel_signer_available(&self, peer_id: &PublicKey, chan_id: &ChannelId, available: bool) {
+       fn set_channel_signer_ops(&self, peer_id: &PublicKey, chan_id: &ChannelId, signer_op: SignerOp, available: bool) {
+               use crate::sign::ChannelSigner;
+               log_debug!(self.logger, "Setting channel signer for {} as available={}", chan_id, available);
+
                let per_peer_state = self.node.per_peer_state.read().unwrap();
-               let chan_lock = per_peer_state.get(peer_id).unwrap().lock().unwrap();
-               let signer = (|| {
-                       match chan_lock.channel_by_id.get(chan_id) {
-                               Some(phase) => phase.context().get_signer(),
-                               None => panic!("Couldn't find a channel with id {}", chan_id),
+               let mut chan_lock = per_peer_state.get(peer_id).unwrap().lock().unwrap();
+
+               let mut channel_keys_id = None;
+               if let Some(chan) = chan_lock.channel_by_id.get_mut(chan_id).map(|phase| phase.context_mut()) {
+                       let signer = chan.get_mut_signer().as_mut_ecdsa().unwrap();
+                       if available {
+                               signer.enable_op(signer_op);
+                       } else {
+                               signer.disable_op(signer_op);
                        }
-               })();
-               log_debug!(self.logger, "Setting channel signer for {} as available={}", chan_id, available);
-               signer.as_ecdsa().unwrap().set_available(available);
+                       channel_keys_id = Some(chan.channel_keys_id);
+               }
+
+               let monitor = self.chain_monitor.chain_monitor.list_monitors().into_iter()
+                       .find(|(_, channel_id)| *channel_id == *chan_id)
+                       .and_then(|(funding_txo, _)| self.chain_monitor.chain_monitor.get_monitor(funding_txo).ok());
+               if let Some(monitor) = monitor {
+                       monitor.do_mut_signer_call(|signer| {
+                               channel_keys_id = channel_keys_id.or(Some(signer.inner.channel_keys_id()));
+                               if available {
+                                       signer.enable_op(signer_op);
+                               } else {
+                                       signer.disable_op(signer_op);
+                               }
+                       });
+               }
+
+               let channel_keys_id = channel_keys_id.unwrap();
+               let mut unavailable_signers_ops = self.keys_manager.unavailable_signers_ops.lock().unwrap();
+               let entry = unavailable_signers_ops.entry(channel_keys_id).or_insert(new_hash_set());
+               if available {
+                       entry.remove(&signer_op);
+                       if entry.is_empty() {
+                               unavailable_signers_ops.remove(&channel_keys_id);
+                       }
+               } else {
+                       entry.insert(signer_op);
+               };
        }
 }
 
@@ -618,7 +674,7 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> {
                        // Before using all the new monitors to check the watch outpoints, use the full set of
                        // them to ensure we can write and reload our ChannelManager.
                        {
-                               let mut channel_monitors = HashMap::new();
+                               let mut channel_monitors = new_hash_map();
                                for monitor in deserialized_monitors.iter_mut() {
                                        channel_monitors.insert(monitor.get_funding_txo().0, monitor);
                                }
@@ -1049,7 +1105,7 @@ pub fn _reload_node<'a, 'b, 'c>(node: &'a Node<'a, 'b, 'c>, default_config: User
 
        let mut node_read = &chanman_encoded[..];
        let (_, node_deserialized) = {
-               let mut channel_monitors = HashMap::new();
+               let mut channel_monitors = new_hash_map();
                for monitor in monitors_read.iter_mut() {
                        assert!(channel_monitors.insert(monitor.get_funding_txo().0, monitor).is_none());
                }
@@ -1132,8 +1188,8 @@ fn internal_create_funding_transaction<'a, 'b, 'c>(node: &Node<'a, 'b, 'c>,
                                Vec::new()
                        };
 
-                       let tx = Transaction { version: chan_id as i32, lock_time: LockTime::ZERO, input, output: vec![TxOut {
-                               value: *channel_value_satoshis, script_pubkey: output_script.clone(),
+                       let tx = Transaction { version: transaction::Version(chan_id as i32), lock_time: LockTime::ZERO, input, output: vec![TxOut {
+                               value: Amount::from_sat(*channel_value_satoshis), script_pubkey: output_script.clone(),
                        }]};
                        let funding_outpoint = OutPoint { txid: tx.txid(), index: 0 };
                        (*temporary_channel_id, tx, funding_outpoint)
@@ -1203,7 +1259,7 @@ pub fn open_zero_conf_channel<'a, 'b, 'c, 'd>(initiator: &'a Node<'b, 'c, 'd>, r
        };
 
        let accept_channel = get_event_msg!(receiver, MessageSendEvent::SendAcceptChannel, initiator.node.get_our_node_id());
-       assert_eq!(accept_channel.minimum_depth, 0);
+       assert_eq!(accept_channel.common_fields.minimum_depth, 0);
        initiator.node.handle_accept_channel(&receiver.node.get_our_node_id(), &accept_channel);
 
        let (temporary_channel_id, tx, _) = create_funding_transaction(&initiator, &receiver.node.get_our_node_id(), 100_000, 42);
@@ -1257,7 +1313,7 @@ pub fn open_zero_conf_channel<'a, 'b, 'c, 'd>(initiator: &'a Node<'b, 'c, 'd>, r
 pub fn exchange_open_accept_chan<'a, 'b, 'c>(node_a: &Node<'a, 'b, 'c>, node_b: &Node<'a, 'b, 'c>, channel_value: u64, push_msat: u64) -> ChannelId {
        let create_chan_id = node_a.node.create_channel(node_b.node.get_our_node_id(), channel_value, push_msat, 42, None, None).unwrap();
        let open_channel_msg = get_event_msg!(node_a, MessageSendEvent::SendOpenChannel, node_b.node.get_our_node_id());
-       assert_eq!(open_channel_msg.temporary_channel_id, create_chan_id);
+       assert_eq!(open_channel_msg.common_fields.temporary_channel_id, create_chan_id);
        assert_eq!(node_a.node.list_channels().iter().find(|channel| channel.channel_id == create_chan_id).unwrap().user_channel_id, 42);
        node_b.node.handle_open_channel(&node_a.node.get_our_node_id(), &open_channel_msg);
        if node_b.node.get_current_default_configuration().manually_accept_inbound_channels {
@@ -1270,7 +1326,7 @@ pub fn exchange_open_accept_chan<'a, 'b, 'c>(node_a: &Node<'a, 'b, 'c>, node_b:
                };
        }
        let accept_channel_msg = get_event_msg!(node_b, MessageSendEvent::SendAcceptChannel, node_a.node.get_our_node_id());
-       assert_eq!(accept_channel_msg.temporary_channel_id, create_chan_id);
+       assert_eq!(accept_channel_msg.common_fields.temporary_channel_id, create_chan_id);
        node_a.node.handle_accept_channel(&node_b.node.get_our_node_id(), &accept_channel_msg);
        assert_ne!(node_b.node.list_channels().iter().find(|channel| channel.channel_id == create_chan_id).unwrap().user_channel_id, 0);
 
@@ -1453,15 +1509,15 @@ pub fn update_nodes_with_chan_announce<'a, 'b, 'c, 'd>(nodes: &'a Vec<Node<'b, '
 
 pub fn do_check_spends<F: Fn(&bitcoin::blockdata::transaction::OutPoint) -> Option<TxOut>>(tx: &Transaction, get_output: F) {
        for outp in tx.output.iter() {
-               assert!(outp.value >= outp.script_pubkey.dust_value().to_sat(), "Spending tx output didn't meet dust limit");
+               assert!(outp.value >= outp.script_pubkey.dust_value(), "Spending tx output didn't meet dust limit");
        }
        let mut total_value_in = 0;
        for input in tx.input.iter() {
-               total_value_in += get_output(&input.previous_output).unwrap().value;
+               total_value_in += get_output(&input.previous_output).unwrap().value.to_sat();
        }
        let mut total_value_out = 0;
        for output in tx.output.iter() {
-               total_value_out += output.value;
+               total_value_out += output.value.to_sat();
        }
        let min_fee = (tx.weight().to_wu() as u64 + 3) / 4; // One sat per vbyte (ie per weight/4, rounded up)
        // Input amount - output amount = fee, so check that out + min_fee is smaller than input
@@ -1475,7 +1531,7 @@ macro_rules! check_spends {
                {
                        $(
                        for outp in $spends_txn.output.iter() {
-                               assert!(outp.value >= outp.script_pubkey.dust_value().to_sat(), "Input tx output didn't meet dust limit");
+                               assert!(outp.value >= outp.script_pubkey.dust_value(), "Input tx output didn't meet dust limit");
                        }
                        )*
                        let get_output = |out_point: &bitcoin::blockdata::transaction::OutPoint| {
@@ -1823,14 +1879,9 @@ macro_rules! expect_htlc_handling_failed_destinations {
 /// there are any [`Event::HTLCHandlingFailed`] events their [`HTLCDestination`] is included in the
 /// `expected_failures` set.
 pub fn expect_pending_htlcs_forwardable_conditions(events: Vec<Event>, expected_failures: &[HTLCDestination]) {
-       match events[0] {
-               Event::PendingHTLCsForwardable { .. } => { },
-               _ => panic!("Unexpected event {:?}", events),
-       };
-
        let count = expected_failures.len() + 1;
        assert_eq!(events.len(), count);
-
+       assert!(events.iter().find(|event| matches!(event, Event::PendingHTLCsForwardable { .. })).is_some());
        if expected_failures.len() > 0 {
                expect_htlc_handling_failed_destinations!(events, expected_failures)
        }
@@ -2047,7 +2098,7 @@ macro_rules! get_payment_preimage_hash {
 /// Gets a route from the given sender to the node described in `payment_params`.
 pub fn get_route(send_node: &Node, route_params: &RouteParameters) -> Result<Route, msgs::LightningError> {
        let scorer = TestScorer::new();
-       let keys_manager = TestKeysInterface::new(&[0u8; 32], bitcoin::network::constants::Network::Testnet);
+       let keys_manager = TestKeysInterface::new(&[0u8; 32], Network::Testnet);
        let random_seed_bytes = keys_manager.get_secure_random_bytes();
        router::get_route(
                &send_node.node.get_our_node_id(), route_params, &send_node.network_graph.read_only(),
@@ -2059,7 +2110,7 @@ pub fn get_route(send_node: &Node, route_params: &RouteParameters) -> Result<Rou
 /// Like `get_route` above, but adds a random CLTV offset to the final hop.
 pub fn find_route(send_node: &Node, route_params: &RouteParameters) -> Result<Route, msgs::LightningError> {
        let scorer = TestScorer::new();
-       let keys_manager = TestKeysInterface::new(&[0u8; 32], bitcoin::network::constants::Network::Testnet);
+       let keys_manager = TestKeysInterface::new(&[0u8; 32], Network::Testnet);
        let random_seed_bytes = keys_manager.get_secure_random_bytes();
        router::find_route(
                &send_node.node.get_our_node_id(), route_params, &send_node.network_graph,
@@ -2111,7 +2162,15 @@ pub fn check_payment_claimable(
                        assert_eq!(expected_recv_value, *amount_msat);
                        assert_eq!(expected_receiver_node_id, receiver_node_id.unwrap());
                        match purpose {
-                               PaymentPurpose::InvoicePayment { payment_preimage, payment_secret, .. } => {
+                               PaymentPurpose::Bolt11InvoicePayment { payment_preimage, payment_secret, .. } => {
+                                       assert_eq!(&expected_payment_preimage, payment_preimage);
+                                       assert_eq!(expected_payment_secret, *payment_secret);
+                               },
+                               PaymentPurpose::Bolt12OfferPayment { payment_preimage, payment_secret, .. } => {
+                                       assert_eq!(&expected_payment_preimage, payment_preimage);
+                                       assert_eq!(expected_payment_secret, *payment_secret);
+                               },
+                               PaymentPurpose::Bolt12RefundPayment { payment_preimage, payment_secret, .. } => {
                                        assert_eq!(&expected_payment_preimage, payment_preimage);
                                        assert_eq!(expected_payment_secret, *payment_secret);
                                },
@@ -2219,38 +2278,74 @@ macro_rules! expect_payment_path_successful {
        }
 }
 
+/// Returns the total fee earned by this HTLC forward, in msat.
 pub fn expect_payment_forwarded<CM: AChannelManager, H: NodeHolder<CM=CM>>(
        event: Event, node: &H, prev_node: &H, next_node: &H, expected_fee: Option<u64>,
-       upstream_force_closed: bool, downstream_force_closed: bool
-) {
+       expected_extra_fees_msat: Option<u64>, upstream_force_closed: bool,
+       downstream_force_closed: bool, allow_1_msat_fee_overpay: bool,
+) -> Option<u64> {
        match event {
                Event::PaymentForwarded {
-                       fee_earned_msat, prev_channel_id, claim_from_onchain_tx, next_channel_id,
-                       outbound_amount_forwarded_msat: _
+                       prev_channel_id, next_channel_id, prev_user_channel_id, next_user_channel_id,
+                       total_fee_earned_msat, skimmed_fee_msat, claim_from_onchain_tx, ..
                } => {
-                       assert_eq!(fee_earned_msat, expected_fee);
+                       if allow_1_msat_fee_overpay {
+                               // Aggregating fees for blinded paths may result in a rounding error, causing slight
+                               // overpayment in fees.
+                               let actual_fee = total_fee_earned_msat.unwrap();
+                               let expected_fee = expected_fee.unwrap();
+                               assert!(actual_fee == expected_fee || actual_fee == expected_fee + 1);
+                       } else {
+                               assert_eq!(total_fee_earned_msat, expected_fee);
+                       }
+
+                       // Check that the (knowingly) withheld amount is always less or equal to the expected
+                       // overpaid amount.
+                       assert!(skimmed_fee_msat == expected_extra_fees_msat);
                        if !upstream_force_closed {
                                // Is the event prev_channel_id in one of the channels between the two nodes?
-                               assert!(node.node().list_channels().iter().any(|x| x.counterparty.node_id == prev_node.node().get_our_node_id() && x.channel_id == prev_channel_id.unwrap()));
+                               assert!(node.node().list_channels().iter().any(|x|
+                                       x.counterparty.node_id == prev_node.node().get_our_node_id() &&
+                                       x.channel_id == prev_channel_id.unwrap() &&
+                                       x.user_channel_id == prev_user_channel_id.unwrap()
+                               ));
                        }
                        // We check for force closures since a force closed channel is removed from the
                        // node's channel list
                        if !downstream_force_closed {
-                               assert!(node.node().list_channels().iter().any(|x| x.counterparty.node_id == next_node.node().get_our_node_id() && x.channel_id == next_channel_id.unwrap()));
+                               // As documented, `next_user_channel_id` will only be `Some` if we didn't settle via an
+                               // onchain transaction, just as the `total_fee_earned_msat` field. Rather than
+                               // introducing yet another variable, we use the latter's state as a flag to detect
+                               // this and only check if it's `Some`.
+                               if total_fee_earned_msat.is_none() {
+                                       assert!(node.node().list_channels().iter().any(|x|
+                                               x.counterparty.node_id == next_node.node().get_our_node_id() &&
+                                               x.channel_id == next_channel_id.unwrap()
+                                       ));
+                               } else {
+                                       assert!(node.node().list_channels().iter().any(|x|
+                                               x.counterparty.node_id == next_node.node().get_our_node_id() &&
+                                               x.channel_id == next_channel_id.unwrap() &&
+                                               x.user_channel_id == next_user_channel_id.unwrap()
+                                       ));
+                               }
                        }
                        assert_eq!(claim_from_onchain_tx, downstream_force_closed);
+                       total_fee_earned_msat
                },
                _ => panic!("Unexpected event"),
        }
 }
 
+#[macro_export]
 macro_rules! expect_payment_forwarded {
        ($node: expr, $prev_node: expr, $next_node: expr, $expected_fee: expr, $upstream_force_closed: expr, $downstream_force_closed: expr) => {
                let mut events = $node.node.get_and_clear_pending_events();
                assert_eq!(events.len(), 1);
                $crate::ln::functional_test_utils::expect_payment_forwarded(
-                       events.pop().unwrap(), &$node, &$prev_node, &$next_node, $expected_fee,
-                       $upstream_force_closed, $downstream_force_closed);
+                       events.pop().unwrap(), &$node, &$prev_node, &$next_node, $expected_fee, None,
+                       $upstream_force_closed, $downstream_force_closed, false
+               );
        }
 }
 
@@ -2391,18 +2486,11 @@ pub fn expect_payment_failed_conditions_event<'a, 'b, 'c, 'd, 'e>(
                        if let Some(chan_closed) = conditions.expected_blamed_chan_closed {
                                if let PathFailure::OnPath { network_update: Some(upd) } = failure {
                                        match upd {
-                                               NetworkUpdate::ChannelUpdateMessage { ref msg } if !chan_closed => {
-                                                       if let Some(scid) = conditions.expected_blamed_scid {
-                                                               assert_eq!(msg.contents.short_channel_id, scid);
-                                                       }
-                                                       const CHAN_DISABLED_FLAG: u8 = 2;
-                                                       assert_eq!(msg.contents.flags & CHAN_DISABLED_FLAG, 0);
-                                               },
-                                               NetworkUpdate::ChannelFailure { short_channel_id, is_permanent } if chan_closed => {
+                                               NetworkUpdate::ChannelFailure { short_channel_id, is_permanent } => {
                                                        if let Some(scid) = conditions.expected_blamed_scid {
                                                                assert_eq!(*short_channel_id, scid);
                                                        }
-                                                       assert!(is_permanent);
+                                                       assert_eq!(*is_permanent, chan_closed);
                                                },
                                                _ => panic!("Unexpected update type"),
                                        }
@@ -2461,7 +2549,70 @@ fn fail_payment_along_path<'a, 'b, 'c>(expected_path: &[&Node<'a, 'b, 'c>]) {
        }
 }
 
-pub fn do_pass_along_path<'a, 'b, 'c>(origin_node: &Node<'a, 'b, 'c>, expected_path: &[&Node<'a, 'b, 'c>], recv_value: u64, our_payment_hash: PaymentHash, our_payment_secret: Option<PaymentSecret>, ev: MessageSendEvent, payment_claimable_expected: bool, clear_recipient_events: bool, expected_preimage: Option<PaymentPreimage>, is_probe: bool) -> Option<Event> {
+pub struct PassAlongPathArgs<'a, 'b, 'c, 'd> {
+       pub origin_node: &'a Node<'b, 'c, 'd>,
+       pub expected_path: &'a [&'a Node<'b, 'c, 'd>],
+       pub recv_value: u64,
+       pub payment_hash: PaymentHash,
+       pub payment_secret: Option<PaymentSecret>,
+       pub event: MessageSendEvent,
+       pub payment_claimable_expected: bool,
+       pub clear_recipient_events: bool,
+       pub expected_preimage: Option<PaymentPreimage>,
+       pub is_probe: bool,
+       pub custom_tlvs: Vec<(u64, Vec<u8>)>,
+       pub payment_metadata: Option<Vec<u8>>,
+}
+
+impl<'a, 'b, 'c, 'd> PassAlongPathArgs<'a, 'b, 'c, 'd> {
+       pub fn new(
+               origin_node: &'a Node<'b, 'c, 'd>, expected_path: &'a [&'a Node<'b, 'c, 'd>], recv_value: u64,
+               payment_hash: PaymentHash, event: MessageSendEvent,
+       ) -> Self {
+               Self {
+                       origin_node, expected_path, recv_value, payment_hash, payment_secret: None, event,
+                       payment_claimable_expected: true, clear_recipient_events: true, expected_preimage: None,
+                       is_probe: false, custom_tlvs: Vec::new(), payment_metadata: None,
+               }
+       }
+       pub fn without_clearing_recipient_events(mut self) -> Self {
+               self.clear_recipient_events = false;
+               self
+       }
+       pub fn is_probe(mut self) -> Self {
+               self.payment_claimable_expected = false;
+               self.is_probe = true;
+               self
+       }
+       pub fn without_claimable_event(mut self) -> Self {
+               self.payment_claimable_expected = false;
+               self
+       }
+       pub fn with_payment_secret(mut self, payment_secret: PaymentSecret) -> Self {
+               self.payment_secret = Some(payment_secret);
+               self
+       }
+       pub fn with_payment_preimage(mut self, payment_preimage: PaymentPreimage) -> Self {
+               self.expected_preimage = Some(payment_preimage);
+               self
+       }
+       pub fn with_custom_tlvs(mut self, custom_tlvs: Vec<(u64, Vec<u8>)>) -> Self {
+               self.custom_tlvs = custom_tlvs;
+               self
+       }
+       pub fn with_payment_metadata(mut self, payment_metadata: Vec<u8>) -> Self {
+               self.payment_metadata = Some(payment_metadata);
+               self
+       }
+}
+
+pub fn do_pass_along_path<'a, 'b, 'c>(args: PassAlongPathArgs) -> Option<Event> {
+       let PassAlongPathArgs {
+               origin_node, expected_path, recv_value, payment_hash: our_payment_hash,
+               payment_secret: our_payment_secret, event: ev, payment_claimable_expected,
+               clear_recipient_events, expected_preimage, is_probe, custom_tlvs, payment_metadata,
+       } = args;
+
        let mut payment_event = SendEvent::from_event(ev);
        let mut prev_node = origin_node;
        let mut event = None;
@@ -2492,8 +2643,20 @@ pub fn do_pass_along_path<'a, 'b, 'c>(origin_node: &Node<'a, 'b, 'c>, expected_p
                                                assert_eq!(our_payment_hash, *payment_hash);
                                                assert_eq!(node.node.get_our_node_id(), receiver_node_id.unwrap());
                                                assert!(onion_fields.is_some());
+                                               assert_eq!(onion_fields.as_ref().unwrap().custom_tlvs, custom_tlvs);
+                                               assert_eq!(onion_fields.as_ref().unwrap().payment_metadata, payment_metadata);
                                                match &purpose {
-                                                       PaymentPurpose::InvoicePayment { payment_preimage, payment_secret, .. } => {
+                                                       PaymentPurpose::Bolt11InvoicePayment { payment_preimage, payment_secret, .. } => {
+                                                               assert_eq!(expected_preimage, *payment_preimage);
+                                                               assert_eq!(our_payment_secret.unwrap(), *payment_secret);
+                                                               assert_eq!(Some(*payment_secret), onion_fields.as_ref().unwrap().payment_secret);
+                                                       },
+                                                       PaymentPurpose::Bolt12OfferPayment { payment_preimage, payment_secret, .. } => {
+                                                               assert_eq!(expected_preimage, *payment_preimage);
+                                                               assert_eq!(our_payment_secret.unwrap(), *payment_secret);
+                                                               assert_eq!(Some(*payment_secret), onion_fields.as_ref().unwrap().payment_secret);
+                                                       },
+                                                       PaymentPurpose::Bolt12RefundPayment { payment_preimage, payment_secret, .. } => {
                                                                assert_eq!(expected_preimage, *payment_preimage);
                                                                assert_eq!(our_payment_secret.unwrap(), *payment_secret);
                                                                assert_eq!(Some(*payment_secret), onion_fields.as_ref().unwrap().payment_secret);
@@ -2528,7 +2691,17 @@ pub fn do_pass_along_path<'a, 'b, 'c>(origin_node: &Node<'a, 'b, 'c>, expected_p
 }
 
 pub fn pass_along_path<'a, 'b, 'c>(origin_node: &Node<'a, 'b, 'c>, expected_path: &[&Node<'a, 'b, 'c>], recv_value: u64, our_payment_hash: PaymentHash, our_payment_secret: Option<PaymentSecret>, ev: MessageSendEvent, payment_claimable_expected: bool, expected_preimage: Option<PaymentPreimage>) -> Option<Event> {
-       do_pass_along_path(origin_node, expected_path, recv_value, our_payment_hash, our_payment_secret, ev, payment_claimable_expected, true, expected_preimage, false)
+       let mut args = PassAlongPathArgs::new(origin_node, expected_path, recv_value, our_payment_hash, ev);
+       if !payment_claimable_expected {
+               args = args.without_claimable_event();
+       }
+       if let Some(payment_secret) = our_payment_secret {
+               args = args.with_payment_secret(payment_secret);
+       }
+       if let Some(payment_preimage) = expected_preimage {
+               args = args.with_payment_preimage(payment_preimage);
+       }
+       do_pass_along_path(args)
 }
 
 pub fn send_probe_along_route<'a, 'b, 'c>(origin_node: &Node<'a, 'b, 'c>, expected_route: &[&[&Node<'a, 'b, 'c>]]) {
@@ -2540,7 +2713,10 @@ pub fn send_probe_along_route<'a, 'b, 'c>(origin_node: &Node<'a, 'b, 'c>, expect
        for path in expected_route.iter() {
                let ev = remove_first_msg_event_to_node(&path[0].node.get_our_node_id(), &mut events);
 
-               do_pass_along_path(origin_node, path, 0, PaymentHash([0_u8; 32]), None, ev, false, false, None, true);
+               do_pass_along_path(PassAlongPathArgs::new(origin_node, path, 0, PaymentHash([0_u8; 32]), ev)
+                       .is_probe()
+                       .without_clearing_recipient_events());
+
                let nodes_to_fail_payment: Vec<_> = vec![origin_node].into_iter().chain(path.iter().cloned()).collect();
 
                fail_payment_along_path(nodes_to_fail_payment.as_slice());
@@ -2566,58 +2742,108 @@ pub fn send_along_route<'a, 'b, 'c>(origin_node: &Node<'a, 'b, 'c>, route: Route
        (our_payment_preimage, our_payment_hash, our_payment_secret, payment_id)
 }
 
-pub fn do_claim_payment_along_route<'a, 'b, 'c>(
-       origin_node: &Node<'a, 'b, 'c>, expected_paths: &[&[&Node<'a, 'b, 'c>]], skip_last: bool,
-       our_payment_preimage: PaymentPreimage
-) -> u64 {
-       let extra_fees = vec![0; expected_paths.len()];
-       do_claim_payment_along_route_with_extra_penultimate_hop_fees(origin_node, expected_paths,
-               &extra_fees[..], skip_last, our_payment_preimage)
-}
-
-pub fn do_claim_payment_along_route_with_extra_penultimate_hop_fees<'a, 'b, 'c>(
-       origin_node: &Node<'a, 'b, 'c>, expected_paths: &[&[&Node<'a, 'b, 'c>]], expected_extra_fees:
-       &[u32], skip_last: bool, our_payment_preimage: PaymentPreimage
-) -> u64 {
-       assert_eq!(expected_paths.len(), expected_extra_fees.len());
-       for path in expected_paths.iter() {
-               assert_eq!(path.last().unwrap().node.get_our_node_id(), expected_paths[0].last().unwrap().node.get_our_node_id());
+pub fn do_claim_payment_along_route(args: ClaimAlongRouteArgs) -> u64 {
+       for path in args.expected_paths.iter() {
+               assert_eq!(path.last().unwrap().node.get_our_node_id(), args.expected_paths[0].last().unwrap().node.get_our_node_id());
+       }
+       args.expected_paths[0].last().unwrap().node.claim_funds(args.payment_preimage);
+       pass_claimed_payment_along_route(args)
+}
+
+pub struct ClaimAlongRouteArgs<'a, 'b, 'c, 'd> {
+       pub origin_node: &'a Node<'b, 'c, 'd>,
+       pub expected_paths: &'a [&'a [&'a Node<'b, 'c, 'd>]],
+       pub expected_extra_fees: Vec<u32>,
+       pub expected_min_htlc_overpay: Vec<u32>,
+       pub skip_last: bool,
+       pub payment_preimage: PaymentPreimage,
+       pub custom_tlvs: Vec<(u64, Vec<u8>)>,
+       // Allow forwarding nodes to have taken 1 msat more fee than expected based on the downstream
+       // fulfill amount.
+       //
+       // Necessary because our test utils calculate the expected fee for an intermediate node based on
+       // the amount was claimed in their downstream peer's fulfill, but blinded intermediate nodes
+       // calculate their fee based on the inbound amount from their upstream peer, causing a difference
+       // in rounding.
+       pub allow_1_msat_fee_overpay: bool,
+}
+
+impl<'a, 'b, 'c, 'd> ClaimAlongRouteArgs<'a, 'b, 'c, 'd> {
+       pub fn new(
+               origin_node: &'a Node<'b, 'c, 'd>, expected_paths: &'a [&'a [&'a Node<'b, 'c, 'd>]],
+               payment_preimage: PaymentPreimage,
+       ) -> Self {
+               Self {
+                       origin_node, expected_paths, expected_extra_fees: vec![0; expected_paths.len()],
+                       expected_min_htlc_overpay: vec![0; expected_paths.len()], skip_last: false, payment_preimage,
+                       allow_1_msat_fee_overpay: false, custom_tlvs: vec![],
+               }
+       }
+       pub fn skip_last(mut self, skip_last: bool) -> Self {
+               self.skip_last = skip_last;
+               self
+       }
+       pub fn with_expected_extra_fees(mut self, extra_fees: Vec<u32>) -> Self {
+               self.expected_extra_fees = extra_fees;
+               self
+       }
+       pub fn with_expected_min_htlc_overpay(mut self, extra_fees: Vec<u32>) -> Self {
+               self.expected_min_htlc_overpay = extra_fees;
+               self
+       }
+       pub fn allow_1_msat_fee_overpay(mut self) -> Self {
+               self.allow_1_msat_fee_overpay = true;
+               self
+       }
+       pub fn with_custom_tlvs(mut self, custom_tlvs: Vec<(u64, Vec<u8>)>) -> Self {
+               self.custom_tlvs = custom_tlvs;
+               self
        }
-       expected_paths[0].last().unwrap().node.claim_funds(our_payment_preimage);
-       pass_claimed_payment_along_route(origin_node, expected_paths, expected_extra_fees, skip_last, our_payment_preimage)
 }
 
-pub fn pass_claimed_payment_along_route<'a, 'b, 'c>(origin_node: &Node<'a, 'b, 'c>, expected_paths: &[&[&Node<'a, 'b, 'c>]], expected_extra_fees: &[u32], skip_last: bool, our_payment_preimage: PaymentPreimage) -> u64 {
+pub fn pass_claimed_payment_along_route(args: ClaimAlongRouteArgs) -> u64 {
+       let ClaimAlongRouteArgs {
+               origin_node, expected_paths, expected_extra_fees, expected_min_htlc_overpay, skip_last,
+               payment_preimage: our_payment_preimage, allow_1_msat_fee_overpay, custom_tlvs,
+       } = args;
        let claim_event = expected_paths[0].last().unwrap().node.get_and_clear_pending_events();
        assert_eq!(claim_event.len(), 1);
+       #[allow(unused)]
+       let mut fwd_amt_msat = 0;
        match claim_event[0] {
                Event::PaymentClaimed {
-                       purpose: PaymentPurpose::SpontaneousPayment(preimage),
+                       purpose: PaymentPurpose::SpontaneousPayment(preimage)
+                               | PaymentPurpose::Bolt11InvoicePayment { payment_preimage: Some(preimage), .. }
+                               | PaymentPurpose::Bolt12OfferPayment { payment_preimage: Some(preimage), .. }
+                               | PaymentPurpose::Bolt12RefundPayment { payment_preimage: Some(preimage), .. },
                        amount_msat,
                        ref htlcs,
-                       .. }
-               | Event::PaymentClaimed {
-                       purpose: PaymentPurpose::InvoicePayment { payment_preimage: Some(preimage), ..},
-                       ref htlcs,
-                       amount_msat,
+                       ref onion_fields,
                        ..
                } => {
                        assert_eq!(preimage, our_payment_preimage);
                        assert_eq!(htlcs.len(), expected_paths.len());  // One per path.
                        assert_eq!(htlcs.iter().map(|h| h.value_msat).sum::<u64>(), amount_msat);
+                       assert_eq!(onion_fields.as_ref().unwrap().custom_tlvs, custom_tlvs);
                        expected_paths.iter().zip(htlcs).for_each(|(path, htlc)| check_claimed_htlc_channel(origin_node, path, htlc));
+                       fwd_amt_msat = amount_msat;
                },
                Event::PaymentClaimed {
-                       purpose: PaymentPurpose::InvoicePayment { .. },
+                       purpose: PaymentPurpose::Bolt11InvoicePayment { .. }
+                               | PaymentPurpose::Bolt12OfferPayment { .. }
+                               | PaymentPurpose::Bolt12RefundPayment { .. },
                        payment_hash,
                        amount_msat,
                        ref htlcs,
+                       ref onion_fields,
                        ..
                } => {
                        assert_eq!(&payment_hash.0, &Sha256::hash(&our_payment_preimage.0)[..]);
                        assert_eq!(htlcs.len(), expected_paths.len());  // One per path.
                        assert_eq!(htlcs.iter().map(|h| h.value_msat).sum::<u64>(), amount_msat);
+                       assert_eq!(onion_fields.as_ref().unwrap().custom_tlvs, custom_tlvs);
                        expected_paths.iter().zip(htlcs).for_each(|(path, htlc)| check_claimed_htlc_channel(origin_node, path, htlc));
+                       fwd_amt_msat = amount_msat;
                }
                _ => panic!(),
        }
@@ -2649,8 +2875,12 @@ pub fn pass_claimed_payment_along_route<'a, 'b, 'c>(origin_node: &Node<'a, 'b, '
                per_path_msgs.push(msgs_from_ev!(&events[0]));
        } else {
                for expected_path in expected_paths.iter() {
-                       // For MPP payments, we always want the message to the first node in the path.
-                       let ev = remove_first_msg_event_to_node(&expected_path[0].node.get_our_node_id(), &mut events);
+                       // For MPP payments, we want the fulfill message from the payee to the penultimate hop in the
+                       // path.
+                       let penultimate_hop_node_id = expected_path.iter().rev().skip(1).next()
+                               .map(|n| n.node.get_our_node_id())
+                               .unwrap_or(origin_node.node.get_our_node_id());
+                       let ev = remove_first_msg_event_to_node(&penultimate_hop_node_id, &mut events);
                        per_path_msgs.push(msgs_from_ev!(&ev));
                }
        }
@@ -2674,19 +2904,34 @@ pub fn pass_claimed_payment_along_route<'a, 'b, 'c>(origin_node: &Node<'a, 'b, '
                                {
                                        $node.node.handle_update_fulfill_htlc(&$prev_node.node.get_our_node_id(), &next_msgs.as_ref().unwrap().0);
                                        let mut fee = {
-                                               let per_peer_state = $node.node.per_peer_state.read().unwrap();
-                                               let peer_state = per_peer_state.get(&$prev_node.node.get_our_node_id())
-                                                       .unwrap().lock().unwrap();
-                                               let channel = peer_state.channel_by_id.get(&next_msgs.as_ref().unwrap().0.channel_id).unwrap();
-                                               if let Some(prev_config) = channel.context().prev_config() {
-                                                       prev_config.forwarding_fee_base_msat
-                                               } else {
-                                                       channel.context().config().forwarding_fee_base_msat
-                                               }
+                                               let (base_fee, prop_fee) = {
+                                                       let per_peer_state = $node.node.per_peer_state.read().unwrap();
+                                                       let peer_state = per_peer_state.get(&$prev_node.node.get_our_node_id())
+                                                               .unwrap().lock().unwrap();
+                                                       let channel = peer_state.channel_by_id.get(&next_msgs.as_ref().unwrap().0.channel_id).unwrap();
+                                                       if let Some(prev_config) = channel.context().prev_config() {
+                                                               (prev_config.forwarding_fee_base_msat as u64,
+                                                                prev_config.forwarding_fee_proportional_millionths as u64)
+                                                       } else {
+                                                               (channel.context().config().forwarding_fee_base_msat as u64,
+                                                                channel.context().config().forwarding_fee_proportional_millionths as u64)
+                                                       }
+                                               };
+                                               ((fwd_amt_msat * prop_fee / 1_000_000) + base_fee) as u32
                                        };
-                                       if $idx == 1 { fee += expected_extra_fees[i]; }
-                                       expect_payment_forwarded!(*$node, $next_node, $prev_node, Some(fee as u64), false, false);
-                                       expected_total_fee_msat += fee as u64;
+
+                                       let mut expected_extra_fee = None;
+                                       if $idx == 1 {
+                                               fee += expected_extra_fees[i];
+                                               fee += expected_min_htlc_overpay[i];
+                                               expected_extra_fee = if expected_extra_fees[i] > 0 { Some(expected_extra_fees[i] as u64) } else { None };
+                                       }
+                                       let mut events = $node.node.get_and_clear_pending_events();
+                                       assert_eq!(events.len(), 1);
+                                       let actual_fee = expect_payment_forwarded(events.pop().unwrap(), *$node, $next_node, $prev_node,
+                                               Some(fee as u64), expected_extra_fee, false, false, allow_1_msat_fee_overpay);
+                                       expected_total_fee_msat += actual_fee.unwrap();
+                                       fwd_amt_msat += actual_fee.unwrap();
                                        check_added_monitors!($node, 1);
                                        let new_next_msgs = if $new_msgs {
                                                let events = $node.node.get_and_clear_pending_msg_events();
@@ -2740,15 +2985,20 @@ pub fn pass_claimed_payment_along_route<'a, 'b, 'c>(origin_node: &Node<'a, 'b, '
 
        expected_total_fee_msat
 }
-pub fn claim_payment_along_route<'a, 'b, 'c>(origin_node: &Node<'a, 'b, 'c>, expected_paths: &[&[&Node<'a, 'b, 'c>]], skip_last: bool, our_payment_preimage: PaymentPreimage) {
-       let expected_total_fee_msat = do_claim_payment_along_route(origin_node, expected_paths, skip_last, our_payment_preimage);
+pub fn claim_payment_along_route(args: ClaimAlongRouteArgs) {
+       let origin_node = args.origin_node;
+       let payment_preimage = args.payment_preimage;
+       let skip_last = args.skip_last;
+       let expected_total_fee_msat = do_claim_payment_along_route(args);
        if !skip_last {
-               expect_payment_sent!(origin_node, our_payment_preimage, Some(expected_total_fee_msat));
+               expect_payment_sent!(origin_node, payment_preimage, Some(expected_total_fee_msat));
        }
 }
 
 pub fn claim_payment<'a, 'b, 'c>(origin_node: &Node<'a, 'b, 'c>, expected_route: &[&Node<'a, 'b, 'c>], our_payment_preimage: PaymentPreimage) {
-       claim_payment_along_route(origin_node, &[expected_route], false, our_payment_preimage);
+       claim_payment_along_route(
+               ClaimAlongRouteArgs::new(origin_node, &[expected_route], our_payment_preimage)
+       );
 }
 
 pub const TEST_FINAL_CLTV: u32 = 70;
@@ -2952,7 +3202,7 @@ pub fn create_node_cfgs_with_persisters<'a>(node_count: usize, chanmon_cfgs: &'a
                        tx_broadcaster: &chanmon_cfgs[i].tx_broadcaster,
                        fee_estimator: &chanmon_cfgs[i].fee_estimator,
                        router: test_utils::TestRouter::new(network_graph.clone(), &chanmon_cfgs[i].logger, &chanmon_cfgs[i].scorer),
-                       message_router: test_utils::TestMessageRouter::new(network_graph.clone()),
+                       message_router: test_utils::TestMessageRouter::new(network_graph.clone(), &chanmon_cfgs[i].keys_manager),
                        chain_monitor,
                        keys_manager: &chanmon_cfgs[i].keys_manager,
                        node_seed: seed,
@@ -3008,8 +3258,8 @@ pub fn create_network<'a, 'b: 'a, 'c: 'b>(node_count: usize, cfgs: &'b Vec<NodeC
        for i in 0..node_count {
                let dedicated_entropy = DedicatedEntropy(RandomBytes::new([i as u8; 32]));
                let onion_messenger = OnionMessenger::new(
-                       dedicated_entropy, cfgs[i].keys_manager, cfgs[i].logger, &cfgs[i].message_router,
-                       &chan_mgrs[i], IgnoringMessageHandler {},
+                       dedicated_entropy, cfgs[i].keys_manager, cfgs[i].logger, &chan_mgrs[i],
+                       &cfgs[i].message_router, &chan_mgrs[i], IgnoringMessageHandler {}, IgnoringMessageHandler {},
                );
                let gossip_sync = P2PGossipSync::new(cfgs[i].network_graph.as_ref(), None, cfgs[i].logger);
                let wallet_source = Arc::new(test_utils::TestWalletSource::new(SecretKey::from_slice(&[i as u8 + 1; 32]).unwrap()));
@@ -3033,30 +3283,34 @@ pub fn create_network<'a, 'b: 'a, 'c: 'b>(node_count: usize, cfgs: &'b Vec<NodeC
 
        for i in 0..node_count {
                for j in (i+1)..node_count {
-                       let node_id_i = nodes[i].node.get_our_node_id();
-                       let node_id_j = nodes[j].node.get_our_node_id();
-
-                       let init_i = msgs::Init {
-                               features: nodes[i].init_features(&node_id_j),
-                               networks: None,
-                               remote_network_address: None,
-                       };
-                       let init_j = msgs::Init {
-                               features: nodes[j].init_features(&node_id_i),
-                               networks: None,
-                               remote_network_address: None,
-                       };
-
-                       nodes[i].node.peer_connected(&node_id_j, &init_j, true).unwrap();
-                       nodes[j].node.peer_connected(&node_id_i, &init_i, false).unwrap();
-                       nodes[i].onion_messenger.peer_connected(&node_id_j, &init_j, true).unwrap();
-                       nodes[j].onion_messenger.peer_connected(&node_id_i, &init_i, false).unwrap();
+                       connect_nodes(&nodes[i], &nodes[j]);
                }
        }
 
        nodes
 }
 
+fn connect_nodes<'a, 'b: 'a, 'c: 'b>(node_a: &Node<'a, 'b, 'c>, node_b: &Node<'a, 'b, 'c>) {
+       let node_id_a = node_a.node.get_our_node_id();
+       let node_id_b = node_b.node.get_our_node_id();
+
+       let init_a = msgs::Init {
+               features: node_a.init_features(&node_id_b),
+               networks: None,
+               remote_network_address: None,
+       };
+       let init_b = msgs::Init {
+               features: node_b.init_features(&node_id_a),
+               networks: None,
+               remote_network_address: None,
+       };
+
+       node_a.node.peer_connected(&node_id_b, &init_b, true).unwrap();
+       node_b.node.peer_connected(&node_id_a, &init_a, false).unwrap();
+       node_a.onion_messenger.peer_connected(&node_id_b, &init_b, true).unwrap();
+       node_b.onion_messenger.peer_connected(&node_id_a, &init_a, false).unwrap();
+}
+
 pub fn connect_dummy_node<'a, 'b: 'a, 'c: 'b>(node: &Node<'a, 'b, 'c>) {
        let node_id_dummy = PublicKey::from_slice(&[2; 33]).unwrap();
 
@@ -3098,7 +3352,7 @@ pub enum HTLCType { NONE, TIMEOUT, SUCCESS }
 /// also fail.
 pub fn test_txn_broadcast<'a, 'b, 'c>(node: &Node<'a, 'b, 'c>, chan: &(msgs::ChannelUpdate, msgs::ChannelUpdate, ChannelId, Transaction), commitment_tx: Option<Transaction>, has_htlc_tx: HTLCType) -> Vec<Transaction>  {
        let mut node_txn = node.tx_broadcaster.txn_broadcasted.lock().unwrap();
-       let mut txn_seen = HashSet::new();
+       let mut txn_seen = new_hash_set();
        node_txn.retain(|tx| txn_seen.insert(tx.txid()));
        assert!(node_txn.len() >= if commitment_tx.is_some() { 0 } else { 1 } + if has_htlc_tx == HTLCType::NONE { 0 } else { 1 });
 
@@ -3163,7 +3417,7 @@ pub fn test_revoked_htlc_claim_txn_broadcast<'a, 'b, 'c>(node: &Node<'a, 'b, 'c>
 
 pub fn check_preimage_claim<'a, 'b, 'c>(node: &Node<'a, 'b, 'c>, prev_txn: &Vec<Transaction>) -> Vec<Transaction>  {
        let mut node_txn = node.tx_broadcaster.txn_broadcasted.lock().unwrap();
-       let mut txn_seen = HashSet::new();
+       let mut txn_seen = new_hash_set();
        node_txn.retain(|tx| txn_seen.insert(tx.txid()));
 
        let mut found_prev = false;
@@ -3278,7 +3532,7 @@ macro_rules! get_channel_value_stat {
 macro_rules! get_chan_reestablish_msgs {
        ($src_node: expr, $dst_node: expr) => {
                {
-                       let mut announcements = $crate::prelude::HashSet::new();
+                       let mut announcements = $crate::prelude::new_hash_set();
                        let mut res = Vec::with_capacity(1);
                        for msg in $src_node.node.get_and_clear_pending_msg_events() {
                                if let MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } = msg {
@@ -3417,13 +3671,8 @@ pub fn reconnect_nodes<'a, 'b, 'c, 'd>(args: ReconnectArgs<'a, 'b, 'c, 'd>) {
                pending_cell_htlc_claims, pending_cell_htlc_fails, pending_raa,
                pending_responding_commitment_signed, pending_responding_commitment_signed_dup_monitor,
        } = args;
-       node_a.node.peer_connected(&node_b.node.get_our_node_id(), &msgs::Init {
-               features: node_b.node.init_features(), networks: None, remote_network_address: None
-       }, true).unwrap();
+       connect_nodes(node_a, node_b);
        let reestablish_1 = get_chan_reestablish_msgs!(node_a, node_b);
-       node_b.node.peer_connected(&node_a.node.get_our_node_id(), &msgs::Init {
-               features: node_a.node.init_features(), networks: None, remote_network_address: None
-       }, false).unwrap();
        let reestablish_2 = get_chan_reestablish_msgs!(node_b, node_a);
 
        if send_channel_ready.0 {
@@ -3637,7 +3886,7 @@ pub fn create_batch_channel_funding<'a, 'b, 'c>(
                                assert_eq!(channel_value_satoshis, event_channel_value_satoshis);
                                assert_eq!(user_channel_id, event_user_channel_id);
                                tx_outs.push(TxOut {
-                                       value: *channel_value_satoshis, script_pubkey: output_script.clone(),
+                                       value: Amount::from_sat(*channel_value_satoshis), script_pubkey: output_script.clone(),
                                });
                        },
                        _ => panic!("Unexpected event"),
@@ -3647,7 +3896,7 @@ pub fn create_batch_channel_funding<'a, 'b, 'c>(
 
        // Compose the batch funding transaction and give it to the ChannelManager.
        let tx = Transaction {
-               version: 2,
+               version: transaction::Version::TWO,
                lock_time: LockTime::ZERO,
                input: Vec::new(),
                output: tx_outs,