+ /// Process an incoming message and return a decision (ok, lightning error, peer handling error) regarding the next action with the peer
+ fn handle_message(&self, peers_needing_send: &mut HashSet<Descriptor>, peer: &mut Peer, peer_descriptor: Descriptor, message: wire::Message) -> Result<(), MessageHandlingError> {
+ log_trace!(self.logger, "Received message of type {} from {}", message.type_id(), log_pubkey!(peer.their_node_id.unwrap()));
+
+ // Need an Init as first message
+ if let wire::Message::Init(_) = message {
+ } else if peer.their_features.is_none() {
+ log_trace!(self.logger, "Peer {} sent non-Init first message", log_pubkey!(peer.their_node_id.unwrap()));
+ return Err(PeerHandleError{ no_connection_possible: false }.into());
+ }
+
+ match message {
+ // Setup and Control messages:
+ wire::Message::Init(msg) => {
+ if msg.features.requires_unknown_bits() {
+ log_info!(self.logger, "Peer global features required unknown version bits");
+ return Err(PeerHandleError{ no_connection_possible: true }.into());
+ }
+ if msg.features.requires_unknown_bits() {
+ log_info!(self.logger, "Peer local features required unknown version bits");
+ return Err(PeerHandleError{ no_connection_possible: true }.into());
+ }
+ if peer.their_features.is_some() {
+ return Err(PeerHandleError{ no_connection_possible: false }.into());
+ }
+
+ log_info!(
+ self.logger, "Received peer Init message: data_loss_protect: {}, initial_routing_sync: {}, upfront_shutdown_script: {}, gossip_queries: {}, static_remote_key: {}, unknown flags (local and global): {}",
+ if msg.features.supports_data_loss_protect() { "supported" } else { "not supported"},
+ if msg.features.initial_routing_sync() { "requested" } else { "not requested" },
+ if msg.features.supports_upfront_shutdown_script() { "supported" } else { "not supported"},
+ if msg.features.supports_gossip_queries() { "supported" } else { "not supported" },
+ if msg.features.supports_static_remote_key() { "supported" } else { "not supported"},
+ if msg.features.supports_unknown_bits() { "present" } else { "none" }
+ );
+
+ if msg.features.initial_routing_sync() {
+ peer.sync_status = InitSyncTracker::ChannelsSyncing(0);
+ peers_needing_send.insert(peer_descriptor.clone());
+ }
+ if !msg.features.supports_static_remote_key() {
+ log_debug!(self.logger, "Peer {} does not support static remote key, disconnecting with no_connection_possible", log_pubkey!(peer.their_node_id.unwrap()));
+ return Err(PeerHandleError{ no_connection_possible: true }.into());
+ }
+
+ if !peer.outbound {
+ let mut features = InitFeatures::known().clear_gossip_queries();
+ if !self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) {
+ features.clear_initial_routing_sync();
+ }
+
+ let resp = msgs::Init { features };
+ self.enqueue_message(peers_needing_send, peer, peer_descriptor.clone(), &resp);
+ }
+
+ self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap(), &msg);
+ peer.their_features = Some(msg.features);
+ },
+ wire::Message::Error(msg) => {
+ let mut data_is_printable = true;
+ for b in msg.data.bytes() {
+ if b < 32 || b > 126 {
+ data_is_printable = false;
+ break;
+ }
+ }
+
+ if data_is_printable {
+ log_debug!(self.logger, "Got Err message from {}: {}", log_pubkey!(peer.their_node_id.unwrap()), msg.data);
+ } else {
+ log_debug!(self.logger, "Got Err message from {} with non-ASCII error message", log_pubkey!(peer.their_node_id.unwrap()));
+ }
+ self.message_handler.chan_handler.handle_error(&peer.their_node_id.unwrap(), &msg);
+ if msg.channel_id == [0; 32] {
+ return Err(PeerHandleError{ no_connection_possible: true }.into());
+ }
+ },
+
+ wire::Message::Ping(msg) => {
+ if msg.ponglen < 65532 {
+ let resp = msgs::Pong { byteslen: msg.ponglen };
+ self.enqueue_message(peers_needing_send, peer, peer_descriptor.clone(), &resp);
+ }
+ },
+ wire::Message::Pong(_msg) => {
+ peer.awaiting_pong = false;
+ },
+
+ // Channel messages:
+ wire::Message::OpenChannel(msg) => {
+ self.message_handler.chan_handler.handle_open_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg);
+ },
+ wire::Message::AcceptChannel(msg) => {
+ self.message_handler.chan_handler.handle_accept_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg);
+ },
+
+ wire::Message::FundingCreated(msg) => {
+ self.message_handler.chan_handler.handle_funding_created(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::FundingSigned(msg) => {
+ self.message_handler.chan_handler.handle_funding_signed(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::FundingLocked(msg) => {
+ self.message_handler.chan_handler.handle_funding_locked(&peer.their_node_id.unwrap(), &msg);
+ },
+
+ wire::Message::Shutdown(msg) => {
+ self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::ClosingSigned(msg) => {
+ self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg);
+ },
+
+ // Commitment messages:
+ wire::Message::UpdateAddHTLC(msg) => {
+ self.message_handler.chan_handler.handle_update_add_htlc(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::UpdateFulfillHTLC(msg) => {
+ self.message_handler.chan_handler.handle_update_fulfill_htlc(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::UpdateFailHTLC(msg) => {
+ self.message_handler.chan_handler.handle_update_fail_htlc(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::UpdateFailMalformedHTLC(msg) => {
+ self.message_handler.chan_handler.handle_update_fail_malformed_htlc(&peer.their_node_id.unwrap(), &msg);
+ },
+
+ wire::Message::CommitmentSigned(msg) => {
+ self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::RevokeAndACK(msg) => {
+ self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::UpdateFee(msg) => {
+ self.message_handler.chan_handler.handle_update_fee(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::ChannelReestablish(msg) => {
+ self.message_handler.chan_handler.handle_channel_reestablish(&peer.their_node_id.unwrap(), &msg);
+ },
+
+ // Routing messages:
+ wire::Message::AnnouncementSignatures(msg) => {
+ self.message_handler.chan_handler.handle_announcement_signatures(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::ChannelAnnouncement(msg) => {
+ let should_forward = match self.message_handler.route_handler.handle_channel_announcement(&msg) {
+ Ok(v) => v,
+ Err(e) => { return Err(e.into()); },
+ };
+
+ if should_forward {
+ // TODO: forward msg along to all our other peers!
+ }
+ },
+ wire::Message::NodeAnnouncement(msg) => {
+ let should_forward = match self.message_handler.route_handler.handle_node_announcement(&msg) {
+ Ok(v) => v,
+ Err(e) => { return Err(e.into()); },
+ };
+
+ if should_forward {
+ // TODO: forward msg along to all our other peers!
+ }
+ },
+ wire::Message::ChannelUpdate(msg) => {
+ let should_forward = match self.message_handler.route_handler.handle_channel_update(&msg) {
+ Ok(v) => v,
+ Err(e) => { return Err(e.into()); },
+ };
+
+ if should_forward {
+ // TODO: forward msg along to all our other peers!
+ }
+ },
+ wire::Message::QueryShortChannelIds(msg) => {
+ self.message_handler.route_handler.handle_query_short_channel_ids(&peer.their_node_id.unwrap(), msg)?;
+ },
+ wire::Message::ReplyShortChannelIdsEnd(msg) => {
+ self.message_handler.route_handler.handle_reply_short_channel_ids_end(&peer.their_node_id.unwrap(), msg)?;
+ },
+ wire::Message::QueryChannelRange(msg) => {
+ self.message_handler.route_handler.handle_query_channel_range(&peer.their_node_id.unwrap(), msg)?;
+ },
+ wire::Message::ReplyChannelRange(msg) => {
+ self.message_handler.route_handler.handle_reply_channel_range(&peer.their_node_id.unwrap(), msg)?;
+ },
+ wire::Message::GossipTimestampFilter(_msg) => {
+ // TODO: handle message
+ },
+
+ // Unknown messages:
+ wire::Message::Unknown(msg_type) if msg_type.is_even() => {
+ log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", msg_type);
+ // Fail the channel if message is an even, unknown type as per BOLT #1.
+ return Err(PeerHandleError{ no_connection_possible: true }.into());
+ },
+ wire::Message::Unknown(msg_type) => {
+ log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", msg_type);
+ }
+ };
+ Ok(())
+ }
+