Always remove disconnected peers with no channels
authorViktor Tigerström <11711198+ViktorTigerstrom@users.noreply.github.com>
Tue, 17 Jan 2023 23:28:00 +0000 (00:28 +0100)
committerViktor Tigerström <11711198+ViktorTigerstrom@users.noreply.github.com>
Tue, 14 Feb 2023 14:04:30 +0000 (15:04 +0100)
When a peer disconnects but still has channels, the peer's `peer_state`
entry in the `per_peer_state` is not removed by the `peer_disconnected`
function. If the channels with that peer is later closed while still
being disconnected (i.e. force closed), we therefore need to remove the
peer from `peer_state` separately.

To remove the peers separately, we push such peers to a separate HashSet
that holds peers awaiting removal, and remove the peers on a timer to
limit the negative effects on parallelism as much as possible.

lightning/src/ln/channelmanager.rs

index d42567d424b2ea0ce678590f2b9a22113e21b401..720f6e42992db450fb02baf5b25c7b9227e9850c 100644 (file)
@@ -3421,6 +3421,38 @@ where
                true
        }
 
+       /// When a peer disconnects but still has channels, the peer's `peer_state` entry in the
+       /// `per_peer_state` is not removed by the `peer_disconnected` function. If the channels of
+       /// to that peer is later closed while still being disconnected (i.e. force closed), we
+       /// therefore need to remove the peer from `peer_state` separately.
+       /// To avoid having to take the `per_peer_state` `write` lock once the channels are closed, we
+       /// instead remove such peers awaiting removal through this function, which is called on a
+       /// timer through `timer_tick_occurred`, passing the peers disconnected peers with no channels,
+       /// to limit the negative effects on parallelism as much as possible.
+       ///
+       /// Must be called without the `per_peer_state` lock acquired.
+       fn remove_peers_awaiting_removal(&self, pending_peers_awaiting_removal: HashSet<PublicKey>) {
+               if pending_peers_awaiting_removal.len() > 0 {
+                       let mut per_peer_state = self.per_peer_state.write().unwrap();
+                       for counterparty_node_id in pending_peers_awaiting_removal {
+                               match per_peer_state.entry(counterparty_node_id) {
+                                       hash_map::Entry::Occupied(entry) => {
+                                               // Remove the entry if the peer is still disconnected and we still
+                                               // have no channels to the peer.
+                                               let remove_entry = {
+                                                       let peer_state = entry.get().lock().unwrap();
+                                                       !peer_state.is_connected && peer_state.channel_by_id.len() == 0
+                                               };
+                                               if remove_entry {
+                                                       entry.remove_entry();
+                                               }
+                                       },
+                                       hash_map::Entry::Vacant(_) => { /* The PeerState has already been removed */ }
+                               }
+                       }
+               }
+       }
+
        #[cfg(any(test, feature = "_test_utils"))]
        /// Process background events, for functional testing
        pub fn test_process_background_events(&self) {
@@ -3481,6 +3513,7 @@ where
        ///    the channel.
        ///  * Expiring a channel's previous `ChannelConfig` if necessary to only allow forwarding HTLCs
        ///    with the current `ChannelConfig`.
+       ///  * Removing peers which have disconnected but and no longer have any channels.
        ///
        /// Note that this may cause reentrancy through `chain::Watch::update_channel` calls or feerate
        /// estimate fetches.
@@ -3493,19 +3526,21 @@ where
 
                        let mut handle_errors: Vec<(Result<(), _>, _)> = Vec::new();
                        let mut timed_out_mpp_htlcs = Vec::new();
+                       let mut pending_peers_awaiting_removal = HashSet::new();
                        {
                                let per_peer_state = self.per_peer_state.read().unwrap();
                                for (counterparty_node_id, peer_state_mutex) in per_peer_state.iter() {
                                        let mut peer_state_lock = peer_state_mutex.lock().unwrap();
                                        let peer_state = &mut *peer_state_lock;
                                        let pending_msg_events = &mut peer_state.pending_msg_events;
+                                       let counterparty_node_id = *counterparty_node_id;
                                        peer_state.channel_by_id.retain(|chan_id, chan| {
                                                let chan_needs_persist = self.update_channel_fee(chan_id, chan, new_feerate);
                                                if chan_needs_persist == NotifyOption::DoPersist { should_persist = NotifyOption::DoPersist; }
 
                                                if let Err(e) = chan.timer_check_closing_negotiation_progress() {
                                                        let (needs_close, err) = convert_chan_err!(self, e, chan, chan_id);
-                                                       handle_errors.push((Err(err), *counterparty_node_id));
+                                                       handle_errors.push((Err(err), counterparty_node_id));
                                                        if needs_close { return false; }
                                                }
 
@@ -3539,8 +3574,13 @@ where
 
                                                true
                                        });
+                                       let peer_should_be_removed = !peer_state.is_connected && peer_state.channel_by_id.len() == 0;
+                                       if peer_should_be_removed {
+                                               pending_peers_awaiting_removal.insert(counterparty_node_id);
+                                       }
                                }
                        }
+                       self.remove_peers_awaiting_removal(pending_peers_awaiting_removal);
 
                        self.claimable_payments.lock().unwrap().claimable_htlcs.retain(|payment_hash, (_, htlcs)| {
                                if htlcs.is_empty() {
@@ -8116,6 +8156,40 @@ mod tests {
                }
        }
 
+       #[test]
+       fn test_drop_disconnected_peers_when_removing_channels() {
+               let chanmon_cfgs = create_chanmon_cfgs(2);
+               let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+               let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+               let nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+
+               let chan = create_announced_chan_between_nodes(&nodes, 0, 1);
+
+               nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id(), false);
+               nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id(), false);
+
+               nodes[0].node.force_close_broadcasting_latest_txn(&chan.2, &nodes[1].node.get_our_node_id()).unwrap();
+               check_closed_broadcast!(nodes[0], true);
+               check_added_monitors!(nodes[0], 1);
+               check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed);
+
+               {
+                       // Assert that nodes[1] is awaiting removal for nodes[0] once nodes[1] has been
+                       // disconnected and the channel between has been force closed.
+                       let nodes_0_per_peer_state = nodes[0].node.per_peer_state.read().unwrap();
+                       // Assert that nodes[1] isn't removed before `timer_tick_occurred` has been executed.
+                       assert_eq!(nodes_0_per_peer_state.len(), 1);
+                       assert!(nodes_0_per_peer_state.get(&nodes[1].node.get_our_node_id()).is_some());
+               }
+
+               nodes[0].node.timer_tick_occurred();
+
+               {
+                       // Assert that nodes[1] has now been removed.
+                       assert_eq!(nodes[0].node.per_peer_state.read().unwrap().len(), 0);
+               }
+       }
+
        #[test]
        fn bad_inbound_payment_hash() {
                // Add coverage for checking that a user-provided payment hash matches the payment secret.