Merge pull request #2226 from alecchendev/2023-04-persist-network-graph-on-rgs
[rust-lightning] / lightning / src / ln / peer_handler.rs
index ae2a52378e4279337afe4e04731eb1e1727b6487..a20b316eb0a93e997a162e9857ef3cd60ad02373 100644 (file)
@@ -272,7 +272,52 @@ impl ChannelMessageHandler for ErroringMessageHandler {
                features.set_zero_conf_optional();
                features
        }
+
+       fn handle_open_channel_v2(&self, their_node_id: &PublicKey, msg: &msgs::OpenChannelV2) {
+               ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id);
+       }
+
+       fn handle_accept_channel_v2(&self, their_node_id: &PublicKey, msg: &msgs::AcceptChannelV2) {
+               ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id);
+       }
+
+       fn handle_tx_add_input(&self, their_node_id: &PublicKey, msg: &msgs::TxAddInput) {
+               ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+       }
+
+       fn handle_tx_add_output(&self, their_node_id: &PublicKey, msg: &msgs::TxAddOutput) {
+               ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+       }
+
+       fn handle_tx_remove_input(&self, their_node_id: &PublicKey, msg: &msgs::TxRemoveInput) {
+               ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+       }
+
+       fn handle_tx_remove_output(&self, their_node_id: &PublicKey, msg: &msgs::TxRemoveOutput) {
+               ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+       }
+
+       fn handle_tx_complete(&self, their_node_id: &PublicKey, msg: &msgs::TxComplete) {
+               ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+       }
+
+       fn handle_tx_signatures(&self, their_node_id: &PublicKey, msg: &msgs::TxSignatures) {
+               ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+       }
+
+       fn handle_tx_init_rbf(&self, their_node_id: &PublicKey, msg: &msgs::TxInitRbf) {
+               ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+       }
+
+       fn handle_tx_ack_rbf(&self, their_node_id: &PublicKey, msg: &msgs::TxAckRbf) {
+               ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+       }
+
+       fn handle_tx_abort(&self, their_node_id: &PublicKey, msg: &msgs::TxAbort) {
+               ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id);
+       }
 }
+
 impl Deref for ErroringMessageHandler {
        type Target = ErroringMessageHandler;
        fn deref(&self) -> &Self { self }
@@ -1401,10 +1446,17 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
 
                // Need an Init as first message
                if let wire::Message::Init(msg) = message {
-                       if msg.features.requires_unknown_bits() {
-                               log_debug!(self.logger, "Peer features required unknown version bits");
+                       let our_features = self.init_features(&their_node_id);
+                       if msg.features.requires_unknown_bits_from(&our_features) {
+                               log_debug!(self.logger, "Peer requires features unknown to us");
                                return Err(PeerHandleError { }.into());
                        }
+
+                       if our_features.requires_unknown_bits_from(&msg.features) {
+                               log_debug!(self.logger, "We require features unknown to our peer");
+                               return Err(PeerHandleError { }.into());
+                       }
+
                        if peer_lock.their_features.is_some() {
                                return Err(PeerHandleError { }.into());
                        }
@@ -1520,9 +1572,15 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                        wire::Message::OpenChannel(msg) => {
                                self.message_handler.chan_handler.handle_open_channel(&their_node_id, &msg);
                        },
+                       wire::Message::OpenChannelV2(msg) => {
+                               self.message_handler.chan_handler.handle_open_channel_v2(&their_node_id, &msg);
+                       },
                        wire::Message::AcceptChannel(msg) => {
                                self.message_handler.chan_handler.handle_accept_channel(&their_node_id, &msg);
                        },
+                       wire::Message::AcceptChannelV2(msg) => {
+                               self.message_handler.chan_handler.handle_accept_channel_v2(&their_node_id, &msg);
+                       },
 
                        wire::Message::FundingCreated(msg) => {
                                self.message_handler.chan_handler.handle_funding_created(&their_node_id, &msg);
@@ -1534,6 +1592,35 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                self.message_handler.chan_handler.handle_channel_ready(&their_node_id, &msg);
                        },
 
+                       // Interactive transaction construction messages:
+                       wire::Message::TxAddInput(msg) => {
+                               self.message_handler.chan_handler.handle_tx_add_input(&their_node_id, &msg);
+                       },
+                       wire::Message::TxAddOutput(msg) => {
+                               self.message_handler.chan_handler.handle_tx_add_output(&their_node_id, &msg);
+                       },
+                       wire::Message::TxRemoveInput(msg) => {
+                               self.message_handler.chan_handler.handle_tx_remove_input(&their_node_id, &msg);
+                       },
+                       wire::Message::TxRemoveOutput(msg) => {
+                               self.message_handler.chan_handler.handle_tx_remove_output(&their_node_id, &msg);
+                       },
+                       wire::Message::TxComplete(msg) => {
+                               self.message_handler.chan_handler.handle_tx_complete(&their_node_id, &msg);
+                       },
+                       wire::Message::TxSignatures(msg) => {
+                               self.message_handler.chan_handler.handle_tx_signatures(&their_node_id, &msg);
+                       },
+                       wire::Message::TxInitRbf(msg) => {
+                               self.message_handler.chan_handler.handle_tx_init_rbf(&their_node_id, &msg);
+                       },
+                       wire::Message::TxAckRbf(msg) => {
+                               self.message_handler.chan_handler.handle_tx_ack_rbf(&their_node_id, &msg);
+                       },
+                       wire::Message::TxAbort(msg) => {
+                               self.message_handler.chan_handler.handle_tx_abort(&their_node_id, &msg);
+                       }
+
                        wire::Message::Shutdown(msg) => {
                                self.message_handler.chan_handler.handle_shutdown(&their_node_id, &msg);
                        },
@@ -1799,12 +1886,24 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                                log_bytes!(msg.temporary_channel_id));
                                                self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                        },
+                                       MessageSendEvent::SendAcceptChannelV2 { ref node_id, ref msg } => {
+                                               log_debug!(self.logger, "Handling SendAcceptChannelV2 event in peer_handler for node {} for channel {}",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.temporary_channel_id));
+                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                       },
                                        MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
                                                log_debug!(self.logger, "Handling SendOpenChannel event in peer_handler for node {} for channel {}",
                                                                log_pubkey!(node_id),
                                                                log_bytes!(msg.temporary_channel_id));
                                                self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                        },
