]> git.bitcoin.ninja Git - rust-lightning/commitdiff
Use a `MessageSendEvent`-handling fn rather than a single lopp
authorMatt Corallo <git@bluematt.me>
Mon, 24 Jun 2024 20:21:08 +0000 (20:21 +0000)
committerMatt Corallo <git@bluematt.me>
Tue, 1 Oct 2024 16:49:38 +0000 (16:49 +0000)
Rather than building a single `Vec` of `MessageSendEvent`s to
handle then iterating over them, we move the body of the loop into
a lambda and run the loop twice. In some cases, this may save a
single allocation, but more importantly it sets us up for the next
commit, which needs to know from which handler the
`MessageSendEvent` it is processing came from.

lightning/src/ln/peer_handler.rs

index 9d7eba6b6465871deac708ebda90de8005efc8eb..a18f8bf8508fa27457a523dc7887a4835e4059cd 100644 (file)
@@ -2064,8 +2064,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                        {
                                let peers_lock = self.peers.read().unwrap();
 
-                               let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
-                               events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
+                               let chan_events = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
+                               let route_events = self.message_handler.route_handler.get_and_clear_pending_msg_events();
 
                                let peers = &*peers_lock;
                                macro_rules! get_peer_for_forwarding {
@@ -2073,55 +2073,57 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                {
                                                        if peers_to_disconnect.get($node_id).is_some() {
                                                                // If we've "disconnected" this peer, do not send to it.
-                                                               continue;
-                                                       }
-                                                       let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned();
-                                                       match descriptor_opt {
-                                                               Some(descriptor) => match peers.get(&descriptor) {
-                                                                       Some(peer_mutex) => {
-                                                                               let peer_lock = peer_mutex.lock().unwrap();
-                                                                               if !peer_lock.handshake_complete() {
-                                                                                       continue;
+                                                               None
+                                                       } else {
+                                                               let descriptor_opt = self.node_id_to_descriptor.lock().unwrap().get($node_id).cloned();
+                                                               match descriptor_opt {
+                                                                       Some(descriptor) => match peers.get(&descriptor) {
+                                                                               Some(peer_mutex) => {
+                                                                                       let peer_lock = peer_mutex.lock().unwrap();
+                                                                                       if !peer_lock.handshake_complete() {
+                                                                                               None
+                                                                                       } else {
+                                                                                               Some(peer_lock)
+                                                                                       }
+                                                                               },
+                                                                               None => {
+                                                                                       debug_assert!(false, "Inconsistent peers set state!");
+                                                                                       None
                                                                                }
-                                                                               peer_lock
                                                                        },
                                                                        None => {
-                                                                               debug_assert!(false, "Inconsistent peers set state!");
-                                                                               continue;
-                                                                       }
-                                                               },
-                                                               None => {
-                                                                       continue;
-                                                               },
+                                                                               None
+                                                                       },
+                                                               }
                                                        }
                                                }
                                        }
                                }
-                               for event in events_generated.drain(..) {
+                               let mut handle_event = |event, from_chan_handler| {
                                        match event {
                                                MessageSendEvent::SendAcceptChannel { ref node_id, ref msg } => {
                                                        log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannel event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.common_fields.temporary_channel_id);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                },
                                                MessageSendEvent::SendAcceptChannelV2 { ref node_id, ref msg } => {
                                                        log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.common_fields.temporary_channel_id);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                },
                                                MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
                                                        log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.common_fields.temporary_channel_id);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                },
                                                MessageSendEvent::SendOpenChannelV2 { ref node_id, ref msg } => {
                                                        log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.common_fields.temporary_channel_id), None), "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.common_fields.temporary_channel_id);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                },
                                                MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
                                                        log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.temporary_channel_id), None), "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
