Merge pull request #1643 from jurvis/jurvis/2022-07-inflights-htlcs-across-payments
authorJeffrey Czyz <jkczyz@gmail.com>
Thu, 1 Sep 2022 20:01:30 +0000 (15:01 -0500)
committerGitHub <noreply@github.com>
Thu, 1 Sep 2022 20:01:30 +0000 (15:01 -0500)
Track in-flight HTLCs across payments when routing

19 files changed:
fuzz/Cargo.toml
fuzz/ci-fuzz.sh
fuzz/src/full_stack.rs
fuzz/src/onion_message.rs
lightning-background-processor/src/lib.rs
lightning-invoice/fuzz/Cargo.toml
lightning-invoice/fuzz/ci-fuzz.sh
lightning-net-tokio/src/lib.rs
lightning/src/chain/channelmonitor.rs
lightning/src/ln/monitor_tests.rs
lightning/src/ln/msgs.rs
lightning/src/ln/peer_handler.rs
lightning/src/ln/wire.rs
lightning/src/onion_message/blinded_route.rs
lightning/src/onion_message/functional_tests.rs
lightning/src/onion_message/messenger.rs
lightning/src/onion_message/packet.rs
lightning/src/util/events.rs
lightning/src/util/macro_logger.rs

index be37fb83cdddbbe48034c6bfb990762c299aa9b3..95308e1bf5c3ae2f81db847b08abcc2690796c7c 100644 (file)
@@ -22,7 +22,7 @@ lightning = { path = "../lightning", features = ["regex"] }
 lightning-rapid-gossip-sync = { path = "../lightning-rapid-gossip-sync" }
 bitcoin = { version = "0.29.0", features = ["secp-lowmemory"] }
 hex = "0.3"
-honggfuzz = { version = "0.5", optional = true }
+honggfuzz = { version = "0.5", optional = true, default-features = false }
 libfuzzer-sys = { version = "0.4", optional = true }
 
 [build-dependencies]
index 6f0074b6a1a0d44ea889d8759fd3d7fde037e950..969505ca88d1bf1e227fd1eeee73c26c13dea741 100755 (executable)
@@ -13,7 +13,7 @@ rm *_target.rs
 [ "$(git diff)" != "" ] && exit 1
 popd
 
