Avoid taking the peers write lock during event processing
authorMatt Corallo <git@bluematt.me>
Sat, 25 Sep 2021 22:24:23 +0000 (22:24 +0000)
committerMatt Corallo <git@bluematt.me>
Tue, 10 May 2022 23:40:20 +0000 (23:40 +0000)
Because the peers write lock "blocks the world", and happens after
each read event, always taking the write lock has pretty severe
impacts on parallelism. Instead, here, we only take the global
write lock if we have to disconnect a peer.

lightning/src/ln/peer_handler.rs

index 2226e9574b1ae82cfc7f4387b4ecfe02b9b19537..5836a8d093187785aeee5373ccff95fbe9990217 100644 (file)
@@ -433,6 +433,10 @@ pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: De
        /// lock held. Entries may be added with only the `peers` read lock held (though the
        /// `Descriptor` value must already exist in `peers`).
        node_id_to_descriptor: Mutex<HashMap<PublicKey, Descriptor>>,
+       /// We can only have one thread processing events at once, but we don't usually need the full
+       /// `peers` write lock to do so, so instead we block on this empty mutex when entering
+       /// `process_events`.
+       event_processing_lock: Mutex<()>,
        our_node_secret: SecretKey,
        ephemeral_key_midstate: Sha256Engine,
        custom_message_handler: CMH,
@@ -564,6 +568,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                peers: HashMap::new(),
                        }),
                        node_id_to_descriptor: Mutex::new(HashMap::new()),
+                       event_processing_lock: Mutex::new(()),
                        our_node_secret,
                        ephemeral_key_midstate,
                        peer_counter: AtomicCounter::new(),
@@ -1368,20 +1373,29 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
        /// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards
        /// [`send_data`]: SocketDescriptor::send_data
        pub fn process_events(&self) {
+               let _single_processor_lock = self.event_processing_lock.lock().unwrap();
+
+               let mut peers_to_disconnect = HashMap::new();
+               let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
+               events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
+
                {
                        // TODO: There are some DoS attacks here where you can flood someone's outbound send
                        // buffer by doing things like announcing channels on another node. We should be willing to
                        // drop optional-ish messages when send buffers get full!
 
-                       let mut peers_lock = self.peers.write().unwrap();
-                       let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
-                       events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
-                       let peers = &mut *peers_lock;
+                       let peers_lock = self.peers.read().unwrap();
+                       let peers = &*peers_lock;
                        macro_rules! get_peer_for_forwarding {
                                ($node_id: expr) => {
                                        {
-                                               match self.node_id_to_descriptor.lock().unwrap().get($node_id) {
-                                                       Some(descriptor) => match peers.peers.get_mut(&descriptor) {
+                                               if peers_to_disconnect.get($node_id).is_some() {
+                                                       // If we've "disconnected" this peer, do not send to it.
+                                                       continue;
+                                               }
+                                               let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned();
+                                               match descriptor_opt {
+                                                       Some(descriptor) => match peers.peers.get(&descriptor) {
                                                                Some(peer_mutex) => {
                                                                        let peer_lock = peer_mutex.lock().unwrap();
                                                                        if peer_lock.their_features.is_none() {
@@ -1389,7 +1403,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                                        }
                                                                        peer_lock
                                                                },
-                                                               None => panic!("Inconsistent peers set state!"),
+                                                               None => {
+                                                                       debug_assert!(false, "Inconsistent peers set state!");
+                                                                       continue;
+                                                               }
                                                        },
                                                        None => {
                                                                continue;
@@ -1525,28 +1542,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                        MessageSendEvent::HandleError { ref node_id, ref action } => {
                                                match *action {
                                                        msgs::ErrorAction::DisconnectPeer { ref msg } => {
-                                                               // Note that since we are holding the peers *write* lock we can
-                                                               // remove from node_id_to_descriptor immediately (as no other
-                                                               // thread can be holding the peer lock if we have the global write
-                                                               // lock).
-                                                               if let Some(mut descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(node_id) {
-                                                                       if let Some(peer_mutex) = peers.peers.remove(&descriptor) {
-                                                                               if let Some(ref msg) = *msg {
-                                                                                       log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
-                                                                                                       log_pubkey!(node_id),
-                                                                                                       msg.data);
-                                                                                       let mut peer = peer_mutex.lock().unwrap();
-                                                                                       self.enqueue_message(&mut *peer, msg);
-                                                                                       // This isn't guaranteed to work, but if there is enough free
-                                                                                       // room in the send buffer, put the error message there...
-                                                                                       self.do_attempt_write_data(&mut descriptor, &mut *peer);
-                                                                               } else {
-                                                                                       log_gossip!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message", log_pubkey!(node_id));
-                                                                               }
-                                                                       }
-                                                                       descriptor.disconnect_socket();
-                                                                       self.message_handler.chan_handler.peer_disconnected(&node_id, false);
-                                                               }
+                                                               // We do not have the peers write lock, so we just store that we're
+                                                               // about to disconenct the peer and do it after we finish
+                                                               // processing most messages.
+                                                               peers_to_disconnect.insert(*node_id, msg.clone());
                                                        },
                                                        msgs::ErrorAction::IgnoreAndLog(level) => {
                                                                log_given_level!(self.logger, level, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id));
@@ -1591,13 +1590,43 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                        }
 
                        for (node_id, msg) in self.custom_message_handler.get_and_clear_pending_msg() {
+                               if peers_to_disconnect.get(&node_id).is_some() { continue; }
                                self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg);
                        }
 
-                       for (descriptor, peer_mutex) in peers.peers.iter_mut() {
+                       for (descriptor, peer_mutex) in peers.peers.iter() {
                                self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer_mutex.lock().unwrap());
                        }
                }
+               if !peers_to_disconnect.is_empty() {
+                       let mut peers_lock = self.peers.write().unwrap();
+                       let peers = &mut *peers_lock;
+                       for (node_id, msg) in peers_to_disconnect.drain() {
+                               // Note that since we are holding the peers *write* lock we can
+                               // remove from node_id_to_descriptor immediately (as no other
+                               // thread can be holding the peer lock if we have the global write
+                               // lock).
+
+                               if let Some(mut descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(&node_id) {
+                                       if let Some(peer_mutex) = peers.peers.remove(&descriptor) {
+                                               if let Some(msg) = msg {
+                                                       log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
+                                                                       log_pubkey!(node_id),
+                                                                       msg.data);
+                                                       let mut peer = peer_mutex.lock().unwrap();
+                                                       self.enqueue_message(&mut *peer, &msg);
+                                                       // This isn't guaranteed to work, but if there is enough free
+                                                       // room in the send buffer, put the error message there...
+                                                       self.do_attempt_write_data(&mut descriptor, &mut *peer);
+                                               } else {
+                                                       log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message", log_pubkey!(node_id));
+                                               }
+                                       }
+                                       descriptor.disconnect_socket();
+                                       self.message_handler.chan_handler.peer_disconnected(&node_id, false);
+                               }
+                       }
+               }
        }
 
        /// Indicates that the given socket descriptor's connection is now closed.