Add boilerplate for sending and receiving onion messages in PeerManager
authorValentine Wallace <vwallace@protonmail.com>
Sat, 6 Aug 2022 04:33:48 +0000 (00:33 -0400)
committerValentine Wallace <vwallace@protonmail.com>
Fri, 26 Aug 2022 23:02:59 +0000 (19:02 -0400)
Adds the boilerplate needed for PeerManager and OnionMessenger to work
together, with some corresponding docs and misc updates mostly due to the
PeerManager public API changing.

fuzz/src/full_stack.rs
fuzz/src/onion_message.rs
lightning-background-processor/src/lib.rs
lightning-net-tokio/src/lib.rs
lightning/src/ln/msgs.rs
lightning/src/ln/peer_handler.rs
lightning/src/ln/wire.rs
lightning/src/onion_message/functional_tests.rs
lightning/src/onion_message/messenger.rs
lightning/src/util/events.rs

index c1d797ea57914cc4e40fb184ccc0a97568bd99df..f506acc9fe88123ab74ea3fb6e986b73dc52efbb 100644 (file)
@@ -166,7 +166,7 @@ type ChannelMan = ChannelManager<
        EnforcingSigner,
        Arc<chainmonitor::ChainMonitor<EnforcingSigner, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<TestPersister>>>,
        Arc<TestBroadcaster>, Arc<KeyProvider>, Arc<FuzzEstimator>, Arc<dyn Logger>>;
-type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<dyn Logger>>>, Arc<dyn chain::Access>, Arc<dyn Logger>>>, Arc<dyn Logger>, IgnoringMessageHandler>;
+type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<dyn Logger>>>, Arc<dyn chain::Access>, Arc<dyn Logger>>>, IgnoringMessageHandler, Arc<dyn Logger>, IgnoringMessageHandler>;
 
 struct MoneyLossDetector<'a> {
        manager: Arc<ChannelMan>,
@@ -414,6 +414,7 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {
        let mut loss_detector = MoneyLossDetector::new(&peers, channelmanager.clone(), monitor.clone(), PeerManager::new(MessageHandler {
                chan_handler: channelmanager.clone(),
                route_handler: gossip_sync.clone(),
+               onion_message_handler: IgnoringMessageHandler {},
        }, our_network_key, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0], Arc::clone(&logger), IgnoringMessageHandler{}));
 
        let mut should_forward = false;
index 57603dde110e58108c84dbdb496a8b1ec2525c08..a2fe88afc83d6af97896dbf06bb3326d0f66e659 100644 (file)
@@ -6,7 +6,7 @@ use bitcoin::secp256k1::ecdh::SharedSecret;
 use bitcoin::secp256k1::ecdsa::RecoverableSignature;
 
 use lightning::chain::keysinterface::{Recipient, KeyMaterial, KeysInterface};
-use lightning::ln::msgs::{self, DecodeError};
+use lightning::ln::msgs::{self, DecodeError, OnionMessageHandler};
 use lightning::ln::script::ShutdownScript;
 use lightning::util::enforcing_trait_impls::EnforcingSigner;
 use lightning::util::logger::Logger;
index 8f6f0c49c1dcf0fdb3d5a6cb3468aeaafa38ef20..e95c9c3709edfe80cfce45d57bdd003a928725f7 100644 (file)
@@ -19,7 +19,7 @@ use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
 use lightning::chain::keysinterface::{Sign, KeysInterface};
 use lightning::ln::channelmanager::ChannelManager;
-use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
+use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
 use lightning::routing::scoring::WriteableScore;
