Encapsulate message wire encoding into a module
authorJeffrey Czyz <jkczyz@gmail.com>
Tue, 21 Jan 2020 23:26:21 +0000 (15:26 -0800)
committerJeffrey Czyz <jkczyz@gmail.com>
Wed, 5 Feb 2020 20:11:48 +0000 (12:11 -0800)
Lightning messages are identified by a 2-byte type when encoded on the
wire. Rather than expecting callers to know message types when sending
messages to peers, have each message implement a trait defining the
message type. Provide an interface for reading and writing messages
as well as a Message enum for matching the decoded message, including
unknown messages.

lightning/src/ln/mod.rs
lightning/src/ln/peer_handler.rs
lightning/src/ln/wire.rs [new file with mode: 0644]

index 93043111eef27c92d1939bd13227a253f4f20dc8..9a2a90fd4fd6e9444bf3a5cbcaa8f68b97be9407 100644 (file)
@@ -24,6 +24,7 @@ pub(crate) mod peer_channel_encryptor;
 
 mod channel;
 mod onion_utils;
+mod wire;
 
 #[cfg(test)]
 #[macro_use] mod functional_test_utils;
index 5838b782f4d4124a906d07f7d8e4c0cfad65eb8d..30e208155c5699596ee92988b37790f10ff904a2 100644 (file)
@@ -12,11 +12,13 @@ use ln::features::InitFeatures;
 use ln::msgs;
 use ln::msgs::ChannelMessageHandler;
 use ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager};
-use util::ser::{Writeable, Writer, Readable};
 use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep};
+use ln::wire;
+use ln::wire::Encode;
 use util::byte_utils;
 use util::events::{MessageSendEvent, MessageSendEventsProvider};
 use util::logger::Logger;
+use util::ser::Writer;
 
 use std::collections::{HashMap,hash_map,HashSet,LinkedList};
 use std::sync::{Arc, Mutex};
@@ -203,11 +205,10 @@ impl Writer for VecWriter {
 }
 
 macro_rules! encode_msg {
-       ($msg: expr, $msg_code: expr) => {{
-               let mut msg = VecWriter(Vec::new());
-               ($msg_code as u16).write(&mut msg).unwrap();
-               $msg.write(&mut msg).unwrap();
-               msg.0
+       ($msg: expr) => {{
+               let mut buffer = VecWriter(Vec::new());
+               wire::write($msg, &mut buffer).unwrap();
+               buffer.0
        }}
 }
 
@@ -344,10 +345,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
 
        fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
                macro_rules! encode_and_send_msg {
-                       ($msg: expr, $msg_code: expr) => {
+                       ($msg: expr) => {
                                {
-                                       log_trace!(self, "Encoding and sending sync update message of type {} to {}", $msg_code, log_pubkey!(peer.their_node_id.unwrap()));
-                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg, $msg_code)[..]));
+                                       log_trace!(self, "Encoding and sending sync update message of type {} to {}", $msg.type_id(), log_pubkey!(peer.their_node_id.unwrap()));
+                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg)[..]));
                                }
                        }
                }
@@ -360,9 +361,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                                                let steps = ((MSG_BUFF_SIZE - peer.pending_outbound_buffer.len() + 2) / 3) as u8;
                                                let all_messages = self.message_handler.route_handler.get_next_channel_announcements(0, steps);
                                                for &(ref announce, ref update_a, ref update_b) in all_messages.iter() {
-                                                       encode_and_send_msg!(announce, 256);
-                                                       encode_and_send_msg!(update_a, 258);
-                                                       encode_and_send_msg!(update_b, 258);
+                                                       encode_and_send_msg!(announce);
+                                                       encode_and_send_msg!(update_a);
+                                                       encode_and_send_msg!(update_b);
                                                        peer.sync_status = InitSyncTracker::ChannelsSyncing(announce.contents.short_channel_id + 1);
                                                }
                                                if all_messages.is_empty() || all_messages.len() != steps as usize {
@@ -373,7 +374,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                                                let steps = (MSG_BUFF_SIZE - peer.pending_outbound_buffer.len()) as u8;
                                                let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps);
                                                for msg in all_messages.iter() {
-                                                       encode_and_send_msg!(msg, 256);
+                                                       encode_and_send_msg!(msg);
                                                        peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
                                                }
                                                if all_messages.is_empty() || all_messages.len() != steps as usize {
@@ -385,7 +386,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                                                let steps = (MSG_BUFF_SIZE - peer.pending_outbound_buffer.len()) as u8;
                                                let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps);
                                                for msg in all_messages.iter() {
-                                                       encode_and_send_msg!(msg, 256);
+                                                       encode_and_send_msg!(msg);
                                                        peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id);
                                                }
                                                if all_messages.is_empty() || all_messages.len() != steps as usize {
@@ -481,10 +482,10 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                                                        peer.pending_read_buffer_pos = 0;
 
                                                        macro_rules! encode_and_send_msg {
-                                                               ($msg: expr, $msg_code: expr) => {
+                                                               ($msg: expr) => {
                                                                        {
-                                                                               log_trace!(self, "Encoding and sending message of type {} to {}", $msg_code, log_pubkey!(peer.their_node_id.unwrap()));
-                                                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg, $msg_code)[..]));
+                                                                               log_trace!(self, "Encoding and sending message of type {} to {}", $msg.type_id(), log_pubkey!(peer.their_node_id.unwrap()));
+                                                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(&$msg)[..]));
                                                                                peers.peers_needing_send.insert(peer_descriptor.clone());
                                                                        }
                                                                }
