Merge pull request #2727 from TheBlueMatt/2023-11-better-bolt11-utils
[rust-lightning] / lightning / src / ln / peer_handler.rs
index 92826244e09ee2b03f674e49c9e229e0a65a20ff..4c2e68d9a43993a021c5a0bc796b548aed1a649e 100644 (file)
@@ -19,17 +19,20 @@ use bitcoin::blockdata::constants::ChainHash;
 use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey};
 
 use crate::sign::{KeysManager, NodeSigner, Recipient};
-use crate::events::{MessageSendEvent, MessageSendEventsProvider, OnionMessageProvider};
+use crate::events::{MessageSendEvent, MessageSendEventsProvider};
 use crate::ln::ChannelId;
 use crate::ln::features::{InitFeatures, NodeFeatures};
 use crate::ln::msgs;
 use crate::ln::msgs::{ChannelMessageHandler, LightningError, SocketAddress, OnionMessageHandler, RoutingMessageHandler};
+#[cfg(not(c_bindings))]
 use crate::ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager};
 use crate::util::ser::{VecWriter, Writeable, Writer};
-use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep};
+use crate::ln::peer_channel_encryptor::{PeerChannelEncryptor, NextNoiseStep, MessageBuf, MSG_BUF_ALLOC_SIZE};
 use crate::ln::wire;
 use crate::ln::wire::{Encode, Type};
-use crate::onion_message::{CustomOnionMessageContents, CustomOnionMessageHandler, OffersMessage, OffersMessageHandler, SimpleArcOnionMessenger, SimpleRefOnionMessenger};
+#[cfg(not(c_bindings))]
+use crate::onion_message::{SimpleArcOnionMessenger, SimpleRefOnionMessenger};
+use crate::onion_message::{CustomOnionMessageHandler, OffersMessage, OffersMessageHandler, OnionMessageContents, PendingOnionMessage};
 use crate::routing::gossip::{NetworkGraph, P2PGossipSync, NodeId, NodeAlias};
 use crate::util::atomic_counter::AtomicCounter;
 use crate::util::logger::Logger;
@@ -37,7 +40,7 @@ use crate::util::string::PrintableString;
 
 use crate::prelude::*;
 use crate::io;
-use alloc::collections::LinkedList;
+use alloc::collections::VecDeque;
 use crate::sync::{Arc, Mutex, MutexGuard, FairRwLock};
 use core::sync::atomic::{AtomicBool, AtomicU32, AtomicI32, Ordering};
 use core::{cmp, hash, fmt, mem};
@@ -107,11 +110,9 @@ impl RoutingMessageHandler for IgnoringMessageHandler {
        }
        fn processing_queue_high(&self) -> bool { false }
 }
-impl OnionMessageProvider for IgnoringMessageHandler {
-       fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option<msgs::OnionMessage> { None }
-}
 impl OnionMessageHandler for IgnoringMessageHandler {
        fn handle_onion_message(&self, _their_node_id: &PublicKey, _msg: &msgs::OnionMessage) {}
+       fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option<msgs::OnionMessage> { None }
        fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init, _inbound: bool) -> Result<(), ()> { Ok(()) }
        fn peer_disconnected(&self, _their_node_id: &PublicKey) {}
        fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() }
@@ -131,9 +132,12 @@ impl CustomOnionMessageHandler for IgnoringMessageHandler {
        fn read_custom_message<R: io::Read>(&self, _msg_type: u64, _buffer: &mut R) -> Result<Option<Infallible>, msgs::DecodeError> where Self: Sized {
                Ok(None)
        }
+       fn release_pending_custom_messages(&self) -> Vec<PendingOnionMessage<Infallible>> {
+               vec![]
+       }
 }
 
-impl CustomOnionMessageContents for Infallible {
+impl OnionMessageContents for Infallible {
        fn tlv_type(&self) -> u64 { unreachable!(); }
 }
 
@@ -227,6 +231,18 @@ impl ChannelMessageHandler for ErroringMessageHandler {
        fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) {
                ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
        }
+       fn handle_stfu(&self, their_node_id: &PublicKey, msg: &msgs::Stfu) {
+               ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id);
+       }
+       fn handle_splice(&self, their_node_id: &PublicKey, msg: &msgs::Splice) {
+               ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id);
+       }
+       fn handle_splice_ack(&self, their_node_id: &PublicKey, msg: &msgs::SpliceAck) {
+               ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id);
+       }
+       fn handle_splice_locked(&self, their_node_id: &PublicKey, msg: &msgs::SpliceLocked) {
+               ErroringMessageHandler::push_error(&self, their_node_id, msg.channel_id);
+       }
        fn handle_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) {
                ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
        }
