Merge pull request #2708 from TheBlueMatt/2023-11-less-graph-memory-frag
[rust-lightning] / lightning / src / ln / peer_handler.rs
index ba3a733d225db072f70cc4d185ce4e4cfe1030e9..006538651a8133462996a750308a13497a0427f8 100644 (file)
@@ -27,7 +27,7 @@ use crate::ln::msgs::{ChannelMessageHandler, LightningError, SocketAddress, Onio
 #[cfg(not(c_bindings))]
 use crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager};
 use crate::util::ser::{VecWriter, Writeable, Writer};
-use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep};
+use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor, NextNoiseStep, MessageBuf, MSG_BUF_ALLOC_SIZE};
 use crate::ln::wire;
 use crate::ln::wire::{Encode, Type};
 #[cfg(not(c_bindings))]
@@ -40,7 +40,7 @@ use crate::util::string::PrintableString;
 
 use crate::prelude::*;
 use crate::io;
-use alloc::collections::LinkedList;
+use alloc::collections::VecDeque;
 use crate::sync::{Arc, Mutex, MutexGuard, FairRwLock};
 use core::sync::atomic::{AtomicBool, AtomicU32, AtomicI32, Ordering};
 use core::{cmp, hash, fmt, mem};
@@ -489,13 +489,13 @@ struct Peer {
        their_features: Option<InitFeatures>,
        their_socket_address: Option<SocketAddress>,
 
-       pending_outbound_buffer: LinkedList<Vec<u8>>,
+       pending_outbound_buffer: VecDeque<Vec<u8>>,
        pending_outbound_buffer_first_msg_offset: usize,
        /// Queue gossip broadcasts separately from `pending_outbound_buffer` so we can easily
        /// prioritize channel messages over them.
        ///
        /// Note that these messages are *not* encrypted/MAC'd, and are only serialized.
-       gossip_broadcast_buffer: LinkedList<Vec<u8>>,
+       gossip_broadcast_buffer: VecDeque<MessageBuf>,
        awaiting_write_event: bool,
 
        pending_read_buffer: Vec<u8>,
@@ -785,7 +785,7 @@ impl From<LightningError> for MessageHandlingError {
 
 macro_rules! encode_msg {
        ($msg: expr) => {{
-               let mut buffer = VecWriter(Vec::new());
+               let mut buffer = VecWriter(Vec::with_capacity(MSG_BUF_ALLOC_SIZE));
                wire::write($msg, &mut buffer).unwrap();
                buffer.0
        }}
@@ -997,9 +997,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                        their_features: None,
                                        their_socket_address: remote_network_address,
 
-                                       pending_outbound_buffer: LinkedList::new(),
+                                       pending_outbound_buffer: VecDeque::new(),
                                        pending_outbound_buffer_first_msg_offset: 0,
-                                       gossip_broadcast_buffer: LinkedList::new(),
+                                       gossip_broadcast_buffer: VecDeque::new(),
                                        awaiting_write_event: false,
 
                                        pending_read_buffer,
@@ -1053,9 +1053,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                        their_features: None,
                                        their_socket_address: remote_network_address,
 
-                                       pending_outbound_buffer: LinkedList::new(),
+                                       pending_outbound_buffer: VecDeque::new(),
                                        pending_outbound_buffer_first_msg_offset: 0,
-                                       gossip_broadcast_buffer: LinkedList::new(),
+                                       gossip_broadcast_buffer: VecDeque::new(),
                                        awaiting_write_event: false,
 
                                        pending_read_buffer,
@@ -1102,7 +1102,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                        }
                        if peer.should_buffer_gossip_broadcast() {
                                if let Some(msg) = peer.gossip_broadcast_buffer.pop_front() {
-                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_buffer(&msg[..]));
+                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_buffer(msg));
                                }
                        }
                        if peer.should_buffer_gossip_backfill() {
@@ -1168,6 +1168,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                        if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() {
                                peer.pending_outbound_buffer_first_msg_offset = 0;
                                peer.pending_outbound_buffer.pop_front();
+                               const VEC_SIZE: usize = ::core::mem::size_of::<Vec<u8>>();
+                               let large_capacity = peer.pending_outbound_buffer.capacity() > 4096 / VEC_SIZE;
+                               let lots_of_slack = peer.pending_outbound_buffer.len()
+                                       < peer.pending_outbound_buffer.capacity() / 2;
+                               if large_capacity && lots_of_slack {
+                                       peer.pending_outbound_buffer.shrink_to_fit();
+                               }
                        } else {
                                peer.awaiting_write_event = true;
                        }
@@ -1244,8 +1251,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
        }
 
        /// Append a message to a peer's pending outbound/write gossip broadcast buffer
-       fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: Vec<u8>) {
+       fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: MessageBuf) {
                peer.msgs_sent_since_pong += 1;
+               debug_assert!(peer.gossip_broadcast_buffer.len() <= OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP);
                peer.gossip_broadcast_buffer.push_back(encoded_message);
        }
 
@@ -1402,17 +1410,18 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                                        }
                                                                        peer.pending_read_is_header = false;
                                                                } else {
-                                                                       let msg_data = try_potential_handleerror!(peer,
-                                                                               peer.channel_encryptor.decrypt_message(&peer.pending_read_buffer[..]));
-                                                                       assert!(msg_data.len() >= 2);
+                                                                       debug_assert!(peer.pending_read_buffer.len() >= 2 + 16);
+                                                                       try_potential_handleerror!(peer,
+                                                                               peer.channel_encryptor.decrypt_message(&mut peer.pending_read_buffer[..]));
+
+                                                                       let mut reader = io::Cursor::new(&peer.pending_read_buffer[..peer.pending_read_buffer.len() - 16]);
+                                                                       let message_result = wire::read(&mut reader, &*self.message_handler.custom_message_handler);
 
                                                                        // Reset read buffer
                                                                        if peer.pending_read_buffer.capacity() > 8192 { peer.pending_read_buffer = Vec::new(); }
                                                                        peer.pending_read_buffer.resize(18, 0);
                                                                        peer.pending_read_is_header = true;
 
-                                                                       let mut reader = io::Cursor::new(&msg_data[..]);
-                                                                       let message_result = wire::read(&mut reader, &*self.message_handler.custom_message_handler);
                                                                        let message = match message_result {
                                                                                Ok(x) => x,
                                                                                Err(e) => {
@@ -1791,7 +1800,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                        if except_node.is_some() && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node {
                                                continue;
                                        }
-                                       self.enqueue_encoded_gossip_broadcast(&mut *peer, encoded_msg.clone());
+                                       self.enqueue_encoded_gossip_broadcast(&mut *peer, MessageBuf::from_encoded(&encoded_msg));
                                }
                        },
                        wire::Message::NodeAnnouncement(ref msg) => {
@@ -1818,7 +1827,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                        if except_node.is_some() && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node {
                                                continue;
                                        }
-                                       self.enqueue_encoded_gossip_broadcast(&mut *peer, encoded_msg.clone());
+                                       self.enqueue_encoded_gossip_broadcast(&mut *peer, MessageBuf::from_encoded(&encoded_msg));
                                }
                        },
                        wire::Message::ChannelUpdate(ref msg) => {
@@ -1840,7 +1849,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                        if except_node.is_some() && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node {
                                                continue;
                                        }
-                                       self.enqueue_encoded_gossip_broadcast(&mut *peer, encoded_msg.clone());
+                                       self.enqueue_encoded_gossip_broadcast(&mut *peer, MessageBuf::from_encoded(&encoded_msg));
                                }
                        },
                        _ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),