X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fpeer_handler.rs;h=e9eaf33e8840b021afe7fbcfc60afec3e0fa5b3d;hb=f361aa62a42551be0de722ba4c886c1ebdcb13cd;hp=1bbb30b6b564e467da1ad6a8caab6a64bcff9792;hpb=00a70c25f9111f8f733f2ca4a0a61ba67d2d56a8;p=rust-lightning diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 1bbb30b6..e9eaf33e 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -46,16 +46,23 @@ use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::hashes::sha256::HashEngine as Sha256Engine; use bitcoin::hashes::{HashEngine, Hash}; -/// Handler for BOLT1-compliant messages. +/// A handler provided to [`PeerManager`] for reading and handling custom messages. +/// +/// [BOLT 1] specifies a custom message type range for use with experimental or application-specific +/// messages. `CustomMessageHandler` allows for user-defined handling of such types. See the +/// [`lightning_custom_message`] crate for tools useful in composing more than one custom handler. +/// +/// [BOLT 1]: https://github.com/lightning/bolts/blob/master/01-messaging.md +/// [`lightning_custom_message`]: https://docs.rs/lightning_custom_message/latest/lightning_custom_message pub trait CustomMessageHandler: wire::CustomMessageReader { - /// Called with the message type that was received and the buffer to be read. - /// Can return a `MessageHandlingError` if the message could not be handled. + /// Handles the given message sent from `sender_node_id`, possibly producing messages for + /// [`CustomMessageHandler::get_and_clear_pending_msg`] to return and thus for [`PeerManager`] + /// to send. fn handle_custom_message(&self, msg: Self::CustomMessage, sender_node_id: &PublicKey) -> Result<(), LightningError>; - /// Gets the list of pending messages which were generated by the custom message - /// handler, clearing the list in the process. The first tuple element must - /// correspond to the intended recipients node ids. If no connection to one of the - /// specified node does not exist, the message is simply not sent to it. + /// Returns the list of pending messages that were generated by the handler, clearing the list + /// in the process. Each message is paired with the node id of the intended recipient. If no + /// connection to the node exists, then the message is simply not sent. fn get_and_clear_pending_msg(&self) -> Vec<(PublicKey, Self::CustomMessage)>; } @@ -72,7 +79,7 @@ impl RoutingMessageHandler for IgnoringMessageHandler { fn get_next_channel_announcement(&self, _starting_point: u64) -> Option<(msgs::ChannelAnnouncement, Option, Option)> { None } fn get_next_node_announcement(&self, _starting_point: Option<&NodeId>) -> Option { None } - fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init) -> Result<(), ()> { Ok(()) } + fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init, _inbound: bool) -> Result<(), ()> { Ok(()) } fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyChannelRange) -> Result<(), LightningError> { Ok(()) } fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) } fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::QueryChannelRange) -> Result<(), LightningError> { Ok(()) } @@ -88,8 +95,8 @@ impl OnionMessageProvider for IgnoringMessageHandler { } impl OnionMessageHandler for IgnoringMessageHandler { fn handle_onion_message(&self, _their_node_id: &PublicKey, _msg: &msgs::OnionMessage) {} - fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init) -> Result<(), ()> { Ok(()) } - fn peer_disconnected(&self, _their_node_id: &PublicKey, _no_connection_possible: bool) {} + fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init, _inbound: bool) -> Result<(), ()> { Ok(()) } + fn peer_disconnected(&self, _their_node_id: &PublicKey) {} fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() } fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::empty() @@ -223,8 +230,8 @@ impl ChannelMessageHandler for ErroringMessageHandler { } // msgs::ChannelUpdate does not contain the channel_id field, so we just drop them. fn handle_channel_update(&self, _their_node_id: &PublicKey, _msg: &msgs::ChannelUpdate) {} - fn peer_disconnected(&self, _their_node_id: &PublicKey, _no_connection_possible: bool) {} - fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init) -> Result<(), ()> { Ok(()) } + fn peer_disconnected(&self, _their_node_id: &PublicKey) {} + fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init, _inbound: bool) -> Result<(), ()> { Ok(()) } fn handle_error(&self, _their_node_id: &PublicKey, _msg: &msgs::ErrorMessage) {} fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() } fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { @@ -315,16 +322,7 @@ pub trait SocketDescriptor : cmp::Eq + hash::Hash + Clone { /// generate no further read_event/write_buffer_space_avail/socket_disconnected calls for the /// descriptor. #[derive(Clone)] -pub struct PeerHandleError { - /// Used to indicate that we probably can't make any future connections to this peer (e.g. - /// because we required features that our peer was missing, or vice versa). - /// - /// While LDK's [`ChannelManager`] will not do it automatically, you likely wish to force-close - /// any channels with this peer or check for new versions of LDK. - /// - /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager - pub no_connection_possible: bool, -} +pub struct PeerHandleError { } impl fmt::Debug for PeerHandleError { fn fmt(&self, formatter: &mut fmt::Formatter) -> Result<(), fmt::Error> { formatter.write_str("Peer Sent Invalid Data") @@ -390,7 +388,15 @@ const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32; struct Peer { channel_encryptor: PeerChannelEncryptor, - their_node_id: Option, + /// We cache a `NodeId` here to avoid serializing peers' keys every time we forward gossip + /// messages in `PeerManager`. Use `Peer::set_their_node_id` to modify this field. + their_node_id: Option<(PublicKey, NodeId)>, + /// The features provided in the peer's [`msgs::Init`] message. + /// + /// This is set only after we've processed the [`msgs::Init`] message and called relevant + /// `peer_connected` handler methods. Thus, this field is set *iff* we've finished our + /// handshake and can talk to this peer normally (though use [`Peer::handshake_complete`] to + /// check this. their_features: Option, their_net_address: Option, @@ -413,9 +419,24 @@ struct Peer { awaiting_pong_timer_tick_intervals: i8, received_message_since_timer_tick: bool, sent_gossip_timestamp_filter: bool, + + /// Indicates we've received a `channel_announcement` since the last time we had + /// [`PeerManager::gossip_processing_backlogged`] set (or, really, that we've received a + /// `channel_announcement` at all - we set this unconditionally but unset it every time we + /// check if we're gossip-processing-backlogged). + received_channel_announce_since_backlogged: bool, + + inbound_connection: bool, } impl Peer { + /// True after we've processed the [`msgs::Init`] message and called relevant `peer_connected` + /// handler methods. Thus, this implies we've finished our handshake and can talk to this peer + /// normally. + fn handshake_complete(&self) -> bool { + self.their_features.is_some() + } + /// Returns true if the channel announcements/updates for the given channel should be /// forwarded to this peer. /// If we are sending our routing table to this peer and we have not yet sent channel @@ -423,6 +444,7 @@ impl Peer { /// point and we shouldn't send it yet to avoid sending duplicate updates. If we've already /// sent the old versions, we should send the update, and so return true here. fn should_forward_channel_announcement(&self, channel_id: u64) -> bool { + if !self.handshake_complete() { return false; } if self.their_features.as_ref().unwrap().supports_gossip_queries() && !self.sent_gossip_timestamp_filter { return false; @@ -436,6 +458,7 @@ impl Peer { /// Similar to the above, but for node announcements indexed by node_id. fn should_forward_node_announcement(&self, node_id: NodeId) -> bool { + if !self.handshake_complete() { return false; } if self.their_features.as_ref().unwrap().supports_gossip_queries() && !self.sent_gossip_timestamp_filter { return false; @@ -449,8 +472,12 @@ impl Peer { /// Returns whether we should be reading bytes from this peer, based on whether its outbound /// buffer still has space and we don't need to pause reads to get some writes out. - fn should_read(&self) -> bool { - self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE + fn should_read(&mut self, gossip_processing_backlogged: bool) -> bool { + if !gossip_processing_backlogged { + self.received_channel_announce_since_backlogged = false; + } + self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && + (!gossip_processing_backlogged || !self.received_channel_announce_since_backlogged) } /// Determines if we should push additional gossip background sync (aka "backfill") onto a peer's @@ -458,19 +485,20 @@ impl Peer { fn should_buffer_gossip_backfill(&self) -> bool { self.pending_outbound_buffer.is_empty() && self.gossip_broadcast_buffer.is_empty() && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK + && self.handshake_complete() } /// Determines if we should push an onion message onto a peer's outbound buffer. This is checked /// every time the peer's buffer may have been drained. fn should_buffer_onion_message(&self) -> bool { - self.pending_outbound_buffer.is_empty() + self.pending_outbound_buffer.is_empty() && self.handshake_complete() && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK } /// Determines if we should push additional gossip broadcast messages onto a peer's outbound /// buffer. This is checked every time the peer's buffer may have been drained. fn should_buffer_gossip_broadcast(&self) -> bool { - self.pending_outbound_buffer.is_empty() + self.pending_outbound_buffer.is_empty() && self.handshake_complete() && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK } @@ -482,6 +510,10 @@ impl Peer { total_outbound_buffered > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP || self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO } + + fn set_their_node_id(&mut self, node_id: PublicKey) { + self.their_node_id = Some((node_id, NodeId::from_pubkey(&node_id))); + } } /// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g. @@ -655,10 +687,10 @@ impl PeerManager(&'a Option); +struct OptionalFromDebugger<'a>(&'a Option<(PublicKey, NodeId)>); impl core::fmt::Display for OptionalFromDebugger<'_> { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> Result<(), core::fmt::Error> { - if let Some(node_id) = self.0 { write!(f, " from {}", log_pubkey!(node_id)) } else { Ok(()) } + if let Some((node_id, _)) = self.0 { write!(f, " from {}", log_pubkey!(node_id)) } else { Ok(()) } } } @@ -734,19 +766,24 @@ impl Vec { + /// The returned `Option`s will only be `Some` if an address had been previously given via + /// [`Self::new_outbound_connection`] or [`Self::new_inbound_connection`]. + pub fn get_peer_node_ids(&self) -> Vec<(PublicKey, Option)> { let peers = self.peers.read().unwrap(); peers.values().filter_map(|peer_mutex| { let p = peer_mutex.lock().unwrap(); - if !p.channel_encryptor.is_ready_for_encryption() || p.their_features.is_none() { + if !p.handshake_complete() { return None; } - p.their_node_id + Some((p.their_node_id.unwrap().0, p.their_net_address.clone())) }).collect() } @@ -757,7 +794,7 @@ impl bool { - !self.gossip_processing_backlogged.load(Ordering::Relaxed) && peer.should_read() + fn peer_should_read(&self, peer: &mut Peer) -> bool { + peer.should_read(self.gossip_processing_backlogged.load(Ordering::Relaxed)) } fn update_gossip_backlogged(&self) { @@ -868,7 +911,7 @@ impl { if force_one_write && !have_written { - let should_read = self.peer_should_read(&peer); if should_read { let data_sent = descriptor.send_data(&[], should_read); debug_assert_eq!(data_sent, 0, "Can't write more than no data"); @@ -937,7 +980,7 @@ impl { let mut peer = peer_mutex.lock().unwrap(); @@ -991,6 +1034,9 @@ impl Result { @@ -998,7 +1044,7 @@ impl Ok(res), Err(e) => { log_trace!(self.logger, "Peer sent invalid data or we decided to disconnect due to a protocol error"); - self.disconnect_event_internal(peer_descriptor, e.no_connection_possible); + self.disconnect_event_internal(peer_descriptor); Err(e) } } @@ -1007,9 +1053,9 @@ impl(&self, peer: &mut Peer, message: &M) { if is_gossip_msg(message.type_id()) { - log_gossip!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap())); + log_gossip!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0)); } else { - log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap())) + log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap().0)) } peer.msgs_sent_since_pong += 1; peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(message)); @@ -1031,7 +1077,7 @@ impl { let mut read_pos = 0; @@ -1045,7 +1091,7 @@ impl { //TODO: Try to push msg log_debug!(self.logger, "Error handling message{}; disconnecting peer with: {}", OptionalFromDebugger(&peer_node_id), e.err); - return Err(PeerHandleError{ no_connection_possible: false }); + return Err(PeerHandleError { }); }, msgs::ErrorAction::IgnoreAndLog(level) => { log_given_level!(self.logger, level, "Error handling message{}; ignoring: {}", OptionalFromDebugger(&peer_node_id), e.err); @@ -1094,14 +1140,14 @@ impl { - match self.node_id_to_descriptor.lock().unwrap().entry(peer.their_node_id.unwrap()) { + match self.node_id_to_descriptor.lock().unwrap().entry(peer.their_node_id.unwrap().0) { hash_map::Entry::Occupied(_) => { - log_trace!(self.logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap())); + log_trace!(self.logger, "Got second connection with {}, closing", log_pubkey!(peer.their_node_id.unwrap().0)); peer.their_node_id = None; // Unset so that we don't generate a peer_disconnected event - return Err(PeerHandleError{ no_connection_possible: false }) + return Err(PeerHandleError { }) }, hash_map::Entry::Vacant(entry) => { - log_debug!(self.logger, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap())); + log_debug!(self.logger, "Finished noise handshake for connection with {}", log_pubkey!(peer.their_node_id.unwrap().0)); entry.insert(peer_descriptor.clone()) }, }; @@ -1125,7 +1171,7 @@ impl 8192 { peer.pending_read_buffer = Vec::new(); } peer.pending_read_buffer.resize(msg_len as usize + 16, 0); if msg_len < 2 { // Need at least the message type tag - return Err(PeerHandleError{ no_connection_possible: false }); + return Err(PeerHandleError { }); } peer.pending_read_is_header = false; } else { @@ -1198,19 +1244,19 @@ impl { log_gossip!(self.logger, "Received a message with an unknown required feature flag or TLV, you may want to update!"); self.enqueue_message(peer, &msgs::WarningMessage { channel_id: [0; 32], data: format!("Received an unknown required feature/TLV in message type {:?}", ty) }); - return Err(PeerHandleError { no_connection_possible: false }); + return Err(PeerHandleError { }); } - (msgs::DecodeError::UnknownVersion, _) => return Err(PeerHandleError { no_connection_possible: false }), + (msgs::DecodeError::UnknownVersion, _) => return Err(PeerHandleError { }), (msgs::DecodeError::InvalidValue, _) => { log_debug!(self.logger, "Got an invalid value while deserializing message"); - return Err(PeerHandleError { no_connection_possible: false }); + return Err(PeerHandleError { }); } (msgs::DecodeError::ShortRead, _) => { log_debug!(self.logger, "Deserialization failed due to shortness of message"); - return Err(PeerHandleError { no_connection_possible: false }); + return Err(PeerHandleError { }); } - (msgs::DecodeError::BadLengthDescriptor, _) => return Err(PeerHandleError { no_connection_possible: false }), - (msgs::DecodeError::Io(_), _) => return Err(PeerHandleError { no_connection_possible: false }), + (msgs::DecodeError::BadLengthDescriptor, _) => return Err(PeerHandleError { }), + (msgs::DecodeError::Io(_), _) => return Err(PeerHandleError { }), } } }; @@ -1220,7 +1266,7 @@ impl, message: wire::Message<<::Target as wire::CustomMessageReader>::CustomMessage> ) -> Result::Target as wire::CustomMessageReader>::CustomMessage>>, MessageHandlingError> { - let their_node_id = peer_lock.their_node_id.clone().expect("We know the peer's public key by the time we receive messages"); + let their_node_id = peer_lock.their_node_id.clone().expect("We know the peer's public key by the time we receive messages").0; peer_lock.received_message_since_timer_tick = true; // Need an Init as first message if let wire::Message::Init(msg) = message { if msg.features.requires_unknown_bits() { log_debug!(self.logger, "Peer features required unknown version bits"); - return Err(PeerHandleError{ no_connection_possible: true }.into()); + return Err(PeerHandleError { }.into()); } if peer_lock.their_features.is_some() { - return Err(PeerHandleError{ no_connection_possible: false }.into()); + return Err(PeerHandleError { }.into()); } log_info!(self.logger, "Received peer Init message from {}: {}", log_pubkey!(their_node_id), msg.features); @@ -1275,24 +1321,24 @@ impl { @@ -1470,8 +1520,7 @@ impl { log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", type_id); - // Fail the channel if message is an even, unknown type as per BOLT #1. - return Err(PeerHandleError{ no_connection_possible: true }.into()); + return Err(PeerHandleError { }.into()); }, wire::Message::Unknown(type_id) => { log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", type_id); @@ -1491,21 +1540,22 @@ impl match peers.get(&descriptor) { Some(peer_mutex) => { let peer_lock = peer_mutex.lock().unwrap(); - if peer_lock.their_features.is_none() { + if !peer_lock.handshake_complete() { continue; } peer_lock @@ -1831,7 +1885,9 @@ impl { let peer = peer_lock.lock().unwrap(); - if let Some(node_id) = peer.their_node_id { - log_trace!(self.logger, - "Handling disconnection of peer {}, with {}future connection to the peer possible.", - log_pubkey!(node_id), if no_connection_possible { "no " } else { "" }); + if !peer.handshake_complete() { return; } + debug_assert!(peer.their_node_id.is_some()); + if let Some((node_id, _)) = peer.their_node_id { + log_trace!(self.logger, "Handling disconnection of peer {}", log_pubkey!(node_id)); self.node_id_to_descriptor.lock().unwrap().remove(&node_id); - self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible); - self.message_handler.onion_message_handler.peer_disconnected(&node_id, no_connection_possible); + self.message_handler.chan_handler.peer_disconnected(&node_id); + self.message_handler.onion_message_handler.peer_disconnected(&node_id); } } }; @@ -1896,21 +1965,17 @@ impl bool { #[cfg(test)] mod tests { use crate::chain::keysinterface::{NodeSigner, Recipient}; + use crate::ln::peer_channel_encryptor::PeerChannelEncryptor; use crate::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler, filter_addresses}; use crate::ln::{msgs, wire}; use crate::ln::msgs::NetAddress; @@ -2193,11 +2253,14 @@ mod tests { } fn establish_connection<'a>(peer_a: &PeerManager, peer_b: &PeerManager) -> (FileDescriptor, FileDescriptor) { - let a_id = peer_a.node_signer.get_node_id(Recipient::Node).unwrap(); + let id_a = peer_a.node_signer.get_node_id(Recipient::Node).unwrap(); let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) }; + let addr_a = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1000}; + let id_b = peer_b.node_signer.get_node_id(Recipient::Node).unwrap(); let mut fd_b = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) }; - let initial_data = peer_b.new_outbound_connection(a_id, fd_b.clone(), None).unwrap(); - peer_a.new_inbound_connection(fd_a.clone(), None).unwrap(); + let addr_b = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1001}; + let initial_data = peer_b.new_outbound_connection(id_a, fd_b.clone(), Some(addr_a.clone())).unwrap(); + peer_a.new_inbound_connection(fd_a.clone(), Some(addr_b.clone())).unwrap(); assert_eq!(peer_a.read_event(&mut fd_a, &initial_data).unwrap(), false); peer_a.process_events(); @@ -2212,6 +2275,9 @@ mod tests { let a_data = fd_a.outbound_data.lock().unwrap().split_off(0); assert_eq!(peer_b.read_event(&mut fd_b, &a_data).unwrap(), false); + assert!(peer_a.get_peer_node_ids().contains(&(id_b, Some(addr_b)))); + assert!(peer_b.get_peer_node_ids().contains(&(id_a, Some(addr_a)))); + (fd_a.clone(), fd_b.clone()) } @@ -2220,19 +2286,15 @@ mod tests { // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and // push a DisconnectPeer event to remove the node flagged by id let cfgs = create_peermgr_cfgs(2); - let chan_handler = test_utils::TestChannelMessageHandler::new(); - let mut peers = create_network(2, &cfgs); + let peers = create_network(2, &cfgs); establish_connection(&peers[0], &peers[1]); assert_eq!(peers[0].peers.read().unwrap().len(), 1); let their_id = peers[1].node_signer.get_node_id(Recipient::Node).unwrap(); - - chan_handler.pending_events.lock().unwrap().push(events::MessageSendEvent::HandleError { + cfgs[0].chan_handler.pending_events.lock().unwrap().push(events::MessageSendEvent::HandleError { node_id: their_id, action: msgs::ErrorAction::DisconnectPeer { msg: None }, }); - assert_eq!(chan_handler.pending_events.lock().unwrap().len(), 1); - peers[0].message_handler.chan_handler = &chan_handler; peers[0].process_events(); assert_eq!(peers[0].peers.read().unwrap().len(), 0); @@ -2266,6 +2328,35 @@ mod tests { assert_eq!(peers[1].read_event(&mut fd_b, &a_data).unwrap(), false); } + #[test] + fn test_non_init_first_msg() { + // Simple test of the first message received over a connection being something other than + // Init. This results in an immediate disconnection, which previously included a spurious + // peer_disconnected event handed to event handlers (which would panic in + // `TestChannelMessageHandler` here). + let cfgs = create_peermgr_cfgs(2); + let peers = create_network(2, &cfgs); + + let mut fd_dup = FileDescriptor { fd: 3, outbound_data: Arc::new(Mutex::new(Vec::new())) }; + let addr_dup = NetAddress::IPv4{addr: [127, 0, 0, 1], port: 1003}; + let id_a = cfgs[0].node_signer.get_node_id(Recipient::Node).unwrap(); + peers[0].new_inbound_connection(fd_dup.clone(), Some(addr_dup.clone())).unwrap(); + + let mut dup_encryptor = PeerChannelEncryptor::new_outbound(id_a, SecretKey::from_slice(&[42; 32]).unwrap()); + let initial_data = dup_encryptor.get_act_one(&peers[1].secp_ctx); + assert_eq!(peers[0].read_event(&mut fd_dup, &initial_data).unwrap(), false); + peers[0].process_events(); + + let a_data = fd_dup.outbound_data.lock().unwrap().split_off(0); + let (act_three, _) = + dup_encryptor.process_act_two(&a_data[..], &&cfgs[1].node_signer).unwrap(); + assert_eq!(peers[0].read_event(&mut fd_dup, &act_three).unwrap(), false); + + let not_init_msg = msgs::Ping { ponglen: 4, byteslen: 0 }; + let msg_bytes = dup_encryptor.encrypt_message(¬_init_msg); + assert!(peers[0].read_event(&mut fd_dup, &msg_bytes).is_err()); + } + #[test] fn test_disconnect_all_peer() { // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and