Merge pull request #1748 from valentinewallace/2022-10-custom-oms
authorJeffrey Czyz <jkczyz@gmail.com>
Wed, 19 Oct 2022 22:15:33 +0000 (17:15 -0500)
committerGitHub <noreply@github.com>
Wed, 19 Oct 2022 22:15:33 +0000 (17:15 -0500)
Support custom onion messages

1  2 
lightning/src/ln/peer_handler.rs
lightning/src/onion_message/messenger.rs
lightning/src/onion_message/packet.rs
lightning/src/util/ser.rs

index d1a9744ed332db85c73aefe118a432eeb8032b02,8a7b4632fd54c8b9ef8dec4283a776c38c5e0d1d..80db0dc0b07e269233afaaad6058ae36aaef288a
@@@ -21,11 -21,11 +21,11 @@@ use ln::features::{InitFeatures, NodeFe
  use ln::msgs;
  use ln::msgs::{ChannelMessageHandler, LightningError, NetAddress, OnionMessageHandler, RoutingMessageHandler};
  use ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager};
- use util::ser::{VecWriter, Writeable, Writer};
+ use util::ser::{MaybeReadableArgs, VecWriter, Writeable, Writer};
  use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep};
  use ln::wire;
  use ln::wire::Encode;
- use onion_message::{SimpleArcOnionMessenger, SimpleRefOnionMessenger};
+ use onion_message::{CustomOnionMessageContents, CustomOnionMessageHandler, SimpleArcOnionMessenger, SimpleRefOnionMessenger};
  use routing::gossip::{NetworkGraph, P2PGossipSync};
  use util::atomic_counter::AtomicCounter;
  use util::crypto::sign;
@@@ -36,7 -36,7 +36,7 @@@ use prelude::*
  use io;
  use alloc::collections::LinkedList;
  use sync::{Arc, Mutex, MutexGuard, FairRwLock};
 -use core::sync::atomic::{AtomicBool, AtomicU64, Ordering};
 +use core::sync::atomic::{AtomicBool, AtomicU32, Ordering};
  use core::{cmp, hash, fmt, mem};
  use core::ops::Deref;
  use core::convert::Infallible;
@@@ -95,6 -95,23 +95,23 @@@ impl OnionMessageHandler for IgnoringMe
                InitFeatures::empty()
        }
  }