@@ -281,6 +281,7 @@ impl BackgroundProcessor {
                P: 'static + Deref + Send + Sync,
                Descriptor: 'static + SocketDescriptor + Send + Sync,
                CMH: 'static + Deref + Send + Sync,
+               OMH: 'static + Deref + Send + Sync,
                RMH: 'static + Deref + Send + Sync,
                EH: 'static + EventHandler + Send,
                PS: 'static + Deref + Send,
@@ -289,7 +290,7 @@ impl BackgroundProcessor {
                PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
                RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
                UMH: 'static + Deref + Send + Sync,
-               PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
+               PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
                S: 'static + Deref<Target = SC> + Send + Sync,
                SC: WriteableScore<'a>,
        >(
@@ -306,6 +307,7 @@ impl BackgroundProcessor {
                L::Target: 'static + Logger,
                P::Target: 'static + Persist<Signer>,
                CMH::Target: 'static + ChannelMessageHandler,
+               OMH::Target: 'static + OnionMessageHandler,
                RMH::Target: 'static + RoutingMessageHandler,
                UMH::Target: 'static + CustomMessageHandler,
                PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
@@ -544,7 +546,7 @@ mod tests {
                node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
                p2p_gossip_sync: PGS,
                rapid_gossip_sync: RGS,
-               peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
+               peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
                chain_monitor: Arc<ChainMonitor>,
                persister: Arc<FilesystemPersister>,
                tx_broadcaster: Arc<test_utils::TestBroadcaster>,
@@ -663,7 +665,7 @@ mod tests {
                        let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
                        let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
                        let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
-                       let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
+                       let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
                        let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
                        let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
                        let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
index ac9d4bb3bd5899a08e0dfaadebd811fb779e2c04..7dfa38e95e82f150f73c939b79bb13dc50080043 100644 (file)
@@ -83,7 +83,7 @@ use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
 use lightning::ln::peer_handler;
 use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
 use lightning::ln::peer_handler::CustomMessageHandler;
-use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler, NetAddress};
+use lightning::ln::msgs::{ChannelMessageHandler, NetAddress, OnionMessageHandler, RoutingMessageHandler};
 use lightning::util::logger::Logger;
 
 use std::ops::Deref;
@@ -123,13 +123,15 @@ struct Connection {
        id: u64,
 }
 impl Connection {
-       async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, mut event_receiver: mpsc::Receiver<()>) where
+       async fn poll_event_process<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, mut event_receiver: mpsc::Receiver<()>) where
                        CMH: Deref + 'static + Send + Sync,
                        RMH: Deref + 'static + Send + Sync,
+                       OMH: Deref + 'static + Send + Sync,
                        L: Deref + 'static + Send + Sync,
                        UMH: Deref + 'static + Send + Sync,
                        CMH::Target: ChannelMessageHandler + Send + Sync,
                        RMH::Target: RoutingMessageHandler + Send + Sync,
+                       OMH::Target: OnionMessageHandler + Send + Sync,
                        L::Target: Logger + Send + Sync,
                        UMH::Target: CustomMessageHandler + Send + Sync,
     {
@@ -141,13 +143,15 @@ impl Connection {
                }
        }
 
-       async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
+       async fn schedule_read<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
                        CMH: Deref + 'static + Send + Sync,
                        RMH: Deref + 'static + Send + Sync,
+                       OMH: Deref + 'static + Send + Sync,
                        L: Deref + 'static + Send + Sync,
                        UMH: Deref + 'static + Send + Sync,
                        CMH::Target: ChannelMessageHandler + 'static + Send + Sync,
                        RMH::Target: RoutingMessageHandler + 'static + Send + Sync,
+                       OMH::Target: OnionMessageHandler + 'static + Send + Sync,
                        L::Target: Logger + 'static + Send + Sync,
                        UMH::Target: CustomMessageHandler + 'static + Send + Sync,
         {
@@ -268,13 +272,15 @@ fn get_addr_from_stream(stream: &StdTcpStream) -> Option<NetAddress> {
 /// The returned future will complete when the peer is disconnected and associated handling
 /// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
 /// not need to poll the provided future in order to make progress.
-pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
+pub fn setup_inbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
                CMH: Deref + 'static + Send + Sync,
                RMH: Deref + 'static + Send + Sync,
+               OMH: Deref + 'static + Send + Sync,
                L: Deref + 'static + Send + Sync,
                UMH: Deref + 'static + Send + Sync,
                CMH::Target: ChannelMessageHandler + Send + Sync,
                RMH::Target: RoutingMessageHandler + Send + Sync,
+               OMH::Target: OnionMessageHandler + Send + Sync,
                L::Target: Logger + Send + Sync,
                UMH::Target: CustomMessageHandler + Send + Sync,
 {
@@ -315,13 +321,15 @@ pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManag
 /// The returned future will complete when the peer is disconnected and associated handling
 /// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
 /// not need to poll the provided future in order to make progress.
-pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
+pub fn setup_outbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
                CMH: Deref + 'static + Send + Sync,
                RMH: Deref + 'static + Send + Sync,
+               OMH: Deref + 'static + Send + Sync,
                L: Deref + 'static + Send + Sync,
                UMH: Deref + 'static + Send + Sync,
                CMH::Target: ChannelMessageHandler + Send + Sync,
                RMH::Target: RoutingMessageHandler + Send + Sync,
+               OMH::Target: OnionMessageHandler + Send + Sync,
                L::Target: Logger + Send + Sync,
                UMH::Target: CustomMessageHandler + Send + Sync,
 {
@@ -391,13 +399,15 @@ pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerMana
 /// disconnected and associated handling futures are freed, though, because all processing in said
 /// futures are spawned with tokio::spawn, you do not need to poll the second future in order to
 /// make progress.
-pub async fn connect_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
+pub async fn connect_outbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
                CMH: Deref + 'static + Send + Sync,
                RMH: Deref + 'static + Send + Sync,
+               OMH: Deref + 'static + Send + Sync,
                L: Deref + 'static + Send + Sync,
                UMH: Deref + 'static + Send + Sync,
                CMH::Target: ChannelMessageHandler + Send + Sync,
                RMH::Target: RoutingMessageHandler + Send + Sync,
+               OMH::Target: OnionMessageHandler + Send + Sync,
                L::Target: Logger + Send + Sync,
                UMH::Target: CustomMessageHandler + Send + Sync,
 {
@@ -646,6 +656,7 @@ mod tests {
                let a_manager = Arc::new(PeerManager::new(MessageHandler {
                        chan_handler: Arc::clone(&a_handler),
                        route_handler: Arc::clone(&a_handler),
+                       onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
                }, a_key.clone(), &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
 
                let (b_connected_sender, mut b_connected) = mpsc::channel(1);
@@ -660,6 +671,7 @@ mod tests {
                let b_manager = Arc::new(PeerManager::new(MessageHandler {
                        chan_handler: Arc::clone(&b_handler),
                        route_handler: Arc::clone(&b_handler),
+                       onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
                }, b_key.clone(), &[2; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
 
                // We bind on localhost, hoping the environment is properly configured with a local
@@ -711,6 +723,7 @@ mod tests {
 
                let a_manager = Arc::new(PeerManager::new(MessageHandler {
                        chan_handler: Arc::new(lightning::ln::peer_handler::ErroringMessageHandler::new()),
+                       onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
                        route_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
                }, a_key, &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
 
index ffc595ccedab5bbf547f0f33d1426e0214e4e6bc..190907ce26e3a9f01edcc613d1112086692c6567 100644 (file)
@@ -40,7 +40,7 @@ use core::fmt::Debug;
 use io::{self, Read};
 use io_extras::read_to_end;
 
-use util::events::MessageSendEventsProvider;
+use util::events::{MessageSendEventsProvider, OnionMessageProvider};
 use util::logger;
 use util::ser::{BigSize, LengthReadable, Readable, ReadableArgs, Writeable, Writer, FixedLengthReader, HighZeroBytesDroppedBigSize, Hostname};
 
@@ -945,6 +945,12 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider {
        fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError>;
 }
 
+/// A trait to describe an object that can receive onion messages.
+pub trait OnionMessageHandler : OnionMessageProvider {
+       /// Handle an incoming onion_message message from the given peer.
+       fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage);
+}
+
 mod fuzzy_internal_msgs {
        use prelude::*;
        use ln::{PaymentPreimage, PaymentSecret};
index 573e910ab083849b74587517540ff2210be04983..58c4f11f02c7122f344ddff1fe42c9c12a3fd618 100644 (file)
@@ -19,7 +19,7 @@ use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey};
 
 use ln::features::InitFeatures;
 use ln::msgs;
-use ln::msgs::{ChannelMessageHandler, LightningError, NetAddress, RoutingMessageHandler};
+use ln::msgs::{ChannelMessageHandler, LightningError, NetAddress, OnionMessageHandler, RoutingMessageHandler};
 use ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager};
 use util::ser::{VecWriter, Writeable, Writer};
 use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep};
@@ -27,7 +27,7 @@ use ln::wire;
 use ln::wire::Encode;
 use routing::gossip::{NetworkGraph, P2PGossipSync};
 use util::atomic_counter::AtomicCounter;
-use util::events::{MessageSendEvent, MessageSendEventsProvider};
+use util::events::{MessageSendEvent, MessageSendEventsProvider, OnionMessageProvider};
 use util::logger::Logger;
 
 use prelude::*;
@@ -76,6 +76,12 @@ impl RoutingMessageHandler for IgnoringMessageHandler {
        fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::QueryChannelRange) -> Result<(), LightningError> { Ok(()) }
        fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: msgs::QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) }
 }
+impl OnionMessageProvider for IgnoringMessageHandler {
+       fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option<msgs::OnionMessage> { None }
+}
+impl OnionMessageHandler for IgnoringMessageHandler {
+       fn handle_onion_message(&self, _their_node_id: &PublicKey, _msg: &msgs::OnionMessage) {}
+}
 impl Deref for IgnoringMessageHandler {
        type Target = IgnoringMessageHandler;
        fn deref(&self) -> &Self { self }
@@ -199,9 +205,11 @@ impl Deref for ErroringMessageHandler {
 }
 
 /// Provides references to trait impls which handle different types of messages.
-pub struct MessageHandler<CM: Deref, RM: Deref> where
+pub struct MessageHandler<CM: Deref, RM: Deref, OM: Deref> where
                CM::Target: ChannelMessageHandler,
-               RM::Target: RoutingMessageHandler {
+               RM::Target: RoutingMessageHandler,
+               OM::Target: OnionMessageHandler,
+{
        /// A message handler which handles messages specific to channels. Usually this is just a
        /// [`ChannelManager`] object or an [`ErroringMessageHandler`].
        ///
@@ -212,6 +220,10 @@ pub struct MessageHandler<CM: Deref, RM: Deref> where
        ///
        /// [`P2PGossipSync`]: crate::routing::gossip::P2PGossipSync
        pub route_handler: RM,
+
+       /// A message handler which handles onion messages. For now, this can only be an
+       /// [`IgnoringMessageHandler`].
+       pub onion_message_handler: OM,
 }
 
 /// Provides an object which can be used to send data to and which uniquely identifies a connection
@@ -423,7 +435,7 @@ impl Peer {
 /// issues such as overly long function definitions.
 ///
 /// (C-not exported) as Arcs don't make sense in bindings
-pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<SD, Arc<SimpleArcChannelManager<M, T, F, L>>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<L>>>, Arc<C>, Arc<L>>>, Arc<L>, Arc<IgnoringMessageHandler>>;
+pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<SD, Arc<SimpleArcChannelManager<M, T, F, L>>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<L>>>, Arc<C>, Arc<L>>>, IgnoringMessageHandler, Arc<L>, Arc<IgnoringMessageHandler>>;
 
 /// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference
 /// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't
@@ -433,7 +445,7 @@ pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<SD, Arc<SimpleArc
 /// helps with issues such as long function definitions.
 ///
 /// (C-not exported) as Arcs don't make sense in bindings
-pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, SD, M, T, F, C, L> = PeerManager<SD, SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L>, &'e P2PGossipSync<&'g NetworkGraph<&'f L>, &'h C, &'f L>, &'f L, IgnoringMessageHandler>;
+pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, SD, M, T, F, C, L> = PeerManager<SD, SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L>, &'e P2PGossipSync<&'g NetworkGraph<&'f L>, &'h C, &'f L>, IgnoringMessageHandler, &'f L, IgnoringMessageHandler>;
 
 /// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls
 /// socket events into messages which it passes on to its [`MessageHandler`].
@@ -454,12 +466,13 @@ pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, SD, M, T, F, C, L>
 /// you're using lightning-net-tokio.
 ///
 /// [`read_event`]: PeerManager::read_event
-pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> where
+pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CMH: Deref> where
                CM::Target: ChannelMessageHandler,
                RM::Target: RoutingMessageHandler,
+               OM::Target: OnionMessageHandler,
                L::Target: Logger,
                CMH::Target: CustomMessageHandler {
-       message_handler: MessageHandler<CM, RM>,
+       message_handler: MessageHandler<CM, RM, OM>,
        /// Connection state for each connected peer - we have an outer read-write lock which is taken
        /// as read while we're doing processing for a peer and taken write when a peer is being added
        /// or removed.
@@ -518,31 +531,34 @@ macro_rules! encode_msg {
        }}
 }
 
-impl<Descriptor: SocketDescriptor, CM: Deref, L: Deref> PeerManager<Descriptor, CM, IgnoringMessageHandler, L, IgnoringMessageHandler> where
+impl<Descriptor: SocketDescriptor, CM: Deref, OM: Deref, L: Deref> PeerManager<Descriptor, CM, IgnoringMessageHandler, OM, L, IgnoringMessageHandler> where
                CM::Target: ChannelMessageHandler,
+               OM::Target: OnionMessageHandler,
                L::Target: Logger {
-       /// Constructs a new PeerManager with the given ChannelMessageHandler. No routing message
-       /// handler is used and network graph messages are ignored.
+       /// Constructs a new `PeerManager` with the given `ChannelMessageHandler` and
+       /// `OnionMessageHandler`. No routing message handler is used and network graph messages are
+       /// ignored.
        ///
        /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
        /// cryptographically secure random bytes.
        ///
        /// (C-not exported) as we can't export a PeerManager with a dummy route handler
-       pub fn new_channel_only(channel_message_handler: CM, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L) -> Self {
+       pub fn new_channel_only(channel_message_handler: CM, onion_message_handler: OM, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L) -> Self {
                Self::new(MessageHandler {
                        chan_handler: channel_message_handler,
                        route_handler: IgnoringMessageHandler{},
+                       onion_message_handler,
                }, our_node_secret, ephemeral_random_data, logger, IgnoringMessageHandler{})
        }
 }
 
-impl<Descriptor: SocketDescriptor, RM: Deref, L: Deref> PeerManager<Descriptor, ErroringMessageHandler, RM, L, IgnoringMessageHandler> where
+impl<Descriptor: SocketDescriptor, RM: Deref, L: Deref> PeerManager<Descriptor, ErroringMessageHandler, RM, IgnoringMessageHandler, L, IgnoringMessageHandler> where
                RM::Target: RoutingMessageHandler,
                L::Target: Logger {
-       /// Constructs a new PeerManager with the given RoutingMessageHandler. No channel message
-       /// handler is used and messages related to channels will be ignored (or generate error
-       /// messages). Note that some other lightning implementations time-out connections after some
-       /// time if no channel is built with the peer.
+       /// Constructs a new `PeerManager` with the given `RoutingMessageHandler`. No channel message
+       /// handler or onion message handler is used and onion and channel messages will be ignored (or
+       /// generate error messages). Note that some other lightning implementations time-out connections
+       /// after some time if no channel is built with the peer.
        ///
        /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
        /// cryptographically secure random bytes.
@@ -552,6 +568,7 @@ impl<Descriptor: SocketDescriptor, RM: Deref, L: Deref> PeerManager<Descriptor,
                Self::new(MessageHandler {
                        chan_handler: ErroringMessageHandler::new(),
                        route_handler: routing_message_handler,
+                       onion_message_handler: IgnoringMessageHandler{},
                }, our_node_secret, ephemeral_random_data, logger, IgnoringMessageHandler{})
        }
 }
@@ -597,15 +614,16 @@ fn filter_addresses(ip_address: Option<NetAddress>) -> Option<NetAddress> {
        }
 }
 
-impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> PeerManager<Descriptor, CM, RM, L, CMH> where
+impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CMH: Deref> PeerManager<Descriptor, CM, RM, OM, L, CMH> where
                CM::Target: ChannelMessageHandler,
                RM::Target: RoutingMessageHandler,
+               OM::Target: OnionMessageHandler,
                L::Target: Logger,
                CMH::Target: CustomMessageHandler {
        /// Constructs a new PeerManager with the given message handlers and node_id secret key
        /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
        /// cryptographically secure random bytes.
-       pub fn new(message_handler: MessageHandler<CM, RM>, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L, custom_message_handler: CMH) -> Self {
+       pub fn new(message_handler: MessageHandler<CM, RM, OM>, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L, custom_message_handler: CMH) -> Self {
                let mut ephemeral_key_midstate = Sha256::engine();
                ephemeral_key_midstate.input(ephemeral_random_data);
 
@@ -1314,6 +1332,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                self.message_handler.route_handler.handle_reply_channel_range(&their_node_id, msg)?;
                        },
 
+                       // Onion message:
+                       wire::Message::OnionMessage(msg) => {
+                               self.message_handler.onion_message_handler.handle_onion_message(&their_node_id, &msg);
+                       },
+
                        // Unknown messages:
                        wire::Message::Unknown(type_id) if message.is_even() => {
                                log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", type_id);
@@ -1930,12 +1953,12 @@ mod tests {
                cfgs
        }
 
-       fn create_network<'a>(peer_count: usize, cfgs: &'a Vec<PeerManagerCfg>) -> Vec<PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler>> {
+       fn create_network<'a>(peer_count: usize, cfgs: &'a Vec<PeerManagerCfg>) -> Vec<PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, IgnoringMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler>> {
                let mut peers = Vec::new();
                for i in 0..peer_count {
                        let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap();
                        let ephemeral_bytes = [i as u8; 32];
-                       let msg_handler = MessageHandler { chan_handler: &cfgs[i].chan_handler, route_handler: &cfgs[i].routing_handler };
+                       let msg_handler = MessageHandler { chan_handler: &cfgs[i].chan_handler, route_handler: &cfgs[i].routing_handler, onion_message_handler: IgnoringMessageHandler {} };
                        let peer = PeerManager::new(msg_handler, node_secret, &ephemeral_bytes, &cfgs[i].logger, IgnoringMessageHandler {});
                        peers.push(peer);
                }
@@ -1943,7 +1966,7 @@ mod tests {
                peers
        }
 
-       fn establish_connection<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler>) -> (FileDescriptor, FileDescriptor) {
+       fn establish_connection<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, IgnoringMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, IgnoringMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler>) -> (FileDescriptor, FileDescriptor) {
                let secp_ctx = Secp256k1::new();
                let a_id = PublicKey::from_secret_key(&secp_ctx, &peer_a.our_node_secret);
                let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
index cbf5c77d60095d73abe235c7652eeccb8380cf04..1191a8d3d531477977cad80776bb29f6c49ed2c8 100644 (file)
@@ -9,7 +9,7 @@
 
 //! Wire encoding/decoding for Lightning messages according to [BOLT #1], and for
 //! custom message through the [`CustomMessageReader`] trait.
-//! 
+//!
 //! [BOLT #1]: https://github.com/lightning/bolts/blob/master/01-messaging.md
 
 use io;
@@ -60,6 +60,7 @@ pub(crate) enum Message<T> where T: core::fmt::Debug + Type + TestEq {
        ChannelReady(msgs::ChannelReady),
        Shutdown(msgs::Shutdown),
        ClosingSigned(msgs::ClosingSigned),
+       OnionMessage(msgs::OnionMessage),
        UpdateAddHTLC(msgs::UpdateAddHTLC),
        UpdateFulfillHTLC(msgs::UpdateFulfillHTLC),
        UpdateFailHTLC(msgs::UpdateFailHTLC),
@@ -100,6 +101,7 @@ impl<T> Message<T> where T: core::fmt::Debug + Type + TestEq {
                        &Message::ChannelReady(ref msg) => msg.type_id(),
                        &Message::Shutdown(ref msg) => msg.type_id(),
                        &Message::ClosingSigned(ref msg) => msg.type_id(),
+                       &Message::OnionMessage(ref msg) => msg.type_id(),
                        &Message::UpdateAddHTLC(ref msg) => msg.type_id(),
                        &Message::UpdateFulfillHTLC(ref msg) => msg.type_id(),
                        &Message::UpdateFailHTLC(ref msg) => msg.type_id(),
@@ -185,6 +187,9 @@ fn do_read<R: io::Read, T, H: core::ops::Deref>(buffer: &mut R, message_type: u1
                msgs::ClosingSigned::TYPE => {
                        Ok(Message::ClosingSigned(Readable::read(buffer)?))
                },
+               msgs::OnionMessage::TYPE => {
+                       Ok(Message::OnionMessage(Readable::read(buffer)?))
+               },
                msgs::UpdateAddHTLC::TYPE => {
                        Ok(Message::UpdateAddHTLC(Readable::read(buffer)?))
                },
@@ -344,6 +349,10 @@ impl Encode for msgs::ClosingSigned {
        const TYPE: u16 = 39;
 }
 
+impl Encode for msgs::OnionMessage {
+       const TYPE: u16 = 513;
+}
+
 impl Encode for msgs::UpdateAddHTLC {
        const TYPE: u16 = 128;
 }
index ccc834434a1c09e34ccb3874d444b0801c4e87c7..5a83a0b2b213939822a09c0b29e0d50deec345ad 100644 (file)
@@ -10,6 +10,7 @@
 //! Onion message testing and test utilities live here.
 
 use chain::keysinterface::{KeysInterface, Recipient};
+use ln::msgs::OnionMessageHandler;
 use super::{BlindedRoute, Destination, OnionMessenger, SendError};
 use util::enforcing_trait_impls::EnforcingSigner;
 use util::test_utils;
index a5438afbb8ea0eda20e619dcf7563cff884b706f..2684ab8b3e8aaac5462d4a033db67e3b18e6636c 100644 (file)
@@ -16,13 +16,15 @@ use bitcoin::hashes::sha256::Hash as Sha256;
 use bitcoin::secp256k1::{self, PublicKey, Scalar, Secp256k1, SecretKey};
 
 use chain::keysinterface::{InMemorySigner, KeysInterface, KeysManager, Recipient, Sign};
-use ln::msgs;
+use ln::msgs::{self, OnionMessageHandler};
 use ln::onion_utils;
 use super::blinded_route::{BlindedRoute, ForwardTlvs, ReceiveTlvs};
 use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload, ReceiveControlTlvs, SMALL_PACKET_HOP_DATA_LEN};
 use super::utils;
+use util::events::OnionMessageProvider;
 use util::logger::Logger;
 
+use core::mem;
 use core::ops::Deref;
 use sync::{Arc, Mutex};
 use prelude::*;
@@ -178,10 +180,23 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
                Ok(())
        }
 
+       #[cfg(test)]
+       pub(super) fn release_pending_msgs(&self) -> HashMap<PublicKey, Vec<msgs::OnionMessage>> {
+               let mut pending_msgs = self.pending_messages.lock().unwrap();
+               let mut msgs = HashMap::new();
+               core::mem::swap(&mut *pending_msgs, &mut msgs);
+               msgs
+       }
+}
+
+impl<Signer: Sign, K: Deref, L: Deref> OnionMessageHandler for OnionMessenger<Signer, K, L>
+       where K::Target: KeysInterface<Signer = Signer>,
+             L::Target: Logger,
+{
        /// Handle an incoming onion message. Currently, if a message was destined for us we will log, but
        /// soon we'll delegate the onion message to a handler that can generate invoices or send
        /// payments.
-       pub fn handle_onion_message(&self, _peer_node_id: &PublicKey, msg: &msgs::OnionMessage) {
+       fn handle_onion_message(&self, _peer_node_id: &PublicKey, msg: &msgs::OnionMessage) {
                let control_tlvs_ss = match self.keys_manager.ecdh(Recipient::Node, &msg.blinding_point, None) {
                        Ok(ss) => ss,
                        Err(e) =>  {
@@ -273,13 +288,14 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
                        },
                };
        }
+}
 
-       #[cfg(test)]
-       pub(super) fn release_pending_msgs(&self) -> HashMap<PublicKey, Vec<msgs::OnionMessage>> {
-               let mut pending_msgs = self.pending_messages.lock().unwrap();
-               let mut msgs = HashMap::new();
-               core::mem::swap(&mut *pending_msgs, &mut msgs);
-               msgs
+impl<Signer: Sign, K: Deref, L: Deref> OnionMessageProvider for OnionMessenger<Signer, K, L>
+       where K::Target: KeysInterface<Signer = Signer>,
+             L::Target: Logger,
+{
+       fn next_onion_message_for_peer(&self, peer_node_id: PublicKey) -> Option<msgs::OnionMessage> {
+               None
        }
 }
 
index d56747598563c381ce1e554c2cccda96d0f92be0..e86eae3c813a8af7a3684480171df913ea0883bd 100644 (file)
@@ -1195,6 +1195,12 @@ pub trait MessageSendEventsProvider {
        fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent>;
 }
 
+/// A trait indicating an object may generate onion messages to send
+pub trait OnionMessageProvider {
+       /// Gets the next pending onion message for the peer with the given node id.
+       fn next_onion_message_for_peer(&self, peer_node_id: PublicKey) -> Option<msgs::OnionMessage>;
+}
+
 /// A trait indicating an object may generate events.
 ///
 /// Events are processed by passing an [`EventHandler`] to [`process_pending_events`].