Debug more information when we fail to find a lock call symbol
[rust-lightning] / lightning / src / ln / channelmanager.rs
index c76ba5d3afc040e8ccf9f62435576da18920c1ab..11d0b299efef3a35d0d8872d5a016ad807498838 100644 (file)
@@ -903,7 +903,7 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
        /// The peer is currently connected (i.e. we've seen a
        /// [`ChannelMessageHandler::peer_connected`] and no corresponding
        /// [`ChannelMessageHandler::peer_disconnected`].
-       is_connected: bool,
+       pub is_connected: bool,
 }
 
 impl <SP: Deref> PeerState<SP> where SP::Target: SignerProvider {
@@ -1428,6 +1428,9 @@ where
 
        pending_offers_messages: Mutex<Vec<PendingOnionMessage<OffersMessage>>>,
 
+       /// Tracks the message events that are to be broadcasted when we are connected to some peer.
+       pending_broadcast_messages: Mutex<Vec<MessageSendEvent>>,
+
        entropy_source: ES,
        node_signer: NS,
        signer_provider: SP,
@@ -2019,7 +2022,7 @@ macro_rules! handle_error {
                match $internal {
                        Ok(msg) => Ok(msg),
                        Err(MsgHandleErrInternal { err, shutdown_finish, .. }) => {
-                               let mut msg_events = Vec::with_capacity(2);
+                               let mut msg_event = None;
 
                                if let Some((shutdown_res, update_option)) = shutdown_finish {
                                        let counterparty_node_id = shutdown_res.counterparty_node_id;
@@ -2031,7 +2034,8 @@ macro_rules! handle_error {
 
                                        $self.finish_close_channel(shutdown_res);
                                        if let Some(update) = update_option {
-                                               msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                               let mut pending_broadcast_messages = $self.pending_broadcast_messages.lock().unwrap();
+                                               pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                        msg: update
                                                });
                                        }
@@ -2041,17 +2045,17 @@ macro_rules! handle_error {
 
                                if let msgs::ErrorAction::IgnoreError = err.action {
                                } else {
-                                       msg_events.push(events::MessageSendEvent::HandleError {
+                                       msg_event = Some(events::MessageSendEvent::HandleError {
                                                node_id: $counterparty_node_id,
                                                action: err.action.clone()
                                        });
                                }
 
-                               if !msg_events.is_empty() {
+                               if let Some(msg_event) = msg_event {
                                        let per_peer_state = $self.per_peer_state.read().unwrap();
                                        if let Some(peer_state_mutex) = per_peer_state.get(&$counterparty_node_id) {
                                                let mut peer_state = peer_state_mutex.lock().unwrap();
-                                               peer_state.pending_msg_events.append(&mut msg_events);
+                                               peer_state.pending_msg_events.push(msg_event);
                                        }
                                }
 
@@ -2522,6 +2526,7 @@ where
                        funding_batch_states: Mutex::new(BTreeMap::new()),
 
                        pending_offers_messages: Mutex::new(Vec::new()),
+                       pending_broadcast_messages: Mutex::new(Vec::new()),
 
                        entropy_source,
                        node_signer,
@@ -3020,17 +3025,11 @@ where
                        }
                };
                if let Some(update) = update_opt {
-                       // Try to send the `BroadcastChannelUpdate` to the peer we just force-closed on, but if
-                       // not try to broadcast it via whatever peer we have.
-                       let per_peer_state = self.per_peer_state.read().unwrap();
-                       let a_peer_state_opt = per_peer_state.get(peer_node_id)
-                               .ok_or(per_peer_state.values().next());
-                       if let Ok(a_peer_state_mutex) = a_peer_state_opt {
-                               let mut a_peer_state = a_peer_state_mutex.lock().unwrap();
-                               a_peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
-                                       msg: update
-                               });
-                       }
+                       // If we have some Channel Update to broadcast, we cache it and broadcast it later.
+                       let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+                       pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                               msg: update
+                       });
                }
 
                Ok(counterparty_node_id)
@@ -4113,6 +4112,7 @@ where
                        .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?;
                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                let peer_state = &mut *peer_state_lock;
+
                for channel_id in channel_ids {
                        if !peer_state.has_channel(channel_id) {
                                return Err(APIError::ChannelUnavailable {
@@ -4129,7 +4129,8 @@ where
                                }
                                if let ChannelPhase::Funded(channel) = channel_phase {
                                        if let Ok(msg) = self.get_channel_update_for_broadcast(channel) {
-                                               peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg });
+                                               let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+                                               pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { msg });
                                        } else if let Ok(msg) = self.get_channel_update_for_unicast(channel) {
                                                peer_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate {
                                                        node_id: channel.context.get_counterparty_node_id(),
@@ -4304,6 +4305,145 @@ where
                Ok(())
        }
 
+       fn process_pending_update_add_htlcs(&self) {
+               let mut decode_update_add_htlcs = new_hash_map();
+               mem::swap(&mut decode_update_add_htlcs, &mut self.decode_update_add_htlcs.lock().unwrap());
+
+               let get_failed_htlc_destination = |outgoing_scid_opt: Option<u64>, payment_hash: PaymentHash| {
+                       if let Some(outgoing_scid) = outgoing_scid_opt {
+                               match self.short_to_chan_info.read().unwrap().get(&outgoing_scid) {
+                                       Some((outgoing_counterparty_node_id, outgoing_channel_id)) =>
+                                               HTLCDestination::NextHopChannel {
+                                                       node_id: Some(*outgoing_counterparty_node_id),
+                                                       channel_id: *outgoing_channel_id,
+                                               },
+                                       None => HTLCDestination::UnknownNextHop {
+                                               requested_forward_scid: outgoing_scid,
+                                       },
+                               }
+                       } else {
+                               HTLCDestination::FailedPayment { payment_hash }
+                       }
+               };
+
+               'outer_loop: for (incoming_scid, update_add_htlcs) in decode_update_add_htlcs {
+                       let incoming_channel_details_opt = self.do_funded_channel_callback(incoming_scid, |chan: &mut Channel<SP>| {
+                               let counterparty_node_id = chan.context.get_counterparty_node_id();
+                               let channel_id = chan.context.channel_id();
+                               let funding_txo = chan.context.get_funding_txo().unwrap();
+                               let user_channel_id = chan.context.get_user_id();
+                               let accept_underpaying_htlcs = chan.context.config().accept_underpaying_htlcs;
+                               (counterparty_node_id, channel_id, funding_txo, user_channel_id, accept_underpaying_htlcs)
+                       });
+                       let (
+                               incoming_counterparty_node_id, incoming_channel_id, incoming_funding_txo,
+                               incoming_user_channel_id, incoming_accept_underpaying_htlcs
+                        ) = if let Some(incoming_channel_details) = incoming_channel_details_opt {
+                               incoming_channel_details
+                       } else {
+                               // The incoming channel no longer exists, HTLCs should be resolved onchain instead.
+                               continue;
+                       };
+
+                       let mut htlc_forwards = Vec::new();
+                       let mut htlc_fails = Vec::new();
+                       for update_add_htlc in &update_add_htlcs {
+                               let (next_hop, shared_secret, next_packet_details_opt) = match decode_incoming_update_add_htlc_onion(
+                                       &update_add_htlc, &self.node_signer, &self.logger, &self.secp_ctx
+                               ) {
+                                       Ok(decoded_onion) => decoded_onion,
+                                       Err(htlc_fail) => {
+                                               htlc_fails.push((htlc_fail, HTLCDestination::InvalidOnion));
+                                               continue;
+                                       },
+                               };
+
+                               let is_intro_node_blinded_forward = next_hop.is_intro_node_blinded_forward();
+                               let outgoing_scid_opt = next_packet_details_opt.as_ref().map(|d| d.outgoing_scid);
+
+                               // Process the HTLC on the incoming channel.
+                               match self.do_funded_channel_callback(incoming_scid, |chan: &mut Channel<SP>| {
+                                       let logger = WithChannelContext::from(&self.logger, &chan.context);
+                                       chan.can_accept_incoming_htlc(
+                                               update_add_htlc, &self.fee_estimator, &logger,
+                                       )
+                               }) {
+                                       Some(Ok(_)) => {},
+                                       Some(Err((err, code))) => {
+                                               let outgoing_chan_update_opt = if let Some(outgoing_scid) = outgoing_scid_opt.as_ref() {
+                                                       self.do_funded_channel_callback(*outgoing_scid, |chan: &mut Channel<SP>| {
+                                                               self.get_channel_update_for_onion(*outgoing_scid, chan).ok()
+                                                       }).flatten()
+                                               } else {
+                                                       None
+                                               };
+                                               let htlc_fail = self.htlc_failure_from_update_add_err(
+                                                       &update_add_htlc, &incoming_counterparty_node_id, err, code,
+                                                       outgoing_chan_update_opt, is_intro_node_blinded_forward, &shared_secret,
+                                               );
+                                               let htlc_destination = get_failed_htlc_destination(outgoing_scid_opt, update_add_htlc.payment_hash);
+                                               htlc_fails.push((htlc_fail, htlc_destination));
+                                               continue;
+                                       },
+                                       // The incoming channel no longer exists, HTLCs should be resolved onchain instead.
+                                       None => continue 'outer_loop,
+                               }
+
+                               // Now process the HTLC on the outgoing channel if it's a forward.
+                               if let Some(next_packet_details) = next_packet_details_opt.as_ref() {
+                                       if let Err((err, code, chan_update_opt)) = self.can_forward_htlc(
+                                               &update_add_htlc, next_packet_details
+                                       ) {
+                                               let htlc_fail = self.htlc_failure_from_update_add_err(
+                                                       &update_add_htlc, &incoming_counterparty_node_id, err, code,
+                                                       chan_update_opt, is_intro_node_blinded_forward, &shared_secret,
+                                               );
+                                               let htlc_destination = get_failed_htlc_destination(outgoing_scid_opt, update_add_htlc.payment_hash);
+                                               htlc_fails.push((htlc_fail, htlc_destination));
+                                               continue;
+                                       }
+                               }
+
+                               match self.construct_pending_htlc_status(
+                                       &update_add_htlc, &incoming_counterparty_node_id, shared_secret, next_hop,
+                                       incoming_accept_underpaying_htlcs, next_packet_details_opt.map(|d| d.next_packet_pubkey),
+                               ) {
+                                       PendingHTLCStatus::Forward(htlc_forward) => {
+                                               htlc_forwards.push((htlc_forward, update_add_htlc.htlc_id));
+                                       },
+                                       PendingHTLCStatus::Fail(htlc_fail) => {
+                                               let htlc_destination = get_failed_htlc_destination(outgoing_scid_opt, update_add_htlc.payment_hash);
+                                               htlc_fails.push((htlc_fail, htlc_destination));
+                                       },
+                               }
+                       }
+
+                       // Process all of the forwards and failures for the channel in which the HTLCs were
+                       // proposed to as a batch.
+                       let pending_forwards = (incoming_scid, incoming_funding_txo, incoming_channel_id,
+                               incoming_user_channel_id, htlc_forwards.drain(..).collect());
+                       self.forward_htlcs_without_forward_event(&mut [pending_forwards]);
+                       for (htlc_fail, htlc_destination) in htlc_fails.drain(..) {
+                               let failure = match htlc_fail {
+                                       HTLCFailureMsg::Relay(fail_htlc) => HTLCForwardInfo::FailHTLC {
+                                               htlc_id: fail_htlc.htlc_id,
+                                               err_packet: fail_htlc.reason,
+                                       },
+                                       HTLCFailureMsg::Malformed(fail_malformed_htlc) => HTLCForwardInfo::FailMalformedHTLC {
+                                               htlc_id: fail_malformed_htlc.htlc_id,
+                                               sha256_of_onion: fail_malformed_htlc.sha256_of_onion,
+                                               failure_code: fail_malformed_htlc.failure_code,
+                                       },
+                               };
+                               self.forward_htlcs.lock().unwrap().entry(incoming_scid).or_insert(vec![]).push(failure);
+                               self.pending_events.lock().unwrap().push_back((events::Event::HTLCHandlingFailed {
+                                       prev_channel_id: incoming_channel_id,
+                                       failed_next_destination: htlc_destination,
+                               }, None));
+                       }
+               }
+       }
+
        /// Processes HTLCs which are pending waiting on random forward delay.
        ///
        /// Should only really ever be called in response to a PendingHTLCsForwardable event.
@@ -4311,6 +4451,8 @@ where
        pub fn process_pending_htlc_forwards(&self) {
                let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);
 
+               self.process_pending_update_add_htlcs();
+
                let mut new_events = VecDeque::new();
                let mut failed_forwards = Vec::new();
                let mut phantom_receives: Vec<(u64, OutPoint, ChannelId, u128, Vec<(PendingHTLCInfo, u64)>)> = Vec::new();
@@ -5038,7 +5180,8 @@ where
                                                                                if n >= DISABLE_GOSSIP_TICKS {
                                                                                        chan.set_channel_update_status(ChannelUpdateStatus::Disabled);
                                                                                        if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
-                                                                                               pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                                                               let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+                                                                                               pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                                                                        msg: update
                                                                                                });
                                                                                        }
@@ -5052,7 +5195,8 @@ where
                                                                                if n >= ENABLE_GOSSIP_TICKS {
                                                                                        chan.set_channel_update_status(ChannelUpdateStatus::Enabled);
                                                                                        if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
-                                                                                               pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                                                               let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+                                                                                               pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                                                                        msg: update
                                                                                                });
                                                                                        }