@@ -507,7 +508,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                                                                                                },
                                                                                                msgs::ErrorAction::SendErrorMessage { msg } => {
                                                                                                        log_trace!(self, "Got Err handling message, sending Error message because {}", e.err);
-                                                                                                       encode_and_send_msg!(msg, 17);
+                                                                                                       encode_and_send_msg!(msg);
                                                                                                        continue;
                                                                                                },
                                                                                        }
@@ -583,9 +584,9 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                                                                                self.initial_syncs_sent.fetch_add(1, Ordering::AcqRel);
                                                                                features.set_initial_routing_sync();
                                                                        }
-                                                                       encode_and_send_msg!(msgs::Init {
-                                                                               features,
-                                                                       }, 16);
+
+                                                                       let resp = msgs::Init { features };
+                                                                       encode_and_send_msg!(resp);
                                                                },
                                                                NextNoiseStep::ActThree => {
                                                                        let their_node_id = try_potential_handleerror!(peer.channel_encryptor.process_act_three(&peer.pending_read_buffer[..]));
@@ -611,18 +612,20 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                                                                                peer.pending_read_buffer = [0; 18].to_vec();
                                                                                peer.pending_read_is_header = true;
 
-                                                                               let msg_type = byte_utils::slice_to_be16(&msg_data[0..2]);
-                                                                               log_trace!(self, "Received message of type {} from {}", msg_type, log_pubkey!(peer.their_node_id.unwrap()));
-                                                                               if msg_type != 16 && peer.their_features.is_none() {
-                                                                                       // Need an init message as first message
+                                                                               let mut reader = ::std::io::Cursor::new(&msg_data[..]);
+                                                                               let message = try_potential_decodeerror!(wire::read(&mut reader));
+                                                                               log_trace!(self, "Received message of type {} from {}", message.type_id(), log_pubkey!(peer.their_node_id.unwrap()));
+
+                                                                               // Need an Init as first message
+                                                                               if let wire::Message::Init(_) = message {
+                                                                               } else if peer.their_features.is_none() {
                                                                                        log_trace!(self, "Peer {} sent non-Init first message", log_pubkey!(peer.their_node_id.unwrap()));
                                                                                        return Err(PeerHandleError{ no_connection_possible: false });
                                                                                }
-                                                                               let mut reader = ::std::io::Cursor::new(&msg_data[2..]);
-                                                                               match msg_type {
-                                                                                       // Connection control:
-                                                                                       16 => {
-                                                                                               let msg = try_potential_decodeerror!(msgs::Init::read(&mut reader));
+
+                                                                               match message {
+                                                                                       // Setup and Control messages:
+                                                                                       wire::Message::Init(msg) => {
                                                                                                if msg.features.requires_unknown_bits() {
                                                                                                        log_info!(self, "Peer global features required unknown version bits");
                                                                                                        return Err(PeerHandleError{ no_connection_possible: true });
@@ -654,16 +657,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                                                                                                                features.set_initial_routing_sync();
                                                                                                        }
 
-                                                                                                       encode_and_send_msg!(msgs::Init {
-                                                                                                               features,
-                                                                                                       }, 16);
+                                                                                                       let resp = msgs::Init { features };
+                                                                                                       encode_and_send_msg!(resp);
                                                                                                }
 
                                                                                                self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap(), &msg);
                                                                                                peer.their_features = Some(msg.features);
                                                                                        },
-                                                                                       17 => {
-                                                                                               let msg = try_potential_decodeerror!(msgs::ErrorMessage::read(&mut reader));
+                                                                                       wire::Message::Error(msg) => {
                                                                                                let mut data_is_printable = true;
                                                                                                for b in msg.data.bytes() {
                                                                                                        if b < 32 || b > 126 {
@@ -683,113 +684,97 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                                                                                                }
                                                                                        },
 
-                                                                                       18 => {
-                                                                                               let msg = try_potential_decodeerror!(msgs::Ping::read(&mut reader));
+                                                                                       wire::Message::Ping(msg) => {
                                                                                                if msg.ponglen < 65532 {
                                                                                                        let resp = msgs::Pong { byteslen: msg.ponglen };
-                                                                                                       encode_and_send_msg!(resp, 19);
+                                                                                                       encode_and_send_msg!(resp);
                                                                                                }
                                                                                        },
-                                                                                       19 => {
+                                                                                       wire::Message::Pong(_msg) => {
                                                                                                peer.awaiting_pong = false;
-                                                                                               try_potential_decodeerror!(msgs::Pong::read(&mut reader));
                                                                                        },
-                                                                                       // Channel control:
-                                                                                       32 => {
-                                                                                               let msg = try_potential_decodeerror!(msgs::OpenChannel::read(&mut reader));
+
+                                                                                       // Channel messages:
+                                                                                       wire::Message::OpenChannel(msg) => {
                                                                                                self.message_handler.chan_handler.handle_open_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg);
                                                                                        },
-                                                                                       33 => {
-                                                                                               let msg = try_potential_decodeerror!(msgs::AcceptChannel::read(&mut reader));
+                                                                                       wire::Message::AcceptChannel(msg) => {
                                                                                                self.message_handler.chan_handler.handle_accept_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg);
                                                                                        },
 
-                                                                                       34 => {
-                                                                                               let msg = try_potential_decodeerror!(msgs::FundingCreated::read(&mut reader));
+                                                                                       wire::Message::FundingCreated(msg) => {
                                                                                                self.message_handler.chan_handler.handle_funding_created(&peer.their_node_id.unwrap(), &msg);
                                                                                        },
-                                                                                       35 => {
-                                                                                               let msg = try_potential_decodeerror!(msgs::FundingSigned::read(&mut reader));
+                                                                                       wire::Message::FundingSigned(msg) => {
                                                                                                self.message_handler.chan_handler.handle_funding_signed(&peer.their_node_id.unwrap(), &msg);
                                                                                        },
-                                                                                       36 => {
-                                                                                               let msg = try_potential_decodeerror!(msgs::FundingLocked::read(&mut reader));
+                                                                                       wire::Message::FundingLocked(msg) => {
                                                                                                self.message_handler.chan_handler.handle_funding_locked(&peer.their_node_id.unwrap(), &msg);
                                                                                        },
 
-                                                                                       38 => {
-                                                                                               let msg = try_potential_decodeerror!(msgs::Shutdown::read(&mut reader));
+                                                                                       wire::Message::Shutdown(msg) => {
                                                                                                self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg);
                                                                                        },
-                                                                                       39 => {
-                                                                                               let msg = try_potential_decodeerror!(msgs::ClosingSigned::read(&mut reader));
+                                                                                       wire::Message::ClosingSigned(msg) => {
                                                                                                self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg);
                                                                                        },
 
-                                                                                       128 => {
-                                                                                               let msg = try_potential_decodeerror!(msgs::UpdateAddHTLC::read(&mut reader));
+                                                                                       // Commitment messages:
+                                                                                       wire::Message::UpdateAddHTLC(msg) => {
                                                                                                self.message_handler.chan_handler.handle_update_add_htlc(&peer.their_node_id.unwrap(), &msg);
                                                                                        },
-                                                                                       130 => {
-                                                                                               let msg = try_potential_decodeerror!(msgs::UpdateFulfillHTLC::read(&mut reader));
+                                                                                       wire::Message::UpdateFulfillHTLC(msg) => {
                                                                                                self.message_handler.chan_handler.handle_update_fulfill_htlc(&peer.their_node_id.unwrap(), &msg);
                                                                                        },
-                                                                                       131 => {
-                                                                                               let msg = try_potential_decodeerror!(msgs::UpdateFailHTLC::read(&mut reader));
+                                                                                       wire::Message::UpdateFailHTLC(msg) => {
                                                                                                self.message_handler.chan_handler.handle_update_fail_htlc(&peer.their_node_id.unwrap(), &msg);
                                                                                        },
-                                                                                       135 => {
-                                                                                               let msg = try_potential_decodeerror!(msgs::UpdateFailMalformedHTLC::read(&mut reader));
+                                                                                       wire::Message::UpdateFailMalformedHTLC(msg) => {
                                                                                                self.message_handler.chan_handler.handle_update_fail_malformed_htlc(&peer.their_node_id.unwrap(), &msg);
                                                                                        },
 
-                                                                                       132 => {
-                                                                                               let msg = try_potential_decodeerror!(msgs::CommitmentSigned::read(&mut reader));
+                                                                                       wire::Message::CommitmentSigned(msg) => {
                                                                                                self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg);
                                                                                        },
-                                                                                       133 => {
-                                                                                               let msg = try_potential_decodeerror!(msgs::RevokeAndACK::read(&mut reader));
+                                                                                       wire::Message::RevokeAndACK(msg) => {
                                                                                                self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg);
                                                                                        },
-                                                                                       134 => {
-                                                                                               let msg = try_potential_decodeerror!(msgs::UpdateFee::read(&mut reader));
+                                                                                       wire::Message::UpdateFee(msg) => {
                                                                                                self.message_handler.chan_handler.handle_update_fee(&peer.their_node_id.unwrap(), &msg);
                                                                                        },
-                                                                                       136 => {
-                                                                                               let msg = try_potential_decodeerror!(msgs::ChannelReestablish::read(&mut reader));
+                                                                                       wire::Message::ChannelReestablish(msg) => {
                                                                                                self.message_handler.chan_handler.handle_channel_reestablish(&peer.their_node_id.unwrap(), &msg);
                                                                                        },
 
-                                                                                       // Routing control:
-                                                                                       259 => {
-                                                                                               let msg = try_potential_decodeerror!(msgs::AnnouncementSignatures::read(&mut reader));
+                                                                                       // Routing messages:
+                                                                                       wire::Message::AnnouncementSignatures(msg) => {
                                                                                                self.message_handler.chan_handler.handle_announcement_signatures(&peer.their_node_id.unwrap(), &msg);
                                                                                        },
-                                                                                       256 => {
-                                                                                               let msg = try_potential_decodeerror!(msgs::ChannelAnnouncement::read(&mut reader));
+                                                                                       wire::Message::ChannelAnnouncement(msg) => {
                                                                                                let should_forward = try_potential_handleerror!(self.message_handler.route_handler.handle_channel_announcement(&msg));
 
                                                                                                if should_forward {
                                                                                                        // TODO: forward msg along to all our other peers!
                                                                                                }
                                                                                        },
-                                                                                       257 => {
-                                                                                               let msg = try_potential_decodeerror!(msgs::NodeAnnouncement::read(&mut reader));
+                                                                                       wire::Message::NodeAnnouncement(msg) => {
                                                                                                let should_forward = try_potential_handleerror!(self.message_handler.route_handler.handle_node_announcement(&msg));
 
                                                                                                if should_forward {
                                                                                                        // TODO: forward msg along to all our other peers!
                                                                                                }
                                                                                        },
-                                                                                       258 => {
-                                                                                               let msg = try_potential_decodeerror!(msgs::ChannelUpdate::read(&mut reader));
+                                                                                       wire::Message::ChannelUpdate(msg) => {
                                                                                                let should_forward = try_potential_handleerror!(self.message_handler.route_handler.handle_channel_update(&msg));
 
                                                                                                if should_forward {
                                                                                                        // TODO: forward msg along to all our other peers!
                                                                                                }
                                                                                        },
-                                                                                       _ => {
+
+                                                                                       // Unknown messages:
+                                                                                       wire::Message::Unknown(msg_type) => {
+                                                                                               // Fail the channel if message is an even, unknown type as per BOLT #1.
                                                                                                if (msg_type & 1) == 0 {
                                                                                                        return Err(PeerHandleError{ no_connection_possible: true });
                                                                                                }
@@ -857,7 +842,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                                                let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
                                                                //TODO: Drop the pending channel? (or just let it timeout, but that sucks)
                                                        });
-                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 33)));
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => {
@@ -867,7 +852,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                                                let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
                                                                //TODO: Drop the pending channel? (or just let it timeout, but that sucks)
                                                        });
-                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 32)));
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => {
@@ -879,7 +864,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                                                                //TODO: generate a DiscardFunding event indicating to the wallet that
                                                                //they should just throw away this funding transaction
                                                        });
-                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 34)));
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => {
@@ -890,7 +875,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                                                                //TODO: generate a DiscardFunding event indicating to the wallet that
                                                                //they should just throw away this funding transaction
                                                        });
-                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 35)));
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendFundingLocked { ref node_id, ref msg } => {
@@ -900,7 +885,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                                                let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
                                                                //TODO: Do whatever we're gonna do for handling dropped messages
                                                        });
-                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 36)));
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => {
@@ -911,7 +896,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                                                                //TODO: generate a DiscardFunding event indicating to the wallet that
                                                                //they should just throw away this funding transaction
                                                        });
