X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fpeer_handler.rs;h=4c2e68d9a43993a021c5a0bc796b548aed1a649e;hb=185fbc176579f062d82b548a7a3446d975c450a3;hp=44d2a87a87c4ea2cc7492bdbb575a44f41f80f68;hpb=10b8f4c44e94ef2f280decec43ee351502e2a218;p=rust-lightning diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 44d2a87a..4c2e68d9 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -24,12 +24,15 @@ use crate::ln::ChannelId; use crate::ln::features::{InitFeatures, NodeFeatures}; use crate::ln::msgs; use crate::ln::msgs::{ChannelMessageHandler, LightningError, SocketAddress, OnionMessageHandler, RoutingMessageHandler}; +#[cfg(not(c_bindings))] use crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager}; use crate::util::ser::{VecWriter, Writeable, Writer}; -use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep}; +use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor, NextNoiseStep, MessageBuf, MSG_BUF_ALLOC_SIZE}; use crate::ln::wire; use crate::ln::wire::{Encode, Type}; -use crate::onion_message::{CustomOnionMessageHandler, OffersMessage, OffersMessageHandler, OnionMessageContents, PendingOnionMessage, SimpleArcOnionMessenger, SimpleRefOnionMessenger}; +#[cfg(not(c_bindings))] +use crate::onion_message::{SimpleArcOnionMessenger, SimpleRefOnionMessenger}; +use crate::onion_message::{CustomOnionMessageHandler, OffersMessage, OffersMessageHandler, OnionMessageContents, PendingOnionMessage}; use crate::routing::gossip::{NetworkGraph, P2PGossipSync, NodeId, NodeAlias}; use crate::util::atomic_counter::AtomicCounter; use crate::util::logger::Logger; @@ -37,7 +40,7 @@ use crate::util::string::PrintableString; use crate::prelude::*; use crate::io; -use alloc::collections::LinkedList; +use alloc::collections::VecDeque; use crate::sync::{Arc, Mutex, MutexGuard, FairRwLock}; use core::sync::atomic::{AtomicBool, AtomicU32, AtomicI32, Ordering}; use core::{cmp, hash, fmt, mem}; @@ -228,6 +231,18 @@ impl ChannelMessageHandler for ErroringMessageHandler { fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) { ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); } + fn handle_stfu(&self, their_node_id: &PublicKey, msg: &msgs::Stfu) { + ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); + } + fn handle_splice(&self, their_node_id: &PublicKey, msg: &msgs::Splice) { + ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); + } + fn handle_splice_ack(&self, their_node_id: &PublicKey, msg: &msgs::SpliceAck) { + ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); + } + fn handle_splice_locked(&self, their_node_id: &PublicKey, msg: &msgs::SpliceLocked) { + ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id); + } fn handle_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) { ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); } @@ -486,13 +501,13 @@ struct Peer { their_features: Option, their_socket_address: Option, - pending_outbound_buffer: LinkedList>, + pending_outbound_buffer: VecDeque>, pending_outbound_buffer_first_msg_offset: usize, /// Queue gossip broadcasts separately from `pending_outbound_buffer` so we can easily /// prioritize channel messages over them. /// /// Note that these messages are *not* encrypted/MAC'd, and are only serialized. - gossip_broadcast_buffer: LinkedList>, + gossip_broadcast_buffer: VecDeque, awaiting_write_event: bool, pending_read_buffer: Vec, @@ -608,7 +623,8 @@ impl Peer { /// SimpleRefPeerManager is the more appropriate type. Defining these type aliases prevents /// issues such as overly long function definitions. /// -/// This is not exported to bindings users as `Arc`s don't make sense in bindings. +/// This is not exported to bindings users as type aliases aren't supported in most languages. +#[cfg(not(c_bindings))] pub type SimpleArcPeerManager = PeerManager< SD, Arc>, @@ -626,7 +642,8 @@ pub type SimpleArcPeerManager = PeerManager< /// But if this is not necessary, using a reference is more efficient. Defining these type aliases /// helps with issues such as long function definitions. /// -/// This is not exported to bindings users as general type aliases don't make sense in bindings. +/// This is not exported to bindings users as type aliases aren't supported in most languages. +#[cfg(not(c_bindings))] pub type SimpleRefPeerManager< 'a, 'b, 'c, 'd, 'e, 'f, 'logger, 'h, 'i, 'j, 'graph, 'k, SD, M, T, F, C, L > = PeerManager< @@ -780,7 +797,7 @@ impl From for MessageHandlingError { macro_rules! encode_msg { ($msg: expr) => {{ - let mut buffer = VecWriter(Vec::new()); + let mut buffer = VecWriter(Vec::with_capacity(MSG_BUF_ALLOC_SIZE)); wire::write($msg, &mut buffer).unwrap(); buffer.0 }} @@ -992,9 +1009,9 @@ impl>(); + let large_capacity = peer.pending_outbound_buffer.capacity() > 4096 / VEC_SIZE; + let lots_of_slack = peer.pending_outbound_buffer.len() + < peer.pending_outbound_buffer.capacity() / 2; + if large_capacity && lots_of_slack { + peer.pending_outbound_buffer.shrink_to_fit(); + } } else { peer.awaiting_write_event = true; } @@ -1239,8 +1263,9 @@ impl) { + fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: MessageBuf) { peer.msgs_sent_since_pong += 1; + debug_assert!(peer.gossip_broadcast_buffer.len() <= OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP); peer.gossip_broadcast_buffer.push_back(encoded_message); } @@ -1397,17 +1422,18 @@ impl= 2); + debug_assert!(peer.pending_read_buffer.len() >= 2 + 16); + try_potential_handleerror!(peer, + peer.channel_encryptor.decrypt_message(&mut peer.pending_read_buffer[..])); + + let mut reader = io::Cursor::new(&peer.pending_read_buffer[..peer.pending_read_buffer.len() - 16]); + let message_result = wire::read(&mut reader, &*self.message_handler.custom_message_handler); // Reset read buffer if peer.pending_read_buffer.capacity() > 8192 { peer.pending_read_buffer = Vec::new(); } peer.pending_read_buffer.resize(18, 0); peer.pending_read_is_header = true; - let mut reader = io::Cursor::new(&msg_data[..]); - let message_result = wire::read(&mut reader, &*self.message_handler.custom_message_handler); let message = match message_result { Ok(x) => x, Err(e) => { @@ -1638,6 +1664,22 @@ impl { + self.message_handler.chan_handler.handle_stfu(&their_node_id, &msg); + } + + // Splicing messages: + wire::Message::Splice(msg) => { + self.message_handler.chan_handler.handle_splice(&their_node_id, &msg); + } + wire::Message::SpliceAck(msg) => { + self.message_handler.chan_handler.handle_splice_ack(&their_node_id, &msg); + } + wire::Message::SpliceLocked(msg) => { + self.message_handler.chan_handler.handle_splice_locked(&their_node_id, &msg); + } + // Interactive transaction construction messages: wire::Message::TxAddInput(msg) => { self.message_handler.chan_handler.handle_tx_add_input(&their_node_id, &msg); @@ -1786,7 +1828,7 @@ impl { @@ -1813,7 +1855,7 @@ impl { @@ -1835,7 +1877,7 @@ impl debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"), @@ -1955,6 +1997,30 @@ impl { + log_debug!(self.logger, "Handling SendStfu event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + } + MessageSendEvent::SendSplice { ref node_id, ref msg} => { + log_debug!(self.logger, "Handling SendSplice event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + } + MessageSendEvent::SendSpliceAck { ref node_id, ref msg} => { + log_debug!(self.logger, "Handling SendSpliceAck event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + } + MessageSendEvent::SendSpliceLocked { ref node_id, ref msg} => { + log_debug!(self.logger, "Handling SendSpliceLocked event in peer_handler for node {} for channel {}", + log_pubkey!(node_id), + &msg.channel_id); + self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg); + } MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => { log_debug!(self.logger, "Handling SendTxAddInput event in peer_handler for node {} for channel {}", log_pubkey!(node_id),