@@ -6778,9 +6922,8 @@ where
                }
                if let Some(ChannelPhase::Funded(chan)) = chan_option {
                        if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
-                               let mut peer_state_lock = peer_state_mutex.lock().unwrap();
-                               let peer_state = &mut *peer_state_lock;
-                               peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                               let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+                               pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                        msg: update
                                });
                        }
@@ -7465,7 +7608,8 @@ where
                                                                                };
                                                                                failed_channels.push(chan.context.force_shutdown(false, reason.clone()));
                                                                                if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
-                                                                                       pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                                                       let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+                                                                                       pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                                                                msg: update
                                                                                        });
                                                                                }
@@ -7650,7 +7794,8 @@ where
                                                                                // We're done with this channel. We got a closing_signed and sent back
                                                                                // a closing_signed with a closing transaction to broadcast.
                                                                                if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
-                                                                                       pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                                                       let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+                                                                                       pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                                                                msg: update
                                                                                        });
                                                                                }
@@ -8410,7 +8555,7 @@ where
        /// will randomly be placed first or last in the returned array.
        ///
        /// Note that even though `BroadcastChannelAnnouncement` and `BroadcastChannelUpdate`
-       /// `MessageSendEvent`s are intended to be broadcasted to all peers, they will be pleaced among
+       /// `MessageSendEvent`s are intended to be broadcasted to all peers, they will be placed among
        /// the `MessageSendEvent`s to the specific peer they were generated under.
        fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
                let events = RefCell::new(Vec::new());
