#[cfg(not(c_bindings))]
use crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager};
use crate::util::ser::{VecWriter, Writeable, Writer};
-use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor, NextNoiseStep, MSG_BUF_ALLOC_SIZE};
+use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor, NextNoiseStep, MessageBuf, MSG_BUF_ALLOC_SIZE};
use crate::ln::wire;
use crate::ln::wire::{Encode, Type};
#[cfg(not(c_bindings))]
use crate::prelude::*;
use crate::io;
-use alloc::collections::LinkedList;
+use alloc::collections::VecDeque;
use crate::sync::{Arc, Mutex, MutexGuard, FairRwLock};
use core::sync::atomic::{AtomicBool, AtomicU32, AtomicI32, Ordering};
use core::{cmp, hash, fmt, mem};
fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) {
ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
}
+ fn handle_stfu(&self, their_node_id: &PublicKey, msg: &msgs::Stfu) {
+ ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id);
+ }
+ fn handle_splice(&self, their_node_id: &PublicKey, msg: &msgs::Splice) {
+ ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id);
+ }
+ fn handle_splice_ack(&self, their_node_id: &PublicKey, msg: &msgs::SpliceAck) {
+ ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id);
+ }
+ fn handle_splice_locked(&self, their_node_id: &PublicKey, msg: &msgs::SpliceLocked) {
+ ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id);
+ }
fn handle_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) {
ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
}
their_features: Option<InitFeatures>,
their_socket_address: Option<SocketAddress>,
- pending_outbound_buffer: LinkedList<Vec<u8>>,
+ pending_outbound_buffer: VecDeque<Vec<u8>>,
pending_outbound_buffer_first_msg_offset: usize,
/// Queue gossip broadcasts separately from `pending_outbound_buffer` so we can easily
/// prioritize channel messages over them.
///
/// Note that these messages are *not* encrypted/MAC'd, and are only serialized.
- gossip_broadcast_buffer: LinkedList<Vec<u8>>,
+ gossip_broadcast_buffer: VecDeque<MessageBuf>,
awaiting_write_event: bool,
pending_read_buffer: Vec<u8>,
their_features: None,
their_socket_address: remote_network_address,
- pending_outbound_buffer: LinkedList::new(),
+ pending_outbound_buffer: VecDeque::new(),
pending_outbound_buffer_first_msg_offset: 0,
- gossip_broadcast_buffer: LinkedList::new(),
+ gossip_broadcast_buffer: VecDeque::new(),
awaiting_write_event: false,
pending_read_buffer,
their_features: None,
their_socket_address: remote_network_address,
- pending_outbound_buffer: LinkedList::new(),
+ pending_outbound_buffer: VecDeque::new(),
pending_outbound_buffer_first_msg_offset: 0,
- gossip_broadcast_buffer: LinkedList::new(),
+ gossip_broadcast_buffer: VecDeque::new(),
awaiting_write_event: false,
pending_read_buffer,
}
if peer.should_buffer_gossip_broadcast() {
if let Some(msg) = peer.gossip_broadcast_buffer.pop_front() {
- peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_buffer(&msg[..]));
+ peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_buffer(msg));
}
}
if peer.should_buffer_gossip_backfill() {
if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() {
peer.pending_outbound_buffer_first_msg_offset = 0;
peer.pending_outbound_buffer.pop_front();
+ const VEC_SIZE: usize = ::core::mem::size_of::<Vec<u8>>();
+ let large_capacity = peer.pending_outbound_buffer.capacity() > 4096 / VEC_SIZE;
+ let lots_of_slack = peer.pending_outbound_buffer.len()
+ < peer.pending_outbound_buffer.capacity() / 2;
+ if large_capacity && lots_of_slack {
+ peer.pending_outbound_buffer.shrink_to_fit();
+ }
} else {
peer.awaiting_write_event = true;
}
}
/// Append a message to a peer's pending outbound/write gossip broadcast buffer
- fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: Vec<u8>) {
+ fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: MessageBuf) {
peer.msgs_sent_since_pong += 1;
+ debug_assert!(peer.gossip_broadcast_buffer.len() <= OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP);
peer.gossip_broadcast_buffer.push_back(encoded_message);
}
self.message_handler.chan_handler.handle_channel_ready(&their_node_id, &msg);
},
+ // Quiescence messages:
+ wire::Message::Stfu(msg) => {
+ self.message_handler.chan_handler.handle_stfu(&their_node_id, &msg);
+ }
+
+ // Splicing messages:
+ wire::Message::Splice(msg) => {
+ self.message_handler.chan_handler.handle_splice(&their_node_id, &msg);
+ }
+ wire::Message::SpliceAck(msg) => {
+ self.message_handler.chan_handler.handle_splice_ack(&their_node_id, &msg);
+ }
+ wire::Message::SpliceLocked(msg) => {
+ self.message_handler.chan_handler.handle_splice_locked(&their_node_id, &msg);
+ }
+
// Interactive transaction construction messages:
wire::Message::TxAddInput(msg) => {
self.message_handler.chan_handler.handle_tx_add_input(&their_node_id, &msg);
if except_node.is_some() && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node {
continue;
}
- self.enqueue_encoded_gossip_broadcast(&mut *peer, encoded_msg.clone());
+ self.enqueue_encoded_gossip_broadcast(&mut *peer, MessageBuf::from_encoded(&encoded_msg));
}
},
wire::Message::NodeAnnouncement(ref msg) => {
if except_node.is_some() && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node {
continue;
}
- self.enqueue_encoded_gossip_broadcast(&mut *peer, encoded_msg.clone());
+ self.enqueue_encoded_gossip_broadcast(&mut *peer, MessageBuf::from_encoded(&encoded_msg));
}
},
wire::Message::ChannelUpdate(ref msg) => {
if except_node.is_some() && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node {
continue;
}
- self.enqueue_encoded_gossip_broadcast(&mut *peer, encoded_msg.clone());
+ self.enqueue_encoded_gossip_broadcast(&mut *peer, MessageBuf::from_encoded(&encoded_msg));
}
},
_ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),
&msg.channel_id);
self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
},
+ MessageSendEvent::SendStfu { ref node_id, ref msg} => {
+ log_debug!(self.logger, "Handling SendStfu event in peer_handler for node {} for channel {}",
+ log_pubkey!(node_id),
+ &msg.channel_id);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ }
+ MessageSendEvent::SendSplice { ref node_id, ref msg} => {
+ log_debug!(self.logger, "Handling SendSplice event in peer_handler for node {} for channel {}",
+ log_pubkey!(node_id),
+ &msg.channel_id);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ }
+ MessageSendEvent::SendSpliceAck { ref node_id, ref msg} => {
+ log_debug!(self.logger, "Handling SendSpliceAck event in peer_handler for node {} for channel {}",
+ log_pubkey!(node_id),
+ &msg.channel_id);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ }
+ MessageSendEvent::SendSpliceLocked { ref node_id, ref msg} => {
+ log_debug!(self.logger, "Handling SendSpliceLocked event in peer_handler for node {} for channel {}",
+ log_pubkey!(node_id),
+ &msg.channel_id);
+ self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+ }
MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => {
log_debug!(self.logger, "Handling SendTxAddInput event in peer_handler for node {} for channel {}",
log_pubkey!(node_id),