X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning%2Fsrc%2Fln%2Fpeer_handler.rs;h=33c4050b374dd21fa4eeb3486bb179ae91c85f4e;hb=29bad58ff4fc4c9801761e3e5f71f146fb894b63;hp=009fa73f939e21ace76acb54acd205e56b2fa70a;hpb=ba06507a4f8e69111348b7025a09a0b3455d9c25;p=rust-lightning diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 009fa73f..33c4050b 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -1,3 +1,12 @@ +// This file is Copyright its original authors, visible in version control +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + //! Top level peer message handling and socket handling logic lives here. //! //! Instead of actually servicing sockets ourselves we require that you implement the @@ -10,9 +19,9 @@ use bitcoin::secp256k1::key::{SecretKey,PublicKey}; use ln::features::InitFeatures; use ln::msgs; -use ln::msgs::{ChannelMessageHandler, RoutingMessageHandler}; +use ln::msgs::{ChannelMessageHandler, LightningError, RoutingMessageHandler}; use ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager}; -use util::ser::VecWriter; +use util::ser::{VecWriter, Writeable}; use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep}; use ln::wire; use ln::wire::Encode; @@ -21,25 +30,141 @@ use util::events::{MessageSendEvent, MessageSendEventsProvider}; use util::logger::Logger; use routing::network_graph::NetGraphMsgHandler; -use std::collections::{HashMap,hash_map,HashSet,LinkedList}; +use prelude::*; +use alloc::collections::LinkedList; +use std::collections::{HashMap,hash_map,HashSet}; use std::sync::{Arc, Mutex}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::{cmp,error,hash,fmt}; -use std::ops::Deref; +use core::sync::atomic::{AtomicUsize, Ordering}; +use core::{cmp, hash, fmt, mem}; +use core::ops::Deref; +use std::error; use bitcoin::hashes::sha256::Hash as Sha256; use bitcoin::hashes::sha256::HashEngine as Sha256Engine; use bitcoin::hashes::{HashEngine, Hash}; +/// A dummy struct which implements `RoutingMessageHandler` without storing any routing information +/// or doing any processing. You can provide one of these as the route_handler in a MessageHandler. +pub struct IgnoringMessageHandler{} +impl MessageSendEventsProvider for IgnoringMessageHandler { + fn get_and_clear_pending_msg_events(&self) -> Vec { Vec::new() } +} +impl RoutingMessageHandler for IgnoringMessageHandler { + fn handle_node_announcement(&self, _msg: &msgs::NodeAnnouncement) -> Result { Ok(false) } + fn handle_channel_announcement(&self, _msg: &msgs::ChannelAnnouncement) -> Result { Ok(false) } + fn handle_channel_update(&self, _msg: &msgs::ChannelUpdate) -> Result { Ok(false) } + fn handle_htlc_fail_channel_update(&self, _update: &msgs::HTLCFailChannelUpdate) {} + fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> + Vec<(msgs::ChannelAnnouncement, Option, Option)> { Vec::new() } + fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec { Vec::new() } + fn sync_routing_table(&self, _their_node_id: &PublicKey, _init: &msgs::Init) {} + fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyChannelRange) -> Result<(), LightningError> { Ok(()) } + fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) } + fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::QueryChannelRange) -> Result<(), LightningError> { Ok(()) } + fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: msgs::QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) } +} +impl Deref for IgnoringMessageHandler { + type Target = IgnoringMessageHandler; + fn deref(&self) -> &Self { self } +} + +/// A dummy struct which implements `ChannelMessageHandler` without having any channels. +/// You can provide one of these as the route_handler in a MessageHandler. +pub struct ErroringMessageHandler { + message_queue: Mutex> +} +impl ErroringMessageHandler { + /// Constructs a new ErroringMessageHandler + pub fn new() -> Self { + Self { message_queue: Mutex::new(Vec::new()) } + } + fn push_error(&self, node_id: &PublicKey, channel_id: [u8; 32]) { + self.message_queue.lock().unwrap().push(MessageSendEvent::HandleError { + action: msgs::ErrorAction::SendErrorMessage { + msg: msgs::ErrorMessage { channel_id, data: "We do not support channel messages, sorry.".to_owned() }, + }, + node_id: node_id.clone(), + }); + } +} +impl MessageSendEventsProvider for ErroringMessageHandler { + fn get_and_clear_pending_msg_events(&self) -> Vec { + let mut res = Vec::new(); + mem::swap(&mut res, &mut self.message_queue.lock().unwrap()); + res + } +} +impl ChannelMessageHandler for ErroringMessageHandler { + // Any messages which are related to a specific channel generate an error message to let the + // peer know we don't care about channels. + fn handle_open_channel(&self, their_node_id: &PublicKey, _their_features: InitFeatures, msg: &msgs::OpenChannel) { + ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id); + } + fn handle_accept_channel(&self, their_node_id: &PublicKey, _their_features: InitFeatures, msg: &msgs::AcceptChannel) { + ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id); + } + fn handle_funding_created(&self, their_node_id: &PublicKey, msg: &msgs::FundingCreated) { + ErroringMessageHandler::push_error(self, their_node_id, msg.temporary_channel_id); + } + fn handle_funding_signed(&self, their_node_id: &PublicKey, msg: &msgs::FundingSigned) { + ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); + } + fn handle_funding_locked(&self, their_node_id: &PublicKey, msg: &msgs::FundingLocked) { + ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); + } + fn handle_shutdown(&self, their_node_id: &PublicKey, _their_features: &InitFeatures, msg: &msgs::Shutdown) { + ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); + } + fn handle_closing_signed(&self, their_node_id: &PublicKey, msg: &msgs::ClosingSigned) { + ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); + } + fn handle_update_add_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) { + ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); + } + fn handle_update_fulfill_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) { + ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); + } + fn handle_update_fail_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) { + ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); + } + fn handle_update_fail_malformed_htlc(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) { + ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); + } + fn handle_commitment_signed(&self, their_node_id: &PublicKey, msg: &msgs::CommitmentSigned) { + ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); + } + fn handle_revoke_and_ack(&self, their_node_id: &PublicKey, msg: &msgs::RevokeAndACK) { + ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); + } + fn handle_update_fee(&self, their_node_id: &PublicKey, msg: &msgs::UpdateFee) { + ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); + } + fn handle_announcement_signatures(&self, their_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) { + ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); + } + fn handle_channel_reestablish(&self, their_node_id: &PublicKey, msg: &msgs::ChannelReestablish) { + ErroringMessageHandler::push_error(self, their_node_id, msg.channel_id); + } + // msgs::ChannelUpdate does not contain the channel_id field, so we just drop them. + fn handle_channel_update(&self, _their_node_id: &PublicKey, _msg: &msgs::ChannelUpdate) {} + fn peer_disconnected(&self, _their_node_id: &PublicKey, _no_connection_possible: bool) {} + fn peer_connected(&self, _their_node_id: &PublicKey, _msg: &msgs::Init) {} + fn handle_error(&self, _their_node_id: &PublicKey, _msg: &msgs::ErrorMessage) {} +} +impl Deref for ErroringMessageHandler { + type Target = ErroringMessageHandler; + fn deref(&self) -> &Self { self } +} + /// Provides references to trait impls which handle different types of messages. pub struct MessageHandler where CM::Target: ChannelMessageHandler, RM::Target: RoutingMessageHandler { /// A message handler which handles messages specific to channels. Usually this is just a - /// ChannelManager object. + /// ChannelManager object or a ErroringMessageHandler. pub chan_handler: CM, /// A message handler which handles messages updating our knowledge of the network channel - /// graph. Usually this is just a NetGraphMsgHandlerMonitor object. + /// graph. Usually this is just a NetGraphMsgHandlerMonitor object or an IgnoringMessageHandler. pub route_handler: RM, } @@ -79,10 +204,9 @@ pub trait SocketDescriptor : cmp::Eq + hash::Hash + Clone { } /// Error for PeerManager errors. If you get one of these, you must disconnect the socket and -/// generate no further read_event/write_buffer_space_avail calls for the descriptor, only -/// triggering a single socket_disconnected call (unless it was provided in response to a -/// new_*_connection event, in which case no such socket_disconnected() must be called and the -/// socket silently disconencted). +/// generate no further read_event/write_buffer_space_avail/socket_disconnected calls for the +/// descriptor. +#[derive(Clone)] pub struct PeerHandleError { /// Used to indicate that we probably can't make any future connections to this peer, implying /// we should go ahead and force-close any channels we have with it. @@ -112,7 +236,6 @@ enum InitSyncTracker{ struct Peer { channel_encryptor: PeerChannelEncryptor, - outbound: bool, their_node_id: Option, their_features: Option, @@ -174,7 +297,7 @@ fn _check_usize_is_32_or_64() { /// lifetimes). Other times you can afford a reference, which is more efficient, in which case /// SimpleRefPeerManager is the more appropriate type. Defining these type aliases prevents /// issues such as overly long function definitions. -pub type SimpleArcPeerManager = Arc, Arc, Arc>>, Arc>>; +pub type SimpleArcPeerManager = PeerManager>, Arc, Arc>>, Arc>; /// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference /// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't @@ -209,6 +332,23 @@ pub struct PeerManager for MessageHandlingError { + fn from(error: PeerHandleError) -> Self { + MessageHandlingError::PeerHandleError(error) + } +} + +impl From for MessageHandlingError { + fn from(error: LightningError) -> Self { + MessageHandlingError::LightningError(error) + } +} + macro_rules! encode_msg { ($msg: expr) => {{ let mut buffer = VecWriter(Vec::new()); @@ -217,6 +357,44 @@ macro_rules! encode_msg { }} } +impl PeerManager where + CM::Target: ChannelMessageHandler, + L::Target: Logger { + /// Constructs a new PeerManager with the given ChannelMessageHandler. No routing message + /// handler is used and network graph messages are ignored. + /// + /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be + /// cryptographically secure random bytes. + /// + /// (C-not exported) as we can't export a PeerManager with a dummy route handler + pub fn new_channel_only(channel_message_handler: CM, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L) -> Self { + Self::new(MessageHandler { + chan_handler: channel_message_handler, + route_handler: IgnoringMessageHandler{}, + }, our_node_secret, ephemeral_random_data, logger) + } +} + +impl PeerManager where + RM::Target: RoutingMessageHandler, + L::Target: Logger { + /// Constructs a new PeerManager with the given RoutingMessageHandler. No channel message + /// handler is used and messages related to channels will be ignored (or generate error + /// messages). Note that some other lightning implementations time-out connections after some + /// time if no channel is built with the peer. + /// + /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be + /// cryptographically secure random bytes. + /// + /// (C-not exported) as we can't export a PeerManager with a dummy channel handler + pub fn new_routing_only(routing_message_handler: RM, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L) -> Self { + Self::new(MessageHandler { + chan_handler: ErroringMessageHandler::new(), + route_handler: routing_message_handler, + }, our_node_secret, ephemeral_random_data, logger) + } +} + /// Manages and reacts to connection events. You probably want to use file descriptors as PeerIds. /// PeerIds may repeat, but only after socket_disconnected() has been called. impl PeerManager where @@ -289,7 +467,6 @@ impl PeerManager PeerManager PeerManager PeerManager PeerManager(&self, peers_needing_send: &mut HashSet, peer: &mut Peer, descriptor: Descriptor, message: &M) { + let mut buffer = VecWriter(Vec::new()); + wire::write(message, &mut buffer).unwrap(); // crash if the write failed + let encoded_message = buffer.0; + + log_trace!(self.logger, "Enqueueing message of type {} to {}", message.type_id(), log_pubkey!(peer.their_node_id.unwrap())); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..])); + peers_needing_send.insert(descriptor); + } + fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result { let pause_read = { let mut peers_lock = self.peers.lock().unwrap(); @@ -490,16 +677,6 @@ impl PeerManager { - { - log_trace!(self.logger, "Encoding and sending message of type {} to {}", $msg.type_id(), log_pubkey!(peer.their_node_id.unwrap())); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(&$msg)[..])); - peers.peers_needing_send.insert(peer_descriptor.clone()); - } - } - } - macro_rules! try_potential_handleerror { ($thing: expr) => { match $thing { @@ -517,7 +694,7 @@ impl PeerManager { log_trace!(self.logger, "Got Err handling message, sending Error message because {}", e.err); - encode_and_send_msg!(msg); + self.enqueue_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), &msg); continue; }, } @@ -557,13 +734,9 @@ impl PeerManager { let their_node_id = try_potential_handleerror!(peer.channel_encryptor.process_act_three(&peer.pending_read_buffer[..])); @@ -571,6 +744,9 @@ impl PeerManager { if peer.pending_read_is_header { @@ -610,181 +786,21 @@ impl PeerManager return Err(PeerHandleError { no_connection_possible: false }), msgs::DecodeError::Io(_) => return Err(PeerHandleError { no_connection_possible: false }), + msgs::DecodeError::UnsupportedCompression => { + log_debug!(self.logger, "We don't support zlib-compressed message fields, ignoring message"); + continue; + } } } }; - log_trace!(self.logger, "Received message of type {} from {}", message.type_id(), log_pubkey!(peer.their_node_id.unwrap())); - - // Need an Init as first message - if let wire::Message::Init(_) = message { - } else if peer.their_features.is_none() { - log_trace!(self.logger, "Peer {} sent non-Init first message", log_pubkey!(peer.their_node_id.unwrap())); - return Err(PeerHandleError{ no_connection_possible: false }); - } - - match message { - // Setup and Control messages: - wire::Message::Init(msg) => { - if msg.features.requires_unknown_bits() { - log_info!(self.logger, "Peer global features required unknown version bits"); - return Err(PeerHandleError{ no_connection_possible: true }); - } - if msg.features.requires_unknown_bits() { - log_info!(self.logger, "Peer local features required unknown version bits"); - return Err(PeerHandleError{ no_connection_possible: true }); - } - if peer.their_features.is_some() { - return Err(PeerHandleError{ no_connection_possible: false }); - } - - log_info!(self.logger, "Received peer Init message: data_loss_protect: {}, initial_routing_sync: {}, upfront_shutdown_script: {}, static_remote_key: {}, unkown local flags: {}, unknown global flags: {}", - if msg.features.supports_data_loss_protect() { "supported" } else { "not supported"}, - if msg.features.initial_routing_sync() { "requested" } else { "not requested" }, - if msg.features.supports_upfront_shutdown_script() { "supported" } else { "not supported"}, - if msg.features.supports_static_remote_key() { "supported" } else { "not supported"}, - if msg.features.supports_unknown_bits() { "present" } else { "none" }, - if msg.features.supports_unknown_bits() { "present" } else { "none" }); - - if msg.features.initial_routing_sync() { - peer.sync_status = InitSyncTracker::ChannelsSyncing(0); - peers.peers_needing_send.insert(peer_descriptor.clone()); - } - if !msg.features.supports_static_remote_key() { - log_debug!(self.logger, "Peer {} does not support static remote key, disconnecting with no_connection_possible", log_pubkey!(peer.their_node_id.unwrap())); - return Err(PeerHandleError{ no_connection_possible: true }); - } - - if !peer.outbound { - let mut features = InitFeatures::known(); - if !self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) { - features.clear_initial_routing_sync(); - } - - let resp = msgs::Init { features }; - encode_and_send_msg!(resp); - } - - self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap(), &msg); - peer.their_features = Some(msg.features); - }, - wire::Message::Error(msg) => { - let mut data_is_printable = true; - for b in msg.data.bytes() { - if b < 32 || b > 126 { - data_is_printable = false; - break; - } - } - - if data_is_printable { - log_debug!(self.logger, "Got Err message from {}: {}", log_pubkey!(peer.their_node_id.unwrap()), msg.data); - } else { - log_debug!(self.logger, "Got Err message from {} with non-ASCII error message", log_pubkey!(peer.their_node_id.unwrap())); - } - self.message_handler.chan_handler.handle_error(&peer.their_node_id.unwrap(), &msg); - if msg.channel_id == [0; 32] { - return Err(PeerHandleError{ no_connection_possible: true }); - } - }, - - wire::Message::Ping(msg) => { - if msg.ponglen < 65532 { - let resp = msgs::Pong { byteslen: msg.ponglen }; - encode_and_send_msg!(resp); - } - }, - wire::Message::Pong(_msg) => { - peer.awaiting_pong = false; - }, - - // Channel messages: - wire::Message::OpenChannel(msg) => { - self.message_handler.chan_handler.handle_open_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg); - }, - wire::Message::AcceptChannel(msg) => { - self.message_handler.chan_handler.handle_accept_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg); - }, - - wire::Message::FundingCreated(msg) => { - self.message_handler.chan_handler.handle_funding_created(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::FundingSigned(msg) => { - self.message_handler.chan_handler.handle_funding_signed(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::FundingLocked(msg) => { - self.message_handler.chan_handler.handle_funding_locked(&peer.their_node_id.unwrap(), &msg); - }, - - wire::Message::Shutdown(msg) => { - self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::ClosingSigned(msg) => { - self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg); - }, - - // Commitment messages: - wire::Message::UpdateAddHTLC(msg) => { - self.message_handler.chan_handler.handle_update_add_htlc(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::UpdateFulfillHTLC(msg) => { - self.message_handler.chan_handler.handle_update_fulfill_htlc(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::UpdateFailHTLC(msg) => { - self.message_handler.chan_handler.handle_update_fail_htlc(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::UpdateFailMalformedHTLC(msg) => { - self.message_handler.chan_handler.handle_update_fail_malformed_htlc(&peer.their_node_id.unwrap(), &msg); - }, - - wire::Message::CommitmentSigned(msg) => { - self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::RevokeAndACK(msg) => { - self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::UpdateFee(msg) => { - self.message_handler.chan_handler.handle_update_fee(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::ChannelReestablish(msg) => { - self.message_handler.chan_handler.handle_channel_reestablish(&peer.their_node_id.unwrap(), &msg); - }, - - // Routing messages: - wire::Message::AnnouncementSignatures(msg) => { - self.message_handler.chan_handler.handle_announcement_signatures(&peer.their_node_id.unwrap(), &msg); - }, - wire::Message::ChannelAnnouncement(msg) => { - let should_forward = try_potential_handleerror!(self.message_handler.route_handler.handle_channel_announcement(&msg)); - - if should_forward { - // TODO: forward msg along to all our other peers! - } - }, - wire::Message::NodeAnnouncement(msg) => { - let should_forward = try_potential_handleerror!(self.message_handler.route_handler.handle_node_announcement(&msg)); - - if should_forward { - // TODO: forward msg along to all our other peers! - } - }, - wire::Message::ChannelUpdate(msg) => { - let should_forward = try_potential_handleerror!(self.message_handler.route_handler.handle_channel_update(&msg)); - - if should_forward { - // TODO: forward msg along to all our other peers! - } - }, - - // Unknown messages: - wire::Message::Unknown(msg_type) if msg_type.is_even() => { - log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", msg_type); - // Fail the channel if message is an even, unknown type as per BOLT #1. - return Err(PeerHandleError{ no_connection_possible: true }); - }, - wire::Message::Unknown(msg_type) => { - log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", msg_type); - }, + if let Err(handling_error) = self.handle_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), message){ + match handling_error { + MessageHandlingError::PeerHandleError(e) => { return Err(e) }, + MessageHandlingError::LightningError(e) => { + try_potential_handleerror!(Err(e)); + }, + } } } } @@ -804,6 +820,198 @@ impl PeerManager, peer: &mut Peer, peer_descriptor: Descriptor, message: wire::Message) -> Result<(), MessageHandlingError> { + log_trace!(self.logger, "Received message of type {} from {}", message.type_id(), log_pubkey!(peer.their_node_id.unwrap())); + + // Need an Init as first message + if let wire::Message::Init(_) = message { + } else if peer.their_features.is_none() { + log_trace!(self.logger, "Peer {} sent non-Init first message", log_pubkey!(peer.their_node_id.unwrap())); + return Err(PeerHandleError{ no_connection_possible: false }.into()); + } + + match message { + // Setup and Control messages: + wire::Message::Init(msg) => { + if msg.features.requires_unknown_bits() { + log_info!(self.logger, "Peer features required unknown version bits"); + return Err(PeerHandleError{ no_connection_possible: true }.into()); + } + if peer.their_features.is_some() { + return Err(PeerHandleError{ no_connection_possible: false }.into()); + } + + log_info!( + self.logger, "Received peer Init message: data_loss_protect: {}, initial_routing_sync: {}, upfront_shutdown_script: {}, gossip_queries: {}, static_remote_key: {}, unknown flags (local and global): {}", + if msg.features.supports_data_loss_protect() { "supported" } else { "not supported"}, + if msg.features.initial_routing_sync() { "requested" } else { "not requested" }, + if msg.features.supports_upfront_shutdown_script() { "supported" } else { "not supported"}, + if msg.features.supports_gossip_queries() { "supported" } else { "not supported" }, + if msg.features.supports_static_remote_key() { "supported" } else { "not supported"}, + if msg.features.supports_unknown_bits() { "present" } else { "none" } + ); + + if msg.features.initial_routing_sync() { + peer.sync_status = InitSyncTracker::ChannelsSyncing(0); + peers_needing_send.insert(peer_descriptor.clone()); + } + if !msg.features.supports_static_remote_key() { + log_debug!(self.logger, "Peer {} does not support static remote key, disconnecting with no_connection_possible", log_pubkey!(peer.their_node_id.unwrap())); + return Err(PeerHandleError{ no_connection_possible: true }.into()); + } + + self.message_handler.route_handler.sync_routing_table(&peer.their_node_id.unwrap(), &msg); + + self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap(), &msg); + peer.their_features = Some(msg.features); + }, + wire::Message::Error(msg) => { + let mut data_is_printable = true; + for b in msg.data.bytes() { + if b < 32 || b > 126 { + data_is_printable = false; + break; + } + } + + if data_is_printable { + log_debug!(self.logger, "Got Err message from {}: {}", log_pubkey!(peer.their_node_id.unwrap()), msg.data); + } else { + log_debug!(self.logger, "Got Err message from {} with non-ASCII error message", log_pubkey!(peer.their_node_id.unwrap())); + } + self.message_handler.chan_handler.handle_error(&peer.their_node_id.unwrap(), &msg); + if msg.channel_id == [0; 32] { + return Err(PeerHandleError{ no_connection_possible: true }.into()); + } + }, + + wire::Message::Ping(msg) => { + if msg.ponglen < 65532 { + let resp = msgs::Pong { byteslen: msg.ponglen }; + self.enqueue_message(peers_needing_send, peer, peer_descriptor.clone(), &resp); + } + }, + wire::Message::Pong(_msg) => { + peer.awaiting_pong = false; + }, + + // Channel messages: + wire::Message::OpenChannel(msg) => { + self.message_handler.chan_handler.handle_open_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg); + }, + wire::Message::AcceptChannel(msg) => { + self.message_handler.chan_handler.handle_accept_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg); + }, + + wire::Message::FundingCreated(msg) => { + self.message_handler.chan_handler.handle_funding_created(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::FundingSigned(msg) => { + self.message_handler.chan_handler.handle_funding_signed(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::FundingLocked(msg) => { + self.message_handler.chan_handler.handle_funding_locked(&peer.their_node_id.unwrap(), &msg); + }, + + wire::Message::Shutdown(msg) => { + self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), peer.their_features.as_ref().unwrap(), &msg); + }, + wire::Message::ClosingSigned(msg) => { + self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg); + }, + + // Commitment messages: + wire::Message::UpdateAddHTLC(msg) => { + self.message_handler.chan_handler.handle_update_add_htlc(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::UpdateFulfillHTLC(msg) => { + self.message_handler.chan_handler.handle_update_fulfill_htlc(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::UpdateFailHTLC(msg) => { + self.message_handler.chan_handler.handle_update_fail_htlc(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::UpdateFailMalformedHTLC(msg) => { + self.message_handler.chan_handler.handle_update_fail_malformed_htlc(&peer.their_node_id.unwrap(), &msg); + }, + + wire::Message::CommitmentSigned(msg) => { + self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::RevokeAndACK(msg) => { + self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::UpdateFee(msg) => { + self.message_handler.chan_handler.handle_update_fee(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::ChannelReestablish(msg) => { + self.message_handler.chan_handler.handle_channel_reestablish(&peer.their_node_id.unwrap(), &msg); + }, + + // Routing messages: + wire::Message::AnnouncementSignatures(msg) => { + self.message_handler.chan_handler.handle_announcement_signatures(&peer.their_node_id.unwrap(), &msg); + }, + wire::Message::ChannelAnnouncement(msg) => { + let should_forward = match self.message_handler.route_handler.handle_channel_announcement(&msg) { + Ok(v) => v, + Err(e) => { return Err(e.into()); }, + }; + + if should_forward { + // TODO: forward msg along to all our other peers! + } + }, + wire::Message::NodeAnnouncement(msg) => { + let should_forward = match self.message_handler.route_handler.handle_node_announcement(&msg) { + Ok(v) => v, + Err(e) => { return Err(e.into()); }, + }; + + if should_forward { + // TODO: forward msg along to all our other peers! + } + }, + wire::Message::ChannelUpdate(msg) => { + self.message_handler.chan_handler.handle_channel_update(&peer.their_node_id.unwrap(), &msg); + let should_forward = match self.message_handler.route_handler.handle_channel_update(&msg) { + Ok(v) => v, + Err(e) => { return Err(e.into()); }, + }; + + if should_forward { + // TODO: forward msg along to all our other peers! + } + }, + wire::Message::QueryShortChannelIds(msg) => { + self.message_handler.route_handler.handle_query_short_channel_ids(&peer.their_node_id.unwrap(), msg)?; + }, + wire::Message::ReplyShortChannelIdsEnd(msg) => { + self.message_handler.route_handler.handle_reply_short_channel_ids_end(&peer.their_node_id.unwrap(), msg)?; + }, + wire::Message::QueryChannelRange(msg) => { + self.message_handler.route_handler.handle_query_channel_range(&peer.their_node_id.unwrap(), msg)?; + }, + wire::Message::ReplyChannelRange(msg) => { + self.message_handler.route_handler.handle_reply_channel_range(&peer.their_node_id.unwrap(), msg)?; + }, + wire::Message::GossipTimestampFilter(_msg) => { + // TODO: handle message + }, + + // Unknown messages: + wire::Message::Unknown(msg_type) if msg_type.is_even() => { + log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", msg_type); + // Fail the channel if message is an even, unknown type as per BOLT #1. + return Err(PeerHandleError{ no_connection_possible: true }.into()); + }, + wire::Message::Unknown(msg_type) => { + log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", msg_type); + } + }; + Ok(()) + } + /// Checks for any events generated by our handlers and processes them. Includes sending most /// response messages as well as messages generated by calls to handler functions directly (eg /// functions like ChannelManager::process_pending_htlc_forward or send_payment). @@ -813,8 +1021,9 @@ impl PeerManager PeerManager { + let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {}); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); + self.do_attempt_write_data(&mut descriptor, peer); + }, + MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => { + let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {}); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); + self.do_attempt_write_data(&mut descriptor, peer); + } + MessageSendEvent::SendReplyChannelRange { ref node_id, ref msg } => { + log_trace!(self.logger, "Handling SendReplyChannelRange event in peer_handler for node {} with num_scids={} first_blocknum={} number_of_blocks={}, sync_complete={}", + log_pubkey!(node_id), + msg.short_channel_ids.len(), + msg.first_blocknum, + msg.number_of_blocks, + msg.sync_complete); + let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {}); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); + self.do_attempt_write_data(&mut descriptor, peer); } } } @@ -1108,11 +1338,29 @@ impl PeerManager(&self, hasher: &mut H) { + impl core::hash::Hash for FileDescriptor { + fn hash(&self, hasher: &mut H) { self.fd.hash(hasher) } } @@ -1230,18 +1476,11 @@ mod tests { fn create_network<'a>(peer_count: usize, cfgs: &'a Vec) -> Vec> { let mut peers = Vec::new(); - let mut rng = thread_rng(); - let mut ephemeral_bytes = [0; 32]; - rng.fill_bytes(&mut ephemeral_bytes); - for i in 0..peer_count { - let node_id = { - let mut key_slice = [0;32]; - rng.fill_bytes(&mut key_slice); - SecretKey::from_slice(&key_slice).unwrap() - }; + let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap(); + let ephemeral_bytes = [i as u8; 32]; let msg_handler = MessageHandler { chan_handler: &cfgs[i].chan_handler, route_handler: &cfgs[i].routing_handler }; - let peer = PeerManager::new(msg_handler, node_id, &ephemeral_bytes, &cfgs[i].logger); + let peer = PeerManager::new(msg_handler, node_secret, &ephemeral_bytes, &cfgs[i].logger); peers.push(peer); } @@ -1261,13 +1500,6 @@ mod tests { (fd_a.clone(), fd_b.clone()) } - fn establish_connection_and_read_events<'a>(peer_a: &PeerManager, peer_b: &PeerManager) -> (FileDescriptor, FileDescriptor) { - let (mut fd_a, mut fd_b) = establish_connection(peer_a, peer_b); - assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false); - assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false); - (fd_a.clone(), fd_b.clone()) - } - #[test] fn test_disconnect_peer() { // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and @@ -1301,11 +1533,11 @@ mod tests { assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1); // peers[0] awaiting_pong is set to true, but the Peer is still connected - peers[0].timer_tick_occured(); + peers[0].timer_tick_occurred(); assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1); - // Since timer_tick_occured() is called again when awaiting_pong is true, all Peers are disconnected - peers[0].timer_tick_occured(); + // Since timer_tick_occurred() is called again when awaiting_pong is true, all Peers are disconnected + peers[0].timer_tick_occurred(); assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0); } @@ -1336,41 +1568,4 @@ mod tests { assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100); assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50); } - - #[test] - fn limit_initial_routing_sync_requests() { - // Inbound peer 0 requests initial_routing_sync, but outbound peer 1 does not. - { - let cfgs = create_peermgr_cfgs(2); - cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release); - let peers = create_network(2, &cfgs); - let (fd_0_to_1, fd_1_to_0) = establish_connection_and_read_events(&peers[0], &peers[1]); - - let peer_0 = peers[0].peers.lock().unwrap(); - let peer_1 = peers[1].peers.lock().unwrap(); - - let peer_0_features = peer_1.peers.get(&fd_1_to_0).unwrap().their_features.as_ref(); - let peer_1_features = peer_0.peers.get(&fd_0_to_1).unwrap().their_features.as_ref(); - - assert!(peer_0_features.unwrap().initial_routing_sync()); - assert!(!peer_1_features.unwrap().initial_routing_sync()); - } - - // Outbound peer 1 requests initial_routing_sync, but inbound peer 0 does not. - { - let cfgs = create_peermgr_cfgs(2); - cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release); - let peers = create_network(2, &cfgs); - let (fd_0_to_1, fd_1_to_0) = establish_connection_and_read_events(&peers[0], &peers[1]); - - let peer_0 = peers[0].peers.lock().unwrap(); - let peer_1 = peers[1].peers.lock().unwrap(); - - let peer_0_features = peer_1.peers.get(&fd_1_to_0).unwrap().their_features.as_ref(); - let peer_1_features = peer_0.peers.get(&fd_0_to_1).unwrap().their_features.as_ref(); - - assert!(!peer_0_features.unwrap().initial_routing_sync()); - assert!(peer_1_features.unwrap().initial_routing_sync()); - } - } }