@@ -8430,6 +8575,7 @@ where
                                result = NotifyOption::DoPersist;
                        }
 
+                       let mut is_any_peer_connected = false;
                        let mut pending_events = Vec::new();
                        let per_peer_state = self.per_peer_state.read().unwrap();
                        for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
@@ -8438,6 +8584,15 @@ where
                                if peer_state.pending_msg_events.len() > 0 {
                                        pending_events.append(&mut peer_state.pending_msg_events);
                                }
+                               if peer_state.is_connected {
+                                       is_any_peer_connected = true
+                               }
+                       }
+
+                       // Ensure that we are connected to some peers before getting broadcast messages.
+                       if is_any_peer_connected {
+                               let mut broadcast_msgs = self.pending_broadcast_messages.lock().unwrap();
+                               pending_events.append(&mut broadcast_msgs);
                        }
 
                        if !pending_events.is_empty() {
@@ -8642,6 +8797,7 @@ where
                                let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                                let peer_state = &mut *peer_state_lock;
                                let pending_msg_events = &mut peer_state.pending_msg_events;
+
                                peer_state.channel_by_id.retain(|_, phase| {
                                        match phase {
                                                // Retain unfunded channels.
@@ -8717,7 +8873,8 @@ where
                                                                let reason_message = format!("{}", reason);
                                                                failed_channels.push(channel.context.force_shutdown(true, reason));
                                                                if let Ok(update) = self.get_channel_update_for_broadcast(&channel) {
-                                                                       pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
+                                                                       let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
+                                                                       pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
                                                                                msg: update
                                                                        });
                                                                }
@@ -9174,7 +9331,12 @@ where
                                                // Gossip
                                                &events::MessageSendEvent::SendChannelAnnouncement { .. } => false,
                                                &events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true,
-                                               &events::MessageSendEvent::BroadcastChannelUpdate { .. } => true,
+                                               // [`ChannelManager::pending_broadcast_events`] holds the [`BroadcastChannelUpdate`]
+                                               // This check here is to ensure exhaustivity.
+                                               &events::MessageSendEvent::BroadcastChannelUpdate { .. } => {
+                                                       debug_assert!(false, "This event shouldn't have been here");
+                                                       false
+                                               },
                                                &events::MessageSendEvent::BroadcastNodeAnnouncement { .. } => true,
                                                &events::MessageSendEvent::SendChannelUpdate { .. } => false,
                                                &events::MessageSendEvent::SendChannelRangeQuery { .. } => false,
@@ -10985,7 +11147,7 @@ where
                                                }
                                        }
                                        if chan.get_latest_unblocked_monitor_update_id() > max_in_flight_update_id {
-                                               // If the channel is ahead of the monitor, return InvalidValue:
+                                               // If the channel is ahead of the monitor, return DangerousValue:
                                                log_error!(logger, "A ChannelMonitor is stale compared to the current ChannelManager! This indicates a potentially-critical violation of the chain::Watch API!");
                                                log_error!(logger, " The ChannelMonitor for channel {} is at update_id {} with update_id through {} in-flight",
                                                        chan.context.channel_id(), monitor.get_latest_update_id(), max_in_flight_update_id);
@@ -10994,7 +11156,7 @@ where
                                                log_error!(logger, " client applications must ensure that ChannelMonitor data is always available and the latest to avoid funds loss!");
                                                log_error!(logger, " Without the latest ChannelMonitor we cannot continue without risking funds.");
                                                log_error!(logger, " Please ensure the chain::Watch API requirements are met and file a bug report at https://github.com/lightningdevkit/rust-lightning");
-                                               return Err(DecodeError::InvalidValue);
+                                               return Err(DecodeError::DangerousValue);
                                        }
                                } else {
                                        // We shouldn't have persisted (or read) any unfunded channel types so none should have been
@@ -11461,6 +11623,8 @@ where
 
                        pending_offers_messages: Mutex::new(Vec::new()),
 
+                       pending_broadcast_messages: Mutex::new(Vec::new()),
+
                        entropy_source: args.entropy_source,
                        node_signer: args.node_signer,
                        signer_provider: args.signer_provider,
@@ -11992,6 +12156,61 @@ mod tests {
                }
        }
 
