From 0b8311643697cb3dd7c8e6a53ade86003432e10e Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Fri, 1 Dec 2023 13:22:43 -0600 Subject: [PATCH] Rename OnionMessageBuffer to OnionMessageRecipient --- lightning/src/onion_message/messenger.rs | 88 ++++++++++++------------ 1 file changed, 44 insertions(+), 44 deletions(-) diff --git a/lightning/src/onion_message/messenger.rs b/lightning/src/onion_message/messenger.rs index 8d4fb044f..05ea7a285 100644 --- a/lightning/src/onion_message/messenger.rs +++ b/lightning/src/onion_message/messenger.rs @@ -156,7 +156,7 @@ where entropy_source: ES, node_signer: NS, logger: L, - message_buffers: Mutex>, + message_recipients: Mutex>, secp_ctx: Secp256k1, message_router: MR, offers_handler: OMH, @@ -164,7 +164,7 @@ where } /// [`OnionMessage`]s buffered to be sent. -enum OnionMessageBuffer { +enum OnionMessageRecipient { /// Messages for a node connected as a peer. ConnectedPeer(VecDeque), @@ -173,22 +173,22 @@ enum OnionMessageBuffer { PendingConnection(VecDeque, Option>, usize), } -impl OnionMessageBuffer { +impl OnionMessageRecipient { fn pending_connection(addresses: Vec) -> Self { Self::PendingConnection(VecDeque::new(), Some(addresses), 0) } fn pending_messages(&self) -> &VecDeque { match self { - OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, - OnionMessageBuffer::PendingConnection(pending_messages, _, _) => pending_messages, + OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages, + OnionMessageRecipient::PendingConnection(pending_messages, _, _) => pending_messages, } } fn enqueue_message(&mut self, message: OnionMessage) { let pending_messages = match self { - OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, - OnionMessageBuffer::PendingConnection(pending_messages, _, _) => pending_messages, + OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages, + OnionMessageRecipient::PendingConnection(pending_messages, _, _) => pending_messages, }; pending_messages.push_back(message); @@ -196,8 +196,8 @@ impl OnionMessageBuffer { fn dequeue_message(&mut self) -> Option { let pending_messages = match self { - OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, - OnionMessageBuffer::PendingConnection(pending_messages, _, _) => { + OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages, + OnionMessageRecipient::PendingConnection(pending_messages, _, _) => { debug_assert!(false); pending_messages }, @@ -209,18 +209,18 @@ impl OnionMessageBuffer { #[cfg(test)] fn release_pending_messages(&mut self) -> VecDeque { let pending_messages = match self { - OnionMessageBuffer::ConnectedPeer(pending_messages) => pending_messages, - OnionMessageBuffer::PendingConnection(pending_messages, _, _) => pending_messages, + OnionMessageRecipient::ConnectedPeer(pending_messages) => pending_messages, + OnionMessageRecipient::PendingConnection(pending_messages, _, _) => pending_messages, }; core::mem::take(pending_messages) } fn mark_connected(&mut self) { - if let OnionMessageBuffer::PendingConnection(pending_messages, _, _) = self { + if let OnionMessageRecipient::PendingConnection(pending_messages, _, _) = self { let mut new_pending_messages = VecDeque::new(); core::mem::swap(pending_messages, &mut new_pending_messages); - *self = OnionMessageBuffer::ConnectedPeer(new_pending_messages); + *self = OnionMessageRecipient::ConnectedPeer(new_pending_messages); } } } @@ -631,7 +631,7 @@ where OnionMessenger { entropy_source, node_signer, - message_buffers: Mutex::new(HashMap::new()), + message_recipients: Mutex::new(HashMap::new()), secp_ctx, logger, message_router, @@ -687,9 +687,9 @@ where .get_node_id(Recipient::Node) .map_err(|_| SendError::GetNodeIdFailed)?; - let peers = self.message_buffers.lock().unwrap() + let peers = self.message_recipients.lock().unwrap() .iter() - .filter(|(_, buffer)| matches!(buffer, OnionMessageBuffer::ConnectedPeer(_))) + .filter(|(_, recipient)| matches!(recipient, OnionMessageRecipient::ConnectedPeer(_))) .map(|(node_id, _)| *node_id) .collect(); @@ -708,16 +708,16 @@ where &self.entropy_source, &self.node_signer, &self.secp_ctx, path, contents, reply_path )?; - let mut message_buffers = self.message_buffers.lock().unwrap(); - if outbound_buffer_full(&first_node_id, &message_buffers) { + let mut message_recipients = self.message_recipients.lock().unwrap(); + if outbound_buffer_full(&first_node_id, &message_recipients) { return Err(SendError::BufferFull); } - match message_buffers.entry(first_node_id) { + match message_recipients.entry(first_node_id) { hash_map::Entry::Vacant(e) => match addresses { None => Err(SendError::InvalidFirstHop(first_node_id)), Some(addresses) => { - e.insert(OnionMessageBuffer::pending_connection(addresses)) + e.insert(OnionMessageRecipient::pending_connection(addresses)) .enqueue_message(onion_message); Ok(SendSuccess::BufferedAwaitingConnection(first_node_id)) }, @@ -755,18 +755,18 @@ where #[cfg(test)] pub(super) fn release_pending_msgs(&self) -> HashMap> { - let mut message_buffers = self.message_buffers.lock().unwrap(); + let mut message_recipients = self.message_recipients.lock().unwrap(); let mut msgs = HashMap::new(); // We don't want to disconnect the peers by removing them entirely from the original map, so we // release the pending message buffers individually. - for (peer_node_id, buffer) in &mut *message_buffers { - msgs.insert(*peer_node_id, buffer.release_pending_messages()); + for (node_id, recipient) in &mut *message_recipients { + msgs.insert(*node_id, recipient.release_pending_messages()); } msgs } } -fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap) -> bool { +fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap) -> bool { const MAX_TOTAL_BUFFER_SIZE: usize = (1 << 20) * 128; const MAX_PER_PEER_BUFFER_SIZE: usize = (1 << 10) * 256; let mut total_buffered_bytes = 0; @@ -800,8 +800,8 @@ where CMH::Target: CustomOnionMessageHandler, { fn process_pending_events(&self, handler: H) where H::Target: EventHandler { - for (node_id, recipient) in self.message_buffers.lock().unwrap().iter_mut() { - if let OnionMessageBuffer::PendingConnection(_, addresses, _) = recipient { + for (node_id, recipient) in self.message_recipients.lock().unwrap().iter_mut() { + if let OnionMessageRecipient::PendingConnection(_, addresses, _) = recipient { if let Some(addresses) = addresses.take() { handler.handle_event(Event::ConnectionNeeded { node_id: *node_id, addresses }); } @@ -852,20 +852,20 @@ where } }, Ok(PeeledOnion::Forward(next_node_id, onion_message)) => { - let mut message_buffers = self.message_buffers.lock().unwrap(); - if outbound_buffer_full(&next_node_id, &message_buffers) { + let mut message_recipients = self.message_recipients.lock().unwrap(); + if outbound_buffer_full(&next_node_id, &message_recipients) { log_trace!(self.logger, "Dropping forwarded onion message to peer {:?}: outbound buffer full", next_node_id); return } #[cfg(fuzzing)] - message_buffers + message_recipients .entry(next_node_id) - .or_insert_with(|| OnionMessageBuffer::ConnectedPeer(VecDeque::new())); + .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())); - match message_buffers.entry(next_node_id) { + match message_recipients.entry(next_node_id) { hash_map::Entry::Occupied(mut e) if matches!( - e.get(), OnionMessageBuffer::ConnectedPeer(..) + e.get(), OnionMessageRecipient::ConnectedPeer(..) ) => { e.get_mut().enqueue_message(onion_message); log_trace!(self.logger, "Forwarding an onion message to peer {}", next_node_id); @@ -884,39 +884,39 @@ where fn peer_connected(&self, their_node_id: &PublicKey, init: &msgs::Init, _inbound: bool) -> Result<(), ()> { if init.features.supports_onion_messages() { - self.message_buffers.lock().unwrap() + self.message_recipients.lock().unwrap() .entry(*their_node_id) - .or_insert_with(|| OnionMessageBuffer::ConnectedPeer(VecDeque::new())) + .or_insert_with(|| OnionMessageRecipient::ConnectedPeer(VecDeque::new())) .mark_connected(); } else { - self.message_buffers.lock().unwrap().remove(their_node_id); + self.message_recipients.lock().unwrap().remove(their_node_id); } Ok(()) } fn peer_disconnected(&self, their_node_id: &PublicKey) { - match self.message_buffers.lock().unwrap().remove(their_node_id) { - Some(OnionMessageBuffer::ConnectedPeer(..)) => {}, + match self.message_recipients.lock().unwrap().remove(their_node_id) { + Some(OnionMessageRecipient::ConnectedPeer(..)) => {}, _ => debug_assert!(false), } } fn timer_tick_occurred(&self) { - let mut message_buffers = self.message_buffers.lock().unwrap(); + let mut message_recipients = self.message_recipients.lock().unwrap(); // Drop any pending recipients since the last call to avoid retaining buffered messages for // too long. - message_buffers.retain(|_, recipient| match recipient { - OnionMessageBuffer::PendingConnection(_, None, ticks) => *ticks < MAX_TIMER_TICKS, - OnionMessageBuffer::PendingConnection(_, Some(_), _) => true, + message_recipients.retain(|_, recipient| match recipient { + OnionMessageRecipient::PendingConnection(_, None, ticks) => *ticks < MAX_TIMER_TICKS, + OnionMessageRecipient::PendingConnection(_, Some(_), _) => true, _ => true, }); // Increment a timer tick for pending recipients so that their buffered messages are dropped // at MAX_TIMER_TICKS. - for recipient in message_buffers.values_mut() { - if let OnionMessageBuffer::PendingConnection(_, None, ticks) = recipient { + for recipient in message_recipients.values_mut() { + if let OnionMessageRecipient::PendingConnection(_, None, ticks) = recipient { *ticks += 1; } } @@ -960,7 +960,7 @@ where ); } - self.message_buffers.lock().unwrap() + self.message_recipients.lock().unwrap() .get_mut(&peer_node_id) .and_then(|buffer| buffer.dequeue_message()) } -- 2.39.5