@@ -2130,107 +2132,107 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                                        ChannelId::v1_from_funding_txid(msg.funding_txid.as_byte_array(), msg.funding_output_index));
                                                        // TODO: If the peer is gone we should generate a DiscardFunding event
                                                        // indicating to the wallet that they should just throw away this funding transaction
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                },
                                                MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => {
                                                        log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendFundingSigned event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                },
                                                MessageSendEvent::SendChannelReady { ref node_id, ref msg } => {
                                                        log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendChannelReady event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                },
                                                MessageSendEvent::SendStfu { ref node_id, ref msg} => {
                                                        let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
                                                        log_debug!(logger, "Handling SendStfu event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                }
                                                MessageSendEvent::SendSpliceInit { ref node_id, ref msg} => {
                                                        let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
                                                        log_debug!(logger, "Handling SendSpliceInit event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                }
                                                MessageSendEvent::SendSpliceAck { ref node_id, ref msg} => {
                                                        let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
                                                        log_debug!(logger, "Handling SendSpliceAck event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                }
                                                MessageSendEvent::SendSpliceLocked { ref node_id, ref msg} => {
                                                        let logger = WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None);
                                                        log_debug!(logger, "Handling SendSpliceLocked event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                }
                                                MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => {
                                                        log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAddInput event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                },
                                                MessageSendEvent::SendTxAddOutput { ref node_id, ref msg } => {
                                                        log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAddOutput event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                },
                                                MessageSendEvent::SendTxRemoveInput { ref node_id, ref msg } => {
                                                        log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                },
                                                MessageSendEvent::SendTxRemoveOutput { ref node_id, ref msg } => {
                                                        log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                },
                                                MessageSendEvent::SendTxComplete { ref node_id, ref msg } => {
                                                        log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxComplete event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                },
                                                MessageSendEvent::SendTxSignatures { ref node_id, ref msg } => {
                                                        log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxSignatures event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                },
                                                MessageSendEvent::SendTxInitRbf { ref node_id, ref msg } => {
                                                        log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxInitRbf event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                },
                                                MessageSendEvent::SendTxAckRbf { ref node_id, ref msg } => {
                                                        log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAckRbf event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                },
                                                MessageSendEvent::SendTxAbort { ref node_id, ref msg } => {
                                                        log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendTxAbort event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                },
                                                MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
                                                        log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                },
                                                MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
                                                        log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(commitment_signed.channel_id), None), "Handling UpdateHTLCs event in peer_handler for node {} with {} adds, {} fulfills, {} fails for channel {}",
@@ -2239,7 +2241,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                                        update_fulfill_htlcs.len(),
                                                                        update_fail_htlcs.len(),
                                                                        &commitment_signed.channel_id);
-                                                       let mut peer = get_peer_for_forwarding!(node_id);
+                                                       let mut peer = get_peer_for_forwarding!(node_id)?;
                                                        for msg in update_add_htlcs {
                                                                self.enqueue_message(&mut *peer, msg);
                                                        }
@@ -2261,32 +2263,32 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                        log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendRevokeAndACK event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                },
                                                MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
                                                        log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendClosingSigned event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                },
                                                MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
                                                        log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling Shutdown event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                },
                                                MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
                                                        log_debug!(WithContext::from(&self.logger, Some(*node_id), Some(msg.channel_id), None), "Handling SendChannelReestablish event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),
                                                                        &msg.channel_id);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                },
                                                MessageSendEvent::SendChannelAnnouncement { ref node_id, ref msg, ref update_msg } => {
                                                        log_debug!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelAnnouncement event in peer_handler for node {} for short channel id {}",
                                                                        log_pubkey!(node_id),
                                                                        msg.contents.short_channel_id);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), update_msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, update_msg);
                                                },
                                                MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg } => {
                                                        log_debug!(self.logger, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
@@ -2322,7 +2324,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                MessageSendEvent::SendChannelUpdate { ref node_id, ref msg } => {
                                                        log_trace!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendChannelUpdate event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id), msg.contents.short_channel_id);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                },
                                                MessageSendEvent::HandleError { node_id, action } => {
                                                        let logger = WithContext::from(&self.logger, Some(node_id), None, None);
@@ -2360,21 +2362,21 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                                        log_trace!(logger, "Handling SendErrorMessage HandleError event in peer_handler for node {} with message {}",
                                                                                        log_pubkey!(node_id),
                                                                                        msg.data);
-                                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg);
+                                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id)?, msg);
                                                                },
                                                                msgs::ErrorAction::SendWarningMessage { ref msg, ref log_level } => {
                                                                        log_given_level!(logger, *log_level, "Handling SendWarningMessage HandleError event in peer_handler for node {} with message {}",
                                                                                        log_pubkey!(node_id),
                                                                                        msg.data);
-                                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), msg);
+                                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id)?, msg);
                                                                },
                                                        }
                                                },
                                                MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => {
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                },
                                                MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => {
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                }
                                                MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => {
                                                        log_gossip!(WithContext::from(&self.logger, Some(*node_id), None, None), "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}",
@@ -2383,17 +2385,24 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                                msg.first_blocknum,
                                                                msg.number_of_blocks,
                                                                msg.sync_complete);
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                }
                                                MessageSendEvent::SendGossipTimestampFilter { ref node_id, ref msg } => {
-                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id)?, msg);
                                                }
                                        }
+                                       Some(())
+                               };
+                               for event in chan_events {
+                                       handle_event(event, true);
+                               }
+                               for event in route_events {
+                                       handle_event(event, false);
                                }
 
                                for (node_id, msg) in self.message_handler.custom_message_handler.get_and_clear_pending_msg() {
                                        if peers_to_disconnect.get(&node_id).is_some() { continue; }
-                                       self.enqueue_message(&mut *get_peer_for_forwarding!(&node_id), &msg);
+                                       self.enqueue_message(&mut *if let Some(peer) = get_peer_for_forwarding!(&node_id) { peer } else { continue; }, &msg);
                                }
 
                                for (descriptor, peer_mutex) in peers.iter() {