X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fpeer_handler.rs;h=3949b97e0d03019a5f79ad4f293d0d450a9d970a;hb=146a291f15b359289bf536f5579e8cc7bd00b512;hp=f4873095be6e5a412202a549abc1a2974152fcff;hpb=649129ddab4b24664f40190463a0b622413ab163;p=rust-lightning diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index f4873095..3949b97e 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -27,7 +27,7 @@ use crate::ln::msgs::{ChannelMessageHandler, LightningError, SocketAddress, Onio #[cfg(not(c_bindings))] use crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager}; use crate::util::ser::{VecWriter, Writeable, Writer}; -use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep}; +use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor, NextNoiseStep, MessageBuf, MSG_BUF_ALLOC_SIZE}; use crate::ln::wire; use crate::ln::wire::{Encode, Type}; #[cfg(not(c_bindings))] @@ -40,7 +40,7 @@ use crate::util::string::PrintableString; use crate::prelude::*; use crate::io; -use alloc::collections::LinkedList; +use alloc::collections::VecDeque; use crate::sync::{Arc, Mutex, MutexGuard, FairRwLock}; use core::sync::atomic::{AtomicBool, AtomicU32, AtomicI32, Ordering}; use core::{cmp, hash, fmt, mem}; @@ -501,13 +501,13 @@ struct Peer { their_features: Option, their_socket_address: Option, - pending_outbound_buffer: LinkedList>, + pending_outbound_buffer: VecDeque>, pending_outbound_buffer_first_msg_offset: usize, /// Queue gossip broadcasts separately from `pending_outbound_buffer` so we can easily /// prioritize channel messages over them. /// /// Note that these messages are *not* encrypted/MAC'd, and are only serialized. - gossip_broadcast_buffer: LinkedList>, + gossip_broadcast_buffer: VecDeque, awaiting_write_event: bool, pending_read_buffer: Vec, @@ -797,7 +797,7 @@ impl From for MessageHandlingError { macro_rules! encode_msg { ($msg: expr) => {{ - let mut buffer = VecWriter(Vec::new()); + let mut buffer = VecWriter(Vec::with_capacity(MSG_BUF_ALLOC_SIZE)); wire::write($msg, &mut buffer).unwrap(); buffer.0 }} @@ -922,7 +922,7 @@ impl InitFeatures { @@ -1009,9 +1009,9 @@ impl>(); + let large_capacity = peer.pending_outbound_buffer.capacity() > 4096 / VEC_SIZE; + let lots_of_slack = peer.pending_outbound_buffer.len() + < peer.pending_outbound_buffer.capacity() / 2; + if large_capacity && lots_of_slack { + peer.pending_outbound_buffer.shrink_to_fit(); + } } else { peer.awaiting_write_event = true; } @@ -1256,8 +1263,9 @@ impl) { + fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: MessageBuf) { peer.msgs_sent_since_pong += 1; + debug_assert!(peer.gossip_broadcast_buffer.len() <= OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP); peer.gossip_broadcast_buffer.push_back(encoded_message); } @@ -1414,17 +1422,18 @@ impl= 2); + debug_assert!(peer.pending_read_buffer.len() >= 2 + 16); + try_potential_handleerror!(peer, + peer.channel_encryptor.decrypt_message(&mut peer.pending_read_buffer[..])); + + let mut reader = io::Cursor::new(&peer.pending_read_buffer[..peer.pending_read_buffer.len() - 16]); + let message_result = wire::read(&mut reader, &*self.message_handler.custom_message_handler); // Reset read buffer if peer.pending_read_buffer.capacity() > 8192 { peer.pending_read_buffer = Vec::new(); } peer.pending_read_buffer.resize(18, 0); peer.pending_read_is_header = true; - let mut reader = io::Cursor::new(&msg_data[..]); - let message_result = wire::read(&mut reader, &*self.message_handler.custom_message_handler); let message = match message_result { Ok(x) => x, Err(e) => { @@ -1819,7 +1828,7 @@ impl { @@ -1846,7 +1855,7 @@ impl { @@ -1868,7 +1877,7 @@ impl debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"), @@ -2667,7 +2676,7 @@ mod tests { for i in 0..peer_count { let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap(); let features = InitFeatures::from_le_bytes(vec![0u8; 33]); - let network = ChainHash::from(&[i as u8; 32][..]); + let network = ChainHash::from(&[i as u8; 32]); cfgs.push( PeerManagerCfg{ chan_handler: test_utils::TestChannelMessageHandler::new(network), @@ -2780,7 +2789,7 @@ mod tests { node_id: peers[1].node_signer.get_node_id(Recipient::Node).unwrap(), msg: msgs::Shutdown { channel_id: ChannelId::new_zero(), - scriptpubkey: bitcoin::Script::new(), + scriptpubkey: bitcoin::ScriptBuf::new(), }, }); cfgs[1].chan_handler.pending_events.lock().unwrap() @@ -2788,7 +2797,7 @@ mod tests { node_id: peers[0].node_signer.get_node_id(Recipient::Node).unwrap(), msg: msgs::Shutdown { channel_id: ChannelId::new_zero(), - scriptpubkey: bitcoin::Script::new(), + scriptpubkey: bitcoin::ScriptBuf::new(), }, }); @@ -2916,7 +2925,7 @@ mod tests { let their_id = peers[1].node_signer.get_node_id(Recipient::Node).unwrap(); - let msg = msgs::Shutdown { channel_id: ChannelId::from_bytes([42; 32]), scriptpubkey: bitcoin::Script::new() }; + let msg = msgs::Shutdown { channel_id: ChannelId::from_bytes([42; 32]), scriptpubkey: bitcoin::ScriptBuf::new() }; a_chan_handler.pending_events.lock().unwrap().push(events::MessageSendEvent::SendShutdown { node_id: their_id, msg: msg.clone() });