+                                       MessageSendEvent::SendOpenChannelV2 { ref node_id, ref msg } => {
+                                               log_debug!(self.logger, "Handling SendOpenChannelV2 event in peer_handler for node {} for channel {}",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.temporary_channel_id));
+                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                       },
                                        MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
                                                log_debug!(self.logger, "Handling SendFundingCreated event in peer_handler for node {} for channel {} (which becomes {})",
                                                                log_pubkey!(node_id),
@@ -1826,6 +1925,60 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CM
                                                                log_bytes!(msg.channel_id));
                                                self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
                                        },
+                                       MessageSendEvent::SendTxAddInput { ref node_id, ref msg } => {
+                                               log_debug!(self.logger, "Handling SendTxAddInput event in peer_handler for node {} for channel {}",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.channel_id));
+                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                       },
+                                       MessageSendEvent::SendTxAddOutput { ref node_id, ref msg } => {
+                                               log_debug!(self.logger, "Handling SendTxAddOutput event in peer_handler for node {} for channel {}",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.channel_id));
+                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                       },
+                                       MessageSendEvent::SendTxRemoveInput { ref node_id, ref msg } => {
+                                               log_debug!(self.logger, "Handling SendTxRemoveInput event in peer_handler for node {} for channel {}",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.channel_id));
+                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                       },
+                                       MessageSendEvent::SendTxRemoveOutput { ref node_id, ref msg } => {
+                                               log_debug!(self.logger, "Handling SendTxRemoveOutput event in peer_handler for node {} for channel {}",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.channel_id));
+                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                       },
+                                       MessageSendEvent::SendTxComplete { ref node_id, ref msg } => {
+                                               log_debug!(self.logger, "Handling SendTxComplete event in peer_handler for node {} for channel {}",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.channel_id));
+                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                       },
+                                       MessageSendEvent::SendTxSignatures { ref node_id, ref msg } => {
+                                               log_debug!(self.logger, "Handling SendTxSignatures event in peer_handler for node {} for channel {}",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.channel_id));
+                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                       },
+                                       MessageSendEvent::SendTxInitRbf { ref node_id, ref msg } => {
+                                               log_debug!(self.logger, "Handling SendTxInitRbf event in peer_handler for node {} for channel {}",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.channel_id));
+                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                       },
+                                       MessageSendEvent::SendTxAckRbf { ref node_id, ref msg } => {
+                                               log_debug!(self.logger, "Handling SendTxAckRbf event in peer_handler for node {} for channel {}",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.channel_id));
+                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                       },
+                                       MessageSendEvent::SendTxAbort { ref node_id, ref msg } => {
+                                               log_debug!(self.logger, "Handling SendTxAbort event in peer_handler for node {} for channel {}",
+                                                               log_pubkey!(node_id),
+                                                               log_bytes!(msg.channel_id));
+                                               self.enqueue_message(&mut *get_peer_for_forwarding!(node_id), msg);
+                                       },
                                        MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
                                                log_debug!(self.logger, "Handling SendAnnouncementSignatures event in peer_handler for node {} for channel {})",
                                                                log_pubkey!(node_id),
@@ -2276,16 +2429,19 @@ fn is_gossip_msg(type_id: u16) -> bool {
 mod tests {
        use crate::sign::{NodeSigner, Recipient};
        use crate::events;
+       use crate::io;
+       use crate::ln::features::{InitFeatures, NodeFeatures};
        use crate::ln::peer_channel_encryptor::PeerChannelEncryptor;
-       use crate::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses};
+       use crate::ln::peer_handler::{CustomMessageHandler, PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses};
        use crate::ln::{msgs, wire};
-       use crate::ln::msgs::NetAddress;
+       use crate::ln::msgs::{LightningError, NetAddress};
        use crate::util::test_utils;
 
-       use bitcoin::secp256k1::SecretKey;
+       use bitcoin::secp256k1::{PublicKey, SecretKey};
 
        use crate::prelude::*;
        use crate::sync::{Arc, Mutex};
+       use core::convert::Infallible;
        use core::sync::atomic::{AtomicBool, Ordering};
 
        #[derive(Clone)]
@@ -2318,19 +2474,74 @@ mod tests {
        struct PeerManagerCfg {
                chan_handler: test_utils::TestChannelMessageHandler,
                routing_handler: test_utils::TestRoutingMessageHandler,
+               custom_handler: TestCustomMessageHandler,
                logger: test_utils::TestLogger,
                node_signer: test_utils::TestNodeSigner,
        }
 
+       struct TestCustomMessageHandler {
+               features: InitFeatures,
+       }
+
+       impl wire::CustomMessageReader for TestCustomMessageHandler {
+               type CustomMessage = Infallible;
+               fn read<R: io::Read>(&self, _: u16, _: &mut R) -> Result<Option<Self::CustomMessage>, msgs::DecodeError> {
+                       Ok(None)
+               }
+       }
+
+       impl CustomMessageHandler for TestCustomMessageHandler {
+               fn handle_custom_message(&self, _: Infallible, _: &PublicKey) -> Result<(), LightningError> {
+                       unreachable!();
+               }
+
+               fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)> { Vec::new() }
+
+               fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() }
+
+               fn provided_init_features(&self, _: &PublicKey) -> InitFeatures {
+                       self.features.clone()
+               }
+       }
+
        fn create_peermgr_cfgs(peer_count: usize) -> Vec<PeerManagerCfg> {
                let mut cfgs = Vec::new();
                for i in 0..peer_count {
                        let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap();
+                       let features = {
+                               let mut feature_bits = vec![0u8; 33];
+                               feature_bits[32] = 0b00000001;
+                               InitFeatures::from_le_bytes(feature_bits)
+                       };
+                       cfgs.push(
+                               PeerManagerCfg{
+                                       chan_handler: test_utils::TestChannelMessageHandler::new(),
+                                       logger: test_utils::TestLogger::new(),
+                                       routing_handler: test_utils::TestRoutingMessageHandler::new(),
+                                       custom_handler: TestCustomMessageHandler { features },
+                                       node_signer: test_utils::TestNodeSigner::new(node_secret),
+                               }
+                       );
+               }
+
+               cfgs
+       }
+
+       fn create_incompatible_peermgr_cfgs(peer_count: usize) -> Vec<PeerManagerCfg> {
+               let mut cfgs = Vec::new();
+               for i in 0..peer_count {
+                       let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap();
+                       let features = {
+                               let mut feature_bits = vec![0u8; 33 + i + 1];
+                               feature_bits[33 + i] = 0b00000001;
+                               InitFeatures::from_le_bytes(feature_bits)
+                       };
                        cfgs.push(
                                PeerManagerCfg{
                                        chan_handler: test_utils::TestChannelMessageHandler::new(),
                                        logger: test_utils::TestLogger::new(),
                                        routing_handler: test_utils::TestRoutingMessageHandler::new(),
+                                       custom_handler: TestCustomMessageHandler { features },
                                        node_signer: test_utils::TestNodeSigner::new(node_secret),
                                }
                        );
@@ -2339,13 +2550,13 @@ mod tests {
                cfgs
        }
 
-       fn create_network<'a>(peer_count: usize, cfgs: &'a Vec<PeerManagerCfg>) -> Vec<PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, IgnoringMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler, &'a test_utils::TestNodeSigner>> {
+       fn create_network<'a>(peer_count: usize, cfgs: &'a Vec<PeerManagerCfg>) -> Vec<PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, IgnoringMessageHandler, &'a test_utils::TestLogger, &'a TestCustomMessageHandler, &'a test_utils::TestNodeSigner>> {
                let mut peers = Vec::new();
                for i in 0..peer_count {
                        let ephemeral_bytes = [i as u8; 32];
                        let msg_handler = MessageHandler {
                                chan_handler: &cfgs[i].chan_handler, route_handler: &cfgs[i].routing_handler,
-                               onion_message_handler: IgnoringMessageHandler {}, custom_message_handler: IgnoringMessageHandler {}
+                               onion_message_handler: IgnoringMessageHandler {}, custom_message_handler: &cfgs[i].custom_handler
                        };
                        let peer = PeerManager::new(msg_handler, 0, &ephemeral_bytes, &cfgs[i].logger, &cfgs[i].node_signer);
                        peers.push(peer);
@@ -2354,7 +2565,7 @@ mod tests {
                peers
        }
 
-       fn establish_connection<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, IgnoringMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler, &'a test_utils::TestNodeSigner>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, IgnoringMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler, &'a test_utils::TestNodeSigner>) -> (FileDescriptor, FileDescriptor) {
+       fn establish_connection<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, IgnoringMessageHandler, &'a test_utils::TestLogger, &'a TestCustomMessageHandler, &'a test_utils::TestNodeSigner>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, IgnoringMessageHandler, &'a test_utils::TestLogger, &'a TestCustomMessageHandler, &'a test_utils::TestNodeSigner>) -> (FileDescriptor, FileDescriptor) {
                let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap();
                let mut fd_a = FileDescriptor {
                        fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
@@ -2469,6 +2680,42 @@ mod tests {
                thrd_b.join().unwrap();
        }
 
+       #[test]
+       fn test_incompatible_peers() {
+               let cfgs = create_peermgr_cfgs(2);
+               let incompatible_cfgs = create_incompatible_peermgr_cfgs(2);
+
+               let peers = create_network(2, &cfgs);
+               let incompatible_peers = create_network(2, &incompatible_cfgs);
+               let peer_pairs = [(&peers[0], &incompatible_peers[0]), (&incompatible_peers[1], &peers[1])];
+               for (peer_a, peer_b) in peer_pairs.iter() {
+                       let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap();
+                       let mut fd_a = FileDescriptor {
+                               fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
+                               disconnect: Arc::new(AtomicBool::new(false)),
+                       };
+                       let addr_a = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1000};
+                       let mut fd_b = FileDescriptor {
+                               fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())),
+                               disconnect: Arc::new(AtomicBool::new(false)),
+                       };
+                       let addr_b = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1001};
+                       let initial_data = peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap();
+                       peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap();
+                       assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false);
+                       peer_a.process_events();
+
+                       let a_data = fd_a.outbound_data.lock().unwrap().split_off(0);
+                       assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false);
+
+                       peer_b.process_events();
+                       let b_data = fd_b.outbound_data.lock().unwrap().split_off(0);
+
+                       // Should fail because of unknown required features
+                       assert!(peer_a.read_event(&mut fd_a, &b_data).is_err());
+               }
+       }
+
        #[test]
        fn test_disconnect_peer() {
                // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and