From 7de9f5278c75e4da224c175a7c332f4e28471bc1 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Tue, 21 Jan 2020 15:26:21 -0800 Subject: [PATCH] Encapsulate message wire encoding into a module Lightning messages are identified by a 2-byte type when encoded on the wire. Rather than expecting callers to know message types when sending messages to peers, have each message implement a trait defining the message type. Provide an interface for reading and writing messages as well as a Message enum for matching the decoded message, including unknown messages. --- lightning/src/ln/mod.rs | 1 + lightning/src/ln/peer_handler.rs | 191 ++++++++-------- lightning/src/ln/wire.rs | 375 +++++++++++++++++++++++++++++++ 3 files changed, 464 insertions(+), 103 deletions(-) create mode 100644 lightning/src/ln/wire.rs diff --git a/lightning/src/ln/mod.rs b/lightning/src/ln/mod.rs index 93043111e..9a2a90fd4 100644 --- a/lightning/src/ln/mod.rs +++ b/lightning/src/ln/mod.rs @@ -24,6 +24,7 @@ pub(crate) mod peer_channel_encryptor; mod channel; mod onion_utils; +mod wire; #[cfg(test)] #[macro_use] mod functional_test_utils; diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 5838b782f..30e208155 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -12,11 +12,13 @@ use ln::features::InitFeatures; use ln::msgs; use ln::msgs::ChannelMessageHandler; use ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager}; -use util::ser::{Writeable, Writer, Readable}; use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep}; +use ln::wire; +use ln::wire::Encode; use util::byte_utils; use util::events::{MessageSendEvent, MessageSendEventsProvider}; use util::logger::Logger; +use util::ser::Writer; use std::collections::{HashMap,hash_map,HashSet,LinkedList}; use std::sync::{Arc, Mutex}; @@ -203,11 +205,10 @@ impl Writer for VecWriter { } macro_rules! encode_msg { - ($msg: expr, $msg_code: expr) => {{ - let mut msg = VecWriter(Vec::new()); - ($msg_code as u16).write(&mut msg).unwrap(); - $msg.write(&mut msg).unwrap(); - msg.0 + ($msg: expr) => {{ + let mut buffer = VecWriter(Vec::new()); + wire::write($msg, &mut buffer).unwrap(); + buffer.0 }} } @@ -344,10 +345,10 @@ impl PeerManager where fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) { macro_rules! encode_and_send_msg { - ($msg: expr, $msg_code: expr) => { + ($msg: expr) => { { - log_trace!(self, "Encoding and sending sync update message of type {} to {}", $msg_code, log_pubkey!(peer.their_node_id.unwrap())); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg, $msg_code)[..])); + log_trace!(self, "Encoding and sending sync update message of type {} to {}", $msg.type_id(), log_pubkey!(peer.their_node_id.unwrap())); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg)[..])); } } } @@ -360,9 +361,9 @@ impl PeerManager where let steps = ((MSG_BUFF_SIZE - peer.pending_outbound_buffer.len() + 2) / 3) as u8; let all_messages = self.message_handler.route_handler.get_next_channel_announcements(0, steps); for &(ref announce, ref update_a, ref update_b) in all_messages.iter() { - encode_and_send_msg!(announce, 256); - encode_and_send_msg!(update_a, 258); - encode_and_send_msg!(update_b, 258); + encode_and_send_msg!(announce); + encode_and_send_msg!(update_a); + encode_and_send_msg!(update_b); peer.sync_status = InitSyncTracker::ChannelsSyncing(announce.contents.short_channel_id + 1); } if all_messages.is_empty() || all_messages.len() != steps as usize { @@ -373,7 +374,7 @@ impl PeerManager where let steps = (MSG_BUFF_SIZE - peer.pending_outbound_buffer.len()) as u8; let all_messages = self.message_handler.route_handler.get_next_node_announcements(None, steps); for msg in all_messages.iter() { - encode_and_send_msg!(msg, 256); + encode_and_send_msg!(msg); peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id); } if all_messages.is_empty() || all_messages.len() != steps as usize { @@ -385,7 +386,7 @@ impl PeerManager where let steps = (MSG_BUFF_SIZE - peer.pending_outbound_buffer.len()) as u8; let all_messages = self.message_handler.route_handler.get_next_node_announcements(Some(&key), steps); for msg in all_messages.iter() { - encode_and_send_msg!(msg, 256); + encode_and_send_msg!(msg); peer.sync_status = InitSyncTracker::NodesSyncing(msg.contents.node_id); } if all_messages.is_empty() || all_messages.len() != steps as usize { @@ -481,10 +482,10 @@ impl PeerManager where peer.pending_read_buffer_pos = 0; macro_rules! encode_and_send_msg { - ($msg: expr, $msg_code: expr) => { + ($msg: expr) => { { - log_trace!(self, "Encoding and sending message of type {} to {}", $msg_code, log_pubkey!(peer.their_node_id.unwrap())); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!($msg, $msg_code)[..])); + log_trace!(self, "Encoding and sending message of type {} to {}", $msg.type_id(), log_pubkey!(peer.their_node_id.unwrap())); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(&$msg)[..])); peers.peers_needing_send.insert(peer_descriptor.clone()); } } @@ -507,7 +508,7 @@ impl PeerManager where }, msgs::ErrorAction::SendErrorMessage { msg } => { log_trace!(self, "Got Err handling message, sending Error message because {}", e.err); - encode_and_send_msg!(msg, 17); + encode_and_send_msg!(msg); continue; }, } @@ -583,9 +584,9 @@ impl PeerManager where self.initial_syncs_sent.fetch_add(1, Ordering::AcqRel); features.set_initial_routing_sync(); } - encode_and_send_msg!(msgs::Init { - features, - }, 16); + + let resp = msgs::Init { features }; + encode_and_send_msg!(resp); }, NextNoiseStep::ActThree => { let their_node_id = try_potential_handleerror!(peer.channel_encryptor.process_act_three(&peer.pending_read_buffer[..])); @@ -611,18 +612,20 @@ impl PeerManager where peer.pending_read_buffer = [0; 18].to_vec(); peer.pending_read_is_header = true; - let msg_type = byte_utils::slice_to_be16(&msg_data[0..2]); - log_trace!(self, "Received message of type {} from {}", msg_type, log_pubkey!(peer.their_node_id.unwrap())); - if msg_type != 16 && peer.their_features.is_none() { - // Need an init message as first message + let mut reader = ::std::io::Cursor::new(&msg_data[..]); + let message = try_potential_decodeerror!(wire::read(&mut reader)); + log_trace!(self, "Received message of type {} from {}", message.type_id(), log_pubkey!(peer.their_node_id.unwrap())); + + // Need an Init as first message + if let wire::Message::Init(_) = message { + } else if peer.their_features.is_none() { log_trace!(self, "Peer {} sent non-Init first message", log_pubkey!(peer.their_node_id.unwrap())); return Err(PeerHandleError{ no_connection_possible: false }); } - let mut reader = ::std::io::Cursor::new(&msg_data[2..]); - match msg_type { - // Connection control: - 16 => { - let msg = try_potential_decodeerror!(msgs::Init::read(&mut reader)); + + match message { + // Setup and Control messages: + wire::Message::Init(msg) => { if msg.features.requires_unknown_bits() { log_info!(self, "Peer global features required unknown version bits"); return Err(PeerHandleError{ no_connection_possible: true }); @@ -654,16 +657,14 @@ impl PeerManager where features.set_initial_routing_sync(); } - encode_and_send_msg!(msgs::Init { - features, - }, 16); + let resp = msgs::Init { features }; + encode_and_send_msg!(resp); } self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap(), &msg); peer.their_features = Some(msg.features); }, - 17 => { - let msg = try_potential_decodeerror!(msgs::ErrorMessage::read(&mut reader)); + wire::Message::Error(msg) => { let mut data_is_printable = true; for b in msg.data.bytes() { if b < 32 || b > 126 { @@ -683,113 +684,97 @@ impl PeerManager where } }, - 18 => { - let msg = try_potential_decodeerror!(msgs::Ping::read(&mut reader)); + wire::Message::Ping(msg) => { if msg.ponglen < 65532 { let resp = msgs::Pong { byteslen: msg.ponglen }; - encode_and_send_msg!(resp, 19); + encode_and_send_msg!(resp); } }, - 19 => { + wire::Message::Pong(_msg) => { peer.awaiting_pong = false; - try_potential_decodeerror!(msgs::Pong::read(&mut reader)); }, - // Channel control: - 32 => { - let msg = try_potential_decodeerror!(msgs::OpenChannel::read(&mut reader)); + + // Channel messages: + wire::Message::OpenChannel(msg) => { self.message_handler.chan_handler.handle_open_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg); }, - 33 => { - let msg = try_potential_decodeerror!(msgs::AcceptChannel::read(&mut reader)); + wire::Message::AcceptChannel(msg) => { self.message_handler.chan_handler.handle_accept_channel(&peer.their_node_id.unwrap(), peer.their_features.clone().unwrap(), &msg); }, - 34 => { - let msg = try_potential_decodeerror!(msgs::FundingCreated::read(&mut reader)); + wire::Message::FundingCreated(msg) => { self.message_handler.chan_handler.handle_funding_created(&peer.their_node_id.unwrap(), &msg); }, - 35 => { - let msg = try_potential_decodeerror!(msgs::FundingSigned::read(&mut reader)); + wire::Message::FundingSigned(msg) => { self.message_handler.chan_handler.handle_funding_signed(&peer.their_node_id.unwrap(), &msg); }, - 36 => { - let msg = try_potential_decodeerror!(msgs::FundingLocked::read(&mut reader)); + wire::Message::FundingLocked(msg) => { self.message_handler.chan_handler.handle_funding_locked(&peer.their_node_id.unwrap(), &msg); }, - 38 => { - let msg = try_potential_decodeerror!(msgs::Shutdown::read(&mut reader)); + wire::Message::Shutdown(msg) => { self.message_handler.chan_handler.handle_shutdown(&peer.their_node_id.unwrap(), &msg); }, - 39 => { - let msg = try_potential_decodeerror!(msgs::ClosingSigned::read(&mut reader)); + wire::Message::ClosingSigned(msg) => { self.message_handler.chan_handler.handle_closing_signed(&peer.their_node_id.unwrap(), &msg); }, - 128 => { - let msg = try_potential_decodeerror!(msgs::UpdateAddHTLC::read(&mut reader)); + // Commitment messages: + wire::Message::UpdateAddHTLC(msg) => { self.message_handler.chan_handler.handle_update_add_htlc(&peer.their_node_id.unwrap(), &msg); }, - 130 => { - let msg = try_potential_decodeerror!(msgs::UpdateFulfillHTLC::read(&mut reader)); + wire::Message::UpdateFulfillHTLC(msg) => { self.message_handler.chan_handler.handle_update_fulfill_htlc(&peer.their_node_id.unwrap(), &msg); }, - 131 => { - let msg = try_potential_decodeerror!(msgs::UpdateFailHTLC::read(&mut reader)); + wire::Message::UpdateFailHTLC(msg) => { self.message_handler.chan_handler.handle_update_fail_htlc(&peer.their_node_id.unwrap(), &msg); }, - 135 => { - let msg = try_potential_decodeerror!(msgs::UpdateFailMalformedHTLC::read(&mut reader)); + wire::Message::UpdateFailMalformedHTLC(msg) => { self.message_handler.chan_handler.handle_update_fail_malformed_htlc(&peer.their_node_id.unwrap(), &msg); }, - 132 => { - let msg = try_potential_decodeerror!(msgs::CommitmentSigned::read(&mut reader)); + wire::Message::CommitmentSigned(msg) => { self.message_handler.chan_handler.handle_commitment_signed(&peer.their_node_id.unwrap(), &msg); }, - 133 => { - let msg = try_potential_decodeerror!(msgs::RevokeAndACK::read(&mut reader)); + wire::Message::RevokeAndACK(msg) => { self.message_handler.chan_handler.handle_revoke_and_ack(&peer.their_node_id.unwrap(), &msg); }, - 134 => { - let msg = try_potential_decodeerror!(msgs::UpdateFee::read(&mut reader)); + wire::Message::UpdateFee(msg) => { self.message_handler.chan_handler.handle_update_fee(&peer.their_node_id.unwrap(), &msg); }, - 136 => { - let msg = try_potential_decodeerror!(msgs::ChannelReestablish::read(&mut reader)); + wire::Message::ChannelReestablish(msg) => { self.message_handler.chan_handler.handle_channel_reestablish(&peer.their_node_id.unwrap(), &msg); }, - // Routing control: - 259 => { - let msg = try_potential_decodeerror!(msgs::AnnouncementSignatures::read(&mut reader)); + // Routing messages: + wire::Message::AnnouncementSignatures(msg) => { self.message_handler.chan_handler.handle_announcement_signatures(&peer.their_node_id.unwrap(), &msg); }, - 256 => { - let msg = try_potential_decodeerror!(msgs::ChannelAnnouncement::read(&mut reader)); + wire::Message::ChannelAnnouncement(msg) => { let should_forward = try_potential_handleerror!(self.message_handler.route_handler.handle_channel_announcement(&msg)); if should_forward { // TODO: forward msg along to all our other peers! } }, - 257 => { - let msg = try_potential_decodeerror!(msgs::NodeAnnouncement::read(&mut reader)); + wire::Message::NodeAnnouncement(msg) => { let should_forward = try_potential_handleerror!(self.message_handler.route_handler.handle_node_announcement(&msg)); if should_forward { // TODO: forward msg along to all our other peers! } }, - 258 => { - let msg = try_potential_decodeerror!(msgs::ChannelUpdate::read(&mut reader)); + wire::Message::ChannelUpdate(msg) => { let should_forward = try_potential_handleerror!(self.message_handler.route_handler.handle_channel_update(&msg)); if should_forward { // TODO: forward msg along to all our other peers! } }, - _ => { + + // Unknown messages: + wire::Message::Unknown(msg_type) => { + // Fail the channel if message is an even, unknown type as per BOLT #1. if (msg_type & 1) == 0 { return Err(PeerHandleError{ no_connection_possible: true }); } @@ -857,7 +842,7 @@ impl PeerManager where let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { //TODO: Drop the pending channel? (or just let it timeout, but that sucks) }); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 33))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendOpenChannel { ref node_id, ref msg } => { @@ -867,7 +852,7 @@ impl PeerManager where let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { //TODO: Drop the pending channel? (or just let it timeout, but that sucks) }); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 32))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendFundingCreated { ref node_id, ref msg } => { @@ -879,7 +864,7 @@ impl PeerManager where //TODO: generate a DiscardFunding event indicating to the wallet that //they should just throw away this funding transaction }); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 34))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendFundingSigned { ref node_id, ref msg } => { @@ -890,7 +875,7 @@ impl PeerManager where //TODO: generate a DiscardFunding event indicating to the wallet that //they should just throw away this funding transaction }); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 35))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendFundingLocked { ref node_id, ref msg } => { @@ -900,7 +885,7 @@ impl PeerManager where let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { //TODO: Do whatever we're gonna do for handling dropped messages }); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 36))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendAnnouncementSignatures { ref node_id, ref msg } => { @@ -911,7 +896,7 @@ impl PeerManager where //TODO: generate a DiscardFunding event indicating to the wallet that //they should just throw away this funding transaction }); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 259))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::UpdateHTLCs { ref node_id, updates: msgs::CommitmentUpdate { ref update_add_htlcs, ref update_fulfill_htlcs, ref update_fail_htlcs, ref update_fail_malformed_htlcs, ref update_fee, ref commitment_signed } } => { @@ -925,21 +910,21 @@ impl PeerManager where //TODO: Do whatever we're gonna do for handling dropped messages }); for msg in update_add_htlcs { - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 128))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); } for msg in update_fulfill_htlcs { - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 130))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); } for msg in update_fail_htlcs { - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 131))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); } for msg in update_fail_malformed_htlcs { - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 135))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); } if let &Some(ref msg) = update_fee { - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 134))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); } - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(commitment_signed, 132))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(commitment_signed))); self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendRevokeAndACK { ref node_id, ref msg } => { @@ -949,7 +934,7 @@ impl PeerManager where let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { //TODO: Do whatever we're gonna do for handling dropped messages }); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 133))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendClosingSigned { ref node_id, ref msg } => { @@ -959,7 +944,7 @@ impl PeerManager where let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { //TODO: Do whatever we're gonna do for handling dropped messages }); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 39))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendShutdown { ref node_id, ref msg } => { @@ -969,7 +954,7 @@ impl PeerManager where let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { //TODO: Do whatever we're gonna do for handling dropped messages }); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 38))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::SendChannelReestablish { ref node_id, ref msg } => { @@ -979,14 +964,14 @@ impl PeerManager where let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { //TODO: Do whatever we're gonna do for handling dropped messages }); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 136))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); self.do_attempt_write_data(&mut descriptor, peer); }, MessageSendEvent::BroadcastChannelAnnouncement { ref msg, ref update_msg } => { log_trace!(self, "Handling BroadcastChannelAnnouncement event in peer_handler for short channel id {}", msg.contents.short_channel_id); if self.message_handler.route_handler.handle_channel_announcement(msg).is_ok() && self.message_handler.route_handler.handle_channel_update(update_msg).is_ok() { - let encoded_msg = encode_msg!(msg, 256); - let encoded_update_msg = encode_msg!(update_msg, 258); + let encoded_msg = encode_msg!(msg); + let encoded_update_msg = encode_msg!(update_msg); for (ref descriptor, ref mut peer) in peers.peers.iter_mut() { if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || @@ -1010,7 +995,7 @@ impl PeerManager where MessageSendEvent::BroadcastChannelUpdate { ref msg } => { log_trace!(self, "Handling BroadcastChannelUpdate event in peer_handler for short channel id {}", msg.contents.short_channel_id); if self.message_handler.route_handler.handle_channel_update(msg).is_ok() { - let encoded_msg = encode_msg!(msg, 258); + let encoded_msg = encode_msg!(msg); for (ref descriptor, ref mut peer) in peers.peers.iter_mut() { if !peer.channel_encryptor.is_ready_for_encryption() || peer.their_features.is_none() || @@ -1035,7 +1020,7 @@ impl PeerManager where log_trace!(self, "Handling DisconnectPeer HandleError event in peer_handler for node {} with message {}", log_pubkey!(node_id), msg.data); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 17))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); // This isn't guaranteed to work, but if there is enough free // room in the send buffer, put the error message there... self.do_attempt_write_data(&mut descriptor, &mut peer); @@ -1055,7 +1040,7 @@ impl PeerManager where let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, { //TODO: Do whatever we're gonna do for handling dropped messages }); - peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg, 17))); + peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg))); self.do_attempt_write_data(&mut descriptor, peer); }, } @@ -1128,7 +1113,7 @@ impl PeerManager where ponglen: 0, byteslen: 64, }; - peer.pending_outbound_buffer.push_back(encode_msg!(ping, 18)); + peer.pending_outbound_buffer.push_back(encode_msg!(&ping)); let mut descriptor_clone = descriptor.clone(); self.do_attempt_write_data(&mut descriptor_clone, peer); diff --git a/lightning/src/ln/wire.rs b/lightning/src/ln/wire.rs new file mode 100644 index 000000000..963eaeb54 --- /dev/null +++ b/lightning/src/ln/wire.rs @@ -0,0 +1,375 @@ +//! Wire encoding/decoding for Lightning messages according to [BOLT #1]. +//! +//! Messages known by this module can be read from the wire using [`read`]. +//! The [`Message`] enum returned by [`read`] wraps the decoded message or the message type (if +//! unknown) to use with pattern matching. +//! +//! Messages implementing the [`Encode`] trait define a message type and can be sent over the wire +//! using [`write`]. +//! +//! [BOLT #1]: https://github.com/lightningnetwork/lightning-rfc/blob/master/01-messaging.md +//! [`read`]: fn.read.html +//! [`write`]: fn.write.html +//! [`Encode`]: trait.Encode.html +//! [`Message`]: enum.Message.html + +use ln::msgs; +use util::ser::{Readable, Writeable, Writer}; + +/// A Lightning message returned by [`read`] when decoding bytes received over the wire. Each +/// variant contains a message from [`ln::msgs`] or otherwise the message type if unknown. +/// +/// [`read`]: fn.read.html +/// [`ln::msgs`]: ../msgs/index.html +#[allow(missing_docs)] +pub enum Message { + Init(msgs::Init), + Error(msgs::ErrorMessage), + Ping(msgs::Ping), + Pong(msgs::Pong), + OpenChannel(msgs::OpenChannel), + AcceptChannel(msgs::AcceptChannel), + FundingCreated(msgs::FundingCreated), + FundingSigned(msgs::FundingSigned), + FundingLocked(msgs::FundingLocked), + Shutdown(msgs::Shutdown), + ClosingSigned(msgs::ClosingSigned), + UpdateAddHTLC(msgs::UpdateAddHTLC), + UpdateFulfillHTLC(msgs::UpdateFulfillHTLC), + UpdateFailHTLC(msgs::UpdateFailHTLC), + UpdateFailMalformedHTLC(msgs::UpdateFailMalformedHTLC), + CommitmentSigned(msgs::CommitmentSigned), + RevokeAndACK(msgs::RevokeAndACK), + UpdateFee(msgs::UpdateFee), + ChannelReestablish(msgs::ChannelReestablish), + AnnouncementSignatures(msgs::AnnouncementSignatures), + ChannelAnnouncement(msgs::ChannelAnnouncement), + NodeAnnouncement(msgs::NodeAnnouncement), + ChannelUpdate(msgs::ChannelUpdate), + /// A message that could not be decoded because its type is unknown. + Unknown(u16), +} + +impl Message { + /// Returns the type that was used to decode the message payload. + pub fn type_id(&self) -> u16 { + match self { + &Message::Init(ref msg) => msg.type_id(), + &Message::Error(ref msg) => msg.type_id(), + &Message::Ping(ref msg) => msg.type_id(), + &Message::Pong(ref msg) => msg.type_id(), + &Message::OpenChannel(ref msg) => msg.type_id(), + &Message::AcceptChannel(ref msg) => msg.type_id(), + &Message::FundingCreated(ref msg) => msg.type_id(), + &Message::FundingSigned(ref msg) => msg.type_id(), + &Message::FundingLocked(ref msg) => msg.type_id(), + &Message::Shutdown(ref msg) => msg.type_id(), + &Message::ClosingSigned(ref msg) => msg.type_id(), + &Message::UpdateAddHTLC(ref msg) => msg.type_id(), + &Message::UpdateFulfillHTLC(ref msg) => msg.type_id(), + &Message::UpdateFailHTLC(ref msg) => msg.type_id(), + &Message::UpdateFailMalformedHTLC(ref msg) => msg.type_id(), + &Message::CommitmentSigned(ref msg) => msg.type_id(), + &Message::RevokeAndACK(ref msg) => msg.type_id(), + &Message::UpdateFee(ref msg) => msg.type_id(), + &Message::ChannelReestablish(ref msg) => msg.type_id(), + &Message::AnnouncementSignatures(ref msg) => msg.type_id(), + &Message::ChannelAnnouncement(ref msg) => msg.type_id(), + &Message::NodeAnnouncement(ref msg) => msg.type_id(), + &Message::ChannelUpdate(ref msg) => msg.type_id(), + &Message::Unknown(type_id) => type_id, + } + } +} + +/// Reads a message from the data buffer consisting of a 2-byte big-endian type and a +/// variable-length payload conforming to the type. +/// +/// # Errors +/// +/// Returns an error if the message payload code not be decoded as the specified type. +pub fn read(buffer: &mut R) -> Result { + let message_type = >::read(buffer)?; + match message_type { + msgs::Init::TYPE => { + Ok(Message::Init(Readable::read(buffer)?)) + }, + msgs::ErrorMessage::TYPE => { + Ok(Message::Error(Readable::read(buffer)?)) + }, + msgs::Ping::TYPE => { + Ok(Message::Ping(Readable::read(buffer)?)) + }, + msgs::Pong::TYPE => { + Ok(Message::Pong(Readable::read(buffer)?)) + }, + msgs::OpenChannel::TYPE => { + Ok(Message::OpenChannel(Readable::read(buffer)?)) + }, + msgs::AcceptChannel::TYPE => { + Ok(Message::AcceptChannel(Readable::read(buffer)?)) + }, + msgs::FundingCreated::TYPE => { + Ok(Message::FundingCreated(Readable::read(buffer)?)) + }, + msgs::FundingSigned::TYPE => { + Ok(Message::FundingSigned(Readable::read(buffer)?)) + }, + msgs::FundingLocked::TYPE => { + Ok(Message::FundingLocked(Readable::read(buffer)?)) + }, + msgs::Shutdown::TYPE => { + Ok(Message::Shutdown(Readable::read(buffer)?)) + }, + msgs::ClosingSigned::TYPE => { + Ok(Message::ClosingSigned(Readable::read(buffer)?)) + }, + msgs::UpdateAddHTLC::TYPE => { + Ok(Message::UpdateAddHTLC(Readable::read(buffer)?)) + }, + msgs::UpdateFulfillHTLC::TYPE => { + Ok(Message::UpdateFulfillHTLC(Readable::read(buffer)?)) + }, + msgs::UpdateFailHTLC::TYPE => { + Ok(Message::UpdateFailHTLC(Readable::read(buffer)?)) + }, + msgs::UpdateFailMalformedHTLC::TYPE => { + Ok(Message::UpdateFailMalformedHTLC(Readable::read(buffer)?)) + }, + msgs::CommitmentSigned::TYPE => { + Ok(Message::CommitmentSigned(Readable::read(buffer)?)) + }, + msgs::RevokeAndACK::TYPE => { + Ok(Message::RevokeAndACK(Readable::read(buffer)?)) + }, + msgs::UpdateFee::TYPE => { + Ok(Message::UpdateFee(Readable::read(buffer)?)) + }, + msgs::ChannelReestablish::TYPE => { + Ok(Message::ChannelReestablish(Readable::read(buffer)?)) + }, + msgs::AnnouncementSignatures::TYPE => { + Ok(Message::AnnouncementSignatures(Readable::read(buffer)?)) + }, + msgs::ChannelAnnouncement::TYPE => { + Ok(Message::ChannelAnnouncement(Readable::read(buffer)?)) + }, + msgs::NodeAnnouncement::TYPE => { + Ok(Message::NodeAnnouncement(Readable::read(buffer)?)) + }, + msgs::ChannelUpdate::TYPE => { + Ok(Message::ChannelUpdate(Readable::read(buffer)?)) + }, + _ => { + Ok(Message::Unknown(message_type)) + }, + } +} + +/// Writes a message to the data buffer encoded as a 2-byte big-endian type and a variable-length +/// payload. +/// +/// # Errors +/// +/// Returns an I/O error if the write could not be completed. +pub fn write(message: &M, buffer: &mut W) -> Result<(), ::std::io::Error> { + M::TYPE.write(buffer)?; + message.write(buffer) +} + +/// Defines a type-identified encoding for sending messages over the wire. +/// +/// Messages implementing this trait specify a type and must be [`Writeable`] to use with [`write`]. +/// +/// [`Writeable`]: ../../util/ser/trait.Writeable.html +/// [`write`]: fn.write.html +pub trait Encode { + /// The type identifying the message payload. + const TYPE: u16; + + /// Returns the type identifying the message payload. Convenience method for accessing + /// [`TYPE`](TYPE). + fn type_id(&self) -> u16 { + Self::TYPE + } +} + +impl Encode for msgs::Init { + const TYPE: u16 = 16; +} + +impl Encode for msgs::ErrorMessage { + const TYPE: u16 = 17; +} + +impl Encode for msgs::Ping { + const TYPE: u16 = 18; +} + +impl Encode for msgs::Pong { + const TYPE: u16 = 19; +} + +impl Encode for msgs::OpenChannel { + const TYPE: u16 = 32; +} + +impl Encode for msgs::AcceptChannel { + const TYPE: u16 = 33; +} + +impl Encode for msgs::FundingCreated { + const TYPE: u16 = 34; +} + +impl Encode for msgs::FundingSigned { + const TYPE: u16 = 35; +} + +impl Encode for msgs::FundingLocked { + const TYPE: u16 = 36; +} + +impl Encode for msgs::Shutdown { + const TYPE: u16 = 38; +} + +impl Encode for msgs::ClosingSigned { + const TYPE: u16 = 39; +} + +impl Encode for msgs::UpdateAddHTLC { + const TYPE: u16 = 128; +} + +impl Encode for msgs::UpdateFulfillHTLC { + const TYPE: u16 = 130; +} + +impl Encode for msgs::UpdateFailHTLC { + const TYPE: u16 = 131; +} + +impl Encode for msgs::UpdateFailMalformedHTLC { + const TYPE: u16 = 135; +} + +impl Encode for msgs::CommitmentSigned { + const TYPE: u16 = 132; +} + +impl Encode for msgs::RevokeAndACK { + const TYPE: u16 = 133; +} + +impl Encode for msgs::UpdateFee { + const TYPE: u16 = 134; +} + +impl Encode for msgs::ChannelReestablish { + const TYPE: u16 = 136; +} + +impl Encode for msgs::AnnouncementSignatures { + const TYPE: u16 = 259; +} + +impl Encode for msgs::ChannelAnnouncement { + const TYPE: u16 = 256; +} + +impl Encode for msgs::NodeAnnouncement { + const TYPE: u16 = 257; +} + +impl Encode for msgs::ChannelUpdate { + const TYPE: u16 = 258; +} + +#[cfg(test)] +mod tests { + use super::*; + use util::byte_utils; + + // Big-endian wire encoding of Pong message (type = 19, byteslen = 2). + const ENCODED_PONG: [u8; 6] = [0u8, 19u8, 0u8, 2u8, 0u8, 0u8]; + + #[test] + fn read_empty_buffer() { + let buffer = []; + let mut reader = ::std::io::Cursor::new(buffer); + assert!(read(&mut reader).is_err()); + } + + #[test] + fn read_incomplete_type() { + let buffer = &ENCODED_PONG[..1]; + let mut reader = ::std::io::Cursor::new(buffer); + assert!(read(&mut reader).is_err()); + } + + #[test] + fn read_empty_payload() { + let buffer = &ENCODED_PONG[..2]; + let mut reader = ::std::io::Cursor::new(buffer); + assert!(read(&mut reader).is_err()); + } + + #[test] + fn read_invalid_message() { + let buffer = &ENCODED_PONG[..4]; + let mut reader = ::std::io::Cursor::new(buffer); + assert!(read(&mut reader).is_err()); + } + + #[test] + fn read_known_message() { + let buffer = &ENCODED_PONG[..]; + let mut reader = ::std::io::Cursor::new(buffer); + let message = read(&mut reader).unwrap(); + match message { + Message::Pong(_) => (), + _ => panic!("Expected pong message; found message type: {}", message.type_id()), + } + } + + #[test] + fn read_unknown_message() { + let buffer = &byte_utils::be16_to_array(::std::u16::MAX); + let mut reader = ::std::io::Cursor::new(buffer); + let message = read(&mut reader).unwrap(); + match message { + Message::Unknown(::std::u16::MAX) => (), + _ => panic!("Expected message type {}; found: {}", ::std::u16::MAX, message.type_id()), + } + } + + #[test] + fn write_message_with_type() { + let message = msgs::Pong { byteslen: 2u16 }; + let mut buffer = Vec::new(); + assert!(write(&message, &mut buffer).is_ok()); + + let type_length = ::std::mem::size_of::(); + let (type_bytes, payload_bytes) = buffer.split_at(type_length); + assert_eq!(byte_utils::slice_to_be16(type_bytes), msgs::Pong::TYPE); + assert_eq!(payload_bytes, &ENCODED_PONG[type_length..]); + } + + #[test] + fn read_message_encoded_with_write() { + let message = msgs::Pong { byteslen: 2u16 }; + let mut buffer = Vec::new(); + assert!(write(&message, &mut buffer).is_ok()); + + let mut reader = ::std::io::Cursor::new(buffer); + let decoded_message = read(&mut reader).unwrap(); + match decoded_message { + Message::Pong(msgs::Pong { byteslen: 2u16 }) => (), + Message::Pong(msgs::Pong { byteslen }) => { + panic!("Expected byteslen {}; found: {}", message.byteslen, byteslen); + }, + _ => panic!("Expected pong message; found message type: {}", decoded_message.type_id()), + } + } +} -- 2.39.5