-cargo install --color always --force honggfuzz
+cargo install --color always --force honggfuzz --no-default-features
 sed -i 's/lto = true//' Cargo.toml
 HFUZZ_BUILD_ARGS="--features honggfuzz_fuzz" cargo --color always hfuzz build
 for TARGET in src/bin/*.rs; do
index c1d797ea57914cc4e40fb184ccc0a97568bd99df..f506acc9fe88123ab74ea3fb6e986b73dc52efbb 100644 (file)
@@ -166,7 +166,7 @@ type ChannelMan = ChannelManager<
        EnforcingSigner,
        Arc<chainmonitor::ChainMonitor<EnforcingSigner, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<TestPersister>>>,
        Arc<TestBroadcaster>, Arc<KeyProvider>, Arc<FuzzEstimator>, Arc<dyn Logger>>;
-type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<dyn Logger>>>, Arc<dyn chain::Access>, Arc<dyn Logger>>>, Arc<dyn Logger>, IgnoringMessageHandler>;
+type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<dyn Logger>>>, Arc<dyn chain::Access>, Arc<dyn Logger>>>, IgnoringMessageHandler, Arc<dyn Logger>, IgnoringMessageHandler>;
 
 struct MoneyLossDetector<'a> {
        manager: Arc<ChannelMan>,
@@ -414,6 +414,7 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {
        let mut loss_detector = MoneyLossDetector::new(&peers, channelmanager.clone(), monitor.clone(), PeerManager::new(MessageHandler {
                chan_handler: channelmanager.clone(),
                route_handler: gossip_sync.clone(),
+               onion_message_handler: IgnoringMessageHandler {},
        }, our_network_key, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0], Arc::clone(&logger), IgnoringMessageHandler{}));
 
        let mut should_forward = false;
index 7ab2bd63a9f0fa6556ba8f1f0e99543f948e4b2b..a2fe88afc83d6af97896dbf06bb3326d0f66e659 100644 (file)
@@ -6,7 +6,7 @@ use bitcoin::secp256k1::ecdh::SharedSecret;
 use bitcoin::secp256k1::ecdsa::RecoverableSignature;
 
 use lightning::chain::keysinterface::{Recipient, KeyMaterial, KeysInterface};
-use lightning::ln::msgs::{self, DecodeError};
+use lightning::ln::msgs::{self, DecodeError, OnionMessageHandler};
 use lightning::ln::script::ShutdownScript;
 use lightning::util::enforcing_trait_impls::EnforcingSigner;
 use lightning::util::logger::Logger;
@@ -122,7 +122,7 @@ mod tests {
                super::do_test(&::hex::decode(one_hop_om).unwrap(), &logger);
                {
                        let log_entries = logger.lines.lock().unwrap();
-                       assert_eq!(log_entries.get(&("lightning::onion_message::messenger".to_string(), "Received an onion message with path_id: None".to_string())), Some(&1));
+                       assert_eq!(log_entries.get(&("lightning::onion_message::messenger".to_string(), "Received an onion message with path_id: None and no reply_path".to_string())), Some(&1));
                }
 
                let two_unblinded_hops_om = "020000000000000000000000000000000000000000000000000000000000000e01055600020000000000000000000000000000000000000000000000000000000000000e0135043304210200000000000000000000000000000000000000000000000000000000000000039500000000000000000000000000000058000000000000000000000000000000000000000000000000000000000000001204105e00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000b300000000000000000000000000000000000000000000000000000000000000";
index 8f6f0c49c1dcf0fdb3d5a6cb3468aeaafa38ef20..e95c9c3709edfe80cfce45d57bdd003a928725f7 100644 (file)
@@ -19,7 +19,7 @@ use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
 use lightning::chain::keysinterface::{Sign, KeysInterface};
 use lightning::ln::channelmanager::ChannelManager;
-use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
+use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
 use lightning::routing::scoring::WriteableScore;
@@ -281,6 +281,7 @@ impl BackgroundProcessor {
                P: 'static + Deref + Send + Sync,
                Descriptor: 'static + SocketDescriptor + Send + Sync,
                CMH: 'static + Deref + Send + Sync,
+               OMH: 'static + Deref + Send + Sync,
                RMH: 'static + Deref + Send + Sync,
                EH: 'static + EventHandler + Send,
                PS: 'static + Deref + Send,
@@ -289,7 +290,7 @@ impl BackgroundProcessor {
                PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
                RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
                UMH: 'static + Deref + Send + Sync,
-               PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
+               PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
                S: 'static + Deref<Target = SC> + Send + Sync,
                SC: WriteableScore<'a>,
        >(
@@ -306,6 +307,7 @@ impl BackgroundProcessor {
                L::Target: 'static + Logger,
                P::Target: 'static + Persist<Signer>,
                CMH::Target: 'static + ChannelMessageHandler,
+               OMH::Target: 'static + OnionMessageHandler,
                RMH::Target: 'static + RoutingMessageHandler,
                UMH::Target: 'static + CustomMessageHandler,
                PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
@@ -544,7 +546,7 @@ mod tests {
                node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
                p2p_gossip_sync: PGS,
                rapid_gossip_sync: RGS,
-               peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
+               peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
                chain_monitor: Arc<ChainMonitor>,
                persister: Arc<FilesystemPersister>,
                tx_broadcaster: Arc<test_utils::TestBroadcaster>,
@@ -663,7 +665,7 @@ mod tests {
                        let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
                        let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
                        let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
-                       let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
+                       let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
                        let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
                        let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
                        let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
index 833606c1ad83eea553af28d9d780639093e249a9..6f79757c22bb810d574a0438003b70ee76444e6b 100644 (file)
@@ -12,7 +12,7 @@ afl_fuzz = ["afl"]
 honggfuzz_fuzz = ["honggfuzz"]
 
 [dependencies]
-honggfuzz = { version = "0.5", optional = true }
+honggfuzz = { version = "0.5", optional = true, default-features = false }
 afl = { version = "0.4", optional = true }
 lightning-invoice = { path = ".." }
 lightning = { path = "../../lightning", features = ["regex"] }
index ae85ea91301283a5a6765949f361deaba9986304..db1b9eb388c5231d58dbb0e9d03e338f40b2c8e6 100755 (executable)
@@ -1,6 +1,6 @@
 #!/bin/bash
 set -e
-cargo install --force honggfuzz
+cargo install --force honggfuzz --no-default-features
 for TARGET in fuzz_targets/*; do
     FILENAME=$(basename $TARGET)
        FILE="${FILENAME%.*}"
index ac9d4bb3bd5899a08e0dfaadebd811fb779e2c04..7dfa38e95e82f150f73c939b79bb13dc50080043 100644 (file)
@@ -83,7 +83,7 @@ use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
 use lightning::ln::peer_handler;
 use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
 use lightning::ln::peer_handler::CustomMessageHandler;
-use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler, NetAddress};
+use lightning::ln::msgs::{ChannelMessageHandler, NetAddress, OnionMessageHandler, RoutingMessageHandler};
 use lightning::util::logger::Logger;
 
 use std::ops::Deref;
@@ -123,13 +123,15 @@ struct Connection {
        id: u64,
 }
 impl Connection {
-       async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, mut event_receiver: mpsc::Receiver<()>) where
+       async fn poll_event_process<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, mut event_receiver: mpsc::Receiver<()>) where
                        CMH: Deref + 'static + Send + Sync,
                        RMH: Deref + 'static + Send + Sync,
+                       OMH: Deref + 'static + Send + Sync,
                        L: Deref + 'static + Send + Sync,
                        UMH: Deref + 'static + Send + Sync,
                        CMH::Target: ChannelMessageHandler + Send + Sync,
                        RMH::Target: RoutingMessageHandler + Send + Sync,
+                       OMH::Target: OnionMessageHandler + Send + Sync,
                        L::Target: Logger + Send + Sync,
                        UMH::Target: CustomMessageHandler + Send + Sync,
     {
@@ -141,13 +143,15 @@ impl Connection {
                }
        }
 
-       async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
+       async fn schedule_read<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
                        CMH: Deref + 'static + Send + Sync,
                        RMH: Deref + 'static + Send + Sync,
+                       OMH: Deref + 'static + Send + Sync,
                        L: Deref + 'static + Send + Sync,
                        UMH: Deref + 'static + Send + Sync,
                        CMH::Target: ChannelMessageHandler + 'static + Send + Sync,
                        RMH::Target: RoutingMessageHandler + 'static + Send + Sync,
+                       OMH::Target: OnionMessageHandler + 'static + Send + Sync,
                        L::Target: Logger + 'static + Send + Sync,
                        UMH::Target: CustomMessageHandler + 'static + Send + Sync,
         {
@@ -268,13 +272,15 @@ fn get_addr_from_stream(stream: &StdTcpStream) -> Option<NetAddress> {
 /// The returned future will complete when the peer is disconnected and associated handling
 /// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
 /// not need to poll the provided future in order to make progress.
-pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
+pub fn setup_inbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
                CMH: Deref + 'static + Send + Sync,
                RMH: Deref + 'static + Send + Sync,
+               OMH: Deref + 'static + Send + Sync,
                L: Deref + 'static + Send + Sync,
                UMH: Deref + 'static + Send + Sync,
                CMH::Target: ChannelMessageHandler + Send + Sync,
                RMH::Target: RoutingMessageHandler + Send + Sync,
+               OMH::Target: OnionMessageHandler + Send + Sync,
                L::Target: Logger + Send + Sync,
                UMH::Target: CustomMessageHandler + Send + Sync,
 {
@@ -315,13 +321,15 @@ pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManag
 /// The returned future will complete when the peer is disconnected and associated handling
 /// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
 /// not need to poll the provided future in order to make progress.
-pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
+pub fn setup_outbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
                CMH: Deref + 'static + Send + Sync,
                RMH: Deref + 'static + Send + Sync,
+               OMH: Deref + 'static + Send + Sync,
                L: Deref + 'static + Send + Sync,
                UMH: Deref + 'static + Send + Sync,
                CMH::Target: ChannelMessageHandler + Send + Sync,
                RMH::Target: RoutingMessageHandler + Send + Sync,
+               OMH::Target: OnionMessageHandler + Send + Sync,
                L::Target: Logger + Send + Sync,
                UMH::Target: CustomMessageHandler + Send + Sync,
 {
@@ -391,13 +399,15 @@ pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerMana
 /// disconnected and associated handling futures are freed, though, because all processing in said
 /// futures are spawned with tokio::spawn, you do not need to poll the second future in order to
 /// make progress.
-pub async fn connect_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
+pub async fn connect_outbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
                CMH: Deref + 'static + Send + Sync,
                RMH: Deref + 'static + Send + Sync,
+               OMH: Deref + 'static + Send + Sync,
                L: Deref + 'static + Send + Sync,
                UMH: Deref + 'static + Send + Sync,
                CMH::Target: ChannelMessageHandler + Send + Sync,
                RMH::Target: RoutingMessageHandler + Send + Sync,
+               OMH::Target: OnionMessageHandler + Send + Sync,
                L::Target: Logger + Send + Sync,
                UMH::Target: CustomMessageHandler + Send + Sync,
 {
@@ -646,6 +656,7 @@ mod tests {
                let a_manager = Arc::new(PeerManager::new(MessageHandler {
                        chan_handler: Arc::clone(&a_handler),
                        route_handler: Arc::clone(&a_handler),
+                       onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
                }, a_key.clone(), &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
 
                let (b_connected_sender, mut b_connected) = mpsc::channel(1);
@@ -660,6 +671,7 @@ mod tests {
                let b_manager = Arc::new(PeerManager::new(MessageHandler {
                        chan_handler: Arc::clone(&b_handler),
                        route_handler: Arc::clone(&b_handler),
+                       onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
                }, b_key.clone(), &[2; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
 
                // We bind on localhost, hoping the environment is properly configured with a local
@@ -711,6 +723,7 @@ mod tests {
 
                let a_manager = Arc::new(PeerManager::new(MessageHandler {
                        chan_handler: Arc::new(lightning::ln::peer_handler::ErroringMessageHandler::new()),
+                       onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
                        route_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
                }, a_key, &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
 
index 573f144e5c04b0e6ba6342515d32efc05b31a4e8..52cfe9972f7289388caa38d8b7244c7ec6b905bc 100644 (file)
@@ -583,14 +583,25 @@ pub enum Balance {
        /// HTLCs which we sent to our counterparty which are claimable after a timeout (less on-chain
        /// fees) if the counterparty does not know the preimage for the HTLCs. These are somewhat
        /// likely to be claimed by our counterparty before we do.
-       MaybeClaimableHTLCAwaitingTimeout {
-               /// The amount available to claim, in satoshis, excluding the on-chain fees which will be
-               /// required to do so.
+       MaybeTimeoutClaimableHTLC {
+               /// The amount potentially available to claim, in satoshis, excluding the on-chain fees
+               /// which will be required to do so.
                claimable_amount_satoshis: u64,
                /// The height at which we will be able to claim the balance if our counterparty has not
                /// done so.
                claimable_height: u32,
        },
+       /// HTLCs which we received from our counterparty which are claimable with a preimage which we
+       /// do not currently have. This will only be claimable if we receive the preimage from the node
+       /// to which we forwarded this HTLC before the timeout.
+       MaybePreimageClaimableHTLC {
+               /// The amount potentially available to claim, in satoshis, excluding the on-chain fees
+               /// which will be required to do so.
+               claimable_amount_satoshis: u64,
+               /// The height at which our counterparty will be able to claim the balance if we have not
+               /// yet received the preimage and claimed it ourselves.
+               expiry_height: u32,
+       },
        /// The channel has been closed, and our counterparty broadcasted a revoked commitment
        /// transaction.
        ///
@@ -1547,7 +1558,7 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
                                        confirmation_height: conf_thresh,
                                });
                        } else {
-                               return Some(Balance::MaybeClaimableHTLCAwaitingTimeout {
+                               return Some(Balance::MaybeTimeoutClaimableHTLC {
                                        claimable_amount_satoshis: htlc.amount_msat / 1000,
                                        claimable_height: htlc.cltv_expiry,
                                });
@@ -1570,6 +1581,11 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
                                        timeout_height: htlc.cltv_expiry,
                                });
                        }
+               } else if htlc_resolved.is_none() {
+                       return Some(Balance::MaybePreimageClaimableHTLC {
+                               claimable_amount_satoshis: htlc.amount_msat / 1000,
+                               expiry_height: htlc.cltv_expiry,
+                       });
                }
                None
        }
@@ -1727,12 +1743,19 @@ impl<Signer: Sign> ChannelMonitor<Signer> {
                        for (htlc, _, _) in us.current_holder_commitment_tx.htlc_outputs.iter() {
                                if htlc.transaction_output_index.is_none() { continue; }
                                if htlc.offered {
-                                       res.push(Balance::MaybeClaimableHTLCAwaitingTimeout {
+                                       res.push(Balance::MaybeTimeoutClaimableHTLC {
                                                claimable_amount_satoshis: htlc.amount_msat / 1000,
                                                claimable_height: htlc.cltv_expiry,
                                        });
                                } else if us.payment_preimages.get(&htlc.payment_hash).is_some() {
                                        claimable_inbound_htlc_value_sat += htlc.amount_msat / 1000;
+                               } else {
+                                       // As long as the HTLC is still in our latest commitment state, treat
+                                       // it as potentially claimable, even if it has long-since expired.
+                                       res.push(Balance::MaybePreimageClaimableHTLC {
+                                               claimable_amount_satoshis: htlc.amount_msat / 1000,
+                                               expiry_height: htlc.cltv_expiry,
+                                       });
                                }
                        }
                        res.push(Balance::ClaimableOnChannelClose {
index 67ea07f2abdb0507fd6dfd0704e45504a1c8f29c..a2fccbbc3883d6bb1e4d7f97ff89021b979483be 100644 (file)
@@ -280,18 +280,24 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) {
        assert_eq!(sorted_vec(vec![Balance::ClaimableOnChannelClose {
                        claimable_amount_satoshis: 1_000_000 - 3_000 - 4_000 - 1_000 - 3 - chan_feerate *
                                (channel::commitment_tx_base_weight(opt_anchors) + 2 * channel::COMMITMENT_TX_WEIGHT_PER_HTLC) / 1000,
-               }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+               }, Balance::MaybeTimeoutClaimableHTLC {
                        claimable_amount_satoshis: 3_000,
                        claimable_height: htlc_cltv_timeout,
-               }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+               }, Balance::MaybeTimeoutClaimableHTLC {
                        claimable_amount_satoshis: 4_000,
                        claimable_height: htlc_cltv_timeout,
                }]),
                sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
-       assert_eq!(vec![Balance::ClaimableOnChannelClose {
+       assert_eq!(sorted_vec(vec![Balance::ClaimableOnChannelClose {
                        claimable_amount_satoshis: 1_000,
-               }],
-               nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances());
+               }, Balance::MaybePreimageClaimableHTLC {
+                       claimable_amount_satoshis: 3_000,
+                       expiry_height: htlc_cltv_timeout,
+               }, Balance::MaybePreimageClaimableHTLC {
+                       claimable_amount_satoshis: 4_000,
+                       expiry_height: htlc_cltv_timeout,
+               }]),
+               sorted_vec(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
 
        nodes[1].node.claim_funds(payment_preimage);
        check_added_monitors!(nodes[1], 1);
@@ -335,12 +341,12 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) {
                                chan_feerate * (channel::commitment_tx_base_weight(opt_anchors) +
                                                                if prev_commitment_tx { 1 } else { 2 } *
                                                                channel::COMMITMENT_TX_WEIGHT_PER_HTLC) / 1000,
-               }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+               }, Balance::MaybeTimeoutClaimableHTLC {
                        claimable_amount_satoshis: 4_000,
                        claimable_height: htlc_cltv_timeout,
                }];
        if !prev_commitment_tx {
-               a_expected_balances.push(Balance::MaybeClaimableHTLCAwaitingTimeout {
+               a_expected_balances.push(Balance::MaybeTimeoutClaimableHTLC {
                        claimable_amount_satoshis: 3_000,
                        claimable_height: htlc_cltv_timeout,
                });
@@ -397,10 +403,10 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) {
                        claimable_amount_satoshis: 1_000_000 - 3_000 - 4_000 - 1_000 - 3 - chan_feerate *
                                (channel::commitment_tx_base_weight(opt_anchors) + 2 * channel::COMMITMENT_TX_WEIGHT_PER_HTLC) / 1000,
                        confirmation_height: nodes[0].best_block_info().1 + ANTI_REORG_DELAY - 1,
-               }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+               }, Balance::MaybeTimeoutClaimableHTLC {
                        claimable_amount_satoshis: 3_000,
                        claimable_height: htlc_cltv_timeout,
-               }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+               }, Balance::MaybeTimeoutClaimableHTLC {
                        claimable_amount_satoshis: 4_000,
                        claimable_height: htlc_cltv_timeout,
                }]),
@@ -428,10 +434,10 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) {
 
        // After ANTI_REORG_DELAY, A will consider its balance fully spendable and generate a
        // `SpendableOutputs` event. However, B still has to wait for the CSV delay.
-       assert_eq!(sorted_vec(vec![Balance::MaybeClaimableHTLCAwaitingTimeout {
+       assert_eq!(sorted_vec(vec![Balance::MaybeTimeoutClaimableHTLC {
                        claimable_amount_satoshis: 3_000,
                        claimable_height: htlc_cltv_timeout,
-               }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+               }, Balance::MaybeTimeoutClaimableHTLC {
                        claimable_amount_satoshis: 4_000,
                        claimable_height: htlc_cltv_timeout,
                }]),
@@ -459,16 +465,16 @@ fn do_test_claim_value_force_close(prev_commitment_tx: bool) {
        } else {
                expect_payment_sent!(nodes[0], payment_preimage);
        }
-       assert_eq!(sorted_vec(vec![Balance::MaybeClaimableHTLCAwaitingTimeout {
+       assert_eq!(sorted_vec(vec![Balance::MaybeTimeoutClaimableHTLC {
                        claimable_amount_satoshis: 3_000,
                        claimable_height: htlc_cltv_timeout,
-               }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+               }, Balance::MaybeTimeoutClaimableHTLC {
                        claimable_amount_satoshis: 4_000,
                        claimable_height: htlc_cltv_timeout,
                }]),
                sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
        connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1);
-       assert_eq!(vec![Balance::MaybeClaimableHTLCAwaitingTimeout {
+       assert_eq!(vec![Balance::MaybeTimeoutClaimableHTLC {
                        claimable_amount_satoshis: 4_000,
                        claimable_height: htlc_cltv_timeout,
                }],
@@ -640,10 +646,10 @@ fn test_balances_on_local_commitment_htlcs() {
                        claimable_amount_satoshis: 1_000_000 - 10_000 - 20_000 - chan_feerate *
                                (channel::commitment_tx_base_weight(opt_anchors) + 2 * channel::COMMITMENT_TX_WEIGHT_PER_HTLC) / 1000,
                        confirmation_height: node_a_commitment_claimable,
-               }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+               }, Balance::MaybeTimeoutClaimableHTLC {
                        claimable_amount_satoshis: 10_000,
                        claimable_height: htlc_cltv_timeout,
-               }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+               }, Balance::MaybeTimeoutClaimableHTLC {
                        claimable_amount_satoshis: 20_000,
                        claimable_height: htlc_cltv_timeout,
                }]),
@@ -667,10 +673,10 @@ fn test_balances_on_local_commitment_htlcs() {
                        claimable_amount_satoshis: 1_000_000 - 10_000 - 20_000 - chan_feerate *
                                (channel::commitment_tx_base_weight(opt_anchors) + 2 * channel::COMMITMENT_TX_WEIGHT_PER_HTLC) / 1000,
                        confirmation_height: node_a_commitment_claimable,
-               }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+               }, Balance::MaybeTimeoutClaimableHTLC {
                        claimable_amount_satoshis: 10_000,
                        claimable_height: htlc_cltv_timeout,
-               }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+               }, Balance::MaybeTimeoutClaimableHTLC {
                        claimable_amount_satoshis: 20_000,
                        claimable_height: htlc_cltv_timeout,
                }]),
@@ -691,7 +697,7 @@ fn test_balances_on_local_commitment_htlcs() {
                }, Balance::ClaimableAwaitingConfirmations {
                        claimable_amount_satoshis: 10_000,
                        confirmation_height: node_a_htlc_claimable,
-               }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+               }, Balance::MaybeTimeoutClaimableHTLC {
                        claimable_amount_satoshis: 20_000,
                        claimable_height: htlc_cltv_timeout,
                }]),
@@ -708,7 +714,7 @@ fn test_balances_on_local_commitment_htlcs() {
                }, Balance::ClaimableAwaitingConfirmations {
                        claimable_amount_satoshis: 10_000,
                        confirmation_height: node_a_htlc_claimable,
-               }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+               }, Balance::MaybeTimeoutClaimableHTLC {
                        claimable_amount_satoshis: 20_000,
                        claimable_height: htlc_cltv_timeout,
                }]),
@@ -747,6 +753,238 @@ fn test_balances_on_local_commitment_htlcs() {
        test_spendable_output(&nodes[0], &as_txn[1]);
 }
 
+#[test]
+fn test_no_preimage_inbound_htlc_balances() {
+       // Tests that MaybePreimageClaimableHTLC are generated for inbound HTLCs for which we do not
+       // have a preimage.
+       let chanmon_cfgs = create_chanmon_cfgs(2);
+       let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+       let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+       let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+
+       let (_, _, chan_id, funding_tx) = create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 500_000_000, InitFeatures::known(), InitFeatures::known());
+       let funding_outpoint = OutPoint { txid: funding_tx.txid(), index: 0 };
+
+       // Send two HTLCs, one from A to B, and one from B to A.
+       let to_b_failed_payment_hash = route_payment(&nodes[0], &[&nodes[1]], 10_000_000).1;
+       let to_a_failed_payment_hash = route_payment(&nodes[1], &[&nodes[0]], 20_000_000).1;
+       let htlc_cltv_timeout = nodes[0].best_block_info().1 + TEST_FINAL_CLTV + 1; // Note ChannelManager adds one to CLTV timeouts for safety
+
+       let chan_feerate = get_feerate!(nodes[0], chan_id) as u64;
+       let opt_anchors = get_opt_anchors!(nodes[0], chan_id);
+
+       // Both A and B will have an HTLC that's claimable on timeout and one that's claimable if they
+       // receive the preimage. These will remain the same through the channel closure and until the
+       // HTLC output is spent.
+
+       assert_eq!(sorted_vec(vec![Balance::ClaimableOnChannelClose {
+                       claimable_amount_satoshis: 1_000_000 - 500_000 - 10_000 - chan_feerate *
+                               (channel::commitment_tx_base_weight(opt_anchors) + 2 * channel::COMMITMENT_TX_WEIGHT_PER_HTLC) / 1000,
+               }, Balance::MaybePreimageClaimableHTLC {
+                       claimable_amount_satoshis: 20_000,
+                       expiry_height: htlc_cltv_timeout,
+               }, Balance::MaybeTimeoutClaimableHTLC {
+                       claimable_amount_satoshis: 10_000,
+                       claimable_height: htlc_cltv_timeout,
+               }]),
+               sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
+
+       assert_eq!(sorted_vec(vec![Balance::ClaimableOnChannelClose {
+                       claimable_amount_satoshis: 500_000 - 20_000,
+               }, Balance::MaybePreimageClaimableHTLC {
+                       claimable_amount_satoshis: 10_000,
+                       expiry_height: htlc_cltv_timeout,
+               }, Balance::MaybeTimeoutClaimableHTLC {
+                       claimable_amount_satoshis: 20_000,
+                       claimable_height: htlc_cltv_timeout,
+               }]),
+               sorted_vec(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
+
+       // Get nodes[0]'s commitment transaction and HTLC-Timeout transaction
+       let as_txn = get_local_commitment_txn!(nodes[0], chan_id);
+       assert_eq!(as_txn.len(), 2);
+       check_spends!(as_txn[1], as_txn[0]);
+       check_spends!(as_txn[0], funding_tx);
+
+       // Now close the channel by confirming A's commitment transaction on both nodes, checking the
+       // claimable balances remain the same except for the non-HTLC balance changing variant.
+       let node_a_commitment_claimable = nodes[0].best_block_info().1 + BREAKDOWN_TIMEOUT as u32;
+       let as_pre_spend_claims = sorted_vec(vec![Balance::ClaimableAwaitingConfirmations {
+                       claimable_amount_satoshis: 1_000_000 - 500_000 - 10_000 - chan_feerate *
+                               (channel::commitment_tx_base_weight(opt_anchors) + 2 * channel::COMMITMENT_TX_WEIGHT_PER_HTLC) / 1000,
+                       confirmation_height: node_a_commitment_claimable,
+               }, Balance::MaybePreimageClaimableHTLC {
+                       claimable_amount_satoshis: 20_000,
+                       expiry_height: htlc_cltv_timeout,
+               }, Balance::MaybeTimeoutClaimableHTLC {
+                       claimable_amount_satoshis: 10_000,
+                       claimable_height: htlc_cltv_timeout,
+               }]);
+
+       mine_transaction(&nodes[0], &as_txn[0]);
+       nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().clear();
+       check_added_monitors!(nodes[0], 1);
+       check_closed_broadcast!(nodes[0], true);
+       check_closed_event!(nodes[0], 1, ClosureReason::CommitmentTxConfirmed);
+
+       assert_eq!(as_pre_spend_claims,
+               sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
+
+       mine_transaction(&nodes[1], &as_txn[0]);
+       check_added_monitors!(nodes[1], 1);
+       check_closed_broadcast!(nodes[1], true);
+       check_closed_event!(nodes[1], 1, ClosureReason::CommitmentTxConfirmed);
+
+       let node_b_commitment_claimable = nodes[1].best_block_info().1 + ANTI_REORG_DELAY - 1;
+       let mut bs_pre_spend_claims = sorted_vec(vec![Balance::ClaimableAwaitingConfirmations {
+                       claimable_amount_satoshis: 500_000 - 20_000,
+                       confirmation_height: node_b_commitment_claimable,
+               }, Balance::MaybePreimageClaimableHTLC {
+                       claimable_amount_satoshis: 10_000,
+                       expiry_height: htlc_cltv_timeout,
+               }, Balance::MaybeTimeoutClaimableHTLC {
+                       claimable_amount_satoshis: 20_000,
+                       claimable_height: htlc_cltv_timeout,
+               }]);
+       assert_eq!(bs_pre_spend_claims,
+               sorted_vec(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
+
+       // We'll broadcast the HTLC-Timeout transaction one block prior to the htlc's expiration (as it
+       // is confirmable in the next block), but will still include the same claimable balances as no
+       // HTLC has been spent, even after the HTLC expires. We'll also fail the inbound HTLC, but it
+       // won't do anything as the channel is already closed.
+
+       connect_blocks(&nodes[0], TEST_FINAL_CLTV - 1);
+       let as_htlc_timeout_claim = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
+       assert_eq!(as_htlc_timeout_claim.len(), 1);
+       check_spends!(as_htlc_timeout_claim[0], as_txn[0]);
+       expect_pending_htlcs_forwardable_conditions!(nodes[0],
+               [HTLCDestination::FailedPayment { payment_hash: to_a_failed_payment_hash }]);
+
+       assert_eq!(as_pre_spend_claims,
+               sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
+
+       connect_blocks(&nodes[0], 1);
+       assert_eq!(as_pre_spend_claims,
+               sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
+
+       // For node B, we'll get the non-HTLC funds claimable after ANTI_REORG_DELAY confirmations
+       connect_blocks(&nodes[1], ANTI_REORG_DELAY - 1);
+       test_spendable_output(&nodes[1], &as_txn[0]);
+       bs_pre_spend_claims.retain(|e| if let Balance::ClaimableAwaitingConfirmations { .. } = e { false } else { true });
+
+       // The next few blocks for B look the same as for A, though for the opposite HTLC
+       nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().clear();
+       connect_blocks(&nodes[1], TEST_FINAL_CLTV - (ANTI_REORG_DELAY - 1) - 1);
+       expect_pending_htlcs_forwardable_conditions!(nodes[1],
+               [HTLCDestination::FailedPayment { payment_hash: to_b_failed_payment_hash }]);
+       let bs_htlc_timeout_claim = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
+       assert_eq!(bs_htlc_timeout_claim.len(), 1);
+       check_spends!(bs_htlc_timeout_claim[0], as_txn[0]);
+
+       assert_eq!(bs_pre_spend_claims,
+               sorted_vec(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
+
+       connect_blocks(&nodes[1], 1);
+       assert_eq!(bs_pre_spend_claims,
+               sorted_vec(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
+
+       // Now confirm the two HTLC timeout transactions for A, checking that the inbound HTLC resolves
+       // after ANTI_REORG_DELAY confirmations and the other takes BREAKDOWN_TIMEOUT confirmations.
+       mine_transaction(&nodes[0], &as_htlc_timeout_claim[0]);
+       let as_timeout_claimable_height = nodes[0].best_block_info().1 + (BREAKDOWN_TIMEOUT as u32) - 1;
+       assert_eq!(sorted_vec(vec![Balance::ClaimableAwaitingConfirmations {
+                       claimable_amount_satoshis: 1_000_000 - 500_000 - 10_000 - chan_feerate *
+                               (channel::commitment_tx_base_weight(opt_anchors) + 2 * channel::COMMITMENT_TX_WEIGHT_PER_HTLC) / 1000,
+                       confirmation_height: node_a_commitment_claimable,
+               }, Balance::MaybePreimageClaimableHTLC {
+                       claimable_amount_satoshis: 20_000,
+                       expiry_height: htlc_cltv_timeout,
+               }, Balance::ClaimableAwaitingConfirmations {
+                       claimable_amount_satoshis: 10_000,
+                       confirmation_height: as_timeout_claimable_height,
+               }]),
+               sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
+
+       mine_transaction(&nodes[0], &bs_htlc_timeout_claim[0]);
+       assert_eq!(sorted_vec(vec![Balance::ClaimableAwaitingConfirmations {
+                       claimable_amount_satoshis: 1_000_000 - 500_000 - 10_000 - chan_feerate *
+                               (channel::commitment_tx_base_weight(opt_anchors) + 2 * channel::COMMITMENT_TX_WEIGHT_PER_HTLC) / 1000,
+                       confirmation_height: node_a_commitment_claimable,
+               }, Balance::MaybePreimageClaimableHTLC {
+                       claimable_amount_satoshis: 20_000,
+                       expiry_height: htlc_cltv_timeout,
+               }, Balance::ClaimableAwaitingConfirmations {
+                       claimable_amount_satoshis: 10_000,
+                       confirmation_height: as_timeout_claimable_height,
+               }]),
+               sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
+
+       // Once as_htlc_timeout_claim[0] reaches ANTI_REORG_DELAY confirmations, we should get a
+       // payment failure event.
+       connect_blocks(&nodes[0], ANTI_REORG_DELAY - 2);
+       expect_payment_failed!(nodes[0], to_b_failed_payment_hash, true);
+
+       connect_blocks(&nodes[0], 1);
+       assert_eq!(sorted_vec(vec![Balance::ClaimableAwaitingConfirmations {
+                       claimable_amount_satoshis: 1_000_000 - 500_000 - 10_000 - chan_feerate *
+                               (channel::commitment_tx_base_weight(opt_anchors) + 2 * channel::COMMITMENT_TX_WEIGHT_PER_HTLC) / 1000,
+                       confirmation_height: node_a_commitment_claimable,
+               }, Balance::ClaimableAwaitingConfirmations {
+                       claimable_amount_satoshis: 10_000,
+                       confirmation_height: core::cmp::max(as_timeout_claimable_height, htlc_cltv_timeout),
+               }]),
+               sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
+
+       connect_blocks(&nodes[0], node_a_commitment_claimable - nodes[0].best_block_info().1);
+       assert_eq!(vec![Balance::ClaimableAwaitingConfirmations {
+                       claimable_amount_satoshis: 10_000,
+                       confirmation_height: core::cmp::max(as_timeout_claimable_height, htlc_cltv_timeout),
+               }],
+               nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances());
+       test_spendable_output(&nodes[0], &as_txn[0]);
+
+       connect_blocks(&nodes[0], as_timeout_claimable_height - nodes[0].best_block_info().1);
+       assert!(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances().is_empty());
+       test_spendable_output(&nodes[0], &as_htlc_timeout_claim[0]);
+
+       // The process for B should be completely identical as well, noting that the non-HTLC-balance
+       // was already claimed.
+       mine_transaction(&nodes[1], &bs_htlc_timeout_claim[0]);
+       let bs_timeout_claimable_height = nodes[1].best_block_info().1 + ANTI_REORG_DELAY - 1;
+       assert_eq!(sorted_vec(vec![Balance::MaybePreimageClaimableHTLC {
+                       claimable_amount_satoshis: 10_000,
+                       expiry_height: htlc_cltv_timeout,
+               }, Balance::ClaimableAwaitingConfirmations {
+                       claimable_amount_satoshis: 20_000,
+                       confirmation_height: bs_timeout_claimable_height,
+               }]),
+               sorted_vec(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
+
+       mine_transaction(&nodes[1], &as_htlc_timeout_claim[0]);
+       assert_eq!(sorted_vec(vec![Balance::MaybePreimageClaimableHTLC {
+                       claimable_amount_satoshis: 10_000,
+                       expiry_height: htlc_cltv_timeout,
+               }, Balance::ClaimableAwaitingConfirmations {
+                       claimable_amount_satoshis: 20_000,
+                       confirmation_height: bs_timeout_claimable_height,
+               }]),
+               sorted_vec(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
+
+       connect_blocks(&nodes[1], ANTI_REORG_DELAY - 2);
+       expect_payment_failed!(nodes[1], to_a_failed_payment_hash, true);
+
+       assert_eq!(vec![Balance::MaybePreimageClaimableHTLC {
+                       claimable_amount_satoshis: 10_000,
+                       expiry_height: htlc_cltv_timeout,
+               }],
+               nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances());
+       test_spendable_output(&nodes[1], &bs_htlc_timeout_claim[0]);
+
+       connect_blocks(&nodes[1], 1);
+       assert!(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances().is_empty());
+}
+
 fn sorted_vec_with_additions<T: Ord + Clone>(v_orig: &Vec<T>, extra_ts: &[&T]) -> Vec<T> {
        let mut v = v_orig.clone();
        for t in extra_ts {
@@ -851,13 +1089,13 @@ fn do_test_revoked_counterparty_commitment_balances(confirm_htlc_spend_first: bo
        // lists the two on-chain timeout-able HTLCs as claimable balances.
        assert_eq!(sorted_vec(vec![Balance::ClaimableOnChannelClose {
                        claimable_amount_satoshis: 100_000 - 5_000 - 4_000 - 3 - 2_000 + 3_000,
-               }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+               }, Balance::MaybeTimeoutClaimableHTLC {
                        claimable_amount_satoshis: 2_000,
                        claimable_height: missing_htlc_cltv_timeout,
-               }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+               }, Balance::MaybeTimeoutClaimableHTLC {
                        claimable_amount_satoshis: 4_000,
                        claimable_height: htlc_cltv_timeout,
-               }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+               }, Balance::MaybeTimeoutClaimableHTLC {
                        claimable_amount_satoshis: 5_000,
                        claimable_height: live_htlc_cltv_timeout,
                }]),
@@ -1263,10 +1501,10 @@ fn test_revoked_counterparty_aggregated_claims() {
 
        assert_eq!(sorted_vec(vec![Balance::ClaimableOnChannelClose {
                        claimable_amount_satoshis: 100_000 - 4_000 - 3_000,
-               }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+               }, Balance::MaybeTimeoutClaimableHTLC {
                        claimable_amount_satoshis: 4_000,
                        claimable_height: htlc_cltv_timeout,
-               }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+               }, Balance::MaybeTimeoutClaimableHTLC {
                        claimable_amount_satoshis: 3_000,
                        claimable_height: htlc_cltv_timeout,
                }]),
index ffc595ccedab5bbf547f0f33d1426e0214e4e6bc..190907ce26e3a9f01edcc613d1112086692c6567 100644 (file)
@@ -40,7 +40,7 @@ use core::fmt::Debug;
 use io::{self, Read};
 use io_extras::read_to_end;
 
-use util::events::MessageSendEventsProvider;
+use util::events::{MessageSendEventsProvider, OnionMessageProvider};
 use util::logger;
 use util::ser::{BigSize, LengthReadable, Readable, ReadableArgs, Writeable, Writer, FixedLengthReader, HighZeroBytesDroppedBigSize, Hostname};
 
@@ -945,6 +945,12 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider {
        fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError>;
 }
 
+/// A trait to describe an object that can receive onion messages.
+pub trait OnionMessageHandler : OnionMessageProvider {
+       /// Handle an incoming onion_message message from the given peer.
+       fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage);
+}
+
 mod fuzzy_internal_msgs {
        use prelude::*;
        use ln::{PaymentPreimage, PaymentSecret};
index 623aa969dfd4262da60773428c6d03aff681a4ab..1258026e17ec123f9e28b9047dbf65b225a8f3db 100644 (file)
@@ -19,7 +19,7 @@ use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey};
 
 use ln::features::InitFeatures;
 use ln::msgs;
-use ln::msgs::{ChannelMessageHandler, LightningError, NetAddress, RoutingMessageHandler};
+use ln::msgs::{ChannelMessageHandler, LightningError, NetAddress, OnionMessageHandler, RoutingMessageHandler};
 use ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager};
 use util::ser::{VecWriter, Writeable, Writer};
 use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep};
@@ -27,7 +27,7 @@ use ln::wire;
 use ln::wire::Encode;
 use routing::gossip::{NetworkGraph, P2PGossipSync};
 use util::atomic_counter::AtomicCounter;
-use util::events::{MessageSendEvent, MessageSendEventsProvider};
+use util::events::{MessageSendEvent, MessageSendEventsProvider, OnionMessageProvider};
 use util::logger::Logger;
 
 use prelude::*;
@@ -76,6 +76,12 @@ impl RoutingMessageHandler for IgnoringMessageHandler {
        fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::QueryChannelRange) -> Result<(), LightningError> { Ok(()) }
        fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: msgs::QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) }
 }
+impl OnionMessageProvider for IgnoringMessageHandler {
+       fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option<msgs::OnionMessage> { None }
+}
+impl OnionMessageHandler for IgnoringMessageHandler {
+       fn handle_onion_message(&self, _their_node_id: &PublicKey, _msg: &msgs::OnionMessage) {}
+}
 impl Deref for IgnoringMessageHandler {
        type Target = IgnoringMessageHandler;
        fn deref(&self) -> &Self { self }
@@ -199,9 +205,11 @@ impl Deref for ErroringMessageHandler {
 }
 
 /// Provides references to trait impls which handle different types of messages.
-pub struct MessageHandler<CM: Deref, RM: Deref> where
+pub struct MessageHandler<CM: Deref, RM: Deref, OM: Deref> where
                CM::Target: ChannelMessageHandler,
-               RM::Target: RoutingMessageHandler {
+               RM::Target: RoutingMessageHandler,
+               OM::Target: OnionMessageHandler,
+{
        /// A message handler which handles messages specific to channels. Usually this is just a
        /// [`ChannelManager`] object or an [`ErroringMessageHandler`].
        ///
@@ -212,6 +220,10 @@ pub struct MessageHandler<CM: Deref, RM: Deref> where
        ///
        /// [`P2PGossipSync`]: crate::routing::gossip::P2PGossipSync
        pub route_handler: RM,
+
+       /// A message handler which handles onion messages. For now, this can only be an
+       /// [`IgnoringMessageHandler`].
+       pub onion_message_handler: OM,
 }
 
 /// Provides an object which can be used to send data to and which uniquely identifies a connection
@@ -298,7 +310,7 @@ const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO: usize = 2;
 /// we have fewer than this many messages in the outbound buffer again.
 /// We also use this as the target number of outbound gossip messages to keep in the write buffer,
 /// refilled as we send bytes.
-const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10;
+const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 12;
 /// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to
 /// the peer.
 const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO;
@@ -337,6 +349,9 @@ struct Peer {
 
        pending_outbound_buffer: LinkedList<Vec<u8>>,
        pending_outbound_buffer_first_msg_offset: usize,
+       // Queue gossip broadcasts separately from `pending_outbound_buffer` so we can easily prioritize
+       // channel messages over them.
+       gossip_broadcast_buffer: LinkedList<Vec<u8>>,
        awaiting_write_event: bool,
 
        pending_read_buffer: Vec<u8>,
@@ -389,21 +404,34 @@ impl Peer {
                self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
        }
 
-       /// Determines if we should push additional gossip messages onto a peer's outbound buffer for
-       /// backfilling gossip data to the peer. This is checked every time the peer's buffer may have
-       /// been drained.
+       /// Determines if we should push additional gossip background sync (aka "backfill") onto a peer's
+       /// outbound buffer. This is checked every time the peer's buffer may have been drained.
        fn should_buffer_gossip_backfill(&self) -> bool {
-               self.pending_outbound_buffer.is_empty() &&
-                       self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
+               self.pending_outbound_buffer.is_empty() && self.gossip_broadcast_buffer.is_empty()
+                       && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
        }
 
-       /// Returns whether this peer's buffer is full and we should drop gossip messages.
-       fn buffer_full_drop_gossip(&self) -> bool {
-               if self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
-                       || self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO {
-                               return false
-               }
-               true
+       /// Determines if we should push an onion message onto a peer's outbound buffer. This is checked
+       /// every time the peer's buffer may have been drained.
+       fn should_buffer_onion_message(&self) -> bool {
+               self.pending_outbound_buffer.is_empty()
+                       && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
+       }
+
+       /// Determines if we should push additional gossip broadcast messages onto a peer's outbound
+       /// buffer. This is checked every time the peer's buffer may have been drained.
+       fn should_buffer_gossip_broadcast(&self) -> bool {
+               self.pending_outbound_buffer.is_empty()
+                       && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
+       }
+
+       /// Returns whether this peer's outbound buffers are full and we should drop gossip broadcasts.
+       fn buffer_full_drop_gossip_broadcast(&self) -> bool {
+               let total_outbound_buffered =
+                       self.gossip_broadcast_buffer.len() + self.pending_outbound_buffer.len();
+
+               total_outbound_buffered > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP ||
+                       self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
        }
 }
 
@@ -414,7 +442,7 @@ impl Peer {
 /// issues such as overly long function definitions.
 ///
 /// (C-not exported) as Arcs don't make sense in bindings
-pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<SD, Arc<SimpleArcChannelManager<M, T, F, L>>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<L>>>, Arc<C>, Arc<L>>>, Arc<L>, Arc<IgnoringMessageHandler>>;
+pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<SD, Arc<SimpleArcChannelManager<M, T, F, L>>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<L>>>, Arc<C>, Arc<L>>>, IgnoringMessageHandler, Arc<L>, Arc<IgnoringMessageHandler>>;
 
 /// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference
 /// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't
@@ -424,7 +452,7 @@ pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<SD, Arc<SimpleArc
 /// helps with issues such as long function definitions.
 ///
 /// (C-not exported) as Arcs don't make sense in bindings
-pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, SD, M, T, F, C, L> = PeerManager<SD, SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L>, &'e P2PGossipSync<&'g NetworkGraph<&'f L>, &'h C, &'f L>, &'f L, IgnoringMessageHandler>;
+pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, SD, M, T, F, C, L> = PeerManager<SD, SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L>, &'e P2PGossipSync<&'g NetworkGraph<&'f L>, &'h C, &'f L>, IgnoringMessageHandler, &'f L, IgnoringMessageHandler>;
 
 /// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls
 /// socket events into messages which it passes on to its [`MessageHandler`].
@@ -445,12 +473,13 @@ pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, SD, M, T, F, C, L>
 /// you're using lightning-net-tokio.
 ///
 /// [`read_event`]: PeerManager::read_event
-pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> where
+pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CMH: Deref> where
                CM::Target: ChannelMessageHandler,
                RM::Target: RoutingMessageHandler,
+               OM::Target: OnionMessageHandler,
                L::Target: Logger,
                CMH::Target: CustomMessageHandler {
-       message_handler: MessageHandler<CM, RM>,
+       message_handler: MessageHandler<CM, RM, OM>,
        /// Connection state for each connected peer - we have an outer read-write lock which is taken
        /// as read while we're doing processing for a peer and taken write when a peer is being added
        /// or removed.
@@ -509,31 +538,34 @@ macro_rules! encode_msg {
        }}
 }
 
-impl<Descriptor: SocketDescriptor, CM: Deref, L: Deref> PeerManager<Descriptor, CM, IgnoringMessageHandler, L, IgnoringMessageHandler> where
+impl<Descriptor: SocketDescriptor, CM: Deref, OM: Deref, L: Deref> PeerManager<Descriptor, CM, IgnoringMessageHandler, OM, L, IgnoringMessageHandler> where
                CM::Target: ChannelMessageHandler,
+               OM::Target: OnionMessageHandler,
                L::Target: Logger {
-       /// Constructs a new PeerManager with the given ChannelMessageHandler. No routing message
-       /// handler is used and network graph messages are ignored.
+       /// Constructs a new `PeerManager` with the given `ChannelMessageHandler` and
+       /// `OnionMessageHandler`. No routing message handler is used and network graph messages are
+       /// ignored.
        ///
        /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
        /// cryptographically secure random bytes.
        ///
        /// (C-not exported) as we can't export a PeerManager with a dummy route handler
-       pub fn new_channel_only(channel_message_handler: CM, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L) -> Self {
+       pub fn new_channel_only(channel_message_handler: CM, onion_message_handler: OM, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L) -> Self {
                Self::new(MessageHandler {
                        chan_handler: channel_message_handler,
                        route_handler: IgnoringMessageHandler{},
+                       onion_message_handler,
                }, our_node_secret, ephemeral_random_data, logger, IgnoringMessageHandler{})
        }
 }
 
-impl<Descriptor: SocketDescriptor, RM: Deref, L: Deref> PeerManager<Descriptor, ErroringMessageHandler, RM, L, IgnoringMessageHandler> where
+impl<Descriptor: SocketDescriptor, RM: Deref, L: Deref> PeerManager<Descriptor, ErroringMessageHandler, RM, IgnoringMessageHandler, L, IgnoringMessageHandler> where
                RM::Target: RoutingMessageHandler,
                L::Target: Logger {
-       /// Constructs a new PeerManager with the given RoutingMessageHandler. No channel message
-       /// handler is used and messages related to channels will be ignored (or generate error
-       /// messages). Note that some other lightning implementations time-out connections after some
-       /// time if no channel is built with the peer.
+       /// Constructs a new `PeerManager` with the given `RoutingMessageHandler`. No channel message
+       /// handler or onion message handler is used and onion and channel messages will be ignored (or
+       /// generate error messages). Note that some other lightning implementations time-out connections
+       /// after some time if no channel is built with the peer.
        ///
        /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
        /// cryptographically secure random bytes.
@@ -543,6 +575,7 @@ impl<Descriptor: SocketDescriptor, RM: Deref, L: Deref> PeerManager<Descriptor,
                Self::new(MessageHandler {
                        chan_handler: ErroringMessageHandler::new(),
                        route_handler: routing_message_handler,
+                       onion_message_handler: IgnoringMessageHandler{},
                }, our_node_secret, ephemeral_random_data, logger, IgnoringMessageHandler{})
        }
 }
@@ -588,15 +621,16 @@ fn filter_addresses(ip_address: Option<NetAddress>) -> Option<NetAddress> {
        }
 }
 
-impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> PeerManager<Descriptor, CM, RM, L, CMH> where
+impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CMH: Deref> PeerManager<Descriptor, CM, RM, OM, L, CMH> where
                CM::Target: ChannelMessageHandler,
                RM::Target: RoutingMessageHandler,
+               OM::Target: OnionMessageHandler,
                L::Target: Logger,
                CMH::Target: CustomMessageHandler {
        /// Constructs a new PeerManager with the given message handlers and node_id secret key
        /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
        /// cryptographically secure random bytes.
-       pub fn new(message_handler: MessageHandler<CM, RM>, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L, custom_message_handler: CMH) -> Self {
+       pub fn new(message_handler: MessageHandler<CM, RM, OM>, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L, custom_message_handler: CMH) -> Self {
                let mut ephemeral_key_midstate = Sha256::engine();
                ephemeral_key_midstate.input(ephemeral_random_data);
 
@@ -671,6 +705,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
 
                        pending_outbound_buffer: LinkedList::new(),
                        pending_outbound_buffer_first_msg_offset: 0,
+                       gossip_broadcast_buffer: LinkedList::new(),
                        awaiting_write_event: false,
 
                        pending_read_buffer,
@@ -717,6 +752,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
 
                        pending_outbound_buffer: LinkedList::new(),
                        pending_outbound_buffer_first_msg_offset: 0,
+                       gossip_broadcast_buffer: LinkedList::new(),
                        awaiting_write_event: false,
 
                        pending_read_buffer,
@@ -737,6 +773,19 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
 
        fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
                while !peer.awaiting_write_event {
+                       if peer.should_buffer_onion_message() {
+                               if let Some(peer_node_id) = peer.their_node_id {
+                                       if let Some(next_onion_message) =
+                                               self.message_handler.onion_message_handler.next_onion_message_for_peer(peer_node_id) {
+                                                       self.enqueue_message(peer, &next_onion_message);
+                                       }
+                               }
+                       }
+                       if peer.should_buffer_gossip_broadcast() {
+                               if let Some(msg) = peer.gossip_broadcast_buffer.pop_front() {
+                                       peer.pending_outbound_buffer.push_back(msg);
+                               }
+                       }
                        if peer.should_buffer_gossip_backfill() {
                                match peer.sync_status {
                                        InitSyncTracker::NoSyncRequested => {},
@@ -851,12 +900,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                }
        }
 
-       /// Append a message to a peer's pending outbound/write buffer
-       fn enqueue_encoded_message(&self, peer: &mut Peer, encoded_message: &Vec<u8>) {
-               peer.msgs_sent_since_pong += 1;
-               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
-       }
-
        /// Append a message to a peer's pending outbound/write buffer
        fn enqueue_message<M: wire::Type>(&self, peer: &mut Peer, message: &M) {
                let mut buffer = VecWriter(Vec::with_capacity(2048));
@@ -867,7 +910,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                } else {
                        log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap()))
                }
-               self.enqueue_encoded_message(peer, &buffer.0);
+               peer.msgs_sent_since_pong += 1;
+               peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&buffer.0[..]));
+       }
+
+       /// Append a message to a peer's pending outbound/write gossip broadcast buffer
+       fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: &Vec<u8>) {
+               peer.msgs_sent_since_pong += 1;
+               peer.gossip_broadcast_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
        }
 
        fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
@@ -1297,6 +1347,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                self.message_handler.route_handler.handle_reply_channel_range(&their_node_id, msg)?;
                        },
 
+                       // Onion message:
+                       wire::Message::OnionMessage(msg) => {
+                               self.message_handler.onion_message_handler.handle_onion_message(&their_node_id, &msg);
+                       },
+
                        // Unknown messages:
                        wire::Message::Unknown(type_id) if message.is_even() => {
                                log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", type_id);
@@ -1325,7 +1380,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                        !peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
                                                continue
                                        }
-                                       if peer.buffer_full_drop_gossip() {
+                                       if peer.buffer_full_drop_gossip_broadcast() {
                                                log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
                                        }
@@ -1336,7 +1391,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                        if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
                                                continue;
                                        }
-                                       self.enqueue_encoded_message(&mut *peer, &encoded_msg);
+                                       self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
                                }
                        },
                        wire::Message::NodeAnnouncement(ref msg) => {
@@ -1349,7 +1404,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                        !peer.should_forward_node_announcement(msg.contents.node_id) {
                                                continue
                                        }
-                                       if peer.buffer_full_drop_gossip() {
+                                       if peer.buffer_full_drop_gossip_broadcast() {
                                                log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
                                        }
@@ -1359,7 +1414,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                        if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
                                                continue;
                                        }
-                                       self.enqueue_encoded_message(&mut *peer, &encoded_msg);
+                                       self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
                                }
                        },
                        wire::Message::ChannelUpdate(ref msg) => {
@@ -1372,14 +1427,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
                                                        !peer.should_forward_channel_announcement(msg.contents.short_channel_id)  {
                                                continue
                                        }
-                                       if peer.buffer_full_drop_gossip() {
+                                       if peer.buffer_full_drop_gossip_broadcast() {
                                                log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
                                                continue;
                                        }
                                        if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
                                                continue;
                                        }
-                                       self.enqueue_encoded_message(&mut *peer, &encoded_msg);
+                                       self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
                                }
                        },
                        _ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),
@@ -1913,12 +1968,12 @@ mod tests {
                cfgs
        }
 
-       fn create_network<'a>(peer_count: usize, cfgs: &'a Vec<PeerManagerCfg>) -> Vec<PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler>> {
+       fn create_network<'a>(peer_count: usize, cfgs: &'a Vec<PeerManagerCfg>) -> Vec<PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, IgnoringMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler>> {
                let mut peers = Vec::new();
                for i in 0..peer_count {
                        let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap();
                        let ephemeral_bytes = [i as u8; 32];
-                       let msg_handler = MessageHandler { chan_handler: &cfgs[i].chan_handler, route_handler: &cfgs[i].routing_handler };
+                       let msg_handler = MessageHandler { chan_handler: &cfgs[i].chan_handler, route_handler: &cfgs[i].routing_handler, onion_message_handler: IgnoringMessageHandler {} };
                        let peer = PeerManager::new(msg_handler, node_secret, &ephemeral_bytes, &cfgs[i].logger, IgnoringMessageHandler {});
                        peers.push(peer);
                }
@@ -1926,7 +1981,7 @@ mod tests {
                peers
        }
 
-       fn establish_connection<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler>) -> (FileDescriptor, FileDescriptor) {
+       fn establish_connection<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, IgnoringMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, IgnoringMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler>) -> (FileDescriptor, FileDescriptor) {
                let secp_ctx = Secp256k1::new();
                let a_id = PublicKey::from_secret_key(&secp_ctx, &peer_a.our_node_secret);
                let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
index cbf5c77d60095d73abe235c7652eeccb8380cf04..1191a8d3d531477977cad80776bb29f6c49ed2c8 100644 (file)
@@ -9,7 +9,7 @@
 
 //! Wire encoding/decoding for Lightning messages according to [BOLT #1], and for
 //! custom message through the [`CustomMessageReader`] trait.
-//! 
+//!
 //! [BOLT #1]: https://github.com/lightning/bolts/blob/master/01-messaging.md
 
 use io;
@@ -60,6 +60,7 @@ pub(crate) enum Message<T> where T: core::fmt::Debug + Type + TestEq {
        ChannelReady(msgs::ChannelReady),
        Shutdown(msgs::Shutdown),
        ClosingSigned(msgs::ClosingSigned),
+       OnionMessage(msgs::OnionMessage),
        UpdateAddHTLC(msgs::UpdateAddHTLC),
        UpdateFulfillHTLC(msgs::UpdateFulfillHTLC),
        UpdateFailHTLC(msgs::UpdateFailHTLC),
@@ -100,6 +101,7 @@ impl<T> Message<T> where T: core::fmt::Debug + Type + TestEq {
                        &Message::ChannelReady(ref msg) => msg.type_id(),
                        &Message::Shutdown(ref msg) => msg.type_id(),
                        &Message::ClosingSigned(ref msg) => msg.type_id(),
+                       &Message::OnionMessage(ref msg) => msg.type_id(),
                        &Message::UpdateAddHTLC(ref msg) => msg.type_id(),
                        &Message::UpdateFulfillHTLC(ref msg) => msg.type_id(),
                        &Message::UpdateFailHTLC(ref msg) => msg.type_id(),
@@ -185,6 +187,9 @@ fn do_read<R: io::Read, T, H: core::ops::Deref>(buffer: &mut R, message_type: u1
                msgs::ClosingSigned::TYPE => {
                        Ok(Message::ClosingSigned(Readable::read(buffer)?))
                },
+               msgs::OnionMessage::TYPE => {
+                       Ok(Message::OnionMessage(Readable::read(buffer)?))
+               },
                msgs::UpdateAddHTLC::TYPE => {
                        Ok(Message::UpdateAddHTLC(Readable::read(buffer)?))
                },
@@ -344,6 +349,10 @@ impl Encode for msgs::ClosingSigned {
        const TYPE: u16 = 39;
 }
 
+impl Encode for msgs::OnionMessage {
+       const TYPE: u16 = 513;
+}
+
 impl Encode for msgs::UpdateAddHTLC {
        const TYPE: u16 = 128;
 }
index d18372e3b009bb09d82a5d7b6eb62ebcdda7d4ca..9f1d8db46dd84d3477600e1d5a19015b3cb9e6aa 100644 (file)
@@ -13,10 +13,10 @@ use bitcoin::secp256k1::{self, PublicKey, Secp256k1, SecretKey};
 
 use chain::keysinterface::{KeysInterface, Sign};
 use super::utils;
+use ln::msgs::DecodeError;
 use util::chacha20poly1305rfc::ChaChaPolyWriteAdapter;
-use util::ser::{VecWriter, Writeable, Writer};
+use util::ser::{Readable, VecWriter, Writeable, Writer};
 
-use core::iter::FromIterator;
 use io;
 use prelude::*;
 
@@ -113,6 +113,41 @@ fn encrypt_payload<P: Writeable>(payload: P, encrypted_tlvs_ss: [u8; 32]) -> Vec
        writer.0
 }
 
+impl Writeable for BlindedRoute {
+       fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
+               self.introduction_node_id.write(w)?;
+               self.blinding_point.write(w)?;
+               (self.blinded_hops.len() as u8).write(w)?;
+               for hop in &self.blinded_hops {
+                       hop.write(w)?;
+               }
+               Ok(())
+       }
+}
+
+impl Readable for BlindedRoute {
+       fn read<R: io::Read>(r: &mut R) -> Result<Self, DecodeError> {
+               let introduction_node_id = Readable::read(r)?;
+               let blinding_point = Readable::read(r)?;
+               let num_hops: u8 = Readable::read(r)?;
+               if num_hops == 0 { return Err(DecodeError::InvalidValue) }
+               let mut blinded_hops: Vec<BlindedHop> = Vec::with_capacity(num_hops.into());
+               for _ in 0..num_hops {
+                       blinded_hops.push(Readable::read(r)?);
+               }
+               Ok(BlindedRoute {
+                       introduction_node_id,
+                       blinding_point,
+                       blinded_hops,
+               })
+       }
+}
+
+impl_writeable!(BlindedHop, {
+       blinded_node_id,
+       encrypted_payload
+});
+
 /// TLVs to encode in an intermediate onion message packet's hop data. When provided in a blinded
 /// route, they are encoded into [`BlindedHop::encrypted_payload`].
 pub(crate) struct ForwardTlvs {
index 695064e467c49dc935728a117472ed1ac1a1e880..5a83a0b2b213939822a09c0b29e0d50deec345ad 100644 (file)
@@ -10,6 +10,7 @@
 //! Onion message testing and test utilities live here.
 
 use chain::keysinterface::{KeysInterface, Recipient};
+use ln::msgs::OnionMessageHandler;
 use super::{BlindedRoute, Destination, OnionMessenger, SendError};
 use util::enforcing_trait_impls::EnforcingSigner;
 use util::test_utils;
@@ -47,10 +48,10 @@ fn create_nodes(num_messengers: u8) -> Vec<MessengerNode> {
        res
 }
 
-fn pass_along_path(mut path: Vec<MessengerNode>, expected_path_id: Option<[u8; 32]>) {
-       let mut prev_node = path.remove(0);
+fn pass_along_path(path: &Vec<MessengerNode>, expected_path_id: Option<[u8; 32]>) {
+       let mut prev_node = &path[0];
        let num_nodes = path.len();
-       for (idx, node) in path.into_iter().enumerate() {
+       for (idx, node) in path.into_iter().skip(1).enumerate() {
                let events = prev_node.messenger.release_pending_msgs();
                assert_eq!(events.len(), 1);
                let onion_msg =  {
@@ -72,16 +73,16 @@ fn pass_along_path(mut path: Vec<MessengerNode>, expected_path_id: Option<[u8; 3
 fn one_hop() {
        let nodes = create_nodes(2);
 
-       nodes[0].messenger.send_onion_message(&[], Destination::Node(nodes[1].get_node_pk())).unwrap();
-       pass_along_path(nodes, None);
+       nodes[0].messenger.send_onion_message(&[], Destination::Node(nodes[1].get_node_pk()), None).unwrap();
+       pass_along_path(&nodes, None);
 }
 
 #[test]
 fn two_unblinded_hops() {
        let nodes = create_nodes(3);
 
-       nodes[0].messenger.send_onion_message(&[nodes[1].get_node_pk()], Destination::Node(nodes[2].get_node_pk())).unwrap();
-       pass_along_path(nodes, None);
+       nodes[0].messenger.send_onion_message(&[nodes[1].get_node_pk()], Destination::Node(nodes[2].get_node_pk()), None).unwrap();
+       pass_along_path(&nodes, None);
 }
 
 #[test]
@@ -91,8 +92,8 @@ fn two_unblinded_two_blinded() {
        let secp_ctx = Secp256k1::new();
        let blinded_route = BlindedRoute::new::<EnforcingSigner, _, _>(&[nodes[3].get_node_pk(), nodes[4].get_node_pk()], &*nodes[4].keys_manager, &secp_ctx).unwrap();
 
-       nodes[0].messenger.send_onion_message(&[nodes[1].get_node_pk(), nodes[2].get_node_pk()], Destination::BlindedRoute(blinded_route)).unwrap();
-       pass_along_path(nodes, None);
+       nodes[0].messenger.send_onion_message(&[nodes[1].get_node_pk(), nodes[2].get_node_pk()], Destination::BlindedRoute(blinded_route), None).unwrap();
+       pass_along_path(&nodes, None);
 }
 
 #[test]
@@ -102,8 +103,8 @@ fn three_blinded_hops() {
        let secp_ctx = Secp256k1::new();
        let blinded_route = BlindedRoute::new::<EnforcingSigner, _, _>(&[nodes[1].get_node_pk(), nodes[2].get_node_pk(), nodes[3].get_node_pk()], &*nodes[3].keys_manager, &secp_ctx).unwrap();
 
-       nodes[0].messenger.send_onion_message(&[], Destination::BlindedRoute(blinded_route)).unwrap();
-       pass_along_path(nodes, None);
+       nodes[0].messenger.send_onion_message(&[], Destination::BlindedRoute(blinded_route), None).unwrap();
+       pass_along_path(&nodes, None);
 }
 
 #[test]
@@ -116,7 +117,7 @@ fn too_big_packet_error() {
        let hop_node_id = PublicKey::from_secret_key(&secp_ctx, &hop_secret);
 
        let hops = [hop_node_id; 400];
-       let err = nodes[0].messenger.send_onion_message(&hops, Destination::Node(hop_node_id)).unwrap_err();
+       let err = nodes[0].messenger.send_onion_message(&hops, Destination::Node(hop_node_id), None).unwrap_err();
        assert_eq!(err, SendError::TooBigPacket);
 }
 
@@ -124,19 +125,43 @@ fn too_big_packet_error() {
 fn invalid_blinded_route_error() {
        // Make sure we error as expected if a provided blinded route has 0 or 1 hops.
        let mut nodes = create_nodes(3);
-       let (node1, node2, node3) = (nodes.remove(0), nodes.remove(0), nodes.remove(0));
 
        // 0 hops
        let secp_ctx = Secp256k1::new();
-       let mut blinded_route = BlindedRoute::new::<EnforcingSigner, _, _>(&[node2.get_node_pk(), node3.get_node_pk()], &*node3.keys_manager, &secp_ctx).unwrap();
+       let mut blinded_route = BlindedRoute::new::<EnforcingSigner, _, _>(&[nodes[1].get_node_pk(), nodes[2].get_node_pk()], &*nodes[2].keys_manager, &secp_ctx).unwrap();
        blinded_route.blinded_hops.clear();
-       let err = node1.messenger.send_onion_message(&[], Destination::BlindedRoute(blinded_route)).unwrap_err();
+       let err = nodes[0].messenger.send_onion_message(&[], Destination::BlindedRoute(blinded_route), None).unwrap_err();
        assert_eq!(err, SendError::TooFewBlindedHops);
 
        // 1 hop
-       let mut blinded_route = BlindedRoute::new::<EnforcingSigner, _, _>(&[node2.get_node_pk(), node3.get_node_pk()], &*node3.keys_manager, &secp_ctx).unwrap();
+       let mut blinded_route = BlindedRoute::new::<EnforcingSigner, _, _>(&[nodes[1].get_node_pk(), nodes[2].get_node_pk()], &*nodes[2].keys_manager, &secp_ctx).unwrap();
        blinded_route.blinded_hops.remove(0);
        assert_eq!(blinded_route.blinded_hops.len(), 1);
-       let err = node1.messenger.send_onion_message(&[], Destination::BlindedRoute(blinded_route)).unwrap_err();
+       let err = nodes[0].messenger.send_onion_message(&[], Destination::BlindedRoute(blinded_route), None).unwrap_err();
        assert_eq!(err, SendError::TooFewBlindedHops);
 }
+
+#[test]
+fn reply_path() {
+       let mut nodes = create_nodes(4);
+       let secp_ctx = Secp256k1::new();
+
+       // Destination::Node
+       let reply_path = BlindedRoute::new::<EnforcingSigner, _, _>(&[nodes[2].get_node_pk(), nodes[1].get_node_pk(), nodes[0].get_node_pk()], &*nodes[0].keys_manager, &secp_ctx).unwrap();
+       nodes[0].messenger.send_onion_message(&[nodes[1].get_node_pk(), nodes[2].get_node_pk()], Destination::Node(nodes[3].get_node_pk()), Some(reply_path)).unwrap();
+       pass_along_path(&nodes, None);
+       // Make sure the last node successfully decoded the reply path.
+       nodes[3].logger.assert_log_contains(
+               "lightning::onion_message::messenger".to_string(),
+               format!("Received an onion message with path_id: None and reply_path").to_string(), 1);
+
+       // Destination::BlindedRoute
+       let blinded_route = BlindedRoute::new::<EnforcingSigner, _, _>(&[nodes[1].get_node_pk(), nodes[2].get_node_pk(), nodes[3].get_node_pk()], &*nodes[3].keys_manager, &secp_ctx).unwrap();
+       let reply_path = BlindedRoute::new::<EnforcingSigner, _, _>(&[nodes[2].get_node_pk(), nodes[1].get_node_pk(), nodes[0].get_node_pk()], &*nodes[0].keys_manager, &secp_ctx).unwrap();
+
+       nodes[0].messenger.send_onion_message(&[], Destination::BlindedRoute(blinded_route), Some(reply_path)).unwrap();
+       pass_along_path(&nodes, None);
+       nodes[3].logger.assert_log_contains(
+               "lightning::onion_message::messenger".to_string(),
+               format!("Received an onion message with path_id: None and reply_path").to_string(), 2);
+}
index c264cbc387f2609a1b8a56c204b4102cdbf433e1..75eb4619b8f913aa667011d29cb2ff5451c33eef 100644 (file)
@@ -16,13 +16,15 @@ use bitcoin::hashes::sha256::Hash as Sha256;
 use bitcoin::secp256k1::{self, PublicKey, Scalar, Secp256k1, SecretKey};
 
 use chain::keysinterface::{InMemorySigner, KeysInterface, KeysManager, Recipient, Sign};
-use ln::msgs;
+use ln::msgs::{self, OnionMessageHandler};
 use ln::onion_utils;
 use super::blinded_route::{BlindedRoute, ForwardTlvs, ReceiveTlvs};
 use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload, ReceiveControlTlvs, SMALL_PACKET_HOP_DATA_LEN};
 use super::utils;
+use util::events::OnionMessageProvider;
 use util::logger::Logger;
 
+use core::mem;
 use core::ops::Deref;
 use sync::{Arc, Mutex};
 use prelude::*;
@@ -84,7 +86,7 @@ pub struct OnionMessenger<Signer: Sign, K: Deref, L: Deref>
 {
        keys_manager: K,
        logger: L,
-       pending_messages: Mutex<HashMap<PublicKey, Vec<msgs::OnionMessage>>>,
+       pending_messages: Mutex<HashMap<PublicKey, VecDeque<msgs::OnionMessage>>>,
        secp_ctx: Secp256k1<secp256k1::All>,
        // Coming soon:
        // invoice_handler: InvoiceHandler,
@@ -142,7 +144,7 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
 
        /// Send an empty onion message to `destination`, routing it through `intermediate_nodes`.
        /// See [`OnionMessenger`] for example usage.
-       pub fn send_onion_message(&self, intermediate_nodes: &[PublicKey], destination: Destination) -> Result<(), SendError> {
+       pub fn send_onion_message(&self, intermediate_nodes: &[PublicKey], destination: Destination, reply_path: Option<BlindedRoute>) -> Result<(), SendError> {
                if let Destination::BlindedRoute(BlindedRoute { ref blinded_hops, .. }) = destination {
                        if blinded_hops.len() < 2 {
                                return Err(SendError::TooFewBlindedHops);
@@ -160,7 +162,7 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
                        }
                };
                let (packet_payloads, packet_keys) = packet_payloads_and_keys(
-                       &self.secp_ctx, intermediate_nodes, destination, &blinding_secret)
+                       &self.secp_ctx, intermediate_nodes, destination, reply_path, &blinding_secret)
                        .map_err(|e| SendError::Secp256k1(e))?;
 
                let prng_seed = self.keys_manager.get_secure_random_bytes();
@@ -168,8 +170,8 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
                        packet_payloads, packet_keys, prng_seed).map_err(|()| SendError::TooBigPacket)?;
 
                let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap();
-               let pending_msgs = pending_per_peer_msgs.entry(introduction_node_id).or_insert(Vec::new());
-               pending_msgs.push(
+               let pending_msgs = pending_per_peer_msgs.entry(introduction_node_id).or_insert_with(VecDeque::new);
+               pending_msgs.push_back(
                        msgs::OnionMessage {
                                blinding_point,
                                onion_routing_packet: onion_packet,
@@ -178,10 +180,23 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
                Ok(())
        }
 
+       #[cfg(test)]
+       pub(super) fn release_pending_msgs(&self) -> HashMap<PublicKey, VecDeque<msgs::OnionMessage>> {
+               let mut pending_msgs = self.pending_messages.lock().unwrap();
+               let mut msgs = HashMap::new();
+               core::mem::swap(&mut *pending_msgs, &mut msgs);
+               msgs
+       }
+}
+
+impl<Signer: Sign, K: Deref, L: Deref> OnionMessageHandler for OnionMessenger<Signer, K, L>
+       where K::Target: KeysInterface<Signer = Signer>,
+             L::Target: Logger,
+{
        /// Handle an incoming onion message. Currently, if a message was destined for us we will log, but
        /// soon we'll delegate the onion message to a handler that can generate invoices or send
        /// payments.
-       pub fn handle_onion_message(&self, _peer_node_id: &PublicKey, msg: &msgs::OnionMessage) {
+       fn handle_onion_message(&self, _peer_node_id: &PublicKey, msg: &msgs::OnionMessage) {
                let control_tlvs_ss = match self.keys_manager.ecdh(Recipient::Node, &msg.blinding_point, None) {
                        Ok(ss) => ss,
                        Err(e) =>  {
@@ -209,9 +224,11 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
                        msg.onion_routing_packet.hmac, control_tlvs_ss)
                {
                        Ok((Payload::Receive {
-                               control_tlvs: ReceiveControlTlvs::Unblinded(ReceiveTlvs { path_id })
+                               control_tlvs: ReceiveControlTlvs::Unblinded(ReceiveTlvs { path_id }), reply_path,
                        }, None)) => {
-                               log_info!(self.logger, "Received an onion message with path_id: {:02x?}", path_id);
+                               log_info!(self.logger,
+                                       "Received an onion message with path_id: {:02x?} and {}reply_path",
+                                               path_id, if reply_path.is_some() { "" } else { "no " });
                        },
                        Ok((Payload::Forward(ForwardControlTlvs::Unblinded(ForwardTlvs {
                                next_node_id, next_blinding_override
@@ -236,8 +253,8 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
                                };
 
                                let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap();
-                               let pending_msgs = pending_per_peer_msgs.entry(next_node_id).or_insert(Vec::new());
-                               pending_msgs.push(
+                               let pending_msgs = pending_per_peer_msgs.entry(next_node_id).or_insert_with(VecDeque::new);
+                               pending_msgs.push_back(
                                        msgs::OnionMessage {
                                                blinding_point: match next_blinding_override {
                                                        Some(blinding_point) => blinding_point,
@@ -271,13 +288,18 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
                        },
                };
        }
+}
 
-       #[cfg(test)]
-       pub(super) fn release_pending_msgs(&self) -> HashMap<PublicKey, Vec<msgs::OnionMessage>> {
+impl<Signer: Sign, K: Deref, L: Deref> OnionMessageProvider for OnionMessenger<Signer, K, L>
+       where K::Target: KeysInterface<Signer = Signer>,
+             L::Target: Logger,
+{
+       fn next_onion_message_for_peer(&self, peer_node_id: PublicKey) -> Option<msgs::OnionMessage> {
                let mut pending_msgs = self.pending_messages.lock().unwrap();
-               let mut msgs = HashMap::new();
-               core::mem::swap(&mut *pending_msgs, &mut msgs);
-               msgs
+               if let Some(msgs) = pending_msgs.get_mut(&peer_node_id) {
+                       return msgs.pop_front()
+               }
+               None
        }
 }
 
@@ -299,7 +321,8 @@ pub type SimpleRefOnionMessenger<'a, 'b, L> = OnionMessenger<InMemorySigner, &'a
 /// Construct onion packet payloads and keys for sending an onion message along the given
 /// `unblinded_path` to the given `destination`.
 fn packet_payloads_and_keys<T: secp256k1::Signing + secp256k1::Verification>(
-       secp_ctx: &Secp256k1<T>, unblinded_path: &[PublicKey], destination: Destination, session_priv: &SecretKey
+       secp_ctx: &Secp256k1<T>, unblinded_path: &[PublicKey], destination: Destination, mut reply_path:
+       Option<BlindedRoute>, session_priv: &SecretKey
 ) -> Result<(Vec<(Payload, [u8; 32])>, Vec<onion_utils::OnionKeys>), secp256k1::Error> {
        let num_hops = unblinded_path.len() + destination.num_hops();
        let mut payloads = Vec::with_capacity(num_hops);
@@ -344,6 +367,7 @@ fn packet_payloads_and_keys<T: secp256k1::Signing + secp256k1::Verification>(
                } else if let Some(encrypted_payload) = enc_payload_opt {
                        payloads.push((Payload::Receive {
                                control_tlvs: ReceiveControlTlvs::Blinded(encrypted_payload),
+                               reply_path: reply_path.take(),
                        }, control_tlvs_ss));
                }
 
@@ -361,7 +385,8 @@ fn packet_payloads_and_keys<T: secp256k1::Signing + secp256k1::Verification>(
 
        if let Some(control_tlvs_ss) = prev_control_tlvs_ss {
                payloads.push((Payload::Receive {
-                       control_tlvs: ReceiveControlTlvs::Unblinded(ReceiveTlvs { path_id: None, })
+                       control_tlvs: ReceiveControlTlvs::Unblinded(ReceiveTlvs { path_id: None, }),
+                       reply_path: reply_path.take(),
                }, control_tlvs_ss));
        }
 
index d4ba28c843b9dde23be1e41111bda0797e44128a..4ab53735ed6f8c9fc132d13dd80fc61628319f35 100644 (file)
@@ -14,9 +14,9 @@ use bitcoin::secp256k1::ecdh::SharedSecret;
 
 use ln::msgs::DecodeError;
 use ln::onion_utils;
-use super::blinded_route::{ForwardTlvs, ReceiveTlvs};
+use super::blinded_route::{BlindedRoute, ForwardTlvs, ReceiveTlvs};
 use util::chacha20poly1305rfc::{ChaChaPolyReadAdapter, ChaChaPolyWriteAdapter};
-use util::ser::{FixedLengthReader, LengthRead, LengthReadable, LengthReadableArgs, Readable, ReadableArgs, Writeable, Writer};
+use util::ser::{BigSize, FixedLengthReader, LengthRead, LengthReadable, LengthReadableArgs, Readable, ReadableArgs, Writeable, Writer};
 
 use core::cmp;
 use io::{self, Read};
@@ -98,8 +98,8 @@ pub(super) enum Payload {
        /// This payload is for the final hop.
        Receive {
                control_tlvs: ReceiveControlTlvs,
+               reply_path: Option<BlindedRoute>,
                // Coming soon:
-               // reply_path: Option<BlindedRoute>,
                // message: Message,
        }
 }
@@ -135,21 +135,31 @@ pub(super) enum ReceiveControlTlvs {
 impl Writeable for (Payload, [u8; 32]) {
        fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
                match &self.0 {
-                       Payload::Forward(ForwardControlTlvs::Blinded(encrypted_bytes)) |
-                       Payload::Receive { control_tlvs: ReceiveControlTlvs::Blinded(encrypted_bytes)} => {
+                       Payload::Forward(ForwardControlTlvs::Blinded(encrypted_bytes)) => {
                                encode_varint_length_prefixed_tlv!(w, {
                                        (4, encrypted_bytes, vec_type)
                                })
                        },
+                       Payload::Receive {
+                               control_tlvs: ReceiveControlTlvs::Blinded(encrypted_bytes), reply_path
+                       } => {
+                               encode_varint_length_prefixed_tlv!(w, {
+                                       (2, reply_path, option),
+                                       (4, encrypted_bytes, vec_type)
+                               })
+                       },
                        Payload::Forward(ForwardControlTlvs::Unblinded(control_tlvs)) => {
                                let write_adapter = ChaChaPolyWriteAdapter::new(self.1, &control_tlvs);
                                encode_varint_length_prefixed_tlv!(w, {
                                        (4, write_adapter, required)
                                })
                        },
-                       Payload::Receive { control_tlvs: ReceiveControlTlvs::Unblinded(control_tlvs)} => {
+                       Payload::Receive {
+                               control_tlvs: ReceiveControlTlvs::Unblinded(control_tlvs), reply_path,
+                       } => {
                                let write_adapter = ChaChaPolyWriteAdapter::new(self.1, &control_tlvs);
                                encode_varint_length_prefixed_tlv!(w, {
+                                       (2, reply_path, option),
                                        (4, write_adapter, required)
                                })
                        },
@@ -161,20 +171,13 @@ impl Writeable for (Payload, [u8; 32]) {
 // Uses the provided secret to simultaneously decode and decrypt the control TLVs.
 impl ReadableArgs<SharedSecret> for Payload {
        fn read<R: Read>(mut r: &mut R, encrypted_tlvs_ss: SharedSecret) -> Result<Self, DecodeError> {
-               use bitcoin::consensus::encode::{Decodable, Error, VarInt};
-               let v: VarInt = Decodable::consensus_decode(&mut r)
-                       .map_err(|e| match e {
-                               Error::Io(ioe) => DecodeError::from(ioe),
-                               _ => DecodeError::InvalidValue
-                       })?;
-
+               let v: BigSize = Readable::read(r)?;
                let mut rd = FixedLengthReader::new(r, v.0);
-               // TODO: support reply paths
-               let mut _reply_path_bytes: Option<Vec<u8>> = Some(Vec::new());
+               let mut reply_path: Option<BlindedRoute> = None;
                let mut read_adapter: Option<ChaChaPolyReadAdapter<ControlTlvs>> = None;
                let rho = onion_utils::gen_rho_from_shared_secret(&encrypted_tlvs_ss.secret_bytes());
                decode_tlv_stream!(&mut rd, {
-                       (2, _reply_path_bytes, vec_type),
+                       (2, reply_path, option),
                        (4, read_adapter, (option: LengthReadableArgs, rho))
                });
                rd.eat_remaining().map_err(|_| DecodeError::ShortRead)?;
@@ -185,7 +188,7 @@ impl ReadableArgs<SharedSecret> for Payload {
                                Ok(Payload::Forward(ForwardControlTlvs::Unblinded(tlvs)))
                        },
                        Some(ChaChaPolyReadAdapter { readable: ControlTlvs::Receive(tlvs)}) => {
-                               Ok(Payload::Receive { control_tlvs: ReceiveControlTlvs::Unblinded(tlvs)})
+                               Ok(Payload::Receive { control_tlvs: ReceiveControlTlvs::Unblinded(tlvs), reply_path })
                        },
                }
        }
index d56747598563c381ce1e554c2cccda96d0f92be0..e86eae3c813a8af7a3684480171df913ea0883bd 100644 (file)
@@ -1195,6 +1195,12 @@ pub trait MessageSendEventsProvider {
        fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent>;
 }
 
+/// A trait indicating an object may generate onion messages to send
+pub trait OnionMessageProvider {
+       /// Gets the next pending onion message for the peer with the given node id.
+       fn next_onion_message_for_peer(&self, peer_node_id: PublicKey) -> Option<msgs::OnionMessage>;
+}
+
 /// A trait indicating an object may generate events.
 ///
 /// Events are processed by passing an [`EventHandler`] to [`process_pending_events`].
index 63496b28362c8c27b716b8547f24b3a6f9d32f97..cd79a3f7bba8b3d500bf7036c1a202e4d6f2cd71 100644 (file)
@@ -160,6 +160,7 @@ macro_rules! log_internal {
 }
 
 /// Logs an entry at the given level.
+#[doc(hidden)]
 #[macro_export]
 macro_rules! log_given_level {
        ($logger: expr, $lvl:expr, $($arg:tt)+) => (
@@ -185,7 +186,7 @@ macro_rules! log_given_level {
        );
 }
 
-/// Log an error.
+/// Log at the `ERROR` level.
 #[macro_export]
 macro_rules! log_error {
        ($logger: expr, $($arg:tt)*) => (
@@ -193,25 +194,31 @@ macro_rules! log_error {
        )
 }
 
+/// Log at the `WARN` level.
+#[macro_export]
 macro_rules! log_warn {
        ($logger: expr, $($arg:tt)*) => (
                log_given_level!($logger, $crate::util::logger::Level::Warn, $($arg)*);
        )
 }
 
+/// Log at the `INFO` level.
+#[macro_export]
 macro_rules! log_info {
        ($logger: expr, $($arg:tt)*) => (
                log_given_level!($logger, $crate::util::logger::Level::Info, $($arg)*);
        )
 }
 
+/// Log at the `DEBUG` level.
+#[macro_export]
 macro_rules! log_debug {
        ($logger: expr, $($arg:tt)*) => (
                log_given_level!($logger, $crate::util::logger::Level::Debug, $($arg)*);
        )
 }
 
-/// Log a trace log.
+/// Log at the `TRACE` level.
 #[macro_export]
 macro_rules! log_trace {
        ($logger: expr, $($arg:tt)*) => (
@@ -219,7 +226,8 @@ macro_rules! log_trace {
        )
 }
 
-/// Log a gossip log.
+/// Log at the `GOSSIP` level.
+#[macro_export]
 macro_rules! log_gossip {
        ($logger: expr, $($arg:tt)*) => (
                log_given_level!($logger, $crate::util::logger::Level::Gossip, $($arg)*);