-                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 259)));
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => {
@@ -925,21 +910,21 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                                                                //TODO: Do whatever we're gonna do for handling dropped messages
                                                        });
                                                for msg in update_add_htlcs {
-                                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 128)));
+                                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                }
                                                for msg in update_fulfill_htlcs {
-                                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 130)));
+                                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                }
                                                for msg in update_fail_htlcs {
-                                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 131)));
+                                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                }
                                                for msg in update_fail_malformed_htlcs {
-                                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 135)));
+                                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                }
                                                if let &Some(ref msg) = update_fee {
-                                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 134)));
+                                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                }
-                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(commitment_signed, 132)));
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(commitment_signed)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => {
@@ -949,7 +934,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                                                let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
                                                                //TODO: Do whatever we're gonna do for handling dropped messages
                                                        });
-                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 133)));
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => {
@@ -959,7 +944,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                                                let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
                                                                //TODO: Do whatever we're gonna do for handling dropped messages
                                                        });
-                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 39)));
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendShutdown { ref node_id, ref msg } => {
@@ -969,7 +954,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                                                let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
                                                                //TODO: Do whatever we're gonna do for handling dropped messages
                                                        });
-                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 38)));
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => {
@@ -979,14 +964,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                                                let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
                                                                //TODO: Do whatever we're gonna do for handling dropped messages
                                                        });
