pub(super) routing: PendingHTLCRouting,
pub(super) incoming_shared_secret: [u8; 32],
payment_hash: PaymentHash,
+ /// Amount received
pub(super) incoming_amt_msat: Option<u64>, // Added in 0.0.113
+ /// Sender intended amount to forward or receive (actual amount received
+ /// may overshoot this in either case)
pub(super) outgoing_amt_msat: u64,
pub(super) outgoing_cltv_value: u32,
}
cltv_expiry: u32,
/// The amount (in msats) of this MPP part
value: u64,
+ /// The amount (in msats) that the sender intended to be sent in this MPP
+ /// part (used for validating total MPP amount)
+ sender_intended_value: u64,
onion_payload: OnionPayload,
timer_ticks: u8,
/// The total value received for a payment (sum of all MPP parts if the payment is a MPP).
}}
}
+macro_rules! emit_channel_pending_event {
+ ($locked_events: expr, $channel: expr) => {
+ if $channel.should_emit_channel_pending_event() {
+ $locked_events.push(events::Event::ChannelPending {
+ channel_id: $channel.channel_id(),
+ former_temporary_channel_id: $channel.temporary_channel_id(),
+ counterparty_node_id: $channel.get_counterparty_node_id(),
+ user_channel_id: $channel.get_user_id(),
+ funding_txo: $channel.get_funding_txo().unwrap().into_bitcoin_outpoint(),
+ });
+ $channel.set_channel_pending_event_emitted();
+ }
+ }
+}
+
macro_rules! emit_channel_ready_event {
- ($self: expr, $channel: expr) => {
+ ($locked_events: expr, $channel: expr) => {
if $channel.should_emit_channel_ready_event() {
- {
- let mut pending_events = $self.pending_events.lock().unwrap();
- pending_events.push(events::Event::ChannelReady {
- channel_id: $channel.channel_id(),
- user_channel_id: $channel.get_user_id(),
- counterparty_node_id: $channel.get_counterparty_node_id(),
- channel_type: $channel.get_channel_type().clone(),
- });
- }
+ debug_assert!($channel.channel_pending_event_emitted());
+ $locked_events.push(events::Event::ChannelReady {
+ channel_id: $channel.channel_id(),
+ user_channel_id: $channel.get_user_id(),
+ counterparty_node_id: $channel.get_counterparty_node_id(),
+ channel_type: $channel.get_channel_type().clone(),
+ });
$channel.set_channel_ready_event_emitted();
}
}
payment_hash: PaymentHash, amt_msat: u64, cltv_expiry: u32, phantom_shared_secret: Option<[u8; 32]>) -> Result<PendingHTLCInfo, ReceiveError>
{
// final_incorrect_cltv_expiry
- if hop_data.outgoing_cltv_value != cltv_expiry {
+ if hop_data.outgoing_cltv_value > cltv_expiry {
return Err(ReceiveError {
- msg: "Upstream node set CLTV to the wrong value",
+ msg: "Upstream node set CLTV to less than the CLTV set by the sender",
err_code: 18,
err_data: cltv_expiry.to_be_bytes().to_vec()
})
payment_hash,
incoming_shared_secret: shared_secret,
incoming_amt_msat: Some(amt_msat),
- outgoing_amt_msat: amt_msat,
+ outgoing_amt_msat: hop_data.amt_to_forward,
outgoing_cltv_value: hop_data.outgoing_cltv_value,
})
}
HTLCForwardInfo::AddHTLC(PendingAddHTLCInfo {
prev_short_channel_id, prev_htlc_id, prev_funding_outpoint, prev_user_channel_id,
forward_info: PendingHTLCInfo {
- routing, incoming_shared_secret, payment_hash, outgoing_amt_msat, ..
+ routing, incoming_shared_secret, payment_hash, incoming_amt_msat, outgoing_amt_msat, ..
}
}) => {
let (cltv_expiry, onion_payload, payment_data, phantom_shared_secret) = match routing {
incoming_packet_shared_secret: incoming_shared_secret,
phantom_shared_secret,
},
- value: outgoing_amt_msat,
+ // We differentiate the received value from the sender intended value
+ // if possible so that we don't prematurely mark MPP payments complete
+ // if routing nodes overpay
+ value: incoming_amt_msat.unwrap_or(outgoing_amt_msat),
+ sender_intended_value: outgoing_amt_msat,
timer_ticks: 0,
total_value_received: None,
total_msat: if let Some(data) = &payment_data { data.total_msat } else { outgoing_amt_msat },
continue
}
}
- let mut total_value = claimable_htlc.value;
+ let mut total_value = claimable_htlc.sender_intended_value;
for htlc in htlcs.iter() {
- total_value += htlc.value;
+ total_value += htlc.sender_intended_value;
match &htlc.onion_payload {
OnionPayload::Invoice { .. } => {
if htlc.total_msat != $payment_data.total_msat {
_ => unreachable!(),
}
}
+ // The condition determining whether an MPP is complete must
+ // match exactly the condition used in `timer_tick_occurred`
if total_value >= msgs::MAX_VALUE_MSAT {
fail_htlc!(claimable_htlc, payment_hash);
- } else if total_value - claimable_htlc.value >= $payment_data.total_msat {
+ } else if total_value - claimable_htlc.sender_intended_value >= $payment_data.total_msat {
log_trace!(self.logger, "Failing HTLC with payment_hash {} as payment is already claimable",
log_bytes!(payment_hash.0));
fail_htlc!(claimable_htlc, payment_hash);
new_events.push(events::Event::PaymentClaimable {
receiver_node_id: Some(receiver_node_id),
payment_hash,
- amount_msat: outgoing_amt_msat,
+ amount_msat,
purpose,
via_channel_id: Some(prev_channel_id),
via_user_channel_id: Some(prev_user_channel_id),
if let OnionPayload::Invoice { .. } = htlcs[0].onion_payload {
// Check if we've received all the parts we need for an MPP (the value of the parts adds to total_msat).
// In this case we're not going to handle any timeouts of the parts here.
- if htlcs[0].total_msat <= htlcs.iter().fold(0, |total, htlc| total + htlc.value) {
+ // This condition determining whether the MPP is complete here must match
+ // exactly the condition used in `process_pending_htlc_forwards`.
+ if htlcs[0].total_msat <= htlcs.iter().fold(0, |total, htlc| total + htlc.sender_intended_value) {
return true;
} else if htlcs.into_iter().any(|htlc| {
htlc.timer_ticks += 1;
};
debug_assert!(!sources.is_empty());
- // If we are claiming an MPP payment, we check that all channels which contain a claimable
- // HTLC still exist. While this isn't guaranteed to remain true if a channel closes while
- // we're claiming (or even after we claim, before the commitment update dance completes),
- // it should be a relatively rare race, and we'd rather not claim HTLCs that require us to
- // go on-chain (and lose the on-chain fee to do so) than just reject the payment.
- //
- // Note that we'll still always get our funds - as long as the generated
- // `ChannelMonitorUpdate` makes it out to the relevant monitor we can claim on-chain.
- //
- // If we find an HTLC which we would need to claim but for which we do not have a
- // channel, we will fail all parts of the MPP payment. While we could wait and see if
- // the sender retries the already-failed path(s), it should be a pretty rare case where
- // we got all the HTLCs and then a channel closed while we were waiting for the user to
- // provide the preimage, so worrying too much about the optimal handling isn't worth
- // it.
+ // Just in case one HTLC has been failed between when we generated the `PaymentClaimable`
+ // and when we got here we need to check that the amount we're about to claim matches the
+ // amount we told the user in the last `PaymentClaimable`. We also do a sanity-check that
+ // the MPP parts all have the same `total_msat`.
let mut claimable_amt_msat = 0;
let mut prev_total_msat = None;
let mut expected_amt_msat = None;
let mut errs = Vec::new();
let per_peer_state = self.per_peer_state.read().unwrap();
for htlc in sources.iter() {
- let (counterparty_node_id, chan_id) = match self.short_to_chan_info.read().unwrap().get(&htlc.prev_hop.short_channel_id) {
- Some((cp_id, chan_id)) => (cp_id.clone(), chan_id.clone()),
- None => {
- valid_mpp = false;
- break;
- }
- };
-
- let peer_state_mutex_opt = per_peer_state.get(&counterparty_node_id);
- if peer_state_mutex_opt.is_none() {
- valid_mpp = false;
- break;
- }
-
- let mut peer_state_lock = peer_state_mutex_opt.unwrap().lock().unwrap();
- let peer_state = &mut *peer_state_lock;
-
- if peer_state.channel_by_id.get(&chan_id).is_none() {
- valid_mpp = false;
- break;
- }
-
if prev_total_msat.is_some() && prev_total_msat != Some(htlc.total_msat) {
log_error!(self.logger, "Somehow ended up with an MPP payment with different expected total amounts - this should not be reachable!");
debug_assert!(false);
claim_from_onchain_tx: from_onchain,
prev_channel_id,
next_channel_id,
+ outbound_amount_forwarded_msat: forwarded_htlc_value_msat,
}})
} else { None }
});
});
}
- emit_channel_ready_event!(self, channel);
-
macro_rules! handle_cs { () => {
if let Some(update) = commitment_update {
pending_msg_events.push(events::MessageSendEvent::UpdateHTLCs {
self.tx_broadcaster.broadcast_transaction(&tx);
}
+ {
+ let mut pending_events = self.pending_events.lock().unwrap();
+ emit_channel_pending_event!(pending_events, channel);
+ emit_channel_ready_event!(pending_events, channel);
+ }
+
htlc_forwards
}
}
}
- emit_channel_ready_event!(self, chan.get_mut());
+ {
+ let mut pending_events = self.pending_events.lock().unwrap();
+ emit_channel_ready_event!(pending_events, chan.get_mut());
+ }
Ok(())
},
}
}
- emit_channel_ready_event!(self, channel);
+ {
+ let mut pending_events = self.pending_events.lock().unwrap();
+ emit_channel_ready_event!(pending_events, channel);
+ }
if let Some(announcement_sigs) = announcement_sigs {
log_trace!(self.logger, "Sending announcement_signatures for channel {}", log_bytes!(channel.channel_id()));
}
}
- /// Blocks until ChannelManager needs to be persisted or a timeout is reached. It returns a bool
- /// indicating whether persistence is necessary. Only one listener on
- /// [`await_persistable_update`], [`await_persistable_update_timeout`], or a future returned by
- /// [`get_persistable_update_future`] is guaranteed to be woken up.
- ///
- /// Note that this method is not available with the `no-std` feature.
+ /// Gets a [`Future`] that completes when this [`ChannelManager`] needs to be persisted.
///
- /// [`await_persistable_update`]: Self::await_persistable_update
- /// [`await_persistable_update_timeout`]: Self::await_persistable_update_timeout
- /// [`get_persistable_update_future`]: Self::get_persistable_update_future
- #[cfg(any(test, feature = "std"))]
- pub fn await_persistable_update_timeout(&self, max_wait: Duration) -> bool {
- self.persistence_notifier.wait_timeout(max_wait)
- }
-
- /// Blocks until ChannelManager needs to be persisted. Only one listener on
- /// [`await_persistable_update`], `await_persistable_update_timeout`, or a future returned by
- /// [`get_persistable_update_future`] is guaranteed to be woken up.
+ /// Note that callbacks registered on the [`Future`] MUST NOT call back into this
+ /// [`ChannelManager`] and should instead register actions to be taken later.
///
- /// [`await_persistable_update`]: Self::await_persistable_update
- /// [`get_persistable_update_future`]: Self::get_persistable_update_future
- pub fn await_persistable_update(&self) {
- self.persistence_notifier.wait()
- }
-
- /// Gets a [`Future`] that completes when a persistable update is available. Note that
- /// callbacks registered on the [`Future`] MUST NOT call back into this [`ChannelManager`] and
- /// should instead register actions to be taken later.
pub fn get_persistable_update_future(&self) -> Future {
self.persistence_notifier.get_future()
}
(0, self.prev_hop, required),
(1, self.total_msat, required),
(2, self.value, required),
+ (3, self.sender_intended_value, required),
(4, payment_data, option),
(5, self.total_value_received, option),
(6, self.cltv_expiry, required),
fn read<R: Read>(reader: &mut R) -> Result<Self, DecodeError> {
let mut prev_hop = crate::util::ser::RequiredWrapper(None);
let mut value = 0;
+ let mut sender_intended_value = None;
let mut payment_data: Option<msgs::FinalOnionHopData> = None;
let mut cltv_expiry = 0;
let mut total_value_received = None;
(0, prev_hop, required),
(1, total_msat, option),
(2, value, required),
+ (3, sender_intended_value, option),
(4, payment_data, option),
(5, total_value_received, option),
(6, cltv_expiry, required),
prev_hop: prev_hop.0.unwrap(),
timer_ticks: 0,
value,
+ sender_intended_value: sender_intended_value.unwrap_or(value),
total_value_received,
total_msat: total_msat.unwrap(),
onion_payload,
let mut id_to_peer = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
let mut short_to_chan_info = HashMap::with_capacity(cmp::min(channel_count as usize, 128));
let mut channel_closures = Vec::new();
+ let mut pending_background_events = Vec::new();
for _ in 0..channel_count {
let mut channel: Channel<<SP::Target as SignerProvider>::Signer> = Channel::read(reader, (
&args.entropy_source, &args.signer_provider, best_block_height, &provided_channel_type_features(&args.default_config)
log_error!(args.logger, " The channel will be force-closed and the latest commitment transaction from the ChannelMonitor broadcast.");
log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} but the ChannelManager is at update_id {}.",
log_bytes!(channel.channel_id()), monitor.get_latest_update_id(), channel.get_latest_monitor_update_id());
- let (_, mut new_failed_htlcs) = channel.force_shutdown(true);
+ let (monitor_update, mut new_failed_htlcs) = channel.force_shutdown(true);
+ if let Some(monitor_update) = monitor_update {
+ pending_background_events.push(BackgroundEvent::ClosingMonitorUpdate(monitor_update));
+ }
failed_htlcs.append(&mut new_failed_htlcs);
- monitor.broadcast_latest_holder_commitment_txn(&args.tx_broadcaster, &args.logger);
channel_closures.push(events::Event::ChannelClosed {
channel_id: channel.channel_id(),
user_channel_id: channel.get_user_id(),
}
}
- for (funding_txo, monitor) in args.channel_monitors.iter_mut() {
+ for (funding_txo, _) in args.channel_monitors.iter() {
if !funding_txo_set.contains(funding_txo) {
- log_info!(args.logger, "Broadcasting latest holder commitment transaction for closed channel {}", log_bytes!(funding_txo.to_channel_id()));
- monitor.broadcast_latest_holder_commitment_txn(&args.tx_broadcaster, &args.logger);
+ let monitor_update = ChannelMonitorUpdate {
+ update_id: CLOSED_CHANNEL_UPDATE_ID,
+ updates: vec![ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast: true }],
+ };
+ pending_background_events.push(BackgroundEvent::ClosingMonitorUpdate((*funding_txo, monitor_update)));
}
}
}
let background_event_count: u64 = Readable::read(reader)?;
- let mut pending_background_events_read: Vec<BackgroundEvent> = Vec::with_capacity(cmp::min(background_event_count as usize, MAX_ALLOC_SIZE/mem::size_of::<BackgroundEvent>()));
for _ in 0..background_event_count {
match <u8 as Readable>::read(reader)? {
- 0 => pending_background_events_read.push(BackgroundEvent::ClosingMonitorUpdate((Readable::read(reader)?, Readable::read(reader)?))),
+ 0 => {
+ let (funding_txo, monitor_update): (OutPoint, ChannelMonitorUpdate) = (Readable::read(reader)?, Readable::read(reader)?);
+ if pending_background_events.iter().find(|e| {
+ let BackgroundEvent::ClosingMonitorUpdate((pending_funding_txo, pending_monitor_update)) = e;
+ *pending_funding_txo == funding_txo && *pending_monitor_update == monitor_update
+ }).is_none() {
+ pending_background_events.push(BackgroundEvent::ClosingMonitorUpdate((funding_txo, monitor_update)));
+ }
+ }
_ => return Err(DecodeError::InvalidValue),
}
}
per_peer_state: FairRwLock::new(per_peer_state),
pending_events: Mutex::new(pending_events_read),
- pending_background_events: Mutex::new(pending_background_events_read),
+ pending_background_events: Mutex::new(pending_background_events),
total_consistency_lock: RwLock::new(()),
persistence_notifier: Notifier::new(),
use bitcoin::hashes::Hash;
use bitcoin::hashes::sha256::Hash as Sha256;
use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
+ #[cfg(feature = "std")]
use core::time::Duration;
use core::sync::atomic::Ordering;
use crate::events::{Event, HTLCDestination, MessageSendEvent, MessageSendEventsProvider, ClosureReason};
// All nodes start with a persistable update pending as `create_network` connects each node
// with all other nodes to make most tests simpler.
- assert!(nodes[0].node.await_persistable_update_timeout(Duration::from_millis(1)));
- assert!(nodes[1].node.await_persistable_update_timeout(Duration::from_millis(1)));
- assert!(nodes[2].node.await_persistable_update_timeout(Duration::from_millis(1)));
+ assert!(nodes[0].node.get_persistable_update_future().poll_is_complete());
+ assert!(nodes[1].node.get_persistable_update_future().poll_is_complete());
+ assert!(nodes[2].node.get_persistable_update_future().poll_is_complete());
let mut chan = create_announced_chan_between_nodes(&nodes, 0, 1);
&nodes[0].node.get_our_node_id()).pop().unwrap();
// The first two nodes (which opened a channel) should now require fresh persistence
- assert!(nodes[0].node.await_persistable_update_timeout(Duration::from_millis(1)));
- assert!(nodes[1].node.await_persistable_update_timeout(Duration::from_millis(1)));
+ assert!(nodes[0].node.get_persistable_update_future().poll_is_complete());
+ assert!(nodes[1].node.get_persistable_update_future().poll_is_complete());
// ... but the last node should not.
- assert!(!nodes[2].node.await_persistable_update_timeout(Duration::from_millis(1)));
+ assert!(!nodes[2].node.get_persistable_update_future().poll_is_complete());
// After persisting the first two nodes they should no longer need fresh persistence.
- assert!(!nodes[0].node.await_persistable_update_timeout(Duration::from_millis(1)));
- assert!(!nodes[1].node.await_persistable_update_timeout(Duration::from_millis(1)));
+ assert!(!nodes[0].node.get_persistable_update_future().poll_is_complete());
+ assert!(!nodes[1].node.get_persistable_update_future().poll_is_complete());
// Node 3, unrelated to the only channel, shouldn't care if it receives a channel_update
// about the channel.
nodes[2].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &chan.0);
nodes[2].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &chan.1);
- assert!(!nodes[2].node.await_persistable_update_timeout(Duration::from_millis(1)));
+ assert!(!nodes[2].node.get_persistable_update_future().poll_is_complete());
// The nodes which are a party to the channel should also ignore messages from unrelated
// parties.
nodes[0].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.1);
nodes[1].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.0);
nodes[1].node.handle_channel_update(&nodes[2].node.get_our_node_id(), &chan.1);
- assert!(!nodes[0].node.await_persistable_update_timeout(Duration::from_millis(1)));
- assert!(!nodes[1].node.await_persistable_update_timeout(Duration::from_millis(1)));
+ assert!(!nodes[0].node.get_persistable_update_future().poll_is_complete());
+ assert!(!nodes[1].node.get_persistable_update_future().poll_is_complete());
// At this point the channel info given by peers should still be the same.
assert_eq!(nodes[0].node.list_channels()[0], node_a_chan_info);
// persisted and that its channel info remains the same.
nodes[0].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &as_update);
nodes[1].node.handle_channel_update(&nodes[0].node.get_our_node_id(), &bs_update);
- assert!(!nodes[0].node.await_persistable_update_timeout(Duration::from_millis(1)));
- assert!(!nodes[1].node.await_persistable_update_timeout(Duration::from_millis(1)));
+ assert!(!nodes[0].node.get_persistable_update_future().poll_is_complete());
+ assert!(!nodes[1].node.get_persistable_update_future().poll_is_complete());
assert_eq!(nodes[0].node.list_channels()[0], node_a_chan_info);
assert_eq!(nodes[1].node.list_channels()[0], node_b_chan_info);
// the channel info has updated.
nodes[0].node.handle_channel_update(&nodes[1].node.get_our_node_id(), &bs_update);
nodes[1].node.handle_channel_update(&nodes[0].node.get_our_node_id(), &as_update);
- assert!(nodes[0].node.await_persistable_update_timeout(Duration::from_millis(1)));
- assert!(nodes[1].node.await_persistable_update_timeout(Duration::from_millis(1)));
+ assert!(nodes[0].node.get_persistable_update_future().poll_is_complete());
+ assert!(nodes[1].node.get_persistable_update_future().poll_is_complete());
assert_ne!(nodes[0].node.list_channels()[0], node_a_chan_info);
assert_ne!(nodes[1].node.list_channels()[0], node_b_chan_info);
}
assert_eq!(nodes_0_lock.len(), 1);
assert!(nodes_0_lock.contains_key(channel_id));
}
+ expect_channel_pending_event(&nodes[1], &nodes[0].node.get_our_node_id());
{
// Assert that `nodes[1]`'s `id_to_peer` map is populated with the channel as soon as
let funding_signed = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, nodes[0].node.get_our_node_id());
nodes[0].node.handle_funding_signed(&nodes[1].node.get_our_node_id(), &funding_signed);
check_added_monitors!(nodes[0], 1);
+ expect_channel_pending_event(&nodes[0], &nodes[1].node.get_our_node_id());
let (channel_ready, _) = create_chan_between_nodes_with_value_confirm(&nodes[0], &nodes[1], &tx);
let (announcement, nodes_0_update, nodes_1_update) = create_chan_between_nodes_with_value_b(&nodes[0], &nodes[1], &channel_ready);
update_nodes_with_chan_announce(&nodes, 0, 1, &announcement, &nodes_0_update, &nodes_1_update);
nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &funding_created_msg);
check_added_monitors!(nodes[1], 1);
+ expect_channel_pending_event(&nodes[1], &nodes[0].node.get_our_node_id());
+
let funding_signed = get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, nodes[0].node.get_our_node_id());
nodes[0].node.handle_funding_signed(&nodes[1].node.get_our_node_id(), &funding_signed);
check_added_monitors!(nodes[0], 1);
+ expect_channel_pending_event(&nodes[0], &nodes[1].node.get_our_node_id());
}
open_channel_msg.temporary_channel_id = nodes[0].keys_manager.get_secure_random_bytes();
}
use crate::chain::chainmonitor::{ChainMonitor, Persist};
use crate::chain::keysinterface::{EntropySource, KeysManager, InMemorySigner};
use crate::events::{Event, MessageSendEvent, MessageSendEventsProvider};
- use crate::ln::channelmanager::{self, BestBlock, ChainParameters, ChannelManager, PaymentHash, PaymentPreimage, PaymentId};
+ use crate::ln::channelmanager::{BestBlock, ChainParameters, ChannelManager, PaymentHash, PaymentPreimage, PaymentId};
use crate::ln::functional_test_utils::*;
use crate::ln::msgs::{ChannelMessageHandler, Init};
use crate::routing::gossip::NetworkGraph;
} else { panic!(); }
node_b.handle_funding_created(&node_a.get_our_node_id(), &get_event_msg!(node_a_holder, MessageSendEvent::SendFundingCreated, node_b.get_our_node_id()));
+ let events_b = node_b.get_and_clear_pending_events();
+ assert_eq!(events_b.len(), 1);
+ match events_b[0] {
+ Event::ChannelPending{ ref counterparty_node_id, .. } => {
+ assert_eq!(*counterparty_node_id, node_a.get_our_node_id());
+ },
+ _ => panic!("Unexpected event"),
+ }
+
node_a.handle_funding_signed(&node_b.get_our_node_id(), &get_event_msg!(node_b_holder, MessageSendEvent::SendFundingSigned, node_a.get_our_node_id()));
+ let events_a = node_a.get_and_clear_pending_events();
+ assert_eq!(events_a.len(), 1);
+ match events_a[0] {
+ Event::ChannelPending{ ref counterparty_node_id, .. } => {
+ assert_eq!(*counterparty_node_id, node_b.get_our_node_id());
+ },
+ _ => panic!("Unexpected event"),
+ }
assert_eq!(&tx_broadcaster.txn_broadcasted.lock().unwrap()[..], &[tx.clone()]);