X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fpeer_handler.rs;h=3949b97e0d03019a5f79ad4f293d0d450a9d970a;hb=ae15ba84620f40ed5c80870d254d5eda341f1aba;hp=ba3a733d225db072f70cc4d185ce4e4cfe1030e9;hpb=d795e247b76f6b0d70a2529c2a05995a909b1134;p=rust-lightning diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index ba3a733d..3949b97e 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -27,7 +27,7 @@ use crate::ln::msgs::{ChannelMessageHandler, LightningError, SocketAddress, Onio #[cfg(not(c_bindings))] use crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager}; use crate::util::ser::{VecWriter, Writeable, Writer}; -use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep}; +use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor, NextNoiseStep, MessageBuf, MSG_BUF_ALLOC_SIZE}; use crate::ln::wire; use crate::ln::wire::{Encode, Type}; #[cfg(not(c_bindings))] @@ -40,7 +40,7 @@ use crate::util::string::PrintableString; use crate::prelude::*; use crate::io; -use alloc::collections::LinkedList; +use alloc::collections::VecDeque; use crate::sync::{Arc, Mutex, MutexGuard, FairRwLock}; use core::sync::atomic::{AtomicBool, AtomicU32, AtomicI32, Ordering}; use core::{cmp, hash, fmt, mem}; @@ -231,6 +231,18 @@ impl ChannelMessageHandler for ErroringMessageHandler { fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) { ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); } + fn handle_stfu(&self, their_node_id: &PublicKey, msg: &msgs::Stfu) { + ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); + } + fn handle_splice(&self, their_node_id: &PublicKey, msg: &msgs::Splice) { + ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); + } + fn handle_splice_ack(&self, their_node_id: &PublicKey, msg: &msgs::SpliceAck) { + ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); + } + fn handle_splice_locked(&self, their_node_id: &PublicKey, msg: &msgs::SpliceLocked) { + ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); + } fn handle_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) { ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); } @@ -489,13 +501,13 @@ struct Peer { their_features: Option, their_socket_address: Option, - pending_outbound_buffer: LinkedList>, + pending_outbound_buffer: VecDeque>, pending_outbound_buffer_first_msg_offset: usize, /// Queue gossip broadcasts separately from `pending_outbound_buffer` so we can easily /// prioritize channel messages over them. /// /// Note that these messages are *not* encrypted/MAC'd, and are only serialized. - gossip_broadcast_buffer: LinkedList>, + gossip_broadcast_buffer: VecDeque, awaiting_write_event: bool, pending_read_buffer: Vec, @@ -785,7 +797,7 @@ impl From for MessageHandlingError { macro_rules! encode_msg { ($msg: expr) => {{ - let mut buffer = VecWriter(Vec::new()); + let mut buffer = VecWriter(Vec::with_capacity(MSG_BUF_ALLOC_SIZE)); wire::write($msg, &mut buffer).unwrap(); buffer.0 }} @@ -910,7 +922,7 @@ impl InitFeatures { @@ -997,9 +1009,9 @@ impl>(); + let large_capacity = peer.pending_outbound_buffer.capacity() > 4096 / VEC_SIZE; + let lots_of_slack = peer.pending_outbound_buffer.len() + < peer.pending_outbound_buffer.capacity() / 2; + if large_capacity && lots_of_slack { + peer.pending_outbound_buffer.shrink_to_fit(); + } } else { peer.awaiting_write_event = true; } @@ -1244,8 +1263,9 @@ impl) { + fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: MessageBuf) { peer.msgs_sent_since_pong += 1; + debug_assert!(peer.gossip_broadcast_buffer.len() <= OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP); peer.gossip_broadcast_buffer.push_back(encoded_message); } @@ -1402,17 +1422,18 @@ impl= 2); + debug_assert!(peer.pending_read_buffer.len() >= 2 + 16); + try_potential_handleerror!(peer, + peer.channel_encryptor.decrypt_message(&mut peer.pending_read_buffer[..])); + + let mut reader = io::Cursor::new(&peer.pending_read_buffer[..peer.pending_read_buffer.len() - 16]); + let message_result = wire::read(&mut reader, &*self.message_handler.custom_message_handler); // Reset read buffer if peer.pending_read_buffer.capacity() > 8192 { peer.pending_read_buffer = Vec::new(); } peer.pending_read_buffer.resize(18, 0); peer.pending_read_is_header = true; - let mut reader = io::Cursor::new(&msg_data[..]); - let message_result = wire::read(&mut reader, &*self.message_handler.custom_message_handler); let message = match message_result { Ok(x) => x, Err(e) => { @@ -1643,6 +1664,22 @@ impl { + self.message_handler.chan_handler.handle_stfu(&their_node_id, &msg); + } + + // Splicing messages: + wire::Message::Splice(msg) => { + self.message_handler.chan_handler.handle_splice(&their_node_id, &msg); + } + wire::Message::SpliceAck(msg) => { + self.message_handler.chan_handler.handle_splice_ack(&their_node_id, &msg); + } + wire::Message::SpliceLocked(msg) => { + self.message_handler.chan_handler.handle_splice_locked(&their_node_id, &msg); + } + // Interactive transaction construction messages: wire::Message::TxAddInput(msg) => { self.message_handler.chan_handler.handle_tx_add_input(&their_node_id, &msg); @@ -1791,7 +1828,7 @@ impl { @@ -1818,7 +1855,7 @@ impl { @@ -1840,7 +1877,7 @@ impl debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"), @@ -1960,6 +1997,30 @@ impl { + log_debug!(self.logger, "Handling SendStfu event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + } + MessageSendEvent::SendSplice { ref node_id, ref msg} => { + log_debug!(self.logger, "Handling SendSplice event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + } + MessageSendEvent::SendSpliceAck { ref node_id, ref msg} => { + log_debug!(self.logger, "Handling SendSpliceAck event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + } + MessageSendEvent::SendSpliceLocked { ref node_id, ref msg} => { + log_debug!(self.logger, "Handling SendSpliceLocked event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + } MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => { log_debug!(self.logger, "Handling SendTxAddInput event in peer_handler for node {} for channel {}", log_pubkey!(node_id), @@ -2615,7 +2676,7 @@ mod tests { for i in 0..peer_count { let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap(); let features = InitFeatures::from_le_bytes(vec![0u8; 33]); - let network = ChainHash::from(&[i as u8; 32][..]); + let network = ChainHash::from(&[i as u8; 32]); cfgs.push( PeerManagerCfg{ chan_handler: test_utils::TestChannelMessageHandler::new(network), @@ -2728,7 +2789,7 @@ mod tests { node_id: peers[1].node_signer.get_node_id(Recipient::Node).unwrap(), msg: msgs::Shutdown { channel_id: ChannelId::new_zero(), - scriptpubkey: bitcoin::Script::new(), + scriptpubkey: bitcoin::ScriptBuf::new(), }, }); cfgs[1].chan_handler.pending_events.lock().unwrap() @@ -2736,7 +2797,7 @@ mod tests { node_id: peers[0].node_signer.get_node_id(Recipient::Node).unwrap(), msg: msgs::Shutdown { channel_id: ChannelId::new_zero(), - scriptpubkey: bitcoin::Script::new(), + scriptpubkey: bitcoin::ScriptBuf::new(), }, }); @@ -2864,7 +2925,7 @@ mod tests { let their_id = peers[1].node_signer.get_node_id(Recipient::Node).unwrap(); - let msg = msgs::Shutdown { channel_id: ChannelId::from_bytes([42; 32]), scriptpubkey: bitcoin::Script::new() }; + let msg = msgs::Shutdown { channel_id: ChannelId::from_bytes([42; 32]), scriptpubkey: bitcoin::ScriptBuf::new() }; a_chan_handler.pending_events.lock().unwrap().push(events::MessageSendEvent::SendShutdown { node_id: their_id, msg: msg.clone() });