@@ -279,7 +295,7 @@ impl ChannelMessageHandler for ErroringMessageHandler {
                features
        }
 
-       fn get_genesis_hashes(&self) -> Option<Vec<ChainHash>> {
+       fn get_chain_hashes(&self) -> Option<Vec<ChainHash>> {
                // We don't enforce any chains upon peer connection for `ErroringMessageHandler` and leave it up
                // to users of `ErroringMessageHandler` to make decisions on network compatiblility.
                // There's not really any way to pull in specific networks here, and hardcoding can cause breakages.
@@ -485,13 +501,13 @@ struct Peer {
        their_features: Option<InitFeatures>,
        their_socket_address: Option<SocketAddress>,
 
-       pending_outbound_buffer: LinkedList<Vec<u8>>,
+       pending_outbound_buffer: VecDeque<Vec<u8>>,
        pending_outbound_buffer_first_msg_offset: usize,
        /// Queue gossip broadcasts separately from `pending_outbound_buffer` so we can easily
        /// prioritize channel messages over them.
        ///
        /// Note that these messages are *not* encrypted/MAC'd, and are only serialized.
-       gossip_broadcast_buffer: LinkedList<Vec<u8>>,
+       gossip_broadcast_buffer: VecDeque<MessageBuf>,
        awaiting_write_event: bool,
 
        pending_read_buffer: Vec<u8>,
@@ -607,12 +623,13 @@ impl Peer {
 /// SimpleRefPeerManager is the more appropriate type. Defining these type aliases prevents
 /// issues such as overly long function definitions.
 ///
-/// This is not exported to bindings users as `Arc`s don't make sense in bindings.
+/// This is not exported to bindings users as type aliases aren't supported in most languages.
+#[cfg(not(c_bindings))]
 pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<
        SD,
        Arc<SimpleArcChannelManager<M, T, F, L>>,
        Arc<P2PGossipSync<Arc<NetworkGraph<Arc<L>>>, C, Arc<L>>>,
-       Arc<SimpleArcOnionMessenger<L>>,
+       Arc<SimpleArcOnionMessenger<M, T, F, L>>,
        Arc<L>,
        IgnoringMessageHandler,
        Arc<KeysManager>
@@ -625,14 +642,15 @@ pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<
 /// But if this is not necessary, using a reference is more efficient. Defining these type aliases
 /// helps with issues such as long function definitions.
 ///
-/// This is not exported to bindings users as general type aliases don't make sense in bindings.
+/// This is not exported to bindings users as type aliases aren't supported in most languages.
+#[cfg(not(c_bindings))]
 pub type SimpleRefPeerManager<
-       'a, 'b, 'c, 'd, 'e, 'f, 'logger, 'h, 'i, 'j, 'graph, SD, M, T, F, C, L
+       'a, 'b, 'c, 'd, 'e, 'f, 'logger, 'h, 'i, 'j, 'graph, 'k, SD, M, T, F, C, L
 > = PeerManager<
        SD,
        &'j SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'graph, 'logger, 'i, M, T, F, L>,
        &'f P2PGossipSync<&'graph NetworkGraph<&'logger L>, C, &'logger L>,
-       &'h SimpleRefOnionMessenger<'logger, 'i, 'j, L>,
+       &'h SimpleRefOnionMessenger<'a, 'b, 'c, 'd, 'e, 'graph, 'logger, 'i, 'j, 'k, M, T, F, L>,
        &'logger L,
        IgnoringMessageHandler,
        &'c KeysManager
@@ -779,7 +797,7 @@ impl From<LightningError> for MessageHandlingError {
 
 macro_rules! encode_msg {
        ($msg: expr) => {{
-               let mut buffer = VecWriter(Vec::new());
+               let mut buffer = VecWriter(Vec::with_capacity(MSG_BUF_ALLOC_SIZE));
                wire::write($msg, &mut buffer).unwrap();
                buffer.0
        }}
@@ -991,9 +1009,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                        their_features: None,
                                        their_socket_address: remote_network_address,
 
-                                       pending_outbound_buffer: LinkedList::new(),
+                                       pending_outbound_buffer: VecDeque::new(),
                                        pending_outbound_buffer_first_msg_offset: 0,
-                                       gossip_broadcast_buffer: LinkedList::new(),
+                                       gossip_broadcast_buffer: VecDeque::new(),
                                        awaiting_write_event: false,
 
                                        pending_read_buffer,
@@ -1047,9 +1065,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                        their_features: None,
                                        their_socket_address: remote_network_address,
 
-                                       pending_outbound_buffer: LinkedList::new(),
+                                       pending_outbound_buffer: VecDeque::new(),
                                        pending_outbound_buffer_first_msg_offset: 0,
-                                       gossip_broadcast_buffer: LinkedList::new(),
+                                       gossip_broadcast_buffer: VecDeque::new(),
                                        awaiting_write_event: false,
 
                                        pending_read_buffer,
@@ -1096,7 +1114,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                        }
                        if peer.should_buffer_gossip_broadcast() {
                                if let Some(msg) = peer.gossip_broadcast_buffer.pop_front() {
-                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_buffer(&msg[..]));
+                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_buffer(msg));
                                }
                        }
                        if peer.should_buffer_gossip_backfill() {
@@ -1162,6 +1180,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                        if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() {
                                peer.pending_outbound_buffer_first_msg_offset = 0;
                                peer.pending_outbound_buffer.pop_front();
+                               const VEC_SIZE: usize = ::core::mem::size_of::<Vec<u8>>();
+                               let large_capacity = peer.pending_outbound_buffer.capacity() > 4096 / VEC_SIZE;
+                               let lots_of_slack = peer.pending_outbound_buffer.len()
+                                       < peer.pending_outbound_buffer.capacity() / 2;
+                               if large_capacity && lots_of_slack {
+                                       peer.pending_outbound_buffer.shrink_to_fit();
+                               }
                        } else {
                                peer.awaiting_write_event = true;
                        }
@@ -1238,8 +1263,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
        }
 
        /// Append a message to a peer's pending outbound/write gossip broadcast buffer
-       fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: Vec<u8>) {
+       fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: MessageBuf) {
                peer.msgs_sent_since_pong += 1;
+               debug_assert!(peer.gossip_broadcast_buffer.len() <= OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP);
                peer.gossip_broadcast_buffer.push_back(encoded_message);
        }
 
@@ -1367,7 +1393,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                                peer.set_their_node_id(their_node_id);
                                                                insert_node_id!();
                                                                let features = self.init_features(&their_node_id);
-                                                               let networks = self.message_handler.chan_handler.get_genesis_hashes();
+                                                               let networks = self.message_handler.chan_handler.get_chain_hashes();
                                                                let resp = msgs::Init { features, networks, remote_network_address: filter_addresses(peer.their_socket_address.clone()) };
                                                                self.enqueue_message(peer, &resp);
                                                                peer.awaiting_pong_timer_tick_intervals = 0;
@@ -1380,7 +1406,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                                peer.set_their_node_id(their_node_id);
                                                                insert_node_id!();
                                                                let features = self.init_features(&their_node_id);
-                                                               let networks = self.message_handler.chan_handler.get_genesis_hashes();
+                                                               let networks = self.message_handler.chan_handler.get_chain_hashes();
                                                                let resp = msgs::Init { features, networks, remote_network_address: filter_addresses(peer.their_socket_address.clone()) };
                                                                self.enqueue_message(peer, &resp);
                                                                peer.awaiting_pong_timer_tick_intervals = 0;
@@ -1396,17 +1422,18 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                                        }
                                                                        peer.pending_read_is_header = false;
                                                                } else {
-                                                                       let msg_data = try_potential_handleerror!(peer,
-                                                                               peer.channel_encryptor.decrypt_message(&peer.pending_read_buffer[..]));
-                                                                       assert!(msg_data.len() >= 2);
+                                                                       debug_assert!(peer.pending_read_buffer.len() >= 2 + 16);
+                                                                       try_potential_handleerror!(peer,
+                                                                               peer.channel_encryptor.decrypt_message(&mut peer.pending_read_buffer[..]));
+
+                                                                       let mut reader = io::Cursor::new(&peer.pending_read_buffer[..peer.pending_read_buffer.len() - 16]);
+                                                                       let message_result = wire::read(&mut reader, &*self.message_handler.custom_message_handler);
 
                                                                        // Reset read buffer
                                                                        if peer.pending_read_buffer.capacity() > 8192 { peer.pending_read_buffer = Vec::new(); }
                                                                        peer.pending_read_buffer.resize(18, 0);
                                                                        peer.pending_read_is_header = true;
 
-                                                                       let mut reader = io::Cursor::new(&msg_data[..]);
-                                                                       let message_result = wire::read(&mut reader, &*self.message_handler.custom_message_handler);
                                                                        let message = match message_result {
                                                                                Ok(x) => x,
                                                                                Err(e) => {
@@ -1498,7 +1525,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                if let wire::Message::Init(msg) = message {
                        // Check if we have any compatible chains if the `networks` field is specified.
                        if let Some(networks) = &msg.networks {
-                               if let Some(our_chains) = self.message_handler.chan_handler.get_genesis_hashes() {
+                               if let Some(our_chains) = self.message_handler.chan_handler.get_chain_hashes() {
                                        let mut have_compatible_chains = false;
                                        'our_chains: for our_chain in our_chains.iter() {
                                                for their_chain in networks {
@@ -1637,6 +1664,22 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                self.message_handler.chan_handler.handle_channel_ready(&their_node_id, &msg);
                        },
 
+                       // Quiescence messages:
+                       wire::Message::Stfu(msg) => {
+                               self.message_handler.chan_handler.handle_stfu(&their_node_id, &msg);
+                       }
+
+                       // Splicing messages:
+                       wire::Message::Splice(msg) => {
+                               self.message_handler.chan_handler.handle_splice(&their_node_id, &msg);
+                       }
+                       wire::Message::SpliceAck(msg) => {
+                               self.message_handler.chan_handler.handle_splice_ack(&their_node_id, &msg);
+                       }
+                       wire::Message::SpliceLocked(msg) => {
+                               self.message_handler.chan_handler.handle_splice_locked(&their_node_id, &msg);
+                       }
+
                        // Interactive transaction construction messages:
                        wire::Message::TxAddInput(msg) => {
                                self.message_handler.chan_handler.handle_tx_add_input(&their_node_id, &msg);
@@ -1785,7 +1828,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                        if except_node.is_some() && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node {
                                                continue;
                                        }
-                                       self.enqueue_encoded_gossip_broadcast(&mut *peer, encoded_msg.clone());
+                                       self.enqueue_encoded_gossip_broadcast(&mut *peer, MessageBuf::from_encoded(&encoded_msg));
                                }
                        },
                        wire::Message::NodeAnnouncement(ref msg) => {
@@ -1812,7 +1855,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                        if except_node.is_some() && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node {
                                                continue;
                                        }
-                                       self.enqueue_encoded_gossip_broadcast(&mut *peer, encoded_msg.clone());
+                                       self.enqueue_encoded_gossip_broadcast(&mut *peer, MessageBuf::from_encoded(&encoded_msg));
                                }
                        },
                        wire::Message::ChannelUpdate(ref msg) => {
@@ -1834,7 +1877,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                        if except_node.is_some() && peer.their_node_id.as_ref().map(|(pk, _)| pk) == except_node {
                                                continue;
                                        }
-                                       self.enqueue_encoded_gossip_broadcast(&mut *peer, encoded_msg.clone());
+                                       self.enqueue_encoded_gossip_broadcast(&mut *peer, MessageBuf::from_encoded(&encoded_msg));
                                }
                        },
                        _ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),
@@ -1870,15 +1913,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                        let flush_read_disabled = self.gossip_processing_backlog_lifted.swap(false, Ordering::Relaxed);
 
                        let mut peers_to_disconnect = HashMap::new();
-                       let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
-                       events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
 
                        {
-                               // TODO: There are some DoS attacks here where you can flood someone's outbound send
-                               // buffer by doing things like announcing channels on another node. We should be willing to
-                               // drop optional-ish messages when send buffers get full!
-
                                let peers_lock = self.peers.read().unwrap();
+
+                               let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
+                               events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
+
                                let peers = &*peers_lock;
                                macro_rules! get_peer_for_forwarding {
                                        ($node_id: expr) => {
@@ -1956,6 +1997,30 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                                        &msg.channel_id);
                                                        self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                                },
+                                               MessageSendEvent::SendStfu { ref node_id, ref msg} => {
+                                                       log_debug!(self.logger, "Handling SendStfu event in peer_handler for node {} for channel {}",
+                                                                       log_pubkey!(node_id),
+                                                                       &msg.channel_id);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                               }
+                                               MessageSendEvent::SendSplice { ref node_id, ref msg} => {
+                                                       log_debug!(self.logger, "Handling SendSplice event in peer_handler for node {} for channel {}",
+                                                                       log_pubkey!(node_id),
+                                                                       &msg.channel_id);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                               }
+                                               MessageSendEvent::SendSpliceAck { ref node_id, ref msg} => {
+                                                       log_debug!(self.logger, "Handling SendSpliceAck event in peer_handler for node {} for channel {}",
+                                                                       log_pubkey!(node_id),
+                                                                       &msg.channel_id);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                               }
+                                               MessageSendEvent::SendSpliceLocked { ref node_id, ref msg} => {
+                                                       log_debug!(self.logger, "Handling SendSpliceLocked event in peer_handler for node {} for channel {}",
+                                                                       log_pubkey!(node_id),
+                                                                       &msg.channel_id);
+                                                       self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                               }
                                                MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => {
                                                        log_debug!(self.logger, "Handling SendTxAddInput event in peer_handler for node {} for channel {}",
                                                                        log_pubkey!(node_id),