Extract wire message handling into a method.
authorArik Sosman <git@arik.io>
Fri, 22 May 2020 20:03:49 +0000 (13:03 -0700)
committerArik Sosman <git@arik.io>
Sun, 7 Jun 2020 05:55:12 +0000 (22:55 -0700)
This is a response to splitting #585 into smaller components. This extraction should allow the subsequent creation of a trait for all message handling, thereby enabling more flexibility in the state machine, particularly for bindings.

lightning/src/ln/peer_handler.rs

index 12d7284661dee0c68c2fc377d9d6451141232a00..e5a973bca36c52626134fc33d74c76371f9fb4e2 100644 (file)
@@ -10,7 +10,7 @@ use bitcoin::secp256k1::key::{SecretKey,PublicKey};
 
 use ln::features::InitFeatures;
 use ln::msgs;
-use ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
+use ln::msgs::{ChannelMessageHandler, LightningError, RoutingMessageHandler};
 use ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager};
 use util::ser::{VecWriter, Writeable};
 use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep};
@@ -209,6 +209,23 @@ pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: De
        logger: L,
 }
 
+enum MessageHandlingError {
+       PeerHandleError(PeerHandleError),
+       LightningError(LightningError),
+}
+
+impl From<PeerHandleError> for MessageHandlingError {
+       fn from(error: PeerHandleError) -> Self {
+               MessageHandlingError::PeerHandleError(error)
+       }
+}
+
+impl From<LightningError> for MessageHandlingError {
+       fn from(error: LightningError) -> Self {
+               MessageHandlingError::LightningError(error)
+       }
+}
+
 macro_rules! encode_msg {
        ($msg: expr) => {{
                let mut buffer = VecWriter(Vec::new());
@@ -615,177 +632,13 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                                                                        }
                                                                                };
 
-                                                                               log_trace!(self.logger, "Received message of type {} from {}", message.type_id(), log_pubkey!(peer.their_node_id.unwrap()));
-
-                                                                               // Need an Init as first message
-                                                                               if let wire::Message::Init(_) = message {
-                                                                               } else if peer.their_features.is_none() {
-                                                                                       log_trace!(self.logger, "Peer {} sent non-Init first message", log_pubkey!(peer.their_node_id.unwrap()));
-                                                                                       return Err(PeerHandleError{ no_connection_possible: false });
-                                                                               }
-
-                                                                               match message {
-                                                                                       // Setup and Control messages:
-                                                                                       wire::Message::Init(msg) => {
-                                                                                               if msg.features.requires_unknown_bits() {
-                                                                                                       log_info!(self.logger, "Peer global features required unknown version bits");
-                                                                                                       return Err(PeerHandleError{ no_connection_possible: true });
-                                                                                               }
-                                                                                               if msg.features.requires_unknown_bits() {
-                                                                                                       log_info!(self.logger, "Peer local features required unknown version bits");
-                                                                                                       return Err(PeerHandleError{ no_connection_possible: true });
-                                                                                               }
-                                                                                               if peer.their_features.is_some() {
-                                                                                                       return Err(PeerHandleError{ no_connection_possible: false });
-                                                                                               }
-
-                                                                                               log_info!(self.logger, "Received peer Init message: data_loss_protect: {}, initial_routing_sync: {}, upfront_shutdown_script: {}, static_remote_key: {}, unkown local flags: {}, unknown global flags: {}",
-                                                                                                       if msg.features.supports_data_loss_protect() { "supported" } else { "not supported"},
-                                                                                                       if msg.features.initial_routing_sync() { "requested" } else { "not requested" },
-                                                                                                       if msg.features.supports_upfront_shutdown_script() { "supported" } else { "not supported"},
-                                                                                                       if msg.features.supports_static_remote_key() { "supported" } else { "not supported"},
-                                                                                                       if msg.features.supports_unknown_bits() { "present" } else { "none" },
-                                                                                                       if msg.features.supports_unknown_bits() { "present" } else { "none" });
-
-                                                                                               if msg.features.initial_routing_sync() {
-                                                                                                       peer.sync_status = InitSyncTracker::ChannelsSyncing(0);
-                                                                                                       peers.peers_needing_send.insert(peer_descriptor.clone());
-                                                                                               }
-                                                                                               if !msg.features.supports_static_remote_key() {
-                                                                                                       log_debug!(self.logger, "Peer {} does not support static remote key, disconnecting with no_connection_possible", log_pubkey!(peer.their_node_id.unwrap()));
-                                                                                                       return Err(PeerHandleError{ no_connection_possible: true });
-                                                                                               }
-
-                                                                                               if !peer.outbound {
-                                                                                                       let mut features = InitFeatures::known();
-                                                                                                       if !self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) {
-                                                                                                               features.clear_initial_routing_sync();
-                                                                                                       }
-
-                                                                                                       let resp = msgs::Init { features };
-                                                                                                       self.enqueue_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), &resp);
-                                                                                               }
-
-                                                                                               self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap(), &msg);
-                                                                                               peer.their_features = Some(msg.features);
-                                                                                       },
-                                                                                       wire::Message::Error(msg) => {
-                                                                                               let mut data_is_printable = true;
-                                                                                               for b in msg.data.bytes() {
-                                                                                                       if b < 32 || b > 126 {
-                                                                                                               data_is_printable = false;
-                                                                                                               break;
-                                                                                                       }
-                                                                                               }
-
-                                                                                               if data_is_printable {
-                                                                                                       log_debug!(self.logger, "Got Err message from {}: {}", log_pubkey!(peer.their_node_id.unwrap()), msg.data);
-                                                                                               } else {
-                                                                                                       log_debug!(self.logger, "Got Err message from {} with non-ASCII error message", log_pubkey!(peer.their_node_id.unwrap()));
-                                                                                               }
-                                                                                               self.message_handler.chan_handler.handle_error(&peer.their_node_id.unwrap(), &msg);
-                                                                                               if msg.channel_id == [0; 32] {
-                                                                                                       return Err(PeerHandleError{ no_connection_possible: true });
-                                                                                               }
-                                                                                       },
-
-                                                                                       wire::Message::Ping(msg) => {
-                                                                                               if msg.ponglen < 65532 {
-                                                                                                       let resp = msgs::Pong { byteslen: msg.ponglen };
-                                                                                                       self.enqueue_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), &resp);
-                                                                                               }
-                                                                                       },
-                                                                                       wire::Message::Pong(_msg) => {
-                                                                                               peer.awaiting_pong = false;
-                                                                                       },
-
-                                                                                       // Channel messages:
-                                                                                       wire::Message::OpenChannel(msg) => {
-                                                                                               self.message_handler.chan_handler.handle_open_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg);
-                                                                                       },
-                                                                                       wire::Message::AcceptChannel(msg) => {
-                                                                                               self.message_handler.chan_handler.handle_accept_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg);
-                                                                                       },
-
-                                                                                       wire::Message::FundingCreated(msg) => {
-                                                                                               self.message_handler.chan_handler.handle_funding_created(&peer.their_node_id.unwrap(), &msg);
-                                                                                       },
-                                                                                       wire::Message::FundingSigned(msg) => {
-                                                                                               self.message_handler.chan_handler.handle_funding_signed(&peer.their_node_id.unwrap(), &msg);
-                                                                                       },
-                                                                                       wire::Message::FundingLocked(msg) => {
-                                                                                               self.message_handler.chan_handler.handle_funding_locked(&peer.their_node_id.unwrap(), &msg);
-                                                                                       },
-
-                                                                                       wire::Message::Shutdown(msg) => {
-                                                                                               self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg);
-                                                                                       },
-                                                                                       wire::Message::ClosingSigned(msg) => {
-                                                                                               self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg);
-                                                                                       },
-
-                                                                                       // Commitment messages:
-                                                                                       wire::Message::UpdateAddHTLC(msg) => {
-                                                                                               self.message_handler.chan_handler.handle_update_add_htlc(&peer.their_node_id.unwrap(), &msg);
-                                                                                       },
-                                                                                       wire::Message::UpdateFulfillHTLC(msg) => {
-                                                                                               self.message_handler.chan_handler.handle_update_fulfill_htlc(&peer.their_node_id.unwrap(), &msg);
-                                                                                       },
-                                                                                       wire::Message::UpdateFailHTLC(msg) => {
-                                                                                               self.message_handler.chan_handler.handle_update_fail_htlc(&peer.their_node_id.unwrap(), &msg);
-                                                                                       },
-                                                                                       wire::Message::UpdateFailMalformedHTLC(msg) => {
-                                                                                               self.message_handler.chan_handler.handle_update_fail_malformed_htlc(&peer.their_node_id.unwrap(), &msg);
-                                                                                       },
-
-                                                                                       wire::Message::CommitmentSigned(msg) => {
-                                                                                               self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg);
-                                                                                       },
-                                                                                       wire::Message::RevokeAndACK(msg) => {
-                                                                                               self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg);
-                                                                                       },
-                                                                                       wire::Message::UpdateFee(msg) => {
-                                                                                               self.message_handler.chan_handler.handle_update_fee(&peer.their_node_id.unwrap(), &msg);
-                                                                                       },
-                                                                                       wire::Message::ChannelReestablish(msg) => {
-                                                                                               self.message_handler.chan_handler.handle_channel_reestablish(&peer.their_node_id.unwrap(), &msg);
-                                                                                       },
-
-                                                                                       // Routing messages:
-                                                                                       wire::Message::AnnouncementSignatures(msg) => {
-                                                                                               self.message_handler.chan_handler.handle_announcement_signatures(&peer.their_node_id.unwrap(), &msg);
-                                                                                       },
-                                                                                       wire::Message::ChannelAnnouncement(msg) => {
-                                                                                               let should_forward = try_potential_handleerror!(self.message_handler.route_handler.handle_channel_announcement(&msg));
-
-                                                                                               if should_forward {
-                                                                                                       // TODO: forward msg along to all our other peers!
-                                                                                               }
-                                                                                       },
-                                                                                       wire::Message::NodeAnnouncement(msg) => {
-                                                                                               let should_forward = try_potential_handleerror!(self.message_handler.route_handler.handle_node_announcement(&msg));
-
-                                                                                               if should_forward {
-                                                                                                       // TODO: forward msg along to all our other peers!
-                                                                                               }
-                                                                                       },
-                                                                                       wire::Message::ChannelUpdate(msg) => {
-                                                                                               let should_forward = try_potential_handleerror!(self.message_handler.route_handler.handle_channel_update(&msg));
-
-                                                                                               if should_forward {
-                                                                                                       // TODO: forward msg along to all our other peers!
-                                                                                               }
-                                                                                       },
-
-                                                                                       // Unknown messages:
-                                                                                       wire::Message::Unknown(msg_type) if msg_type.is_even() => {
-                                                                                               log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", msg_type);
-                                                                                               // Fail the channel if message is an even, unknown type as per BOLT #1.
-                                                                                               return Err(PeerHandleError{ no_connection_possible: true });
-                                                                                       },
-                                                                                       wire::Message::Unknown(msg_type) => {
-                                                                                               log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", msg_type);
-                                                                                       },
+                                                                               if let Err(handling_error) = self.handle_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), message){
+                                                                                       match handling_error {
+                                                                                               MessageHandlingError::PeerHandleError(e) => { return Err(e) },
+                                                                                               MessageHandlingError::LightningError(e) => {
+                                                                                                       try_potential_handleerror!(Err(e));
+                                                                                               },
+                                                                                       }
                                                                                }
                                                                        }
                                                                }