-                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 136)));
+                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                self.do_attempt_write_data(&mut descriptor, peer);
                                        },
                                        MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => {
                                                log_trace!(self, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id);
                                                if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() {
-                                                       let encoded_msg = encode_msg!(msg, 256);
-                                                       let encoded_update_msg = encode_msg!(update_msg, 258);
+                                                       let encoded_msg = encode_msg!(msg);
+                                                       let encoded_update_msg = encode_msg!(update_msg);
 
                                                        for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
                                                                if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
@@ -1010,7 +995,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                                        MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
                                                log_trace!(self, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id);
                                                if self.message_handler.route_handler.handle_channel_update(msg).is_ok() {
-                                                       let encoded_msg = encode_msg!(msg, 258);
+                                                       let encoded_msg = encode_msg!(msg);
 
                                                        for (ref descriptor, ref mut peer) in peers.peers.iter_mut() {
                                                                if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() ||
@@ -1035,7 +1020,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                                                                                        log_trace!(self, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}",
                                                                                                        log_pubkey!(node_id),
                                                                                                        msg.data);
-                                                                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 17)));
+                                                                                       peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                                                        // This isn't guaranteed to work, but if there is enough free
                                                                                        // room in the send buffer, put the error message there...
                                                                                        self.do_attempt_write_data(&mut descriptor, &mut peer);
