From d13f336d16485098cb1ab9b5bc7f838ca2216d77 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Mon, 24 Jun 2024 20:21:08 +0000 Subject: [PATCH] Use a `MessageSendEvent`-handling fn rather than a single lopp Rather than building a single `Vec` of `MessageSendEvent`s to handle then iterating over them, we move the body of the loop into a lambda and run the loop twice. In some cases, this may save a single allocation, but more importantly it sets us up for the next commit, which needs to know from which handler the `MessageSendEvent` it is processing came from. --- lightning/src/ln/peer_handler.rs | 121 +++++++++++++++++-------------- 1 file changed, 65 insertions(+), 56 deletions(-) diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 9d7eba6b6..a18f8bf85 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -2064,8 +2064,8 @@ impl match peers.get(&descriptor) { - Some(peer_mutex) => { - let peer_lock = peer_mutex.lock().unwrap(); - if !peer_lock.handshake_complete() { - continue; + None + } else { + let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned(); + match descriptor_opt { + Some(descriptor) => match peers.get(&descriptor) { + Some(peer_mutex) => { + let peer_lock = peer_mutex.lock().unwrap(); + if !peer_lock.handshake_complete() { + None + } else { + Some(peer_lock) + } + }, + None => { + debug_assert!(false, "Inconsistent peers set state!"); + None } - peer_lock }, None => { - debug_assert!(false, "Inconsistent peers set state!"); - continue; - } - }, - None => { - continue; - }, + None + }, + } } } } } - for event in events_generated.drain(..) { + let mut handle_event = |event, from_chan_handler| { match event { MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannel event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.common_fields.temporary_channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendAcceptChannelV2 { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.common_fields.temporary_channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendOpenChannel event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.common_fields.temporary_channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendOpenChannelV2 { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.common_fields.temporary_channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id), None), "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})", @@ -2130,107 +2132,107 @@ impl { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendFundingSigned event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendChannelReady { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendChannelReady event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendStfu { ref node_id, ref msg} => { let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None); log_debug!(logger, "Handling SendStfu event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); } MessageSendEvent::SendSpliceInit { ref node_id, ref msg} => { let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None); log_debug!(logger, "Handling SendSpliceInit event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); } MessageSendEvent::SendSpliceAck { ref node_id, ref msg} => { let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None); log_debug!(logger, "Handling SendSpliceAck event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); } MessageSendEvent::SendSpliceLocked { ref node_id, ref msg} => { let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None); log_debug!(logger, "Handling SendSpliceLocked event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); } MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAddInput event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendTxAddOutput { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAddOutput event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendTxRemoveInput { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendTxRemoveOutput { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendTxComplete { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxComplete event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendTxSignatures { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxSignatures event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendTxInitRbf { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxInitRbf event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendTxAckRbf { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAckRbf event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendTxAbort { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAbort event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(commitment_signed.channel_id), None), "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}", @@ -2239,7 +2241,7 @@ impl { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendClosingSigned event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendShutdown { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling Shutdown event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendChannelReestablish event in peer_handler for node {} for channel {}", log_pubkey!(node_id), &msg.channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendChannelAnnouncement { ref node_id, ref msg, ref update_msg } => { log_debug!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}", log_pubkey!(node_id), msg.contents.short_channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), update_msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, update_msg); }, MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => { log_debug!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id); @@ -2322,7 +2324,7 @@ impl { log_trace!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelUpdate event in peer_handler for node {} for channel {}", log_pubkey!(node_id), msg.contents.short_channel_id); - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::HandleError { node_id, action } => { let logger = WithContext::from(&self.logger, Some(node_id), None, None); @@ -2360,21 +2362,21 @@ impl { log_given_level!(logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}", log_pubkey!(node_id), msg.data); - self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id)?, msg); }, } }, MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => { - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); }, MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => { - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); } MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => { log_gossip!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}", @@ -2383,17 +2385,24 @@ impl { - self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg); } } + Some(()) + }; + for event in chan_events { + handle_event(event, true); + } + for event in route_events { + handle_event(event, false); } for (node_id, msg) in self.message_handler.custom_message_handler.get_and_clear_pending_msg() { if peers_to_disconnect.get(&node_id).is_some() { continue; } - self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg); + self.enqueue_message(&mut *if let Some(peer) = get_peer_for_forwarding!(&node_id) { peer } else { continue; }, &msg); } for (descriptor, peer_mutex) in peers.iter() { -- 2.39.5