From a5adda18dc2c05959c107a0b2769730680d32d18 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sat, 25 Sep 2021 22:24:23 +0000 Subject: [PATCH] Avoid taking the peers write lock during event processing Because the peers write lock "blocks the world", and happens after each read event, always taking the write lock has pretty severe impacts on parallelism. Instead, here, we only take the global write lock if we have to disconnect a peer. --- lightning/src/ln/peer_handler.rs | 89 +++++++++++++++++++++----------- 1 file changed, 59 insertions(+), 30 deletions(-) diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 2226e9574..5836a8d09 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -433,6 +433,10 @@ pub struct PeerManager>, + /// We can only have one thread processing events at once, but we don't usually need the full + /// `peers` write lock to do so, so instead we block on this empty mutex when entering + /// `process_events`. + event_processing_lock: Mutex<()>, our_node_secret: SecretKey, ephemeral_key_midstate: Sha256Engine, custom_message_handler: CMH, @@ -564,6 +568,7 @@ impl P peers: HashMap::new(), }), node_id_to_descriptor: Mutex::new(HashMap::new()), + event_processing_lock: Mutex::new(()), our_node_secret, ephemeral_key_midstate, peer_counter: AtomicCounter::new(), @@ -1368,20 +1373,29 @@ impl P /// [`ChannelManager::process_pending_htlc_forwards`]: crate::ln::channelmanager::ChannelManager::process_pending_htlc_forwards /// [`send_data`]: SocketDescriptor::send_data pub fn process_events(&self) { + let _single_processor_lock = self.event_processing_lock.lock().unwrap(); + + let mut peers_to_disconnect = HashMap::new(); + let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events(); + events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events()); + { // TODO: There are some DoS attacks here where you can flood someone's outbound send // buffer by doing things like announcing channels on another node. We should be willing to // drop optional-ish messages when send buffers get full! - let mut peers_lock = self.peers.write().unwrap(); - let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events(); - events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events()); - let peers = &mut *peers_lock; + let peers_lock = self.peers.read().unwrap(); + let peers = &*peers_lock; macro_rules! get_peer_for_forwarding { ($node_id: expr) => { { - match self.node_id_to_descriptor.lock().unwrap().get($node_id) { - Some(descriptor) => match peers.peers.get_mut(&descriptor) { + if peers_to_disconnect.get($node_id).is_some() { + // If we've "disconnected" this peer, do not send to it. + continue; + } + let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned(); + match descriptor_opt { + Some(descriptor) => match peers.peers.get(&descriptor) { Some(peer_mutex) => { let peer_lock = peer_mutex.lock().unwrap(); if peer_lock.their_features.is_none() { @@ -1389,7 +1403,10 @@ impl P } peer_lock }, - None => panic!("Inconsistent peers set state!"), + None => { + debug_assert!(false, "Inconsistent peers set state!"); + continue; + } }, None => { continue; @@ -1525,28 +1542,10 @@ impl P MessageSendEvent::HandleError { ref node_id, ref action } => { match *action { msgs::ErrorAction::DisconnectPeer { ref msg } => { - // Note that since we are holding the peers *write* lock we can - // remove from node_id_to_descriptor immediately (as no other - // thread can be holding the peer lock if we have the global write - // lock). - if let Some(mut descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(node_id) { - if let Some(peer_mutex) = peers.peers.remove(&descriptor) { - if let Some(ref msg) = *msg { - log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", - log_pubkey!(node_id), - msg.data); - let mut peer = peer_mutex.lock().unwrap(); - self.enqueue_message(&mut *peer, msg); - // This isn't guaranteed to work, but if there is enough free - // room in the send buffer, put the error message there... - self.do_attempt_write_data(&mut descriptor, &mut *peer); - } else { - log_gossip!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message", log_pubkey!(node_id)); - } - } - descriptor.disconnect_socket(); - self.message_handler.chan_handler.peer_disconnected(&node_id, false); - } + // We do not have the peers write lock, so we just store that we're + // about to disconenct the peer and do it after we finish + // processing most messages. + peers_to_disconnect.insert(*node_id, msg.clone()); }, msgs::ErrorAction::IgnoreAndLog(level) => { log_given_level!(self.logger, level, "Received a HandleError event to be ignored for node {}", log_pubkey!(node_id)); @@ -1591,13 +1590,43 @@ impl P } for (node_id, msg) in self.custom_message_handler.get_and_clear_pending_msg() { + if peers_to_disconnect.get(&node_id).is_some() { continue; } self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg); } - for (descriptor, peer_mutex) in peers.peers.iter_mut() { + for (descriptor, peer_mutex) in peers.peers.iter() { self.do_attempt_write_data(&mut (*descriptor).clone(), &mut *peer_mutex.lock().unwrap()); } } + if !peers_to_disconnect.is_empty() { + let mut peers_lock = self.peers.write().unwrap(); + let peers = &mut *peers_lock; + for (node_id, msg) in peers_to_disconnect.drain() { + // Note that since we are holding the peers *write* lock we can + // remove from node_id_to_descriptor immediately (as no other + // thread can be holding the peer lock if we have the global write + // lock). + + if let Some(mut descriptor) = self.node_id_to_descriptor.lock().unwrap().remove(&node_id) { + if let Some(peer_mutex) = peers.peers.remove(&descriptor) { + if let Some(msg) = msg { + log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", + log_pubkey!(node_id), + msg.data); + let mut peer = peer_mutex.lock().unwrap(); + self.enqueue_message(&mut *peer, &msg); + // This isn't guaranteed to work, but if there is enough free + // room in the send buffer, put the error message there... + self.do_attempt_write_data(&mut descriptor, &mut *peer); + } else { + log_trace!(self.logger, "Handling DisconnectPeer HandleError event in peer_handler for node {} with no message", log_pubkey!(node_id)); + } + } + descriptor.disconnect_socket(); + self.message_handler.chan_handler.peer_disconnected(&node_id, false); + } + } + } } /// Indicates that the given socket descriptor's connection is now closed. -- 2.39.5