@@ -1055,7 +1040,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                                                                let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {
                                                                        //TODO: Do whatever we're gonna do for handling dropped messages
                                                                });
-                                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 17)));
+                                                               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
                                                                self.do_attempt_write_data(&mut descriptor, peer);
                                                        },
                                                }
@@ -1128,7 +1113,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref> PeerManager<Descriptor, CM> where
                                        ponglen: 0,
                                        byteslen: 64,
                                };
-                               peer.pending_outbound_buffer.push_back(encode_msg!(ping, 18));
+                               peer.pending_outbound_buffer.push_back(encode_msg!(&ping));
                                let mut descriptor_clone = descriptor.clone();
                                self.do_attempt_write_data(&mut descriptor_clone, peer);
 
diff --git a/lightning/src/ln/wire.rs b/lightning/src/ln/wire.rs
new file mode 100644 (file)
index 0000000..963eaeb
--- /dev/null
@@ -0,0 +1,375 @@
+//! Wire encoding/decoding for Lightning messages according to [BOLT #1].
+//!
+//! Messages known by this module can be read from the wire using [`read`].
+//! The [`Message`] enum returned by [`read`] wraps the decoded message or the message type (if
+//! unknown) to use with pattern matching.
+//!
+//! Messages implementing the [`Encode`] trait define a message type and can be sent over the wire
+//! using [`write`].
+//!
+//! [BOLT #1]: https://github.com/lightningnetwork/lightning-rfc/blob/master/01-messaging.md
+//! [`read`]: fn.read.html
+//! [`write`]: fn.write.html
+//! [`Encode`]: trait.Encode.html
+//! [`Message`]: enum.Message.html
+
+use ln::msgs;
+use util::ser::{Readable, Writeable, Writer};
+
+/// A Lightning message returned by [`read`] when decoding bytes received over the wire. Each
+/// variant contains a message from [`ln::msgs`] or otherwise the message type if unknown.
+///
+/// [`read`]: fn.read.html
+/// [`ln::msgs`]: ../msgs/index.html
+#[allow(missing_docs)]
+pub enum Message {
+       Init(msgs::Init),
+       Error(msgs::ErrorMessage),
+       Ping(msgs::Ping),
+       Pong(msgs::Pong),
+       OpenChannel(msgs::OpenChannel),
+       AcceptChannel(msgs::AcceptChannel),
+       FundingCreated(msgs::FundingCreated),
+       FundingSigned(msgs::FundingSigned),
+       FundingLocked(msgs::FundingLocked),
+       Shutdown(msgs::Shutdown),
+       ClosingSigned(msgs::ClosingSigned),
+       UpdateAddHTLC(msgs::UpdateAddHTLC),
+       UpdateFulfillHTLC(msgs::UpdateFulfillHTLC),
+       UpdateFailHTLC(msgs::UpdateFailHTLC),
+       UpdateFailMalformedHTLC(msgs::UpdateFailMalformedHTLC),
+       CommitmentSigned(msgs::CommitmentSigned),
+       RevokeAndACK(msgs::RevokeAndACK),
+       UpdateFee(msgs::UpdateFee),
+       ChannelReestablish(msgs::ChannelReestablish),
+       AnnouncementSignatures(msgs::AnnouncementSignatures),
+       ChannelAnnouncement(msgs::ChannelAnnouncement),
+       NodeAnnouncement(msgs::NodeAnnouncement),
+       ChannelUpdate(msgs::ChannelUpdate),
+       /// A message that could not be decoded because its type is unknown.
+       Unknown(u16),
+}
+
+impl Message {
+       /// Returns the type that was used to decode the message payload.
+       pub fn type_id(&self) -> u16 {
+               match self {
+                       &Message::Init(ref msg) => msg.type_id(),
+                       &Message::Error(ref msg) => msg.type_id(),
+                       &Message::Ping(ref msg) => msg.type_id(),
+                       &Message::Pong(ref msg) => msg.type_id(),
+                       &Message::OpenChannel(ref msg) => msg.type_id(),
+                       &Message::AcceptChannel(ref msg) => msg.type_id(),
+                       &Message::FundingCreated(ref msg) => msg.type_id(),
+                       &Message::FundingSigned(ref msg) => msg.type_id(),
+                       &Message::FundingLocked(ref msg) => msg.type_id(),
+                       &Message::Shutdown(ref msg) => msg.type_id(),
+                       &Message::ClosingSigned(ref msg) => msg.type_id(),
+                       &Message::UpdateAddHTLC(ref msg) => msg.type_id(),
+                       &Message::UpdateFulfillHTLC(ref msg) => msg.type_id(),
+                       &Message::UpdateFailHTLC(ref msg) => msg.type_id(),
+                       &Message::UpdateFailMalformedHTLC(ref msg) => msg.type_id(),
+                       &Message::CommitmentSigned(ref msg) => msg.type_id(),
+                       &Message::RevokeAndACK(ref msg) => msg.type_id(),
+                       &Message::UpdateFee(ref msg) => msg.type_id(),
+                       &Message::ChannelReestablish(ref msg) => msg.type_id(),
+                       &Message::AnnouncementSignatures(ref msg) => msg.type_id(),
+                       &Message::ChannelAnnouncement(ref msg) => msg.type_id(),
+                       &Message::NodeAnnouncement(ref msg) => msg.type_id(),
+                       &Message::ChannelUpdate(ref msg) => msg.type_id(),
+                       &Message::Unknown(type_id) => type_id,
+               }
+       }
+}
+
+/// Reads a message from the data buffer consisting of a 2-byte big-endian type and a
+/// variable-length payload conforming to the type.
+///
+/// # Errors
+///
+/// Returns an error if the message payload code not be decoded as the specified type.
+pub fn read<R: ::std::io::Read>(buffer: &mut R) -> Result<Message, msgs::DecodeError> {
+       let message_type = <u16 as Readable<R>>::read(buffer)?;
+       match message_type {
+               msgs::Init::TYPE => {
+                       Ok(Message::Init(Readable::read(buffer)?))
+               },
+               msgs::ErrorMessage::TYPE => {
+                       Ok(Message::Error(Readable::read(buffer)?))
+               },
+               msgs::Ping::TYPE => {
+                       Ok(Message::Ping(Readable::read(buffer)?))
+               },
+               msgs::Pong::TYPE => {
+                       Ok(Message::Pong(Readable::read(buffer)?))
+               },
+               msgs::OpenChannel::TYPE => {
+                       Ok(Message::OpenChannel(Readable::read(buffer)?))
+               },
+               msgs::AcceptChannel::TYPE => {
+                       Ok(Message::AcceptChannel(Readable::read(buffer)?))
+               },
+               msgs::FundingCreated::TYPE => {
+                       Ok(Message::FundingCreated(Readable::read(buffer)?))
+               },
+               msgs::FundingSigned::TYPE => {
+                       Ok(Message::FundingSigned(Readable::read(buffer)?))
+               },
+               msgs::FundingLocked::TYPE => {
+                       Ok(Message::FundingLocked(Readable::read(buffer)?))
+               },
+               msgs::Shutdown::TYPE => {
+                       Ok(Message::Shutdown(Readable::read(buffer)?))
+               },
+               msgs::ClosingSigned::TYPE => {
+                       Ok(Message::ClosingSigned(Readable::read(buffer)?))
+               },
+               msgs::UpdateAddHTLC::TYPE => {
+                       Ok(Message::UpdateAddHTLC(Readable::read(buffer)?))
+               },
+               msgs::UpdateFulfillHTLC::TYPE => {
+                       Ok(Message::UpdateFulfillHTLC(Readable::read(buffer)?))
+               },
+               msgs::UpdateFailHTLC::TYPE => {
+                       Ok(Message::UpdateFailHTLC(Readable::read(buffer)?))
+               },
+               msgs::UpdateFailMalformedHTLC::TYPE => {
+                       Ok(Message::UpdateFailMalformedHTLC(Readable::read(buffer)?))
+               },
+               msgs::CommitmentSigned::TYPE => {
+                       Ok(Message::CommitmentSigned(Readable::read(buffer)?))
+               },
+               msgs::RevokeAndACK::TYPE => {
+                       Ok(Message::RevokeAndACK(Readable::read(buffer)?))
+               },
+               msgs::UpdateFee::TYPE => {
+                       Ok(Message::UpdateFee(Readable::read(buffer)?))
+               },
+               msgs::ChannelReestablish::TYPE => {
+                       Ok(Message::ChannelReestablish(Readable::read(buffer)?))
+               },
+               msgs::AnnouncementSignatures::TYPE => {
+                       Ok(Message::AnnouncementSignatures(Readable::read(buffer)?))
+               },
+               msgs::ChannelAnnouncement::TYPE => {
+                       Ok(Message::ChannelAnnouncement(Readable::read(buffer)?))
+               },
+               msgs::NodeAnnouncement::TYPE => {
+                       Ok(Message::NodeAnnouncement(Readable::read(buffer)?))
+               },
+               msgs::ChannelUpdate::TYPE => {
+                       Ok(Message::ChannelUpdate(Readable::read(buffer)?))
+               },
+               _ => {
+                       Ok(Message::Unknown(message_type))
+               },
+       }
+}
+
+/// Writes a message to the data buffer encoded as a 2-byte big-endian type and a variable-length
+/// payload.
+///
+/// # Errors
+///
+/// Returns an I/O error if the write could not be completed.
+pub fn write<M: Encode + Writeable, W: Writer>(message: &M, buffer: &mut W) -> Result<(), ::std::io::Error> {
+       M::TYPE.write(buffer)?;
+       message.write(buffer)
+}
+
+/// Defines a type-identified encoding for sending messages over the wire.
+///
+/// Messages implementing this trait specify a type and must be [`Writeable`] to use with [`write`].
+///
+/// [`Writeable`]: ../../util/ser/trait.Writeable.html
+/// [`write`]: fn.write.html
+pub trait Encode {
+       /// The type identifying the message payload.
+       const TYPE: u16;
+
+       /// Returns the type identifying the message payload. Convenience method for accessing
+       /// [`TYPE`](TYPE).
+       fn type_id(&self) -> u16 {
+               Self::TYPE
+       }
+}
+
+impl Encode for msgs::Init {
+       const TYPE: u16 = 16;
+}
+
+impl Encode for msgs::ErrorMessage {
+       const TYPE: u16 = 17;
+}
+
+impl Encode for msgs::Ping {
+       const TYPE: u16 = 18;
+}
+
+impl Encode for msgs::Pong {
+       const TYPE: u16 = 19;
+}
+
+impl Encode for msgs::OpenChannel {
+       const TYPE: u16 = 32;
+}
+
+impl Encode for msgs::AcceptChannel {
+       const TYPE: u16 = 33;
+}
+
+impl Encode for msgs::FundingCreated {
+       const TYPE: u16 = 34;
+}
+
+impl Encode for msgs::FundingSigned {
+       const TYPE: u16 = 35;
+}
+
+impl Encode for msgs::FundingLocked {
+       const TYPE: u16 = 36;
+}
+
+impl Encode for msgs::Shutdown {
+       const TYPE: u16 = 38;
+}
+
+impl Encode for msgs::ClosingSigned {
+       const TYPE: u16 = 39;
+}
+
+impl Encode for msgs::UpdateAddHTLC {
+       const TYPE: u16 = 128;
+}
+
+impl Encode for msgs::UpdateFulfillHTLC {
+       const TYPE: u16 = 130;
+}
+
+impl Encode for msgs::UpdateFailHTLC {
+       const TYPE: u16 = 131;
+}
+
+impl Encode for msgs::UpdateFailMalformedHTLC {
+       const TYPE: u16 = 135;
+}
+
+impl Encode for msgs::CommitmentSigned {
+       const TYPE: u16 = 132;
+}
+
+impl Encode for msgs::RevokeAndACK {
+       const TYPE: u16 = 133;
+}
+
+impl Encode for msgs::UpdateFee {
+       const TYPE: u16 = 134;
+}
+
+impl Encode for msgs::ChannelReestablish {
+       const TYPE: u16 = 136;
+}
+
+impl Encode for msgs::AnnouncementSignatures {
+       const TYPE: u16 = 259;
+}
+
+impl Encode for msgs::ChannelAnnouncement {
+       const TYPE: u16 = 256;
+}
+
+impl Encode for msgs::NodeAnnouncement {
+       const TYPE: u16 = 257;
+}
+
+impl Encode for msgs::ChannelUpdate {
+       const TYPE: u16 = 258;
+}
+
+#[cfg(test)]
+mod tests {
+       use super::*;
+       use util::byte_utils;
+
+       // Big-endian wire encoding of Pong message (type = 19, byteslen = 2).
+       const ENCODED_PONG: [u8; 6] = [0u8, 19u8, 0u8, 2u8, 0u8, 0u8];
+
+       #[test]
+       fn read_empty_buffer() {
+               let buffer = [];
+               let mut reader = ::std::io::Cursor::new(buffer);
+               assert!(read(&mut reader).is_err());
+       }
+
+       #[test]
+       fn read_incomplete_type() {
+               let buffer = &ENCODED_PONG[..1];
+               let mut reader = ::std::io::Cursor::new(buffer);
+               assert!(read(&mut reader).is_err());
+       }
+
+       #[test]
+       fn read_empty_payload() {
+               let buffer = &ENCODED_PONG[..2];
+               let mut reader = ::std::io::Cursor::new(buffer);
+               assert!(read(&mut reader).is_err());
+       }
+
+       #[test]
+       fn read_invalid_message() {
+               let buffer = &ENCODED_PONG[..4];
+               let mut reader = ::std::io::Cursor::new(buffer);
+               assert!(read(&mut reader).is_err());
+       }
+
+       #[test]
+       fn read_known_message() {
+               let buffer = &ENCODED_PONG[..];
+               let mut reader = ::std::io::Cursor::new(buffer);
+               let message = read(&mut reader).unwrap();
+               match message {
+                       Message::Pong(_) => (),
+                       _ => panic!("Expected pong message; found message type: {}", message.type_id()),
+               }
+       }
+
+       #[test]
+       fn read_unknown_message() {
+               let buffer = &byte_utils::be16_to_array(::std::u16::MAX);
+               let mut reader = ::std::io::Cursor::new(buffer);
+               let message = read(&mut reader).unwrap();
+               match message {
+                       Message::Unknown(::std::u16::MAX) => (),
+                       _ => panic!("Expected message type {}; found: {}", ::std::u16::MAX, message.type_id()),
+               }
+       }
+
+       #[test]
+       fn write_message_with_type() {
+               let message = msgs::Pong { byteslen: 2u16 };
+               let mut buffer = Vec::new();
+               assert!(write(&message, &mut buffer).is_ok());
+
+               let type_length = ::std::mem::size_of::<u16>();
+               let (type_bytes, payload_bytes) = buffer.split_at(type_length);
+               assert_eq!(byte_utils::slice_to_be16(type_bytes), msgs::Pong::TYPE);
+               assert_eq!(payload_bytes, &ENCODED_PONG[type_length..]);
+       }
+
+       #[test]
+       fn read_message_encoded_with_write() {
+               let message = msgs::Pong { byteslen: 2u16 };
+               let mut buffer = Vec::new();
+               assert!(write(&message, &mut buffer).is_ok());
+
+               let mut reader = ::std::io::Cursor::new(buffer);
+               let decoded_message = read(&mut reader).unwrap();
+               match decoded_message {
+                       Message::Pong(msgs::Pong { byteslen: 2u16 }) => (),
+                       Message::Pong(msgs::Pong { byteslen }) => {
+                               panic!("Expected byteslen {}; found: {}", message.byteslen, byteslen);
+                       },
+                       _ => panic!("Expected pong message; found message type: {}", decoded_message.type_id()),
+               }
+       }
+}