projects
/
rust-lightning
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Merge pull request #2708 from TheBlueMatt/2023-11-less-graph-memory-frag
[rust-lightning]
/
lightning
/
src
/
ln
/
peer_handler.rs
diff --git
a/lightning/src/ln/peer_handler.rs
b/lightning/src/ln/peer_handler.rs
index 44d2a87a87c4ea2cc7492bdbb575a44f41f80f68..006538651a8133462996a750308a13497a0427f8 100644
(file)
--- a/
lightning/src/ln/peer_handler.rs
+++ b/
lightning/src/ln/peer_handler.rs
@@
-24,12
+24,15
@@
use crate::ln::ChannelId;
use crate::ln::features::{InitFeatures, NodeFeatures};
use crate::ln::msgs;
use crate::ln::msgs::{ChannelMessageHandler, LightningError, SocketAddress, OnionMessageHandler, RoutingMessageHandler};
use crate::ln::features::{InitFeatures, NodeFeatures};
use crate::ln::msgs;
use crate::ln::msgs::{ChannelMessageHandler, LightningError, SocketAddress, OnionMessageHandler, RoutingMessageHandler};
+#[cfg(not(c_bindings))]
use crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager};
use crate::util::ser::{VecWriter, Writeable, Writer};
use crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager};
use crate::util::ser::{VecWriter, Writeable, Writer};
-use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor,
NextNoiseStep
};
+use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor,
NextNoiseStep, MessageBuf, MSG_BUF_ALLOC_SIZE
};
use crate::ln::wire;
use crate::ln::wire::{Encode, Type};
use crate::ln::wire;
use crate::ln::wire::{Encode, Type};
-use crate::onion_message::{CustomOnionMessageHandler, OffersMessage, OffersMessageHandler, OnionMessageContents, PendingOnionMessage, SimpleArcOnionMessenger, SimpleRefOnionMessenger};
+#[cfg(not(c_bindings))]
+use crate::onion_message::{SimpleArcOnionMessenger, SimpleRefOnionMessenger};
+use crate::onion_message::{CustomOnionMessageHandler, OffersMessage, OffersMessageHandler, OnionMessageContents, PendingOnionMessage};
use crate::routing::gossip::{NetworkGraph, P2PGossipSync, NodeId, NodeAlias};
use crate::util::atomic_counter::AtomicCounter;
use crate::util::logger::Logger;
use crate::routing::gossip::{NetworkGraph, P2PGossipSync, NodeId, NodeAlias};
use crate::util::atomic_counter::AtomicCounter;
use crate::util::logger::Logger;
@@
-37,7
+40,7
@@
use crate::util::string::PrintableString;
use crate::prelude::*;
use crate::io;
use crate::prelude::*;
use crate::io;
-use alloc::collections::
LinkedList
;
+use alloc::collections::
VecDeque
;
use crate::sync::{Arc, Mutex, MutexGuard, FairRwLock};
use core::sync::atomic::{AtomicBool, AtomicU32, AtomicI32, Ordering};
use core::{cmp, hash, fmt, mem};
use crate::sync::{Arc, Mutex, MutexGuard, FairRwLock};
use core::sync::atomic::{AtomicBool, AtomicU32, AtomicI32, Ordering};
use core::{cmp, hash, fmt, mem};
@@
-486,13
+489,13
@@
struct Peer {
their_features: Option<InitFeatures>,
their_socket_address: Option<SocketAddress>,
their_features: Option<InitFeatures>,
their_socket_address: Option<SocketAddress>,
- pending_outbound_buffer:
LinkedList
<Vec<u8>>,
+ pending_outbound_buffer:
VecDeque
<Vec<u8>>,
pending_outbound_buffer_first_msg_offset: usize,
/// Queue gossip broadcasts separately from `pending_outbound_buffer` so we can easily
/// prioritize channel messages over them.
///
/// Note that these messages are *not* encrypted/MAC'd, and are only serialized.
pending_outbound_buffer_first_msg_offset: usize,
/// Queue gossip broadcasts separately from `pending_outbound_buffer` so we can easily
/// prioritize channel messages over them.
///
/// Note that these messages are *not* encrypted/MAC'd, and are only serialized.
- gossip_broadcast_buffer:
LinkedList<Vec<u8>
>,
+ gossip_broadcast_buffer:
VecDeque<MessageBuf
>,
awaiting_write_event: bool,
pending_read_buffer: Vec<u8>,
awaiting_write_event: bool,
pending_read_buffer: Vec<u8>,
@@
-608,7
+611,8
@@
impl Peer {
/// SimpleRefPeerManager is the more appropriate type. Defining these type aliases prevents
/// issues such as overly long function definitions.
///
/// SimpleRefPeerManager is the more appropriate type. Defining these type aliases prevents
/// issues such as overly long function definitions.
///
-/// This is not exported to bindings users as `Arc`s don't make sense in bindings.
+/// This is not exported to bindings users as type aliases aren't supported in most languages.
+#[cfg(not(c_bindings))]
pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<
SD,
Arc<SimpleArcChannelManager<M, T, F, L>>,
pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<
SD,
Arc<SimpleArcChannelManager<M, T, F, L>>,
@@
-626,7
+630,8
@@
pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<
/// But if this is not necessary, using a reference is more efficient. Defining these type aliases
/// helps with issues such as long function definitions.
///
/// But if this is not necessary, using a reference is more efficient. Defining these type aliases
/// helps with issues such as long function definitions.
///
-/// This is not exported to bindings users as general type aliases don't make sense in bindings.
+/// This is not exported to bindings users as type aliases aren't supported in most languages.
+#[cfg(not(c_bindings))]
pub type SimpleRefPeerManager<
'a, 'b, 'c, 'd, 'e, 'f, 'logger, 'h, 'i, 'j, 'graph, 'k, SD, M, T, F, C, L
> = PeerManager<
pub type SimpleRefPeerManager<
'a, 'b, 'c, 'd, 'e, 'f, 'logger, 'h, 'i, 'j, 'graph, 'k, SD, M, T, F, C, L
> = PeerManager<
@@
-780,7
+785,7
@@
impl From<LightningError> for MessageHandlingError {
macro_rules! encode_msg {
($msg: expr) => {{
macro_rules! encode_msg {
($msg: expr) => {{
- let mut buffer = VecWriter(Vec::
new(
));
+ let mut buffer = VecWriter(Vec::
with_capacity(MSG_BUF_ALLOC_SIZE
));
wire::write($msg, &mut buffer).unwrap();
buffer.0
}}
wire::write($msg, &mut buffer).unwrap();
buffer.0
}}
@@
-992,9
+997,9
@@
impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
their_features: None,
their_socket_address: remote_network_address,
their_features: None,
their_socket_address: remote_network_address,
- pending_outbound_buffer:
LinkedList
::new(),
+ pending_outbound_buffer:
VecDeque
::new(),
pending_outbound_buffer_first_msg_offset: 0,
pending_outbound_buffer_first_msg_offset: 0,
- gossip_broadcast_buffer:
LinkedList
::new(),
+ gossip_broadcast_buffer:
VecDeque
::new(),
awaiting_write_event: false,
pending_read_buffer,
awaiting_write_event: false,
pending_read_buffer,
@@
-1048,9
+1053,9
@@
impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
their_features: None,
their_socket_address: remote_network_address,
their_features: None,
their_socket_address: remote_network_address,
- pending_outbound_buffer:
LinkedList
::new(),
+ pending_outbound_buffer:
VecDeque
::new(),
pending_outbound_buffer_first_msg_offset: 0,
pending_outbound_buffer_first_msg_offset: 0,
- gossip_broadcast_buffer:
LinkedList
::new(),
+ gossip_broadcast_buffer:
VecDeque
::new(),
awaiting_write_event: false,
pending_read_buffer,
awaiting_write_event: false,
pending_read_buffer,
@@
-1097,7
+1102,7
@@
impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
}
if peer.should_buffer_gossip_broadcast() {
if let Some(msg) = peer.gossip_broadcast_buffer.pop_front() {
}
if peer.should_buffer_gossip_broadcast() {
if let Some(msg) = peer.gossip_broadcast_buffer.pop_front() {
- peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_buffer(
&msg[..]
));
+ peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_buffer(
msg
));
}
}
if peer.should_buffer_gossip_backfill() {
}
}
if peer.should_buffer_gossip_backfill() {
@@
-1163,6
+1168,13
@@
impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() {
peer.pending_outbound_buffer_first_msg_offset = 0;
peer.pending_outbound_buffer.pop_front();
if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() {
peer.pending_outbound_buffer_first_msg_offset = 0;
peer.pending_outbound_buffer.pop_front();
+ const VEC_SIZE: usize = ::core::mem::size_of::<Vec<u8>>();
+ let large_capacity = peer.pending_outbound_buffer.capacity() > 4096 / VEC_SIZE;
+ let lots_of_slack = peer.pending_outbound_buffer.len()
+ < peer.pending_outbound_buffer.capacity() / 2;
+ if large_capacity && lots_of_slack {
+ peer.pending_outbound_buffer.shrink_to_fit();
+ }
} else {
peer.awaiting_write_event = true;
}
} else {
peer.awaiting_write_event = true;
}
@@
-1239,8
+1251,9
@@
impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
}
/// Append a message to a peer's pending outbound/write gossip broadcast buffer
}
/// Append a message to a peer's pending outbound/write gossip broadcast buffer
- fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message:
Vec<u8>
) {
+ fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message:
MessageBuf
) {
peer.msgs_sent_since_pong += 1;
peer.msgs_sent_since_pong += 1;
+ debug_assert!(peer.gossip_broadcast_buffer.len() <= OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP);
peer.gossip_broadcast_buffer.push_back(encoded_message);
}
peer.gossip_broadcast_buffer.push_back(encoded_message);
}
@@
-1397,17
+1410,18
@@
impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
}
peer.pending_read_is_header = false;
} else {
}
peer.pending_read_is_header = false;
} else {
- let msg_data = try_potential_handleerror!(peer,
- peer.channel_encryptor.decrypt_message(&peer.pending_read_buffer[..]));
- assert!(msg_data.len() >= 2);
+ debug_assert!(peer.pending_read_buffer.len() >= 2 + 16);
+ try_potential_handleerror!(peer,
+ peer.channel_encryptor.decrypt_message(&mut peer.pending_read_buffer[..]));
+
+ let mut reader = io::Cursor::new(&peer.pending_read_buffer[..peer.pending_read_buffer.len() - 16]);
+ let message_result = wire::read(&mut reader, &*self.message_handler.custom_message_handler);
// Reset read buffer
if peer.pending_read_buffer.capacity() > 8192 { peer.pending_read_buffer = Vec::new(); }
peer.pending_read_buffer.resize(18, 0);
peer.pending_read_is_header = true;
// Reset read buffer
if peer.pending_read_buffer.capacity() > 8192 { peer.pending_read_buffer = Vec::new(); }
peer.pending_read_buffer.resize(18, 0);
peer.pending_read_is_header = true;
- let mut reader = io::Cursor::new(&msg_data[..]);
- let message_result = wire::read(&mut reader, &*self.message_handler.custom_message_handler);
let message = match message_result {
Ok(x) => x,
Err(e) => {
let message = match message_result {
Ok(x) => x,
Err(e) => {
@@
-1786,7
+1800,7
@@
impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
if except_node.is_some() && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node {
continue;
}
if except_node.is_some() && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node {
continue;
}
- self.enqueue_encoded_gossip_broadcast(&mut *peer,
encoded_msg.clone(
));
+ self.enqueue_encoded_gossip_broadcast(&mut *peer,
MessageBuf::from_encoded(&encoded_msg
));
}
},
wire::Message::NodeAnnouncement(ref msg) => {
}
},
wire::Message::NodeAnnouncement(ref msg) => {
@@
-1813,7
+1827,7
@@
impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
if except_node.is_some() && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node {
continue;
}
if except_node.is_some() && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node {
continue;
}
- self.enqueue_encoded_gossip_broadcast(&mut *peer,
encoded_msg.clone(
));
+ self.enqueue_encoded_gossip_broadcast(&mut *peer,
MessageBuf::from_encoded(&encoded_msg
));
}
},
wire::Message::ChannelUpdate(ref msg) => {
}
},
wire::Message::ChannelUpdate(ref msg) => {
@@
-1835,7
+1849,7
@@
impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
if except_node.is_some() && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node {
continue;
}
if except_node.is_some() && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node {
continue;
}
- self.enqueue_encoded_gossip_broadcast(&mut *peer,
encoded_msg.clone(
));
+ self.enqueue_encoded_gossip_broadcast(&mut *peer,
MessageBuf::from_encoded(&encoded_msg
));
}
},
_ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),
}
},
_ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),