+       #[test]
+       fn test_channel_update_cached() {
+               let chanmon_cfgs = create_chanmon_cfgs(3);
+               let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
+               let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
+               let nodes = create_network(3, &node_cfgs, &node_chanmgrs);
+
+               let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
+
+               nodes[0].node.force_close_channel_with_peer(&chan.2, &nodes[1].node.get_our_node_id(), None, true).unwrap();
+               check_added_monitors!(nodes[0], 1);
+               check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed, [nodes[1].node.get_our_node_id()], 100000);
+
+               // Confirm that the channel_update was not sent immediately to node[1] but was cached.
+               let node_1_events = nodes[1].node.get_and_clear_pending_msg_events();
+               assert_eq!(node_1_events.len(), 0);
+
+               {
+                       // Assert that ChannelUpdate message has been added to node[0] pending broadcast messages
+                       let pending_broadcast_messages= nodes[0].node.pending_broadcast_messages.lock().unwrap();
+                       assert_eq!(pending_broadcast_messages.len(), 1);
+               }
+
+               // Test that we do not retrieve the pending broadcast messages when we are not connected to any peer
+               nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id());
+               nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());
+
+               nodes[0].node.peer_disconnected(&nodes[2].node.get_our_node_id());
+               nodes[2].node.peer_disconnected(&nodes[0].node.get_our_node_id());
+
+               let node_0_events = nodes[0].node.get_and_clear_pending_msg_events();
+               assert_eq!(node_0_events.len(), 0);
+
+               // Now we reconnect to a peer
+               nodes[0].node.peer_connected(&nodes[2].node.get_our_node_id(), &msgs::Init {
+                       features: nodes[2].node.init_features(), networks: None, remote_network_address: None
+               }, true).unwrap();
+               nodes[2].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init {
+                       features: nodes[0].node.init_features(), networks: None, remote_network_address: None
+               }, false).unwrap();
+
+               // Confirm that get_and_clear_pending_msg_events correctly captures pending broadcast messages
+               let node_0_events = nodes[0].node.get_and_clear_pending_msg_events();
+               assert_eq!(node_0_events.len(), 1);
+               match &node_0_events[0] {
+                       MessageSendEvent::BroadcastChannelUpdate { .. } => (),
+                       _ => panic!("Unexpected event"),
+               }
+               {
+                       // Assert that ChannelUpdate message has been cleared from nodes[0] pending broadcast messages
+                       let pending_broadcast_messages= nodes[0].node.pending_broadcast_messages.lock().unwrap();
+                       assert_eq!(pending_broadcast_messages.len(), 0);
+               }
+       }
+
        #[test]
        fn test_drop_disconnected_peers_when_removing_channels() {
                let chanmon_cfgs = create_chanmon_cfgs(2);