@@ -805,6 +658,193 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                Ok(pause_read)
        }
 
+       /// Process an incoming message and return a decision (ok, lightning error, peer handling error) regarding the next action with the peer
+       fn handle_message(&self, peers_needing_send: &mut HashSet<Descriptor>, peer: &mut Peer, peer_descriptor: Descriptor, message: wire::Message) -> Result<(), MessageHandlingError> {
+               log_trace!(self.logger, "Received message of type {} from {}", message.type_id(), log_pubkey!(peer.their_node_id.unwrap()));
+
+               // Need an Init as first message
+               if let wire::Message::Init(_) = message {
+               } else if peer.their_features.is_none() {
+                       log_trace!(self.logger, "Peer {} sent non-Init first message", log_pubkey!(peer.their_node_id.unwrap()));
+                       return Err(PeerHandleError{ no_connection_possible: false }.into());
+               }
+
+               match message {
+                       // Setup and Control messages:
+                       wire::Message::Init(msg) => {
+                               if msg.features.requires_unknown_bits() {
+                                       log_info!(self.logger, "Peer global features required unknown version bits");
+                                       return Err(PeerHandleError{ no_connection_possible: true }.into());
+                               }
+                               if msg.features.requires_unknown_bits() {
+                                       log_info!(self.logger, "Peer local features required unknown version bits");
+                                       return Err(PeerHandleError{ no_connection_possible: true }.into());
+                               }
+                               if peer.their_features.is_some() {
+                                       return Err(PeerHandleError{ no_connection_possible: false }.into());
+                               }
+
+                               log_info!(
+                                       self.logger, "Received peer Init message: data_loss_protect: {}, initial_routing_sync: {}, upfront_shutdown_script: {}, static_remote_key: {}, unknown flags (local and global): {}",
+                                       if msg.features.supports_data_loss_protect() { "supported" } else { "not supported"},
+                                       if msg.features.initial_routing_sync() { "requested" } else { "not requested" },
+                                       if msg.features.supports_upfront_shutdown_script() { "supported" } else { "not supported"},
+                                       if msg.features.supports_static_remote_key() { "supported" } else { "not supported"},
+                                       if msg.features.supports_unknown_bits() { "present" } else { "none" }
+                               );
+
+                               if msg.features.initial_routing_sync() {
+                                       peer.sync_status = InitSyncTracker::ChannelsSyncing(0);
+                                       peers_needing_send.insert(peer_descriptor.clone());
+                               }
+                               if !msg.features.supports_static_remote_key() {
+                                       log_debug!(self.logger, "Peer {} does not support static remote key, disconnecting with no_connection_possible", log_pubkey!(peer.their_node_id.unwrap()));
+                                       return Err(PeerHandleError{ no_connection_possible: true }.into());
+                               }
+
+                               if !peer.outbound {
+                                       let mut features = InitFeatures::known();
+                                       if !self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) {
+                                               features.clear_initial_routing_sync();
+                                       }
+
+                                       let resp = msgs::Init { features };
+                                       self.enqueue_message(peers_needing_send, peer, peer_descriptor.clone(), &resp);
+                               }
+
+                               self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap(), &msg);
+                               peer.their_features = Some(msg.features);
+                       },
+                       wire::Message::Error(msg) => {
+                               let mut data_is_printable = true;
+                               for b in msg.data.bytes() {
+                                       if b < 32 || b > 126 {
+                                               data_is_printable = false;
+                                               break;
+                                       }
+                               }
+
+                               if data_is_printable {
+                                       log_debug!(self.logger, "Got Err message from {}: {}", log_pubkey!(peer.their_node_id.unwrap()), msg.data);
+                               } else {
+                                       log_debug!(self.logger, "Got Err message from {} with non-ASCII error message", log_pubkey!(peer.their_node_id.unwrap()));
+                               }
+                               self.message_handler.chan_handler.handle_error(&peer.their_node_id.unwrap(), &msg);
+                               if msg.channel_id == [0; 32] {
+                                       return Err(PeerHandleError{ no_connection_possible: true }.into());
+                               }
+                       },
+
+                       wire::Message::Ping(msg) => {
+                               if msg.ponglen < 65532 {
+                                       let resp = msgs::Pong { byteslen: msg.ponglen };
+                                       self.enqueue_message(peers_needing_send, peer, peer_descriptor.clone(), &resp);
+                               }
+                       },
+                       wire::Message::Pong(_msg) => {
+                               peer.awaiting_pong = false;
+                       },
+
+                       // Channel messages:
+                       wire::Message::OpenChannel(msg) => {
+                               self.message_handler.chan_handler.handle_open_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg);
+                       },
+                       wire::Message::AcceptChannel(msg) => {
+                               self.message_handler.chan_handler.handle_accept_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg);
+                       },
+
+                       wire::Message::FundingCreated(msg) => {
+                               self.message_handler.chan_handler.handle_funding_created(&peer.their_node_id.unwrap(), &msg);
+                       },
+                       wire::Message::FundingSigned(msg) => {
+                               self.message_handler.chan_handler.handle_funding_signed(&peer.their_node_id.unwrap(), &msg);
+                       },
+                       wire::Message::FundingLocked(msg) => {
+                               self.message_handler.chan_handler.handle_funding_locked(&peer.their_node_id.unwrap(), &msg);
+                       },
+
+                       wire::Message::Shutdown(msg) => {
+                               self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg);
+                       },
+                       wire::Message::ClosingSigned(msg) => {
+                               self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg);
+                       },
+
+                       // Commitment messages:
+                       wire::Message::UpdateAddHTLC(msg) => {
+                               self.message_handler.chan_handler.handle_update_add_htlc(&peer.their_node_id.unwrap(), &msg);
+                       },
+                       wire::Message::UpdateFulfillHTLC(msg) => {
+                               self.message_handler.chan_handler.handle_update_fulfill_htlc(&peer.their_node_id.unwrap(), &msg);
+                       },
+                       wire::Message::UpdateFailHTLC(msg) => {
+                               self.message_handler.chan_handler.handle_update_fail_htlc(&peer.their_node_id.unwrap(), &msg);
+                       },
+                       wire::Message::UpdateFailMalformedHTLC(msg) => {
+                               self.message_handler.chan_handler.handle_update_fail_malformed_htlc(&peer.their_node_id.unwrap(), &msg);
+                       },
+
+                       wire::Message::CommitmentSigned(msg) => {
+                               self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg);
+                       },
+                       wire::Message::RevokeAndACK(msg) => {
+                               self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg);
+                       },
+                       wire::Message::UpdateFee(msg) => {
+                               self.message_handler.chan_handler.handle_update_fee(&peer.their_node_id.unwrap(), &msg);
+                       },
+                       wire::Message::ChannelReestablish(msg) => {
+                               self.message_handler.chan_handler.handle_channel_reestablish(&peer.their_node_id.unwrap(), &msg);
+                       },
+
+                       // Routing messages:
+                       wire::Message::AnnouncementSignatures(msg) => {
+                               self.message_handler.chan_handler.handle_announcement_signatures(&peer.their_node_id.unwrap(), &msg);
+                       },
+                       wire::Message::ChannelAnnouncement(msg) => {
+                               let should_forward = match self.message_handler.route_handler.handle_channel_announcement(&msg) {
+                                       Ok(v) => v,
+                                       Err(e) => { return Err(e.into()); },
+                               };
+
+                               if should_forward {
+                                       // TODO: forward msg along to all our other peers!
+                               }
+                       },
+                       wire::Message::NodeAnnouncement(msg) => {
+                               let should_forward = match self.message_handler.route_handler.handle_node_announcement(&msg) {
+                                       Ok(v) => v,
+                                       Err(e) => { return Err(e.into()); },
+                               };
+
+                               if should_forward {
+                                       // TODO: forward msg along to all our other peers!
+                               }
+                       },
+                       wire::Message::ChannelUpdate(msg) => {
+                               let should_forward = match self.message_handler.route_handler.handle_channel_update(&msg) {
+                                       Ok(v) => v,
+                                       Err(e) => { return Err(e.into()); },
+                               };
+
+                               if should_forward {
+                                       // TODO: forward msg along to all our other peers!
+                               }
+                       },
+
+                       // Unknown messages:
+                       wire::Message::Unknown(msg_type) if msg_type.is_even() => {
+                               log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", msg_type);
+                               // Fail the channel if message is an even, unknown type as per BOLT #1.
+                               return Err(PeerHandleError{ no_connection_possible: true }.into());
+                       },
+                       wire::Message::Unknown(msg_type) => {
+                               log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", msg_type);
+                       }
+               };
+               Ok(())
+       }
+
        /// Checks for any events generated by our handlers and processes them. Includes sending most
        /// response messages as well as messages generated by calls to handler functions directly (eg
        /// functions like ChannelManager::process_pending_htlc_forward or send_payment).