+ impl CustomOnionMessageHandler for IgnoringMessageHandler {
+       type CustomMessage = Infallible;
+       fn handle_custom_message(&self, _msg: Self::CustomMessage) {
+               // Since we always return `None` in the read the handle method should never be called.
+               unreachable!();
+       }
+ }
+ impl MaybeReadableArgs<u64> for Infallible {
+       fn read<R: io::Read>(_buffer: &mut R, _msg_type: u64) -> Result<Option<Self>, msgs::DecodeError> where Self: Sized {
+               Ok(None)
+       }
+ }
+ impl CustomOnionMessageContents for Infallible {
+       fn tlv_type(&self) -> u64 { unreachable!(); }
+ }
  impl Deref for IgnoringMessageHandler {
        type Target = IgnoringMessageHandler;
        fn deref(&self) -> &Self { self }
@@@ -540,7 -557,7 +557,7 @@@ pub struct PeerManager<Descriptor: Sock
  
        /// Used to track the last value sent in a node_announcement "timestamp" field. We ensure this
        /// value increases strictly since we don't assume access to a time source.
 -      last_node_announcement_serial: AtomicU64,
 +      last_node_announcement_serial: AtomicU32,
  
        our_node_secret: SecretKey,
        ephemeral_key_midstate: Sha256Engine,
@@@ -594,7 -611,7 +611,7 @@@ impl<Descriptor: SocketDescriptor, CM: 
        /// minute should suffice.
        ///
        /// (C-not exported) as we can't export a PeerManager with a dummy route handler
 -      pub fn new_channel_only(channel_message_handler: CM, onion_message_handler: OM, our_node_secret: SecretKey, current_time: u64, ephemeral_random_data: &[u8; 32], logger: L) -> Self {
 +      pub fn new_channel_only(channel_message_handler: CM, onion_message_handler: OM, our_node_secret: SecretKey, current_time: u32, ephemeral_random_data: &[u8; 32], logger: L) -> Self {
                Self::new(MessageHandler {
                        chan_handler: channel_message_handler,
                        route_handler: IgnoringMessageHandler{},
@@@ -620,7 -637,7 +637,7 @@@ impl<Descriptor: SocketDescriptor, RM: 
        /// cryptographically secure random bytes.
        ///
        /// (C-not exported) as we can't export a PeerManager with a dummy channel handler
 -      pub fn new_routing_only(routing_message_handler: RM, our_node_secret: SecretKey, current_time: u64, ephemeral_random_data: &[u8; 32], logger: L) -> Self {
 +      pub fn new_routing_only(routing_message_handler: RM, our_node_secret: SecretKey, current_time: u32, ephemeral_random_data: &[u8; 32], logger: L) -> Self {
                Self::new(MessageHandler {
                        chan_handler: ErroringMessageHandler::new(),
                        route_handler: routing_message_handler,
@@@ -684,7 -701,7 +701,7 @@@ impl<Descriptor: SocketDescriptor, CM: 
        /// incremented irregularly internally. In general it is best to simply use the current UNIX
        /// timestamp, however if it is not available a persistent counter that increases once per
        /// minute should suffice.
 -      pub fn new(message_handler: MessageHandler<CM, RM, OM>, our_node_secret: SecretKey, current_time: u64, ephemeral_random_data: &[u8; 32], logger: L, custom_message_handler: CMH) -> Self {
 +      pub fn new(message_handler: MessageHandler<CM, RM, OM>, our_node_secret: SecretKey, current_time: u32, ephemeral_random_data: &[u8; 32], logger: L, custom_message_handler: CMH) -> Self {
                let mut ephemeral_key_midstate = Sha256::engine();
                ephemeral_key_midstate.input(ephemeral_random_data);
  
                        our_node_secret,
                        ephemeral_key_midstate,
                        peer_counter: AtomicCounter::new(),
 -                      last_node_announcement_serial: AtomicU64::new(current_time),
 +                      last_node_announcement_serial: AtomicU32::new(current_time),
                        logger,
                        custom_message_handler,
                        secp_ctx,
                        .or(self.message_handler.onion_message_handler.provided_node_features());
                let announcement = msgs::UnsignedNodeAnnouncement {
                        features,
 -                      timestamp: self.last_node_announcement_serial.fetch_add(1, Ordering::AcqRel) as u32,
 +                      timestamp: self.last_node_announcement_serial.fetch_add(1, Ordering::AcqRel),
                        node_id: PublicKey::from_secret_key(&self.secp_ctx, &self.our_node_secret),
                        rgb, alias, addresses,
                        excess_address_data: Vec::new(),
index 3677efda420cc8663914512ea38c732216de8f59,5cf2e1e0dceba6e10ed990c82257c91d0062cb6f..83e0311872b9c4ed562585d75c43396809913a60
@@@ -19,7 -19,9 +19,9 @@@ use chain::keysinterface::{InMemorySign
  use ln::features::{InitFeatures, NodeFeatures};
  use ln::msgs::{self, OnionMessageHandler};
  use ln::onion_utils;
+ use ln::peer_handler::IgnoringMessageHandler;
  use super::blinded_route::{BlindedRoute, ForwardTlvs, ReceiveTlvs};
+ pub use super::packet::{CustomOnionMessageContents, OnionMessageContents};
  use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload, ReceiveControlTlvs, SMALL_PACKET_HOP_DATA_LEN};
  use super::utils;
  use util::events::OnionMessageProvider;
@@@ -32,7 -34,7 +34,7 @@@ use prelude::*
  
  /// A sender, receiver and forwarder of onion messages. In upcoming releases, this object will be
  /// used to retrieve invoices and fulfill invoice requests from [offers]. Currently, only sending
- /// and receiving empty onion messages is supported.
+ /// and receiving custom onion messages is supported.
  ///
  /// # Example
  ///
  /// # use bitcoin::hashes::_export::_core::time::Duration;
  /// # use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
  /// # use lightning::chain::keysinterface::{InMemorySigner, KeysManager, KeysInterface};
- /// # use lightning::onion_message::{BlindedRoute, Destination, OnionMessenger};
+ /// # use lightning::ln::msgs::DecodeError;
+ /// # use lightning::ln::peer_handler::IgnoringMessageHandler;
+ /// # use lightning::onion_message::{BlindedRoute, CustomOnionMessageContents, Destination, OnionMessageContents, OnionMessenger};
  /// # use lightning::util::logger::{Logger, Record};
+ /// # use lightning::util::ser::{MaybeReadableArgs, Writeable, Writer};
+ /// # use lightning::io;
  /// # use std::sync::Arc;
  /// # struct FakeLogger {};
  /// # impl Logger for FakeLogger {
  /// # let node_secret = SecretKey::from_slice(&hex::decode("0101010101010101010101010101010101010101010101010101010101010101").unwrap()[..]).unwrap();
  /// # let secp_ctx = Secp256k1::new();
  /// # let hop_node_id1 = PublicKey::from_secret_key(&secp_ctx, &node_secret);
- /// # let (hop_node_id2, hop_node_id3, hop_node_id4) = (hop_node_id1, hop_node_id1,
- /// hop_node_id1);
+ /// # let (hop_node_id2, hop_node_id3, hop_node_id4) = (hop_node_id1, hop_node_id1, hop_node_id1);
  /// # let destination_node_id = hop_node_id1;
- /// #
+ /// # let your_custom_message_handler = IgnoringMessageHandler {};
  /// // Create the onion messenger. This must use the same `keys_manager` as is passed to your
  /// // ChannelManager.
- /// let onion_messenger = OnionMessenger::new(&keys_manager, logger);
+ /// let onion_messenger = OnionMessenger::new(&keys_manager, logger, your_custom_message_handler);
  ///
- /// // Send an empty onion message to a node id.
+ /// # struct YourCustomMessage {}
+ /// impl Writeable for YourCustomMessage {
+ ///   fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
+ ///           # Ok(())
+ ///           // Write your custom onion message to `w`
+ ///   }
+ /// }
+ /// impl CustomOnionMessageContents for YourCustomMessage {
+ ///   fn tlv_type(&self) -> u64 {
+ ///           # let your_custom_message_type = 42;
+ ///           your_custom_message_type
+ ///   }
+ /// }
+ /// impl MaybeReadableArgs<u64> for YourCustomMessage {
+ ///   fn read<R: io::Read>(r: &mut R, message_type: u64) -> Result<Option<Self>, DecodeError> {
+ ///           # unreachable!()
+ ///           // Read your custom onion message of type `message_type` from `r`, or return `None`
+ ///           // if the message type is unknown
+ ///   }
+ /// }
+ /// // Send a custom onion message to a node id.
  /// let intermediate_hops = [hop_node_id1, hop_node_id2];
  /// let reply_path = None;
- /// onion_messenger.send_onion_message(&intermediate_hops, Destination::Node(destination_node_id), reply_path);
+ /// # let your_custom_message = YourCustomMessage {};
+ /// let message = OnionMessageContents::Custom(your_custom_message);
+ /// onion_messenger.send_onion_message(&intermediate_hops, Destination::Node(destination_node_id), message, reply_path);
  ///
  /// // Create a blinded route to yourself, for someone to send an onion message to.
  /// # let your_node_id = hop_node_id1;
  /// let hops = [hop_node_id3, hop_node_id4, your_node_id];
  /// let blinded_route = BlindedRoute::new(&hops, &keys_manager, &secp_ctx).unwrap();
  ///
- /// // Send an empty onion message to a blinded route.
+ /// // Send a custom onion message to a blinded route.
  /// # let intermediate_hops = [hop_node_id1, hop_node_id2];
  /// let reply_path = None;
- /// onion_messenger.send_onion_message(&intermediate_hops, Destination::BlindedRoute(blinded_route), reply_path);
+ /// # let your_custom_message = YourCustomMessage {};
+ /// let message = OnionMessageContents::Custom(your_custom_message);
+ /// onion_messenger.send_onion_message(&intermediate_hops, Destination::BlindedRoute(blinded_route), message, reply_path);
  /// ```
  ///
  /// [offers]: <https://github.com/lightning/bolts/pull/798>
  /// [`OnionMessenger`]: crate::onion_message::OnionMessenger
- pub struct OnionMessenger<Signer: Sign, K: Deref, L: Deref>
+ pub struct OnionMessenger<Signer: Sign, K: Deref, L: Deref, CMH: Deref>
        where K::Target: KeysInterface<Signer = Signer>,
              L::Target: Logger,
+             CMH:: Target: CustomOnionMessageHandler,
  {
        keys_manager: K,
        logger: L,
        pending_messages: Mutex<HashMap<PublicKey, VecDeque<msgs::OnionMessage>>>,
        secp_ctx: Secp256k1<secp256k1::All>,
+       custom_handler: CMH,
        // Coming soon:
        // invoice_handler: InvoiceHandler,
-       // custom_handler: CustomHandler, // handles custom onion messages
  }
  
  /// The destination of an onion message.
@@@ -114,7 -144,7 +144,7 @@@ impl Destination 
  /// Errors that may occur when [sending an onion message].
  ///
  /// [sending an onion message]: OnionMessenger::send_onion_message
 -#[derive(Debug, PartialEq)]
 +#[derive(Debug, PartialEq, Eq)]
  pub enum SendError {
        /// Errored computing onion message packet keys.
        Secp256k1(secp256k1::Error),
        TooFewBlindedHops,
        /// Our next-hop peer was offline or does not support onion message forwarding.
        InvalidFirstHop,
+       /// Onion message contents must have a TLV type >= 64.
+       InvalidMessage,
        /// Our next-hop peer's buffer was full or our total outbound buffer was full.
        BufferFull,
  }
  
- impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
+ /// Handler for custom onion messages. If you are using [`SimpleArcOnionMessenger`],
+ /// [`SimpleRefOnionMessenger`], or prefer to ignore inbound custom onion messages,
+ /// [`IgnoringMessageHandler`] must be provided to [`OnionMessenger::new`]. Otherwise, a custom
+ /// implementation of this trait must be provided, with [`CustomMessage`] specifying the supported
+ /// message types.
+ ///
+ /// See [`OnionMessenger`] for example usage.
+ ///
+ /// [`IgnoringMessageHandler`]: crate::ln::peer_handler::IgnoringMessageHandler
+ /// [`CustomMessage`]: Self::CustomMessage
+ pub trait CustomOnionMessageHandler {
+       /// The message known to the handler. To support multiple message types, you may want to make this
+       /// an enum with a variant for each supported message.
+       type CustomMessage: CustomOnionMessageContents;
+       /// Called with the custom message that was received.
+       fn handle_custom_message(&self, msg: Self::CustomMessage);
+ }
+ impl<Signer: Sign, K: Deref, L: Deref, CMH: Deref> OnionMessenger<Signer, K, L, CMH>
        where K::Target: KeysInterface<Signer = Signer>,
              L::Target: Logger,
+             CMH::Target: CustomOnionMessageHandler,
  {
        /// Constructs a new `OnionMessenger` to send, forward, and delegate received onion messages to
        /// their respective handlers.
-       pub fn new(keys_manager: K, logger: L) -> Self {
+       pub fn new(keys_manager: K, logger: L, custom_handler: CMH) -> Self {
                let mut secp_ctx = Secp256k1::new();
                secp_ctx.seeded_randomize(&keys_manager.get_secure_random_bytes());
                OnionMessenger {
                        pending_messages: Mutex::new(HashMap::new()),
                        secp_ctx,
                        logger,
+                       custom_handler,
                }
        }
  
-       /// Send an empty onion message to `destination`, routing it through `intermediate_nodes`.
+       /// Send an onion message with contents `message` to `destination`, routing it through `intermediate_nodes`.
        /// See [`OnionMessenger`] for example usage.
-       pub fn send_onion_message(&self, intermediate_nodes: &[PublicKey], destination: Destination, reply_path: Option<BlindedRoute>) -> Result<(), SendError> {
+       pub fn send_onion_message<T: CustomOnionMessageContents>(&self, intermediate_nodes: &[PublicKey], destination: Destination, message: OnionMessageContents<T>, reply_path: Option<BlindedRoute>) -> Result<(), SendError> {
                if let Destination::BlindedRoute(BlindedRoute { ref blinded_hops, .. }) = destination {
                        if blinded_hops.len() < 2 {
                                return Err(SendError::TooFewBlindedHops);
                        }
                }
+               let OnionMessageContents::Custom(ref msg) = message;
+               if msg.tlv_type() < 64 { return Err(SendError::InvalidMessage) }
                let blinding_secret_bytes = self.keys_manager.get_secure_random_bytes();
                let blinding_secret = SecretKey::from_slice(&blinding_secret_bytes[..]).expect("RNG is busted");
                let (introduction_node_id, blinding_point) = if intermediate_nodes.len() != 0 {
                        }
                };
                let (packet_payloads, packet_keys) = packet_payloads_and_keys(
-                       &self.secp_ctx, intermediate_nodes, destination, reply_path, &blinding_secret)
+                       &self.secp_ctx, intermediate_nodes, destination, message, reply_path, &blinding_secret)
                        .map_err(|e| SendError::Secp256k1(e))?;
  
                let prng_seed = self.keys_manager.get_secure_random_bytes();
@@@ -221,9 -276,10 +276,10 @@@ fn outbound_buffer_full(peer_node_id: &
        false
  }
  
- impl<Signer: Sign, K: Deref, L: Deref> OnionMessageHandler for OnionMessenger<Signer, K, L>
+ impl<Signer: Sign, K: Deref, L: Deref, CMH: Deref> OnionMessageHandler for OnionMessenger<Signer, K, L, CMH>
        where K::Target: KeysInterface<Signer = Signer>,
              L::Target: Logger,
+             CMH::Target: CustomOnionMessageHandler,
  {
        /// Handle an incoming onion message. Currently, if a message was destined for us we will log, but
        /// soon we'll delegate the onion message to a handler that can generate invoices or send
                match onion_utils::decode_next_hop(onion_decode_ss, &msg.onion_routing_packet.hop_data[..],
                        msg.onion_routing_packet.hmac, control_tlvs_ss)
                {
-                       Ok((Payload::Receive {
-                               control_tlvs: ReceiveControlTlvs::Unblinded(ReceiveTlvs { path_id }), reply_path,
+                       Ok((Payload::Receive::<<<CMH as Deref>::Target as CustomOnionMessageHandler>::CustomMessage> {
+                               message, control_tlvs: ReceiveControlTlvs::Unblinded(ReceiveTlvs { path_id }), reply_path,
                        }, None)) => {
                                log_info!(self.logger,
-                                       "Received an onion message with path_id: {:02x?} and {}reply_path",
-                                               path_id, if reply_path.is_some() { "" } else { "no " });
+                                       "Received an onion message with path_id {:02x?} and {} reply_path",
+                                               path_id, if reply_path.is_some() { "a" } else { "no" });
+                               match message {
+                                       OnionMessageContents::Custom(msg) => self.custom_handler.handle_custom_message(msg),
+                               }
                        },
                        Ok((Payload::Forward(ForwardControlTlvs::Unblinded(ForwardTlvs {
                                next_node_id, next_blinding_override
        }
  }
  
- impl<Signer: Sign, K: Deref, L: Deref> OnionMessageProvider for OnionMessenger<Signer, K, L>
+ impl<Signer: Sign, K: Deref, L: Deref, CMH: Deref> OnionMessageProvider for OnionMessenger<Signer, K, L, CMH>
        where K::Target: KeysInterface<Signer = Signer>,
              L::Target: Logger,
+             CMH::Target: CustomOnionMessageHandler,
  {
        fn next_onion_message_for_peer(&self, peer_node_id: PublicKey) -> Option<msgs::OnionMessage> {
                let mut pending_msgs = self.pending_messages.lock().unwrap();
  ///
  /// [`SimpleArcChannelManager`]: crate::ln::channelmanager::SimpleArcChannelManager
  /// [`SimpleArcPeerManager`]: crate::ln::peer_handler::SimpleArcPeerManager
- pub type SimpleArcOnionMessenger<L> = OnionMessenger<InMemorySigner, Arc<KeysManager>, Arc<L>>;
+ pub type SimpleArcOnionMessenger<L> = OnionMessenger<InMemorySigner, Arc<KeysManager>, Arc<L>, IgnoringMessageHandler>;
  /// Useful for simplifying the parameters of [`SimpleRefChannelManager`] and
  /// [`SimpleRefPeerManager`]. See their docs for more details.
  ///
  ///
  /// [`SimpleRefChannelManager`]: crate::ln::channelmanager::SimpleRefChannelManager
  /// [`SimpleRefPeerManager`]: crate::ln::peer_handler::SimpleRefPeerManager
- pub type SimpleRefOnionMessenger<'a, 'b, L> = OnionMessenger<InMemorySigner, &'a KeysManager, &'b L>;
+ pub type SimpleRefOnionMessenger<'a, 'b, L> = OnionMessenger<InMemorySigner, &'a KeysManager, &'b L, IgnoringMessageHandler>;
  
  /// Construct onion packet payloads and keys for sending an onion message along the given
  /// `unblinded_path` to the given `destination`.
- fn packet_payloads_and_keys<T: secp256k1::Signing + secp256k1::Verification>(
-       secp_ctx: &Secp256k1<T>, unblinded_path: &[PublicKey], destination: Destination, mut reply_path:
-       Option<BlindedRoute>, session_priv: &SecretKey
- ) -> Result<(Vec<(Payload, [u8; 32])>, Vec<onion_utils::OnionKeys>), secp256k1::Error> {
+ fn packet_payloads_and_keys<T: CustomOnionMessageContents, S: secp256k1::Signing + secp256k1::Verification>(
+       secp_ctx: &Secp256k1<S>, unblinded_path: &[PublicKey], destination: Destination,
+       message: OnionMessageContents<T>, mut reply_path: Option<BlindedRoute>, session_priv: &SecretKey
+ ) -> Result<(Vec<(Payload<T>, [u8; 32])>, Vec<onion_utils::OnionKeys>), secp256k1::Error> {
        let num_hops = unblinded_path.len() + destination.num_hops();
        let mut payloads = Vec::with_capacity(num_hops);
        let mut onion_packet_keys = Vec::with_capacity(num_hops);
        let mut unblinded_path_idx = 0;
        let mut blinded_path_idx = 0;
        let mut prev_control_tlvs_ss = None;
+       let mut final_control_tlvs = None;
        utils::construct_keys_callback(secp_ctx, unblinded_path, Some(destination), session_priv, |_, onion_packet_ss, ephemeral_pubkey, control_tlvs_ss, unblinded_pk_opt, enc_payload_opt| {
                if num_unblinded_hops != 0 && unblinded_path_idx < num_unblinded_hops {
                        if let Some(ss) = prev_control_tlvs_ss.take() {
                                control_tlvs_ss));
                        blinded_path_idx += 1;
                } else if let Some(encrypted_payload) = enc_payload_opt {
-                       payloads.push((Payload::Receive {
-                               control_tlvs: ReceiveControlTlvs::Blinded(encrypted_payload),
-                               reply_path: reply_path.take(),
-                       }, control_tlvs_ss));
+                       final_control_tlvs = Some(ReceiveControlTlvs::Blinded(encrypted_payload));
+                       prev_control_tlvs_ss = Some(control_tlvs_ss);
                }
  
                let (rho, mu) = onion_utils::gen_rho_mu_from_shared_secret(onion_packet_ss.as_ref());
                });
        })?;
  
-       if let Some(control_tlvs_ss) = prev_control_tlvs_ss {
+       if let Some(control_tlvs) = final_control_tlvs {
+               payloads.push((Payload::Receive {
+                       control_tlvs,
+                       reply_path: reply_path.take(),
+                       message,
+               }, prev_control_tlvs_ss.unwrap()));
+       } else {
                payloads.push((Payload::Receive {
                        control_tlvs: ReceiveControlTlvs::Unblinded(ReceiveTlvs { path_id: None, }),
                        reply_path: reply_path.take(),
-               }, control_tlvs_ss));
+                       message,
+               }, prev_control_tlvs_ss.unwrap()));
        }
  
        Ok((payloads, onion_packet_keys))
  }
  
  /// Errors if the serialized payload size exceeds onion_message::BIG_PACKET_HOP_DATA_LEN
- fn construct_onion_message_packet(payloads: Vec<(Payload, [u8; 32])>, onion_keys: Vec<onion_utils::OnionKeys>, prng_seed: [u8; 32]) -> Result<Packet, ()> {
+ fn construct_onion_message_packet<T: CustomOnionMessageContents>(payloads: Vec<(Payload<T>, [u8; 32])>, onion_keys: Vec<onion_utils::OnionKeys>, prng_seed: [u8; 32]) -> Result<Packet, ()> {
        // Spec rationale:
        // "`len` allows larger messages to be sent than the standard 1300 bytes allowed for an HTLC
        // onion, but this should be used sparingly as it is reduces anonymity set, hence the
index 20b1fb0b82fdf5cf0c0b16096f524110d20ba6da,74d253bf5744158befd927ac03665b9e339f89b6..41b8a1634f6bcf68147e15e301c72515a543004f
@@@ -16,7 -16,7 +16,7 @@@ use ln::msgs::DecodeError
  use ln::onion_utils;
  use super::blinded_route::{BlindedRoute, ForwardTlvs, ReceiveTlvs};
  use util::chacha20poly1305rfc::{ChaChaPolyReadAdapter, ChaChaPolyWriteAdapter};
- use util::ser::{BigSize, FixedLengthReader, LengthRead, LengthReadable, LengthReadableArgs, Readable, ReadableArgs, Writeable, Writer};
+ use util::ser::{BigSize, FixedLengthReader, LengthRead, LengthReadable, LengthReadableArgs, MaybeReadableArgs, Readable, ReadableArgs, Writeable, Writer};
  
  use core::cmp;
  use io::{self, Read};
@@@ -27,7 -27,7 +27,7 @@@ use prelude::*
  pub(super) const SMALL_PACKET_HOP_DATA_LEN: usize = 1300;
  pub(super) const BIG_PACKET_HOP_DATA_LEN: usize = 32768;
  
 -#[derive(Clone, Debug, PartialEq)]
 +#[derive(Clone, Debug, PartialEq, Eq)]
  pub(crate) struct Packet {
        pub(super) version: u8,
        pub(super) public_key: PublicKey,
@@@ -92,25 -92,52 +92,52 @@@ impl LengthReadable for Packet 
  /// Onion message payloads contain "control" TLVs and "data" TLVs. Control TLVs are used to route
  /// the onion message from hop to hop and for path verification, whereas data TLVs contain the onion
  /// message content itself, such as an invoice request.
- pub(super) enum Payload {
+ pub(super) enum Payload<T: CustomOnionMessageContents> {
        /// This payload is for an intermediate hop.
        Forward(ForwardControlTlvs),
        /// This payload is for the final hop.
        Receive {
                control_tlvs: ReceiveControlTlvs,
                reply_path: Option<BlindedRoute>,
-               // Coming soon:
-               // message: Message,
+               message: OnionMessageContents<T>,
        }
  }
  
- // Coming soon:
- // enum Message {
- //    InvoiceRequest(InvoiceRequest),
- //    Invoice(Invoice),
- //    InvoiceError(InvoiceError),
- //    CustomMessage<T>,
- // }
+ #[derive(Debug)]
+ /// The contents of an onion message. In the context of offers, this would be the invoice, invoice
+ /// request, or invoice error.
+ pub enum OnionMessageContents<T> where T: CustomOnionMessageContents {
+       // Coming soon:
+       // Invoice,
+       // InvoiceRequest,
+       // InvoiceError,
+       /// A custom onion message specified by the user.
+       Custom(T),
+ }
+ impl<T> OnionMessageContents<T> where T: CustomOnionMessageContents {
+       /// Returns the type that was used to decode the message payload.
+       pub fn tlv_type(&self) -> u64 {
+               match self {
+                       &OnionMessageContents::Custom(ref msg) => msg.tlv_type(),
+               }
+       }
+ }
+ impl<T: CustomOnionMessageContents> Writeable for OnionMessageContents<T> {
+       fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
+               match self {
+                       OnionMessageContents::Custom(msg) => Ok(msg.write(w)?),
+               }
+       }
+ }
+ /// The contents of a custom onion message. Must implement `MaybeReadableArgs<u64>` where the `u64`
+ /// is the custom TLV type attempting to be read, and return `Ok(None)` if the TLV type is unknown.
+ pub trait CustomOnionMessageContents: Writeable + MaybeReadableArgs<u64> {
+       /// Returns the TLV type identifying the message contents. MUST be >= 64.
+       fn tlv_type(&self) -> u64;
+ }
  
  /// Forward control TLVs in their blinded and unblinded form.
  pub(super) enum ForwardControlTlvs {
@@@ -132,7 -159,7 +159,7 @@@ pub(super) enum ReceiveControlTlvs 
  }
  
  // Uses the provided secret to simultaneously encode and encrypt the unblinded control TLVs.
- impl Writeable for (Payload, [u8; 32]) {
+ impl<T: CustomOnionMessageContents> Writeable for (Payload<T>, [u8; 32]) {
        fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
                match &self.0 {
                        Payload::Forward(ForwardControlTlvs::Blinded(encrypted_bytes)) => {
                                })
                        },
                        Payload::Receive {
-                               control_tlvs: ReceiveControlTlvs::Blinded(encrypted_bytes), reply_path
+                               control_tlvs: ReceiveControlTlvs::Blinded(encrypted_bytes), reply_path, message,
                        } => {
                                encode_varint_length_prefixed_tlv!(w, {
                                        (2, reply_path, option),
-                                       (4, encrypted_bytes, vec_type)
+                                       (4, encrypted_bytes, vec_type),
+                                       (message.tlv_type(), message, required)
                                })
                        },
                        Payload::Forward(ForwardControlTlvs::Unblinded(control_tlvs)) => {
                                })
                        },
                        Payload::Receive {
-                               control_tlvs: ReceiveControlTlvs::Unblinded(control_tlvs), reply_path,
+                               control_tlvs: ReceiveControlTlvs::Unblinded(control_tlvs), reply_path, message,
                        } => {
                                let write_adapter = ChaChaPolyWriteAdapter::new(self.1, &control_tlvs);
                                encode_varint_length_prefixed_tlv!(w, {
                                        (2, reply_path, option),
-                                       (4, write_adapter, required)
+                                       (4, write_adapter, required),
+                                       (message.tlv_type(), message, required)
                                })
                        },
                }
        }
  }
  
- // Uses the provided secret to simultaneously decode and decrypt the control TLVs.
- impl ReadableArgs<SharedSecret> for Payload {
+ // Uses the provided secret to simultaneously decode and decrypt the control TLVs and data TLV.
+ impl<T: CustomOnionMessageContents> ReadableArgs<SharedSecret> for Payload<T> {
        fn read<R: Read>(r: &mut R, encrypted_tlvs_ss: SharedSecret) -> Result<Self, DecodeError> {
                let v: BigSize = Readable::read(r)?;
                let mut rd = FixedLengthReader::new(r, v.0);
                let mut reply_path: Option<BlindedRoute> = None;
                let mut read_adapter: Option<ChaChaPolyReadAdapter<ControlTlvs>> = None;
                let rho = onion_utils::gen_rho_from_shared_secret(&encrypted_tlvs_ss.secret_bytes());
+               let mut message_type: Option<u64> = None;
+               let mut message = None;
                decode_tlv_stream!(&mut rd, {
                        (2, reply_path, option),
-                       (4, read_adapter, (option: LengthReadableArgs, rho))
+                       (4, read_adapter, (option: LengthReadableArgs, rho)),
+               }, |msg_type, msg_reader| {
+                       if msg_type < 64 { return Ok(false) }
+                       // Don't allow reading more than one data TLV from an onion message.
+                       if message_type.is_some() { return Err(DecodeError::InvalidValue) }
+                       message_type = Some(msg_type);
+                       match T::read(msg_reader, msg_type) {
+                               Ok(Some(msg)) => {
+                                       message = Some(msg);
+                                       Ok(true)
+                               },
+                               Ok(None) => Ok(false),
+                               Err(e) => Err(e),
+                       }
                });
                rd.eat_remaining().map_err(|_| DecodeError::ShortRead)?;
  
                match read_adapter {
                        None => return Err(DecodeError::InvalidValue),
                        Some(ChaChaPolyReadAdapter { readable: ControlTlvs::Forward(tlvs)}) => {
+                               if message_type.is_some() {
+                                       return Err(DecodeError::InvalidValue)
+                               }
                                Ok(Payload::Forward(ForwardControlTlvs::Unblinded(tlvs)))
                        },
                        Some(ChaChaPolyReadAdapter { readable: ControlTlvs::Receive(tlvs)}) => {
-                               Ok(Payload::Receive { control_tlvs: ReceiveControlTlvs::Unblinded(tlvs), reply_path })
-                       },
+                               if message.is_none() { return Err(DecodeError::InvalidValue) }
+                               Ok(Payload::Receive {
+                                       control_tlvs: ReceiveControlTlvs::Unblinded(tlvs),
+                                       reply_path,
+                                       message: OnionMessageContents::Custom(message.unwrap()),
+                               })
+                       }
                }
        }
  }
index 3de03ea5a0d5efd93b4c3a22e4d88845656dbf23,6b17ec7ed6b10344f69028001e028d69b6cc99d4..3b82da8446da033d5d579cf7ec5a7d87e85225f8
@@@ -269,6 -269,15 +269,15 @@@ impl<T: Readable> MaybeReadable for T 
        }
  }
  
+ /// A trait that various rust-lightning types implement allowing them to (maybe) be read in from a
+ /// Read, given some additional set of arguments which is required to deserialize.
+ ///
+ /// (C-not exported) as we only export serialization to/from byte arrays instead
+ pub trait MaybeReadableArgs<P> {
+       /// Reads a Self in from the given Read
+       fn read<R: Read>(reader: &mut R, params: P) -> Result<Option<Self>, DecodeError> where Self: Sized;
+ }
  pub(crate) struct OptionDeserWrapper<T: Readable>(pub Option<T>);
  impl<T: Readable> Readable for OptionDeserWrapper<T> {
        #[inline]
@@@ -399,7 -408,7 +408,7 @@@ impl Readable for BigSize 
  /// In TLV we occasionally send fields which only consist of, or potentially end with, a
  /// variable-length integer which is simply truncated by skipping high zero bytes. This type
  /// encapsulates such integers implementing Readable/Writeable for them.
 -#[cfg_attr(test, derive(PartialEq, Debug))]
 +#[cfg_attr(test, derive(PartialEq, Eq, Debug))]
  pub(crate) struct HighZeroBytesDroppedBigSize<T>(pub T);
  
  macro_rules! impl_writeable_primitive {
@@@ -523,29 -532,6 +532,29 @@@ impl_array!(PUBLIC_KEY_SIZE); // for Pu
  impl_array!(COMPACT_SIGNATURE_SIZE); // for Signature
  impl_array!(1300); // for OnionPacket.hop_data
  
 +impl Writeable for [u16; 8] {
 +      #[inline]
 +      fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
 +              for v in self.iter() {
 +                      w.write_all(&v.to_be_bytes())?
 +              }
 +              Ok(())
 +      }
 +}
 +
 +impl Readable for [u16; 8] {
 +      #[inline]
 +      fn read<R: Read>(r: &mut R) -> Result<Self, DecodeError> {
 +              let mut buf = [0u8; 16];
 +              r.read_exact(&mut buf)?;
 +              let mut res = [0u16; 8];
 +              for (idx, v) in res.iter_mut().enumerate() {
 +                      *v = (buf[idx] as u16) << 8 | (buf[idx + 1] as u16)
 +              }
 +              Ok(res)
 +      }
 +}
 +
  // HashMap
  impl<K, V> Writeable for HashMap<K, V>
        where K: Writeable + Eq + Hash,
@@@ -979,7 -965,7 +988,7 @@@ impl Readable for String 
  /// The character set consists of ASCII alphanumeric characters, hyphens, and periods.
  /// Its length is guaranteed to be representable by a single byte.
  /// This serialization is used by BOLT 7 hostnames.
 -#[derive(Clone, Debug, PartialEq)]
 +#[derive(Clone, Debug, PartialEq, Eq)]
  pub struct Hostname(String);
  impl Hostname {
        /// Returns the length of the hostname.