use ln::features::InitFeatures;
use ln::msgs;
-use ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
+use ln::msgs::{ChannelMessageHandler, LightningError, RoutingMessageHandler};
use ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager};
use util::ser::{VecWriter, Writeable};
use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep};
logger: L,
}
+enum MessageHandlingError {
+ PeerHandleError(PeerHandleError),
+ LightningError(LightningError),
+}
+
+impl From<PeerHandleError> for MessageHandlingError {
+ fn from(error: PeerHandleError) -> Self {
+ MessageHandlingError::PeerHandleError(error)
+ }
+}
+
+impl From<LightningError> for MessageHandlingError {
+ fn from(error: LightningError) -> Self {
+ MessageHandlingError::LightningError(error)
+ }
+}
+
macro_rules! encode_msg {
($msg: expr) => {{
let mut buffer = VecWriter(Vec::new());
}
};
- log_trace!(self.logger, "Received message of type {} from {}", message.type_id(), log_pubkey!(peer.their_node_id.unwrap()));
-
- // Need an Init as first message
- if let wire::Message::Init(_) = message {
- } else if peer.their_features.is_none() {
- log_trace!(self.logger, "Peer {} sent non-Init first message", log_pubkey!(peer.their_node_id.unwrap()));
- return Err(PeerHandleError{ no_connection_possible: false });
- }
-
- match message {
- // Setup and Control messages:
- wire::Message::Init(msg) => {
- if msg.features.requires_unknown_bits() {
- log_info!(self.logger, "Peer global features required unknown version bits");
- return Err(PeerHandleError{ no_connection_possible: true });
- }
- if msg.features.requires_unknown_bits() {
- log_info!(self.logger, "Peer local features required unknown version bits");
- return Err(PeerHandleError{ no_connection_possible: true });
- }
- if peer.their_features.is_some() {
- return Err(PeerHandleError{ no_connection_possible: false });
- }
-
- log_info!(self.logger, "Received peer Init message: data_loss_protect: {}, initial_routing_sync: {}, upfront_shutdown_script: {}, static_remote_key: {}, unkown local flags: {}, unknown global flags: {}",
- if msg.features.supports_data_loss_protect() { "supported" } else { "not supported"},
- if msg.features.initial_routing_sync() { "requested" } else { "not requested" },
- if msg.features.supports_upfront_shutdown_script() { "supported" } else { "not supported"},
- if msg.features.supports_static_remote_key() { "supported" } else { "not supported"},
- if msg.features.supports_unknown_bits() { "present" } else { "none" },
- if msg.features.supports_unknown_bits() { "present" } else { "none" });
-
- if msg.features.initial_routing_sync() {
- peer.sync_status = InitSyncTracker::ChannelsSyncing(0);
- peers.peers_needing_send.insert(peer_descriptor.clone());
- }
- if !msg.features.supports_static_remote_key() {
- log_debug!(self.logger, "Peer {} does not support static remote key, disconnecting with no_connection_possible", log_pubkey!(peer.their_node_id.unwrap()));
- return Err(PeerHandleError{ no_connection_possible: true });
- }
-
- if !peer.outbound {
- let mut features = InitFeatures::known();
- if !self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) {
- features.clear_initial_routing_sync();
- }
-
- let resp = msgs::Init { features };
- self.enqueue_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), &resp);
- }
-
- self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap(), &msg);
- peer.their_features = Some(msg.features);
- },
- wire::Message::Error(msg) => {
- let mut data_is_printable = true;
- for b in msg.data.bytes() {
- if b < 32 || b > 126 {
- data_is_printable = false;
- break;
- }
- }
-
- if data_is_printable {
- log_debug!(self.logger, "Got Err message from {}: {}", log_pubkey!(peer.their_node_id.unwrap()), msg.data);
- } else {
- log_debug!(self.logger, "Got Err message from {} with non-ASCII error message", log_pubkey!(peer.their_node_id.unwrap()));
- }
- self.message_handler.chan_handler.handle_error(&peer.their_node_id.unwrap(), &msg);
- if msg.channel_id == [0; 32] {
- return Err(PeerHandleError{ no_connection_possible: true });
- }
- },
-
- wire::Message::Ping(msg) => {
- if msg.ponglen < 65532 {
- let resp = msgs::Pong { byteslen: msg.ponglen };
- self.enqueue_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), &resp);
- }
- },
- wire::Message::Pong(_msg) => {
- peer.awaiting_pong = false;
- },
-
- // Channel messages:
- wire::Message::OpenChannel(msg) => {
- self.message_handler.chan_handler.handle_open_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg);
- },
- wire::Message::AcceptChannel(msg) => {
- self.message_handler.chan_handler.handle_accept_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg);
- },
-
- wire::Message::FundingCreated(msg) => {
- self.message_handler.chan_handler.handle_funding_created(&peer.their_node_id.unwrap(), &msg);
- },
- wire::Message::FundingSigned(msg) => {
- self.message_handler.chan_handler.handle_funding_signed(&peer.their_node_id.unwrap(), &msg);
- },
- wire::Message::FundingLocked(msg) => {
- self.message_handler.chan_handler.handle_funding_locked(&peer.their_node_id.unwrap(), &msg);
- },
-
- wire::Message::Shutdown(msg) => {
- self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg);
- },
- wire::Message::ClosingSigned(msg) => {
- self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg);
- },
-
- // Commitment messages:
- wire::Message::UpdateAddHTLC(msg) => {
- self.message_handler.chan_handler.handle_update_add_htlc(&peer.their_node_id.unwrap(), &msg);
- },
- wire::Message::UpdateFulfillHTLC(msg) => {
- self.message_handler.chan_handler.handle_update_fulfill_htlc(&peer.their_node_id.unwrap(), &msg);
- },
- wire::Message::UpdateFailHTLC(msg) => {
- self.message_handler.chan_handler.handle_update_fail_htlc(&peer.their_node_id.unwrap(), &msg);
- },
- wire::Message::UpdateFailMalformedHTLC(msg) => {
- self.message_handler.chan_handler.handle_update_fail_malformed_htlc(&peer.their_node_id.unwrap(), &msg);
- },
-
- wire::Message::CommitmentSigned(msg) => {
- self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg);
- },
- wire::Message::RevokeAndACK(msg) => {
- self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg);
- },
- wire::Message::UpdateFee(msg) => {
- self.message_handler.chan_handler.handle_update_fee(&peer.their_node_id.unwrap(), &msg);
- },
- wire::Message::ChannelReestablish(msg) => {
- self.message_handler.chan_handler.handle_channel_reestablish(&peer.their_node_id.unwrap(), &msg);
- },
-
- // Routing messages:
- wire::Message::AnnouncementSignatures(msg) => {
- self.message_handler.chan_handler.handle_announcement_signatures(&peer.their_node_id.unwrap(), &msg);
- },
- wire::Message::ChannelAnnouncement(msg) => {
- let should_forward = try_potential_handleerror!(self.message_handler.route_handler.handle_channel_announcement(&msg));
-
- if should_forward {
- // TODO: forward msg along to all our other peers!
- }
- },
- wire::Message::NodeAnnouncement(msg) => {
- let should_forward = try_potential_handleerror!(self.message_handler.route_handler.handle_node_announcement(&msg));
-
- if should_forward {
- // TODO: forward msg along to all our other peers!
- }
- },
- wire::Message::ChannelUpdate(msg) => {
- let should_forward = try_potential_handleerror!(self.message_handler.route_handler.handle_channel_update(&msg));
-
- if should_forward {
- // TODO: forward msg along to all our other peers!
- }
- },
-
- // Unknown messages:
- wire::Message::Unknown(msg_type) if msg_type.is_even() => {
- log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", msg_type);
- // Fail the channel if message is an even, unknown type as per BOLT #1.
- return Err(PeerHandleError{ no_connection_possible: true });
- },
- wire::Message::Unknown(msg_type) => {
- log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", msg_type);
- },
+ if let Err(handling_error) = self.handle_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), message){
+ match handling_error {
+ MessageHandlingError::PeerHandleError(e) => { return Err(e) },
+ MessageHandlingError::LightningError(e) => {
+ try_potential_handleerror!(Err(e));
+ },
+ }
}
}
}
Ok(pause_read)
}
+ /// Process an incoming message and return a decision (ok, lightning error, peer handling error) regarding the next action with the peer
+ fn handle_message(&self, peers_needing_send: &mut HashSet<Descriptor>, peer: &mut Peer, peer_descriptor: Descriptor, message: wire::Message) -> Result<(), MessageHandlingError> {
+ log_trace!(self.logger, "Received message of type {} from {}", message.type_id(), log_pubkey!(peer.their_node_id.unwrap()));
+
+ // Need an Init as first message
+ if let wire::Message::Init(_) = message {
+ } else if peer.their_features.is_none() {
+ log_trace!(self.logger, "Peer {} sent non-Init first message", log_pubkey!(peer.their_node_id.unwrap()));
+ return Err(PeerHandleError{ no_connection_possible: false }.into());
+ }
+
+ match message {
+ // Setup and Control messages:
+ wire::Message::Init(msg) => {
+ if msg.features.requires_unknown_bits() {
+ log_info!(self.logger, "Peer global features required unknown version bits");
+ return Err(PeerHandleError{ no_connection_possible: true }.into());
+ }
+ if msg.features.requires_unknown_bits() {
+ log_info!(self.logger, "Peer local features required unknown version bits");
+ return Err(PeerHandleError{ no_connection_possible: true }.into());
+ }
+ if peer.their_features.is_some() {
+ return Err(PeerHandleError{ no_connection_possible: false }.into());
+ }
+
+ log_info!(
+ self.logger, "Received peer Init message: data_loss_protect: {}, initial_routing_sync: {}, upfront_shutdown_script: {}, static_remote_key: {}, unknown flags (local and global): {}",
+ if msg.features.supports_data_loss_protect() { "supported" } else { "not supported"},
+ if msg.features.initial_routing_sync() { "requested" } else { "not requested" },
+ if msg.features.supports_upfront_shutdown_script() { "supported" } else { "not supported"},
+ if msg.features.supports_static_remote_key() { "supported" } else { "not supported"},
+ if msg.features.supports_unknown_bits() { "present" } else { "none" }
+ );
+
+ if msg.features.initial_routing_sync() {
+ peer.sync_status = InitSyncTracker::ChannelsSyncing(0);
+ peers_needing_send.insert(peer_descriptor.clone());
+ }
+ if !msg.features.supports_static_remote_key() {
+ log_debug!(self.logger, "Peer {} does not support static remote key, disconnecting with no_connection_possible", log_pubkey!(peer.their_node_id.unwrap()));
+ return Err(PeerHandleError{ no_connection_possible: true }.into());
+ }
+
+ if !peer.outbound {
+ let mut features = InitFeatures::known();
+ if !self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) {
+ features.clear_initial_routing_sync();
+ }
+
+ let resp = msgs::Init { features };
+ self.enqueue_message(peers_needing_send, peer, peer_descriptor.clone(), &resp);
+ }
+
+ self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap(), &msg);
+ peer.their_features = Some(msg.features);
+ },
+ wire::Message::Error(msg) => {
+ let mut data_is_printable = true;
+ for b in msg.data.bytes() {
+ if b < 32 || b > 126 {
+ data_is_printable = false;
+ break;
+ }
+ }
+
+ if data_is_printable {
+ log_debug!(self.logger, "Got Err message from {}: {}", log_pubkey!(peer.their_node_id.unwrap()), msg.data);
+ } else {
+ log_debug!(self.logger, "Got Err message from {} with non-ASCII error message", log_pubkey!(peer.their_node_id.unwrap()));
+ }
+ self.message_handler.chan_handler.handle_error(&peer.their_node_id.unwrap(), &msg);
+ if msg.channel_id == [0; 32] {
+ return Err(PeerHandleError{ no_connection_possible: true }.into());
+ }
+ },
+
+ wire::Message::Ping(msg) => {
+ if msg.ponglen < 65532 {
+ let resp = msgs::Pong { byteslen: msg.ponglen };
+ self.enqueue_message(peers_needing_send, peer, peer_descriptor.clone(), &resp);
+ }
+ },
+ wire::Message::Pong(_msg) => {
+ peer.awaiting_pong = false;
+ },
+
+ // Channel messages:
+ wire::Message::OpenChannel(msg) => {
+ self.message_handler.chan_handler.handle_open_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg);
+ },
+ wire::Message::AcceptChannel(msg) => {
+ self.message_handler.chan_handler.handle_accept_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg);
+ },
+
+ wire::Message::FundingCreated(msg) => {
+ self.message_handler.chan_handler.handle_funding_created(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::FundingSigned(msg) => {
+ self.message_handler.chan_handler.handle_funding_signed(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::FundingLocked(msg) => {
+ self.message_handler.chan_handler.handle_funding_locked(&peer.their_node_id.unwrap(), &msg);
+ },
+
+ wire::Message::Shutdown(msg) => {
+ self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::ClosingSigned(msg) => {
+ self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg);
+ },
+
+ // Commitment messages:
+ wire::Message::UpdateAddHTLC(msg) => {
+ self.message_handler.chan_handler.handle_update_add_htlc(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::UpdateFulfillHTLC(msg) => {
+ self.message_handler.chan_handler.handle_update_fulfill_htlc(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::UpdateFailHTLC(msg) => {
+ self.message_handler.chan_handler.handle_update_fail_htlc(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::UpdateFailMalformedHTLC(msg) => {
+ self.message_handler.chan_handler.handle_update_fail_malformed_htlc(&peer.their_node_id.unwrap(), &msg);
+ },
+
+ wire::Message::CommitmentSigned(msg) => {
+ self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::RevokeAndACK(msg) => {
+ self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::UpdateFee(msg) => {
+ self.message_handler.chan_handler.handle_update_fee(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::ChannelReestablish(msg) => {
+ self.message_handler.chan_handler.handle_channel_reestablish(&peer.their_node_id.unwrap(), &msg);
+ },
+
+ // Routing messages:
+ wire::Message::AnnouncementSignatures(msg) => {
+ self.message_handler.chan_handler.handle_announcement_signatures(&peer.their_node_id.unwrap(), &msg);
+ },
+ wire::Message::ChannelAnnouncement(msg) => {
+ let should_forward = match self.message_handler.route_handler.handle_channel_announcement(&msg) {
+ Ok(v) => v,
+ Err(e) => { return Err(e.into()); },
+ };
+
+ if should_forward {
+ // TODO: forward msg along to all our other peers!
+ }
+ },
+ wire::Message::NodeAnnouncement(msg) => {
+ let should_forward = match self.message_handler.route_handler.handle_node_announcement(&msg) {
+ Ok(v) => v,
+ Err(e) => { return Err(e.into()); },
+ };
+
+ if should_forward {
+ // TODO: forward msg along to all our other peers!
+ }
+ },
+ wire::Message::ChannelUpdate(msg) => {
+ let should_forward = match self.message_handler.route_handler.handle_channel_update(&msg) {
+ Ok(v) => v,
+ Err(e) => { return Err(e.into()); },
+ };
+
+ if should_forward {
+ // TODO: forward msg along to all our other peers!
+ }
+ },
+
+ // Unknown messages:
+ wire::Message::Unknown(msg_type) if msg_type.is_even() => {
+ log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", msg_type);
+ // Fail the channel if message is an even, unknown type as per BOLT #1.
+ return Err(PeerHandleError{ no_connection_possible: true }.into());
+ },
+ wire::Message::Unknown(msg_type) => {
+ log_trace!(self.logger, "Received unknown odd message of type {}, ignoring", msg_type);
+ }
+ };
+ Ok(())
+ }
+
/// Checks for any events generated by our handlers and processes them. Includes sending most
/// response messages as well as messages generated by calls to handler functions directly (eg
/// functions like ChannelManager::process_pending_htlc_forward or send_payment).