lightning-rapid-gossip-sync = { path = "../lightning-rapid-gossip-sync" }
bitcoin = { version = "0.29.0", features = ["secp-lowmemory"] }
hex = "0.3"
-honggfuzz = { version = "0.5", optional = true }
+honggfuzz = { version = "0.5", optional = true, default-features = false }
libfuzzer-sys = { version = "0.4", optional = true }
[build-dependencies]
[ "$(git diff)" != "" ] && exit 1
popd
-cargo install --color always --force honggfuzz
+cargo install --color always --force honggfuzz --no-default-features
sed -i 's/lto = true//' Cargo.toml
HFUZZ_BUILD_ARGS="--features honggfuzz_fuzz" cargo --color always hfuzz build
for TARGET in src/bin/*.rs; do
EnforcingSigner,
Arc<chainmonitor::ChainMonitor<EnforcingSigner, Arc<dyn chain::Filter>, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<TestPersister>>>,
Arc<TestBroadcaster>, Arc<KeyProvider>, Arc<FuzzEstimator>, Arc<dyn Logger>>;
-type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<dyn Logger>>>, Arc<dyn chain::Access>, Arc<dyn Logger>>>, Arc<dyn Logger>, IgnoringMessageHandler>;
+type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<dyn Logger>>>, Arc<dyn chain::Access>, Arc<dyn Logger>>>, IgnoringMessageHandler, Arc<dyn Logger>, IgnoringMessageHandler>;
struct MoneyLossDetector<'a> {
manager: Arc<ChannelMan>,
let mut loss_detector = MoneyLossDetector::new(&peers, channelmanager.clone(), monitor.clone(), PeerManager::new(MessageHandler {
chan_handler: channelmanager.clone(),
route_handler: gossip_sync.clone(),
+ onion_message_handler: IgnoringMessageHandler {},
}, our_network_key, &[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 15, 0], Arc::clone(&logger), IgnoringMessageHandler{}));
let mut should_forward = false;
use bitcoin::secp256k1::ecdsa::RecoverableSignature;
use lightning::chain::keysinterface::{Recipient, KeyMaterial, KeysInterface};
-use lightning::ln::msgs::{self, DecodeError};
+use lightning::ln::msgs::{self, DecodeError, OnionMessageHandler};
use lightning::ln::script::ShutdownScript;
use lightning::util::enforcing_trait_impls::EnforcingSigner;
use lightning::util::logger::Logger;
super::do_test(&::hex::decode(one_hop_om).unwrap(), &logger);
{
let log_entries = logger.lines.lock().unwrap();
- assert_eq!(log_entries.get(&("lightning::onion_message::messenger".to_string(), "Received an onion message with path_id: None".to_string())), Some(&1));
+ assert_eq!(log_entries.get(&("lightning::onion_message::messenger".to_string(), "Received an onion message with path_id: None and no reply_path".to_string())), Some(&1));
}
let two_unblinded_hops_om = "020000000000000000000000000000000000000000000000000000000000000e01055600020000000000000000000000000000000000000000000000000000000000000e0135043304210200000000000000000000000000000000000000000000000000000000000000039500000000000000000000000000000058000000000000000000000000000000000000000000000000000000000000001204105e00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000b300000000000000000000000000000000000000000000000000000000000000";
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
use lightning::chain::keysinterface::{Sign, KeysInterface};
use lightning::ln::channelmanager::ChannelManager;
-use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
+use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
use lightning::routing::scoring::WriteableScore;
P: 'static + Deref + Send + Sync,
Descriptor: 'static + SocketDescriptor + Send + Sync,
CMH: 'static + Deref + Send + Sync,
+ OMH: 'static + Deref + Send + Sync,
RMH: 'static + Deref + Send + Sync,
EH: 'static + EventHandler + Send,
PS: 'static + Deref + Send,
PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
UMH: 'static + Deref + Send + Sync,
- PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
+ PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
S: 'static + Deref<Target = SC> + Send + Sync,
SC: WriteableScore<'a>,
>(
L::Target: 'static + Logger,
P::Target: 'static + Persist<Signer>,
CMH::Target: 'static + ChannelMessageHandler,
+ OMH::Target: 'static + OnionMessageHandler,
RMH::Target: 'static + RoutingMessageHandler,
UMH::Target: 'static + CustomMessageHandler,
PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
p2p_gossip_sync: PGS,
rapid_gossip_sync: RGS,
- peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
+ peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
chain_monitor: Arc<ChainMonitor>,
persister: Arc<FilesystemPersister>,
tx_broadcaster: Arc<test_utils::TestBroadcaster>,
let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
- let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
+ let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
honggfuzz_fuzz = ["honggfuzz"]
[dependencies]
-honggfuzz = { version = "0.5", optional = true }
+honggfuzz = { version = "0.5", optional = true, default-features = false }
afl = { version = "0.4", optional = true }
lightning-invoice = { path = ".." }
lightning = { path = "../../lightning", features = ["regex"] }
#!/bin/bash
set -e
-cargo install --force honggfuzz
+cargo install --force honggfuzz --no-default-features
for TARGET in fuzz_targets/*; do
FILENAME=$(basename $TARGET)
FILE="${FILENAME%.*}"
use lightning::ln::peer_handler;
use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
use lightning::ln::peer_handler::CustomMessageHandler;
-use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler, NetAddress};
+use lightning::ln::msgs::{ChannelMessageHandler, NetAddress, OnionMessageHandler, RoutingMessageHandler};
use lightning::util::logger::Logger;
use std::ops::Deref;
id: u64,
}
impl Connection {
- async fn poll_event_process<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, mut event_receiver: mpsc::Receiver<()>) where
+ async fn poll_event_process<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, mut event_receiver: mpsc::Receiver<()>) where
CMH: Deref + 'static + Send + Sync,
RMH: Deref + 'static + Send + Sync,
+ OMH: Deref + 'static + Send + Sync,
L: Deref + 'static + Send + Sync,
UMH: Deref + 'static + Send + Sync,
CMH::Target: ChannelMessageHandler + Send + Sync,
RMH::Target: RoutingMessageHandler + Send + Sync,
+ OMH::Target: OnionMessageHandler + Send + Sync,
L::Target: Logger + Send + Sync,
UMH::Target: CustomMessageHandler + Send + Sync,
{
}
}
- async fn schedule_read<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
+ async fn schedule_read<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
CMH: Deref + 'static + Send + Sync,
RMH: Deref + 'static + Send + Sync,
+ OMH: Deref + 'static + Send + Sync,
L: Deref + 'static + Send + Sync,
UMH: Deref + 'static + Send + Sync,
CMH::Target: ChannelMessageHandler + 'static + Send + Sync,
RMH::Target: RoutingMessageHandler + 'static + Send + Sync,
+ OMH::Target: OnionMessageHandler + 'static + Send + Sync,
L::Target: Logger + 'static + Send + Sync,
UMH::Target: CustomMessageHandler + 'static + Send + Sync,
{
/// The returned future will complete when the peer is disconnected and associated handling
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
/// not need to poll the provided future in order to make progress.
-pub fn setup_inbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
+pub fn setup_inbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
CMH: Deref + 'static + Send + Sync,
RMH: Deref + 'static + Send + Sync,
+ OMH: Deref + 'static + Send + Sync,
L: Deref + 'static + Send + Sync,
UMH: Deref + 'static + Send + Sync,
CMH::Target: ChannelMessageHandler + Send + Sync,
RMH::Target: RoutingMessageHandler + Send + Sync,
+ OMH::Target: OnionMessageHandler + Send + Sync,
L::Target: Logger + Send + Sync,
UMH::Target: CustomMessageHandler + Send + Sync,
{
/// The returned future will complete when the peer is disconnected and associated handling
/// futures are freed, though, because all processing futures are spawned with tokio::spawn, you do
/// not need to poll the provided future in order to make progress.
-pub fn setup_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
+pub fn setup_outbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, their_node_id: PublicKey, stream: StdTcpStream) -> impl std::future::Future<Output=()> where
CMH: Deref + 'static + Send + Sync,
RMH: Deref + 'static + Send + Sync,
+ OMH: Deref + 'static + Send + Sync,
L: Deref + 'static + Send + Sync,
UMH: Deref + 'static + Send + Sync,
CMH::Target: ChannelMessageHandler + Send + Sync,
RMH::Target: RoutingMessageHandler + Send + Sync,
+ OMH::Target: OnionMessageHandler + Send + Sync,
L::Target: Logger + Send + Sync,
UMH::Target: CustomMessageHandler + Send + Sync,
{
/// disconnected and associated handling futures are freed, though, because all processing in said
/// futures are spawned with tokio::spawn, you do not need to poll the second future in order to
/// make progress.
-pub async fn connect_outbound<CMH, RMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, L, UMH>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
+pub async fn connect_outbound<CMH, RMH, OMH, L, UMH>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, CMH, RMH, OMH, L, UMH>>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
CMH: Deref + 'static + Send + Sync,
RMH: Deref + 'static + Send + Sync,
+ OMH: Deref + 'static + Send + Sync,
L: Deref + 'static + Send + Sync,
UMH: Deref + 'static + Send + Sync,
CMH::Target: ChannelMessageHandler + Send + Sync,
RMH::Target: RoutingMessageHandler + Send + Sync,
+ OMH::Target: OnionMessageHandler + Send + Sync,
L::Target: Logger + Send + Sync,
UMH::Target: CustomMessageHandler + Send + Sync,
{
let a_manager = Arc::new(PeerManager::new(MessageHandler {
chan_handler: Arc::clone(&a_handler),
route_handler: Arc::clone(&a_handler),
+ onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
}, a_key.clone(), &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
let (b_connected_sender, mut b_connected) = mpsc::channel(1);
let b_manager = Arc::new(PeerManager::new(MessageHandler {
chan_handler: Arc::clone(&b_handler),
route_handler: Arc::clone(&b_handler),
+ onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
}, b_key.clone(), &[2; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
// We bind on localhost, hoping the environment is properly configured with a local
let a_manager = Arc::new(PeerManager::new(MessageHandler {
chan_handler: Arc::new(lightning::ln::peer_handler::ErroringMessageHandler::new()),
+ onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
route_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
}, a_key, &[1; 32], Arc::new(TestLogger()), Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{})));
/// HTLCs which we sent to our counterparty which are claimable after a timeout (less on-chain
/// fees) if the counterparty does not know the preimage for the HTLCs. These are somewhat
/// likely to be claimed by our counterparty before we do.
- MaybeClaimableHTLCAwaitingTimeout {
- /// The amount available to claim, in satoshis, excluding the on-chain fees which will be
- /// required to do so.
+ MaybeTimeoutClaimableHTLC {
+ /// The amount potentially available to claim, in satoshis, excluding the on-chain fees
+ /// which will be required to do so.
claimable_amount_satoshis: u64,
/// The height at which we will be able to claim the balance if our counterparty has not
/// done so.
claimable_height: u32,
},
+ /// HTLCs which we received from our counterparty which are claimable with a preimage which we
+ /// do not currently have. This will only be claimable if we receive the preimage from the node
+ /// to which we forwarded this HTLC before the timeout.
+ MaybePreimageClaimableHTLC {
+ /// The amount potentially available to claim, in satoshis, excluding the on-chain fees
+ /// which will be required to do so.
+ claimable_amount_satoshis: u64,
+ /// The height at which our counterparty will be able to claim the balance if we have not
+ /// yet received the preimage and claimed it ourselves.
+ expiry_height: u32,
+ },
/// The channel has been closed, and our counterparty broadcasted a revoked commitment
/// transaction.
///
confirmation_height: conf_thresh,
});
} else {
- return Some(Balance::MaybeClaimableHTLCAwaitingTimeout {
+ return Some(Balance::MaybeTimeoutClaimableHTLC {
claimable_amount_satoshis: htlc.amount_msat / 1000,
claimable_height: htlc.cltv_expiry,
});
timeout_height: htlc.cltv_expiry,
});
}
+ } else if htlc_resolved.is_none() {
+ return Some(Balance::MaybePreimageClaimableHTLC {
+ claimable_amount_satoshis: htlc.amount_msat / 1000,
+ expiry_height: htlc.cltv_expiry,
+ });
}
None
}
for (htlc, _, _) in us.current_holder_commitment_tx.htlc_outputs.iter() {
if htlc.transaction_output_index.is_none() { continue; }
if htlc.offered {
- res.push(Balance::MaybeClaimableHTLCAwaitingTimeout {
+ res.push(Balance::MaybeTimeoutClaimableHTLC {
claimable_amount_satoshis: htlc.amount_msat / 1000,
claimable_height: htlc.cltv_expiry,
});
} else if us.payment_preimages.get(&htlc.payment_hash).is_some() {
claimable_inbound_htlc_value_sat += htlc.amount_msat / 1000;
+ } else {
+ // As long as the HTLC is still in our latest commitment state, treat
+ // it as potentially claimable, even if it has long-since expired.
+ res.push(Balance::MaybePreimageClaimableHTLC {
+ claimable_amount_satoshis: htlc.amount_msat / 1000,
+ expiry_height: htlc.cltv_expiry,
+ });
}
}
res.push(Balance::ClaimableOnChannelClose {
assert_eq!(sorted_vec(vec![Balance::ClaimableOnChannelClose {
claimable_amount_satoshis: 1_000_000 - 3_000 - 4_000 - 1_000 - 3 - chan_feerate *
(channel::commitment_tx_base_weight(opt_anchors) + 2 * channel::COMMITMENT_TX_WEIGHT_PER_HTLC) / 1000,
- }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+ }, Balance::MaybeTimeoutClaimableHTLC {
claimable_amount_satoshis: 3_000,
claimable_height: htlc_cltv_timeout,
- }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+ }, Balance::MaybeTimeoutClaimableHTLC {
claimable_amount_satoshis: 4_000,
claimable_height: htlc_cltv_timeout,
}]),
sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
- assert_eq!(vec![Balance::ClaimableOnChannelClose {
+ assert_eq!(sorted_vec(vec![Balance::ClaimableOnChannelClose {
claimable_amount_satoshis: 1_000,
- }],
- nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances());
+ }, Balance::MaybePreimageClaimableHTLC {
+ claimable_amount_satoshis: 3_000,
+ expiry_height: htlc_cltv_timeout,
+ }, Balance::MaybePreimageClaimableHTLC {
+ claimable_amount_satoshis: 4_000,
+ expiry_height: htlc_cltv_timeout,
+ }]),
+ sorted_vec(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
nodes[1].node.claim_funds(payment_preimage);
check_added_monitors!(nodes[1], 1);
chan_feerate * (channel::commitment_tx_base_weight(opt_anchors) +
if prev_commitment_tx { 1 } else { 2 } *
channel::COMMITMENT_TX_WEIGHT_PER_HTLC) / 1000,
- }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+ }, Balance::MaybeTimeoutClaimableHTLC {
claimable_amount_satoshis: 4_000,
claimable_height: htlc_cltv_timeout,
}];
if !prev_commitment_tx {
- a_expected_balances.push(Balance::MaybeClaimableHTLCAwaitingTimeout {
+ a_expected_balances.push(Balance::MaybeTimeoutClaimableHTLC {
claimable_amount_satoshis: 3_000,
claimable_height: htlc_cltv_timeout,
});
claimable_amount_satoshis: 1_000_000 - 3_000 - 4_000 - 1_000 - 3 - chan_feerate *
(channel::commitment_tx_base_weight(opt_anchors) + 2 * channel::COMMITMENT_TX_WEIGHT_PER_HTLC) / 1000,
confirmation_height: nodes[0].best_block_info().1 + ANTI_REORG_DELAY - 1,
- }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+ }, Balance::MaybeTimeoutClaimableHTLC {
claimable_amount_satoshis: 3_000,
claimable_height: htlc_cltv_timeout,
- }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+ }, Balance::MaybeTimeoutClaimableHTLC {
claimable_amount_satoshis: 4_000,
claimable_height: htlc_cltv_timeout,
}]),
// After ANTI_REORG_DELAY, A will consider its balance fully spendable and generate a
// `SpendableOutputs` event. However, B still has to wait for the CSV delay.
- assert_eq!(sorted_vec(vec![Balance::MaybeClaimableHTLCAwaitingTimeout {
+ assert_eq!(sorted_vec(vec![Balance::MaybeTimeoutClaimableHTLC {
claimable_amount_satoshis: 3_000,
claimable_height: htlc_cltv_timeout,
- }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+ }, Balance::MaybeTimeoutClaimableHTLC {
claimable_amount_satoshis: 4_000,
claimable_height: htlc_cltv_timeout,
}]),
} else {
expect_payment_sent!(nodes[0], payment_preimage);
}
- assert_eq!(sorted_vec(vec![Balance::MaybeClaimableHTLCAwaitingTimeout {
+ assert_eq!(sorted_vec(vec![Balance::MaybeTimeoutClaimableHTLC {
claimable_amount_satoshis: 3_000,
claimable_height: htlc_cltv_timeout,
- }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+ }, Balance::MaybeTimeoutClaimableHTLC {
claimable_amount_satoshis: 4_000,
claimable_height: htlc_cltv_timeout,
}]),
sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
connect_blocks(&nodes[0], ANTI_REORG_DELAY - 1);
- assert_eq!(vec![Balance::MaybeClaimableHTLCAwaitingTimeout {
+ assert_eq!(vec![Balance::MaybeTimeoutClaimableHTLC {
claimable_amount_satoshis: 4_000,
claimable_height: htlc_cltv_timeout,
}],
claimable_amount_satoshis: 1_000_000 - 10_000 - 20_000 - chan_feerate *
(channel::commitment_tx_base_weight(opt_anchors) + 2 * channel::COMMITMENT_TX_WEIGHT_PER_HTLC) / 1000,
confirmation_height: node_a_commitment_claimable,
- }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+ }, Balance::MaybeTimeoutClaimableHTLC {
claimable_amount_satoshis: 10_000,
claimable_height: htlc_cltv_timeout,
- }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+ }, Balance::MaybeTimeoutClaimableHTLC {
claimable_amount_satoshis: 20_000,
claimable_height: htlc_cltv_timeout,
}]),
claimable_amount_satoshis: 1_000_000 - 10_000 - 20_000 - chan_feerate *
(channel::commitment_tx_base_weight(opt_anchors) + 2 * channel::COMMITMENT_TX_WEIGHT_PER_HTLC) / 1000,
confirmation_height: node_a_commitment_claimable,
- }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+ }, Balance::MaybeTimeoutClaimableHTLC {
claimable_amount_satoshis: 10_000,
claimable_height: htlc_cltv_timeout,
- }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+ }, Balance::MaybeTimeoutClaimableHTLC {
claimable_amount_satoshis: 20_000,
claimable_height: htlc_cltv_timeout,
}]),
}, Balance::ClaimableAwaitingConfirmations {
claimable_amount_satoshis: 10_000,
confirmation_height: node_a_htlc_claimable,
- }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+ }, Balance::MaybeTimeoutClaimableHTLC {
claimable_amount_satoshis: 20_000,
claimable_height: htlc_cltv_timeout,
}]),
}, Balance::ClaimableAwaitingConfirmations {
claimable_amount_satoshis: 10_000,
confirmation_height: node_a_htlc_claimable,
- }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+ }, Balance::MaybeTimeoutClaimableHTLC {
claimable_amount_satoshis: 20_000,
claimable_height: htlc_cltv_timeout,
}]),
test_spendable_output(&nodes[0], &as_txn[1]);
}
+#[test]
+fn test_no_preimage_inbound_htlc_balances() {
+ // Tests that MaybePreimageClaimableHTLC are generated for inbound HTLCs for which we do not
+ // have a preimage.
+ let chanmon_cfgs = create_chanmon_cfgs(2);
+ let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
+ let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
+ let mut nodes = create_network(2, &node_cfgs, &node_chanmgrs);
+
+ let (_, _, chan_id, funding_tx) = create_announced_chan_between_nodes_with_value(&nodes, 0, 1, 1_000_000, 500_000_000, InitFeatures::known(), InitFeatures::known());
+ let funding_outpoint = OutPoint { txid: funding_tx.txid(), index: 0 };
+
+ // Send two HTLCs, one from A to B, and one from B to A.
+ let to_b_failed_payment_hash = route_payment(&nodes[0], &[&nodes[1]], 10_000_000).1;
+ let to_a_failed_payment_hash = route_payment(&nodes[1], &[&nodes[0]], 20_000_000).1;
+ let htlc_cltv_timeout = nodes[0].best_block_info().1 + TEST_FINAL_CLTV + 1; // Note ChannelManager adds one to CLTV timeouts for safety
+
+ let chan_feerate = get_feerate!(nodes[0], chan_id) as u64;
+ let opt_anchors = get_opt_anchors!(nodes[0], chan_id);
+
+ // Both A and B will have an HTLC that's claimable on timeout and one that's claimable if they
+ // receive the preimage. These will remain the same through the channel closure and until the
+ // HTLC output is spent.
+
+ assert_eq!(sorted_vec(vec![Balance::ClaimableOnChannelClose {
+ claimable_amount_satoshis: 1_000_000 - 500_000 - 10_000 - chan_feerate *
+ (channel::commitment_tx_base_weight(opt_anchors) + 2 * channel::COMMITMENT_TX_WEIGHT_PER_HTLC) / 1000,
+ }, Balance::MaybePreimageClaimableHTLC {
+ claimable_amount_satoshis: 20_000,
+ expiry_height: htlc_cltv_timeout,
+ }, Balance::MaybeTimeoutClaimableHTLC {
+ claimable_amount_satoshis: 10_000,
+ claimable_height: htlc_cltv_timeout,
+ }]),
+ sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
+
+ assert_eq!(sorted_vec(vec![Balance::ClaimableOnChannelClose {
+ claimable_amount_satoshis: 500_000 - 20_000,
+ }, Balance::MaybePreimageClaimableHTLC {
+ claimable_amount_satoshis: 10_000,
+ expiry_height: htlc_cltv_timeout,
+ }, Balance::MaybeTimeoutClaimableHTLC {
+ claimable_amount_satoshis: 20_000,
+ claimable_height: htlc_cltv_timeout,
+ }]),
+ sorted_vec(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
+
+ // Get nodes[0]'s commitment transaction and HTLC-Timeout transaction
+ let as_txn = get_local_commitment_txn!(nodes[0], chan_id);
+ assert_eq!(as_txn.len(), 2);
+ check_spends!(as_txn[1], as_txn[0]);
+ check_spends!(as_txn[0], funding_tx);
+
+ // Now close the channel by confirming A's commitment transaction on both nodes, checking the
+ // claimable balances remain the same except for the non-HTLC balance changing variant.
+ let node_a_commitment_claimable = nodes[0].best_block_info().1 + BREAKDOWN_TIMEOUT as u32;
+ let as_pre_spend_claims = sorted_vec(vec![Balance::ClaimableAwaitingConfirmations {
+ claimable_amount_satoshis: 1_000_000 - 500_000 - 10_000 - chan_feerate *
+ (channel::commitment_tx_base_weight(opt_anchors) + 2 * channel::COMMITMENT_TX_WEIGHT_PER_HTLC) / 1000,
+ confirmation_height: node_a_commitment_claimable,
+ }, Balance::MaybePreimageClaimableHTLC {
+ claimable_amount_satoshis: 20_000,
+ expiry_height: htlc_cltv_timeout,
+ }, Balance::MaybeTimeoutClaimableHTLC {
+ claimable_amount_satoshis: 10_000,
+ claimable_height: htlc_cltv_timeout,
+ }]);
+
+ mine_transaction(&nodes[0], &as_txn[0]);
+ nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().clear();
+ check_added_monitors!(nodes[0], 1);
+ check_closed_broadcast!(nodes[0], true);
+ check_closed_event!(nodes[0], 1, ClosureReason::CommitmentTxConfirmed);
+
+ assert_eq!(as_pre_spend_claims,
+ sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
+
+ mine_transaction(&nodes[1], &as_txn[0]);
+ check_added_monitors!(nodes[1], 1);
+ check_closed_broadcast!(nodes[1], true);
+ check_closed_event!(nodes[1], 1, ClosureReason::CommitmentTxConfirmed);
+
+ let node_b_commitment_claimable = nodes[1].best_block_info().1 + ANTI_REORG_DELAY - 1;
+ let mut bs_pre_spend_claims = sorted_vec(vec![Balance::ClaimableAwaitingConfirmations {
+ claimable_amount_satoshis: 500_000 - 20_000,
+ confirmation_height: node_b_commitment_claimable,
+ }, Balance::MaybePreimageClaimableHTLC {
+ claimable_amount_satoshis: 10_000,
+ expiry_height: htlc_cltv_timeout,
+ }, Balance::MaybeTimeoutClaimableHTLC {
+ claimable_amount_satoshis: 20_000,
+ claimable_height: htlc_cltv_timeout,
+ }]);
+ assert_eq!(bs_pre_spend_claims,
+ sorted_vec(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
+
+ // We'll broadcast the HTLC-Timeout transaction one block prior to the htlc's expiration (as it
+ // is confirmable in the next block), but will still include the same claimable balances as no
+ // HTLC has been spent, even after the HTLC expires. We'll also fail the inbound HTLC, but it
+ // won't do anything as the channel is already closed.
+
+ connect_blocks(&nodes[0], TEST_FINAL_CLTV - 1);
+ let as_htlc_timeout_claim = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
+ assert_eq!(as_htlc_timeout_claim.len(), 1);
+ check_spends!(as_htlc_timeout_claim[0], as_txn[0]);
+ expect_pending_htlcs_forwardable_conditions!(nodes[0],
+ [HTLCDestination::FailedPayment { payment_hash: to_a_failed_payment_hash }]);
+
+ assert_eq!(as_pre_spend_claims,
+ sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
+
+ connect_blocks(&nodes[0], 1);
+ assert_eq!(as_pre_spend_claims,
+ sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
+
+ // For node B, we'll get the non-HTLC funds claimable after ANTI_REORG_DELAY confirmations
+ connect_blocks(&nodes[1], ANTI_REORG_DELAY - 1);
+ test_spendable_output(&nodes[1], &as_txn[0]);
+ bs_pre_spend_claims.retain(|e| if let Balance::ClaimableAwaitingConfirmations { .. } = e { false } else { true });
+
+ // The next few blocks for B look the same as for A, though for the opposite HTLC
+ nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().clear();
+ connect_blocks(&nodes[1], TEST_FINAL_CLTV - (ANTI_REORG_DELAY - 1) - 1);
+ expect_pending_htlcs_forwardable_conditions!(nodes[1],
+ [HTLCDestination::FailedPayment { payment_hash: to_b_failed_payment_hash }]);
+ let bs_htlc_timeout_claim = nodes[1].tx_broadcaster.txn_broadcasted.lock().unwrap().split_off(0);
+ assert_eq!(bs_htlc_timeout_claim.len(), 1);
+ check_spends!(bs_htlc_timeout_claim[0], as_txn[0]);
+
+ assert_eq!(bs_pre_spend_claims,
+ sorted_vec(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
+
+ connect_blocks(&nodes[1], 1);
+ assert_eq!(bs_pre_spend_claims,
+ sorted_vec(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
+
+ // Now confirm the two HTLC timeout transactions for A, checking that the inbound HTLC resolves
+ // after ANTI_REORG_DELAY confirmations and the other takes BREAKDOWN_TIMEOUT confirmations.
+ mine_transaction(&nodes[0], &as_htlc_timeout_claim[0]);
+ let as_timeout_claimable_height = nodes[0].best_block_info().1 + (BREAKDOWN_TIMEOUT as u32) - 1;
+ assert_eq!(sorted_vec(vec![Balance::ClaimableAwaitingConfirmations {
+ claimable_amount_satoshis: 1_000_000 - 500_000 - 10_000 - chan_feerate *
+ (channel::commitment_tx_base_weight(opt_anchors) + 2 * channel::COMMITMENT_TX_WEIGHT_PER_HTLC) / 1000,
+ confirmation_height: node_a_commitment_claimable,
+ }, Balance::MaybePreimageClaimableHTLC {
+ claimable_amount_satoshis: 20_000,
+ expiry_height: htlc_cltv_timeout,
+ }, Balance::ClaimableAwaitingConfirmations {
+ claimable_amount_satoshis: 10_000,
+ confirmation_height: as_timeout_claimable_height,
+ }]),
+ sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
+
+ mine_transaction(&nodes[0], &bs_htlc_timeout_claim[0]);
+ assert_eq!(sorted_vec(vec![Balance::ClaimableAwaitingConfirmations {
+ claimable_amount_satoshis: 1_000_000 - 500_000 - 10_000 - chan_feerate *
+ (channel::commitment_tx_base_weight(opt_anchors) + 2 * channel::COMMITMENT_TX_WEIGHT_PER_HTLC) / 1000,
+ confirmation_height: node_a_commitment_claimable,
+ }, Balance::MaybePreimageClaimableHTLC {
+ claimable_amount_satoshis: 20_000,
+ expiry_height: htlc_cltv_timeout,
+ }, Balance::ClaimableAwaitingConfirmations {
+ claimable_amount_satoshis: 10_000,
+ confirmation_height: as_timeout_claimable_height,
+ }]),
+ sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
+
+ // Once as_htlc_timeout_claim[0] reaches ANTI_REORG_DELAY confirmations, we should get a
+ // payment failure event.
+ connect_blocks(&nodes[0], ANTI_REORG_DELAY - 2);
+ expect_payment_failed!(nodes[0], to_b_failed_payment_hash, true);
+
+ connect_blocks(&nodes[0], 1);
+ assert_eq!(sorted_vec(vec![Balance::ClaimableAwaitingConfirmations {
+ claimable_amount_satoshis: 1_000_000 - 500_000 - 10_000 - chan_feerate *
+ (channel::commitment_tx_base_weight(opt_anchors) + 2 * channel::COMMITMENT_TX_WEIGHT_PER_HTLC) / 1000,
+ confirmation_height: node_a_commitment_claimable,
+ }, Balance::ClaimableAwaitingConfirmations {
+ claimable_amount_satoshis: 10_000,
+ confirmation_height: core::cmp::max(as_timeout_claimable_height, htlc_cltv_timeout),
+ }]),
+ sorted_vec(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
+
+ connect_blocks(&nodes[0], node_a_commitment_claimable - nodes[0].best_block_info().1);
+ assert_eq!(vec![Balance::ClaimableAwaitingConfirmations {
+ claimable_amount_satoshis: 10_000,
+ confirmation_height: core::cmp::max(as_timeout_claimable_height, htlc_cltv_timeout),
+ }],
+ nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances());
+ test_spendable_output(&nodes[0], &as_txn[0]);
+
+ connect_blocks(&nodes[0], as_timeout_claimable_height - nodes[0].best_block_info().1);
+ assert!(nodes[0].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances().is_empty());
+ test_spendable_output(&nodes[0], &as_htlc_timeout_claim[0]);
+
+ // The process for B should be completely identical as well, noting that the non-HTLC-balance
+ // was already claimed.
+ mine_transaction(&nodes[1], &bs_htlc_timeout_claim[0]);
+ let bs_timeout_claimable_height = nodes[1].best_block_info().1 + ANTI_REORG_DELAY - 1;
+ assert_eq!(sorted_vec(vec![Balance::MaybePreimageClaimableHTLC {
+ claimable_amount_satoshis: 10_000,
+ expiry_height: htlc_cltv_timeout,
+ }, Balance::ClaimableAwaitingConfirmations {
+ claimable_amount_satoshis: 20_000,
+ confirmation_height: bs_timeout_claimable_height,
+ }]),
+ sorted_vec(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
+
+ mine_transaction(&nodes[1], &as_htlc_timeout_claim[0]);
+ assert_eq!(sorted_vec(vec![Balance::MaybePreimageClaimableHTLC {
+ claimable_amount_satoshis: 10_000,
+ expiry_height: htlc_cltv_timeout,
+ }, Balance::ClaimableAwaitingConfirmations {
+ claimable_amount_satoshis: 20_000,
+ confirmation_height: bs_timeout_claimable_height,
+ }]),
+ sorted_vec(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances()));
+
+ connect_blocks(&nodes[1], ANTI_REORG_DELAY - 2);
+ expect_payment_failed!(nodes[1], to_a_failed_payment_hash, true);
+
+ assert_eq!(vec![Balance::MaybePreimageClaimableHTLC {
+ claimable_amount_satoshis: 10_000,
+ expiry_height: htlc_cltv_timeout,
+ }],
+ nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances());
+ test_spendable_output(&nodes[1], &bs_htlc_timeout_claim[0]);
+
+ connect_blocks(&nodes[1], 1);
+ assert!(nodes[1].chain_monitor.chain_monitor.get_monitor(funding_outpoint).unwrap().get_claimable_balances().is_empty());
+}
+
fn sorted_vec_with_additions<T: Ord + Clone>(v_orig: &Vec<T>, extra_ts: &[&T]) -> Vec<T> {
let mut v = v_orig.clone();
for t in extra_ts {
// lists the two on-chain timeout-able HTLCs as claimable balances.
assert_eq!(sorted_vec(vec![Balance::ClaimableOnChannelClose {
claimable_amount_satoshis: 100_000 - 5_000 - 4_000 - 3 - 2_000 + 3_000,
- }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+ }, Balance::MaybeTimeoutClaimableHTLC {
claimable_amount_satoshis: 2_000,
claimable_height: missing_htlc_cltv_timeout,
- }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+ }, Balance::MaybeTimeoutClaimableHTLC {
claimable_amount_satoshis: 4_000,
claimable_height: htlc_cltv_timeout,
- }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+ }, Balance::MaybeTimeoutClaimableHTLC {
claimable_amount_satoshis: 5_000,
claimable_height: live_htlc_cltv_timeout,
}]),
assert_eq!(sorted_vec(vec![Balance::ClaimableOnChannelClose {
claimable_amount_satoshis: 100_000 - 4_000 - 3_000,
- }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+ }, Balance::MaybeTimeoutClaimableHTLC {
claimable_amount_satoshis: 4_000,
claimable_height: htlc_cltv_timeout,
- }, Balance::MaybeClaimableHTLCAwaitingTimeout {
+ }, Balance::MaybeTimeoutClaimableHTLC {
claimable_amount_satoshis: 3_000,
claimable_height: htlc_cltv_timeout,
}]),
use io::{self, Read};
use io_extras::read_to_end;
-use util::events::MessageSendEventsProvider;
+use util::events::{MessageSendEventsProvider, OnionMessageProvider};
use util::logger;
use util::ser::{BigSize, LengthReadable, Readable, ReadableArgs, Writeable, Writer, FixedLengthReader, HighZeroBytesDroppedBigSize, Hostname};
fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError>;
}
+/// A trait to describe an object that can receive onion messages.
+pub trait OnionMessageHandler : OnionMessageProvider {
+ /// Handle an incoming onion_message message from the given peer.
+ fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage);
+}
+
mod fuzzy_internal_msgs {
use prelude::*;
use ln::{PaymentPreimage, PaymentSecret};
use ln::features::InitFeatures;
use ln::msgs;
-use ln::msgs::{ChannelMessageHandler, LightningError, NetAddress, RoutingMessageHandler};
+use ln::msgs::{ChannelMessageHandler, LightningError, NetAddress, OnionMessageHandler, RoutingMessageHandler};
use ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager};
use util::ser::{VecWriter, Writeable, Writer};
use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep};
use ln::wire::Encode;
use routing::gossip::{NetworkGraph, P2PGossipSync};
use util::atomic_counter::AtomicCounter;
-use util::events::{MessageSendEvent, MessageSendEventsProvider};
+use util::events::{MessageSendEvent, MessageSendEventsProvider, OnionMessageProvider};
use util::logger::Logger;
use prelude::*;
fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::QueryChannelRange) -> Result<(), LightningError> { Ok(()) }
fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: msgs::QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) }
}
+impl OnionMessageProvider for IgnoringMessageHandler {
+ fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option<msgs::OnionMessage> { None }
+}
+impl OnionMessageHandler for IgnoringMessageHandler {
+ fn handle_onion_message(&self, _their_node_id: &PublicKey, _msg: &msgs::OnionMessage) {}
+}
impl Deref for IgnoringMessageHandler {
type Target = IgnoringMessageHandler;
fn deref(&self) -> &Self { self }
}
/// Provides references to trait impls which handle different types of messages.
-pub struct MessageHandler<CM: Deref, RM: Deref> where
+pub struct MessageHandler<CM: Deref, RM: Deref, OM: Deref> where
CM::Target: ChannelMessageHandler,
- RM::Target: RoutingMessageHandler {
+ RM::Target: RoutingMessageHandler,
+ OM::Target: OnionMessageHandler,
+{
/// A message handler which handles messages specific to channels. Usually this is just a
/// [`ChannelManager`] object or an [`ErroringMessageHandler`].
///
///
/// [`P2PGossipSync`]: crate::routing::gossip::P2PGossipSync
pub route_handler: RM,
+
+ /// A message handler which handles onion messages. For now, this can only be an
+ /// [`IgnoringMessageHandler`].
+ pub onion_message_handler: OM,
}
/// Provides an object which can be used to send data to and which uniquely identifies a connection
/// we have fewer than this many messages in the outbound buffer again.
/// We also use this as the target number of outbound gossip messages to keep in the write buffer,
/// refilled as we send bytes.
-const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 10;
+const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 12;
/// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to
/// the peer.
const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO;
pending_outbound_buffer: LinkedList<Vec<u8>>,
pending_outbound_buffer_first_msg_offset: usize,
+ // Queue gossip broadcasts separately from `pending_outbound_buffer` so we can easily prioritize
+ // channel messages over them.
+ gossip_broadcast_buffer: LinkedList<Vec<u8>>,
awaiting_write_event: bool,
pending_read_buffer: Vec<u8>,
self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
}
- /// Determines if we should push additional gossip messages onto a peer's outbound buffer for
- /// backfilling gossip data to the peer. This is checked every time the peer's buffer may have
- /// been drained.
+ /// Determines if we should push additional gossip background sync (aka "backfill") onto a peer's
+ /// outbound buffer. This is checked every time the peer's buffer may have been drained.
fn should_buffer_gossip_backfill(&self) -> bool {
- self.pending_outbound_buffer.is_empty() &&
- self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
+ self.pending_outbound_buffer.is_empty() && self.gossip_broadcast_buffer.is_empty()
+ && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
}
- /// Returns whether this peer's buffer is full and we should drop gossip messages.
- fn buffer_full_drop_gossip(&self) -> bool {
- if self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
- || self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO {
- return false
- }
- true
+ /// Determines if we should push an onion message onto a peer's outbound buffer. This is checked
+ /// every time the peer's buffer may have been drained.
+ fn should_buffer_onion_message(&self) -> bool {
+ self.pending_outbound_buffer.is_empty()
+ && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
+ }
+
+ /// Determines if we should push additional gossip broadcast messages onto a peer's outbound
+ /// buffer. This is checked every time the peer's buffer may have been drained.
+ fn should_buffer_gossip_broadcast(&self) -> bool {
+ self.pending_outbound_buffer.is_empty()
+ && self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
+ }
+
+ /// Returns whether this peer's outbound buffers are full and we should drop gossip broadcasts.
+ fn buffer_full_drop_gossip_broadcast(&self) -> bool {
+ let total_outbound_buffered =
+ self.gossip_broadcast_buffer.len() + self.pending_outbound_buffer.len();
+
+ total_outbound_buffered > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP ||
+ self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
}
}
/// issues such as overly long function definitions.
///
/// (C-not exported) as Arcs don't make sense in bindings
-pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<SD, Arc<SimpleArcChannelManager<M, T, F, L>>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<L>>>, Arc<C>, Arc<L>>>, Arc<L>, Arc<IgnoringMessageHandler>>;
+pub type SimpleArcPeerManager<SD, M, T, F, C, L> = PeerManager<SD, Arc<SimpleArcChannelManager<M, T, F, L>>, Arc<P2PGossipSync<Arc<NetworkGraph<Arc<L>>>, Arc<C>, Arc<L>>>, IgnoringMessageHandler, Arc<L>, Arc<IgnoringMessageHandler>>;
/// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference
/// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't
/// helps with issues such as long function definitions.
///
/// (C-not exported) as Arcs don't make sense in bindings
-pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, SD, M, T, F, C, L> = PeerManager<SD, SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L>, &'e P2PGossipSync<&'g NetworkGraph<&'f L>, &'h C, &'f L>, &'f L, IgnoringMessageHandler>;
+pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, SD, M, T, F, C, L> = PeerManager<SD, SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L>, &'e P2PGossipSync<&'g NetworkGraph<&'f L>, &'h C, &'f L>, IgnoringMessageHandler, &'f L, IgnoringMessageHandler>;
/// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls
/// socket events into messages which it passes on to its [`MessageHandler`].
/// you're using lightning-net-tokio.
///
/// [`read_event`]: PeerManager::read_event
-pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> where
+pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CMH: Deref> where
CM::Target: ChannelMessageHandler,
RM::Target: RoutingMessageHandler,
+ OM::Target: OnionMessageHandler,
L::Target: Logger,
CMH::Target: CustomMessageHandler {
- message_handler: MessageHandler<CM, RM>,
+ message_handler: MessageHandler<CM, RM, OM>,
/// Connection state for each connected peer - we have an outer read-write lock which is taken
/// as read while we're doing processing for a peer and taken write when a peer is being added
/// or removed.
}}
}
-impl<Descriptor: SocketDescriptor, CM: Deref, L: Deref> PeerManager<Descriptor, CM, IgnoringMessageHandler, L, IgnoringMessageHandler> where
+impl<Descriptor: SocketDescriptor, CM: Deref, OM: Deref, L: Deref> PeerManager<Descriptor, CM, IgnoringMessageHandler, OM, L, IgnoringMessageHandler> where
CM::Target: ChannelMessageHandler,
+ OM::Target: OnionMessageHandler,
L::Target: Logger {
- /// Constructs a new PeerManager with the given ChannelMessageHandler. No routing message
- /// handler is used and network graph messages are ignored.
+ /// Constructs a new `PeerManager` with the given `ChannelMessageHandler` and
+ /// `OnionMessageHandler`. No routing message handler is used and network graph messages are
+ /// ignored.
///
/// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
/// cryptographically secure random bytes.
///
/// (C-not exported) as we can't export a PeerManager with a dummy route handler
- pub fn new_channel_only(channel_message_handler: CM, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L) -> Self {
+ pub fn new_channel_only(channel_message_handler: CM, onion_message_handler: OM, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L) -> Self {
Self::new(MessageHandler {
chan_handler: channel_message_handler,
route_handler: IgnoringMessageHandler{},
+ onion_message_handler,
}, our_node_secret, ephemeral_random_data, logger, IgnoringMessageHandler{})
}
}
-impl<Descriptor: SocketDescriptor, RM: Deref, L: Deref> PeerManager<Descriptor, ErroringMessageHandler, RM, L, IgnoringMessageHandler> where
+impl<Descriptor: SocketDescriptor, RM: Deref, L: Deref> PeerManager<Descriptor, ErroringMessageHandler, RM, IgnoringMessageHandler, L, IgnoringMessageHandler> where
RM::Target: RoutingMessageHandler,
L::Target: Logger {
- /// Constructs a new PeerManager with the given RoutingMessageHandler. No channel message
- /// handler is used and messages related to channels will be ignored (or generate error
- /// messages). Note that some other lightning implementations time-out connections after some
- /// time if no channel is built with the peer.
+ /// Constructs a new `PeerManager` with the given `RoutingMessageHandler`. No channel message
+ /// handler or onion message handler is used and onion and channel messages will be ignored (or
+ /// generate error messages). Note that some other lightning implementations time-out connections
+ /// after some time if no channel is built with the peer.
///
/// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
/// cryptographically secure random bytes.
Self::new(MessageHandler {
chan_handler: ErroringMessageHandler::new(),
route_handler: routing_message_handler,
+ onion_message_handler: IgnoringMessageHandler{},
}, our_node_secret, ephemeral_random_data, logger, IgnoringMessageHandler{})
}
}
}
}
-impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> PeerManager<Descriptor, CM, RM, L, CMH> where
+impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CMH: Deref> PeerManager<Descriptor, CM, RM, OM, L, CMH> where
CM::Target: ChannelMessageHandler,
RM::Target: RoutingMessageHandler,
+ OM::Target: OnionMessageHandler,
L::Target: Logger,
CMH::Target: CustomMessageHandler {
/// Constructs a new PeerManager with the given message handlers and node_id secret key
/// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
/// cryptographically secure random bytes.
- pub fn new(message_handler: MessageHandler<CM, RM>, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L, custom_message_handler: CMH) -> Self {
+ pub fn new(message_handler: MessageHandler<CM, RM, OM>, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L, custom_message_handler: CMH) -> Self {
let mut ephemeral_key_midstate = Sha256::engine();
ephemeral_key_midstate.input(ephemeral_random_data);
pending_outbound_buffer: LinkedList::new(),
pending_outbound_buffer_first_msg_offset: 0,
+ gossip_broadcast_buffer: LinkedList::new(),
awaiting_write_event: false,
pending_read_buffer,
pending_outbound_buffer: LinkedList::new(),
pending_outbound_buffer_first_msg_offset: 0,
+ gossip_broadcast_buffer: LinkedList::new(),
awaiting_write_event: false,
pending_read_buffer,
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
while !peer.awaiting_write_event {
+ if peer.should_buffer_onion_message() {
+ if let Some(peer_node_id) = peer.their_node_id {
+ if let Some(next_onion_message) =
+ self.message_handler.onion_message_handler.next_onion_message_for_peer(peer_node_id) {
+ self.enqueue_message(peer, &next_onion_message);
+ }
+ }
+ }
+ if peer.should_buffer_gossip_broadcast() {
+ if let Some(msg) = peer.gossip_broadcast_buffer.pop_front() {
+ peer.pending_outbound_buffer.push_back(msg);
+ }
+ }
if peer.should_buffer_gossip_backfill() {
match peer.sync_status {
InitSyncTracker::NoSyncRequested => {},
}
}
- /// Append a message to a peer's pending outbound/write buffer
- fn enqueue_encoded_message(&self, peer: &mut Peer, encoded_message: &Vec<u8>) {
- peer.msgs_sent_since_pong += 1;
- peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
- }
-
/// Append a message to a peer's pending outbound/write buffer
fn enqueue_message<M: wire::Type>(&self, peer: &mut Peer, message: &M) {
let mut buffer = VecWriter(Vec::with_capacity(2048));
} else {
log_trace!(self.logger, "Enqueueing message {:?} to {}", message, log_pubkey!(peer.their_node_id.unwrap()))
}
- self.enqueue_encoded_message(peer, &buffer.0);
+ peer.msgs_sent_since_pong += 1;
+ peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&buffer.0[..]));
+ }
+
+ /// Append a message to a peer's pending outbound/write gossip broadcast buffer
+ fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: &Vec<u8>) {
+ peer.msgs_sent_since_pong += 1;
+ peer.gossip_broadcast_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
}
fn do_read_event(&self, peer_descriptor: &mut Descriptor, data: &[u8]) -> Result<bool, PeerHandleError> {
self.message_handler.route_handler.handle_reply_channel_range(&their_node_id, msg)?;
},
+ // Onion message:
+ wire::Message::OnionMessage(msg) => {
+ self.message_handler.onion_message_handler.handle_onion_message(&their_node_id, &msg);
+ },
+
// Unknown messages:
wire::Message::Unknown(type_id) if message.is_even() => {
log_debug!(self.logger, "Received unknown even message of type {}, disconnecting peer!", type_id);
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
continue
}
- if peer.buffer_full_drop_gossip() {
+ if peer.buffer_full_drop_gossip_broadcast() {
log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
continue;
}
- self.enqueue_encoded_message(&mut *peer, &encoded_msg);
+ self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
}
},
wire::Message::NodeAnnouncement(ref msg) => {
!peer.should_forward_node_announcement(msg.contents.node_id) {
continue
}
- if peer.buffer_full_drop_gossip() {
+ if peer.buffer_full_drop_gossip_broadcast() {
log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
continue;
}
- self.enqueue_encoded_message(&mut *peer, &encoded_msg);
+ self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
}
},
wire::Message::ChannelUpdate(ref msg) => {
!peer.should_forward_channel_announcement(msg.contents.short_channel_id) {
continue
}
- if peer.buffer_full_drop_gossip() {
+ if peer.buffer_full_drop_gossip_broadcast() {
log_gossip!(self.logger, "Skipping broadcast message to {:?} as its outbound buffer is full", peer.their_node_id);
continue;
}
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
continue;
}
- self.enqueue_encoded_message(&mut *peer, &encoded_msg);
+ self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
}
},
_ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),
cfgs
}
- fn create_network<'a>(peer_count: usize, cfgs: &'a Vec<PeerManagerCfg>) -> Vec<PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler>> {
+ fn create_network<'a>(peer_count: usize, cfgs: &'a Vec<PeerManagerCfg>) -> Vec<PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, IgnoringMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler>> {
let mut peers = Vec::new();
for i in 0..peer_count {
let node_secret = SecretKey::from_slice(&[42 + i as u8; 32]).unwrap();
let ephemeral_bytes = [i as u8; 32];
- let msg_handler = MessageHandler { chan_handler: &cfgs[i].chan_handler, route_handler: &cfgs[i].routing_handler };
+ let msg_handler = MessageHandler { chan_handler: &cfgs[i].chan_handler, route_handler: &cfgs[i].routing_handler, onion_message_handler: IgnoringMessageHandler {} };
let peer = PeerManager::new(msg_handler, node_secret, &ephemeral_bytes, &cfgs[i].logger, IgnoringMessageHandler {});
peers.push(peer);
}
peers
}
- fn establish_connection<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler>) -> (FileDescriptor, FileDescriptor) {
+ fn establish_connection<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, IgnoringMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, IgnoringMessageHandler, &'a test_utils::TestLogger, IgnoringMessageHandler>) -> (FileDescriptor, FileDescriptor) {
let secp_ctx = Secp256k1::new();
let a_id = PublicKey::from_secret_key(&secp_ctx, &peer_a.our_node_secret);
let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
//! Wire encoding/decoding for Lightning messages according to [BOLT #1], and for
//! custom message through the [`CustomMessageReader`] trait.
-//!
+//!
//! [BOLT #1]: https://github.com/lightning/bolts/blob/master/01-messaging.md
use io;
ChannelReady(msgs::ChannelReady),
Shutdown(msgs::Shutdown),
ClosingSigned(msgs::ClosingSigned),
+ OnionMessage(msgs::OnionMessage),
UpdateAddHTLC(msgs::UpdateAddHTLC),
UpdateFulfillHTLC(msgs::UpdateFulfillHTLC),
UpdateFailHTLC(msgs::UpdateFailHTLC),
&Message::ChannelReady(ref msg) => msg.type_id(),
&Message::Shutdown(ref msg) => msg.type_id(),
&Message::ClosingSigned(ref msg) => msg.type_id(),
+ &Message::OnionMessage(ref msg) => msg.type_id(),
&Message::UpdateAddHTLC(ref msg) => msg.type_id(),
&Message::UpdateFulfillHTLC(ref msg) => msg.type_id(),
&Message::UpdateFailHTLC(ref msg) => msg.type_id(),
msgs::ClosingSigned::TYPE => {
Ok(Message::ClosingSigned(Readable::read(buffer)?))
},
+ msgs::OnionMessage::TYPE => {
+ Ok(Message::OnionMessage(Readable::read(buffer)?))
+ },
msgs::UpdateAddHTLC::TYPE => {
Ok(Message::UpdateAddHTLC(Readable::read(buffer)?))
},
const TYPE: u16 = 39;
}
+impl Encode for msgs::OnionMessage {
+ const TYPE: u16 = 513;
+}
+
impl Encode for msgs::UpdateAddHTLC {
const TYPE: u16 = 128;
}
use chain::keysinterface::{KeysInterface, Sign};
use super::utils;
+use ln::msgs::DecodeError;
use util::chacha20poly1305rfc::ChaChaPolyWriteAdapter;
-use util::ser::{VecWriter, Writeable, Writer};
+use util::ser::{Readable, VecWriter, Writeable, Writer};
-use core::iter::FromIterator;
use io;
use prelude::*;
writer.0
}
+impl Writeable for BlindedRoute {
+ fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
+ self.introduction_node_id.write(w)?;
+ self.blinding_point.write(w)?;
+ (self.blinded_hops.len() as u8).write(w)?;
+ for hop in &self.blinded_hops {
+ hop.write(w)?;
+ }
+ Ok(())
+ }
+}
+
+impl Readable for BlindedRoute {
+ fn read<R: io::Read>(r: &mut R) -> Result<Self, DecodeError> {
+ let introduction_node_id = Readable::read(r)?;
+ let blinding_point = Readable::read(r)?;
+ let num_hops: u8 = Readable::read(r)?;
+ if num_hops == 0 { return Err(DecodeError::InvalidValue) }
+ let mut blinded_hops: Vec<BlindedHop> = Vec::with_capacity(num_hops.into());
+ for _ in 0..num_hops {
+ blinded_hops.push(Readable::read(r)?);
+ }
+ Ok(BlindedRoute {
+ introduction_node_id,
+ blinding_point,
+ blinded_hops,
+ })
+ }
+}
+
+impl_writeable!(BlindedHop, {
+ blinded_node_id,
+ encrypted_payload
+});
+
/// TLVs to encode in an intermediate onion message packet's hop data. When provided in a blinded
/// route, they are encoded into [`BlindedHop::encrypted_payload`].
pub(crate) struct ForwardTlvs {
//! Onion message testing and test utilities live here.
use chain::keysinterface::{KeysInterface, Recipient};
+use ln::msgs::OnionMessageHandler;
use super::{BlindedRoute, Destination, OnionMessenger, SendError};
use util::enforcing_trait_impls::EnforcingSigner;
use util::test_utils;
res
}
-fn pass_along_path(mut path: Vec<MessengerNode>, expected_path_id: Option<[u8; 32]>) {
- let mut prev_node = path.remove(0);
+fn pass_along_path(path: &Vec<MessengerNode>, expected_path_id: Option<[u8; 32]>) {
+ let mut prev_node = &path[0];
let num_nodes = path.len();
- for (idx, node) in path.into_iter().enumerate() {
+ for (idx, node) in path.into_iter().skip(1).enumerate() {
let events = prev_node.messenger.release_pending_msgs();
assert_eq!(events.len(), 1);
let onion_msg = {
fn one_hop() {
let nodes = create_nodes(2);
- nodes[0].messenger.send_onion_message(&[], Destination::Node(nodes[1].get_node_pk())).unwrap();
- pass_along_path(nodes, None);
+ nodes[0].messenger.send_onion_message(&[], Destination::Node(nodes[1].get_node_pk()), None).unwrap();
+ pass_along_path(&nodes, None);
}
#[test]
fn two_unblinded_hops() {
let nodes = create_nodes(3);
- nodes[0].messenger.send_onion_message(&[nodes[1].get_node_pk()], Destination::Node(nodes[2].get_node_pk())).unwrap();
- pass_along_path(nodes, None);
+ nodes[0].messenger.send_onion_message(&[nodes[1].get_node_pk()], Destination::Node(nodes[2].get_node_pk()), None).unwrap();
+ pass_along_path(&nodes, None);
}
#[test]
let secp_ctx = Secp256k1::new();
let blinded_route = BlindedRoute::new::<EnforcingSigner, _, _>(&[nodes[3].get_node_pk(), nodes[4].get_node_pk()], &*nodes[4].keys_manager, &secp_ctx).unwrap();
- nodes[0].messenger.send_onion_message(&[nodes[1].get_node_pk(), nodes[2].get_node_pk()], Destination::BlindedRoute(blinded_route)).unwrap();
- pass_along_path(nodes, None);
+ nodes[0].messenger.send_onion_message(&[nodes[1].get_node_pk(), nodes[2].get_node_pk()], Destination::BlindedRoute(blinded_route), None).unwrap();
+ pass_along_path(&nodes, None);
}
#[test]
let secp_ctx = Secp256k1::new();
let blinded_route = BlindedRoute::new::<EnforcingSigner, _, _>(&[nodes[1].get_node_pk(), nodes[2].get_node_pk(), nodes[3].get_node_pk()], &*nodes[3].keys_manager, &secp_ctx).unwrap();
- nodes[0].messenger.send_onion_message(&[], Destination::BlindedRoute(blinded_route)).unwrap();
- pass_along_path(nodes, None);
+ nodes[0].messenger.send_onion_message(&[], Destination::BlindedRoute(blinded_route), None).unwrap();
+ pass_along_path(&nodes, None);
}
#[test]
let hop_node_id = PublicKey::from_secret_key(&secp_ctx, &hop_secret);
let hops = [hop_node_id; 400];
- let err = nodes[0].messenger.send_onion_message(&hops, Destination::Node(hop_node_id)).unwrap_err();
+ let err = nodes[0].messenger.send_onion_message(&hops, Destination::Node(hop_node_id), None).unwrap_err();
assert_eq!(err, SendError::TooBigPacket);
}
fn invalid_blinded_route_error() {
// Make sure we error as expected if a provided blinded route has 0 or 1 hops.
let mut nodes = create_nodes(3);
- let (node1, node2, node3) = (nodes.remove(0), nodes.remove(0), nodes.remove(0));
// 0 hops
let secp_ctx = Secp256k1::new();
- let mut blinded_route = BlindedRoute::new::<EnforcingSigner, _, _>(&[node2.get_node_pk(), node3.get_node_pk()], &*node3.keys_manager, &secp_ctx).unwrap();
+ let mut blinded_route = BlindedRoute::new::<EnforcingSigner, _, _>(&[nodes[1].get_node_pk(), nodes[2].get_node_pk()], &*nodes[2].keys_manager, &secp_ctx).unwrap();
blinded_route.blinded_hops.clear();
- let err = node1.messenger.send_onion_message(&[], Destination::BlindedRoute(blinded_route)).unwrap_err();
+ let err = nodes[0].messenger.send_onion_message(&[], Destination::BlindedRoute(blinded_route), None).unwrap_err();
assert_eq!(err, SendError::TooFewBlindedHops);
// 1 hop
- let mut blinded_route = BlindedRoute::new::<EnforcingSigner, _, _>(&[node2.get_node_pk(), node3.get_node_pk()], &*node3.keys_manager, &secp_ctx).unwrap();
+ let mut blinded_route = BlindedRoute::new::<EnforcingSigner, _, _>(&[nodes[1].get_node_pk(), nodes[2].get_node_pk()], &*nodes[2].keys_manager, &secp_ctx).unwrap();
blinded_route.blinded_hops.remove(0);
assert_eq!(blinded_route.blinded_hops.len(), 1);
- let err = node1.messenger.send_onion_message(&[], Destination::BlindedRoute(blinded_route)).unwrap_err();
+ let err = nodes[0].messenger.send_onion_message(&[], Destination::BlindedRoute(blinded_route), None).unwrap_err();
assert_eq!(err, SendError::TooFewBlindedHops);
}
+
+#[test]
+fn reply_path() {
+ let mut nodes = create_nodes(4);
+ let secp_ctx = Secp256k1::new();
+
+ // Destination::Node
+ let reply_path = BlindedRoute::new::<EnforcingSigner, _, _>(&[nodes[2].get_node_pk(), nodes[1].get_node_pk(), nodes[0].get_node_pk()], &*nodes[0].keys_manager, &secp_ctx).unwrap();
+ nodes[0].messenger.send_onion_message(&[nodes[1].get_node_pk(), nodes[2].get_node_pk()], Destination::Node(nodes[3].get_node_pk()), Some(reply_path)).unwrap();
+ pass_along_path(&nodes, None);
+ // Make sure the last node successfully decoded the reply path.
+ nodes[3].logger.assert_log_contains(
+ "lightning::onion_message::messenger".to_string(),
+ format!("Received an onion message with path_id: None and reply_path").to_string(), 1);
+
+ // Destination::BlindedRoute
+ let blinded_route = BlindedRoute::new::<EnforcingSigner, _, _>(&[nodes[1].get_node_pk(), nodes[2].get_node_pk(), nodes[3].get_node_pk()], &*nodes[3].keys_manager, &secp_ctx).unwrap();
+ let reply_path = BlindedRoute::new::<EnforcingSigner, _, _>(&[nodes[2].get_node_pk(), nodes[1].get_node_pk(), nodes[0].get_node_pk()], &*nodes[0].keys_manager, &secp_ctx).unwrap();
+
+ nodes[0].messenger.send_onion_message(&[], Destination::BlindedRoute(blinded_route), Some(reply_path)).unwrap();
+ pass_along_path(&nodes, None);
+ nodes[3].logger.assert_log_contains(
+ "lightning::onion_message::messenger".to_string(),
+ format!("Received an onion message with path_id: None and reply_path").to_string(), 2);
+}
use bitcoin::secp256k1::{self, PublicKey, Scalar, Secp256k1, SecretKey};
use chain::keysinterface::{InMemorySigner, KeysInterface, KeysManager, Recipient, Sign};
-use ln::msgs;
+use ln::msgs::{self, OnionMessageHandler};
use ln::onion_utils;
use super::blinded_route::{BlindedRoute, ForwardTlvs, ReceiveTlvs};
use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload, ReceiveControlTlvs, SMALL_PACKET_HOP_DATA_LEN};
use super::utils;
+use util::events::OnionMessageProvider;
use util::logger::Logger;
+use core::mem;
use core::ops::Deref;
use sync::{Arc, Mutex};
use prelude::*;
{
keys_manager: K,
logger: L,
- pending_messages: Mutex<HashMap<PublicKey, Vec<msgs::OnionMessage>>>,
+ pending_messages: Mutex<HashMap<PublicKey, VecDeque<msgs::OnionMessage>>>,
secp_ctx: Secp256k1<secp256k1::All>,
// Coming soon:
// invoice_handler: InvoiceHandler,
/// Send an empty onion message to `destination`, routing it through `intermediate_nodes`.
/// See [`OnionMessenger`] for example usage.
- pub fn send_onion_message(&self, intermediate_nodes: &[PublicKey], destination: Destination) -> Result<(), SendError> {
+ pub fn send_onion_message(&self, intermediate_nodes: &[PublicKey], destination: Destination, reply_path: Option<BlindedRoute>) -> Result<(), SendError> {
if let Destination::BlindedRoute(BlindedRoute { ref blinded_hops, .. }) = destination {
if blinded_hops.len() < 2 {
return Err(SendError::TooFewBlindedHops);
}
};
let (packet_payloads, packet_keys) = packet_payloads_and_keys(
- &self.secp_ctx, intermediate_nodes, destination, &blinding_secret)
+ &self.secp_ctx, intermediate_nodes, destination, reply_path, &blinding_secret)
.map_err(|e| SendError::Secp256k1(e))?;
let prng_seed = self.keys_manager.get_secure_random_bytes();
packet_payloads, packet_keys, prng_seed).map_err(|()| SendError::TooBigPacket)?;
let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap();
- let pending_msgs = pending_per_peer_msgs.entry(introduction_node_id).or_insert(Vec::new());
- pending_msgs.push(
+ let pending_msgs = pending_per_peer_msgs.entry(introduction_node_id).or_insert_with(VecDeque::new);
+ pending_msgs.push_back(
msgs::OnionMessage {
blinding_point,
onion_routing_packet: onion_packet,
Ok(())
}
+ #[cfg(test)]
+ pub(super) fn release_pending_msgs(&self) -> HashMap<PublicKey, VecDeque<msgs::OnionMessage>> {
+ let mut pending_msgs = self.pending_messages.lock().unwrap();
+ let mut msgs = HashMap::new();
+ core::mem::swap(&mut *pending_msgs, &mut msgs);
+ msgs
+ }
+}
+
+impl<Signer: Sign, K: Deref, L: Deref> OnionMessageHandler for OnionMessenger<Signer, K, L>
+ where K::Target: KeysInterface<Signer = Signer>,
+ L::Target: Logger,
+{
/// Handle an incoming onion message. Currently, if a message was destined for us we will log, but
/// soon we'll delegate the onion message to a handler that can generate invoices or send
/// payments.
- pub fn handle_onion_message(&self, _peer_node_id: &PublicKey, msg: &msgs::OnionMessage) {
+ fn handle_onion_message(&self, _peer_node_id: &PublicKey, msg: &msgs::OnionMessage) {
let control_tlvs_ss = match self.keys_manager.ecdh(Recipient::Node, &msg.blinding_point, None) {
Ok(ss) => ss,
Err(e) => {
msg.onion_routing_packet.hmac, control_tlvs_ss)
{
Ok((Payload::Receive {
- control_tlvs: ReceiveControlTlvs::Unblinded(ReceiveTlvs { path_id })
+ control_tlvs: ReceiveControlTlvs::Unblinded(ReceiveTlvs { path_id }), reply_path,
}, None)) => {
- log_info!(self.logger, "Received an onion message with path_id: {:02x?}", path_id);
+ log_info!(self.logger,
+ "Received an onion message with path_id: {:02x?} and {}reply_path",
+ path_id, if reply_path.is_some() { "" } else { "no " });
},
Ok((Payload::Forward(ForwardControlTlvs::Unblinded(ForwardTlvs {
next_node_id, next_blinding_override
};
let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap();
- let pending_msgs = pending_per_peer_msgs.entry(next_node_id).or_insert(Vec::new());
- pending_msgs.push(
+ let pending_msgs = pending_per_peer_msgs.entry(next_node_id).or_insert_with(VecDeque::new);
+ pending_msgs.push_back(
msgs::OnionMessage {
blinding_point: match next_blinding_override {
Some(blinding_point) => blinding_point,
},
};
}
+}
- #[cfg(test)]
- pub(super) fn release_pending_msgs(&self) -> HashMap<PublicKey, Vec<msgs::OnionMessage>> {
+impl<Signer: Sign, K: Deref, L: Deref> OnionMessageProvider for OnionMessenger<Signer, K, L>
+ where K::Target: KeysInterface<Signer = Signer>,
+ L::Target: Logger,
+{
+ fn next_onion_message_for_peer(&self, peer_node_id: PublicKey) -> Option<msgs::OnionMessage> {
let mut pending_msgs = self.pending_messages.lock().unwrap();
- let mut msgs = HashMap::new();
- core::mem::swap(&mut *pending_msgs, &mut msgs);
- msgs
+ if let Some(msgs) = pending_msgs.get_mut(&peer_node_id) {
+ return msgs.pop_front()
+ }
+ None
}
}
/// Construct onion packet payloads and keys for sending an onion message along the given
/// `unblinded_path` to the given `destination`.
fn packet_payloads_and_keys<T: secp256k1::Signing + secp256k1::Verification>(
- secp_ctx: &Secp256k1<T>, unblinded_path: &[PublicKey], destination: Destination, session_priv: &SecretKey
+ secp_ctx: &Secp256k1<T>, unblinded_path: &[PublicKey], destination: Destination, mut reply_path:
+ Option<BlindedRoute>, session_priv: &SecretKey
) -> Result<(Vec<(Payload, [u8; 32])>, Vec<onion_utils::OnionKeys>), secp256k1::Error> {
let num_hops = unblinded_path.len() + destination.num_hops();
let mut payloads = Vec::with_capacity(num_hops);
} else if let Some(encrypted_payload) = enc_payload_opt {
payloads.push((Payload::Receive {
control_tlvs: ReceiveControlTlvs::Blinded(encrypted_payload),
+ reply_path: reply_path.take(),
}, control_tlvs_ss));
}
if let Some(control_tlvs_ss) = prev_control_tlvs_ss {
payloads.push((Payload::Receive {
- control_tlvs: ReceiveControlTlvs::Unblinded(ReceiveTlvs { path_id: None, })
+ control_tlvs: ReceiveControlTlvs::Unblinded(ReceiveTlvs { path_id: None, }),
+ reply_path: reply_path.take(),
}, control_tlvs_ss));
}
use ln::msgs::DecodeError;
use ln::onion_utils;
-use super::blinded_route::{ForwardTlvs, ReceiveTlvs};
+use super::blinded_route::{BlindedRoute, ForwardTlvs, ReceiveTlvs};
use util::chacha20poly1305rfc::{ChaChaPolyReadAdapter, ChaChaPolyWriteAdapter};
-use util::ser::{FixedLengthReader, LengthRead, LengthReadable, LengthReadableArgs, Readable, ReadableArgs, Writeable, Writer};
+use util::ser::{BigSize, FixedLengthReader, LengthRead, LengthReadable, LengthReadableArgs, Readable, ReadableArgs, Writeable, Writer};
use core::cmp;
use io::{self, Read};
/// This payload is for the final hop.
Receive {
control_tlvs: ReceiveControlTlvs,
+ reply_path: Option<BlindedRoute>,
// Coming soon:
- // reply_path: Option<BlindedRoute>,
// message: Message,
}
}
impl Writeable for (Payload, [u8; 32]) {
fn write<W: Writer>(&self, w: &mut W) -> Result<(), io::Error> {
match &self.0 {
- Payload::Forward(ForwardControlTlvs::Blinded(encrypted_bytes)) |
- Payload::Receive { control_tlvs: ReceiveControlTlvs::Blinded(encrypted_bytes)} => {
+ Payload::Forward(ForwardControlTlvs::Blinded(encrypted_bytes)) => {
encode_varint_length_prefixed_tlv!(w, {
(4, encrypted_bytes, vec_type)
})
},
+ Payload::Receive {
+ control_tlvs: ReceiveControlTlvs::Blinded(encrypted_bytes), reply_path
+ } => {
+ encode_varint_length_prefixed_tlv!(w, {
+ (2, reply_path, option),
+ (4, encrypted_bytes, vec_type)
+ })
+ },
Payload::Forward(ForwardControlTlvs::Unblinded(control_tlvs)) => {
let write_adapter = ChaChaPolyWriteAdapter::new(self.1, &control_tlvs);
encode_varint_length_prefixed_tlv!(w, {
(4, write_adapter, required)
})
},
- Payload::Receive { control_tlvs: ReceiveControlTlvs::Unblinded(control_tlvs)} => {
+ Payload::Receive {
+ control_tlvs: ReceiveControlTlvs::Unblinded(control_tlvs), reply_path,
+ } => {
let write_adapter = ChaChaPolyWriteAdapter::new(self.1, &control_tlvs);
encode_varint_length_prefixed_tlv!(w, {
+ (2, reply_path, option),
(4, write_adapter, required)
})
},
// Uses the provided secret to simultaneously decode and decrypt the control TLVs.
impl ReadableArgs<SharedSecret> for Payload {
fn read<R: Read>(mut r: &mut R, encrypted_tlvs_ss: SharedSecret) -> Result<Self, DecodeError> {
- use bitcoin::consensus::encode::{Decodable, Error, VarInt};
- let v: VarInt = Decodable::consensus_decode(&mut r)
- .map_err(|e| match e {
- Error::Io(ioe) => DecodeError::from(ioe),
- _ => DecodeError::InvalidValue
- })?;
-
+ let v: BigSize = Readable::read(r)?;
let mut rd = FixedLengthReader::new(r, v.0);
- // TODO: support reply paths
- let mut _reply_path_bytes: Option<Vec<u8>> = Some(Vec::new());
+ let mut reply_path: Option<BlindedRoute> = None;
let mut read_adapter: Option<ChaChaPolyReadAdapter<ControlTlvs>> = None;
let rho = onion_utils::gen_rho_from_shared_secret(&encrypted_tlvs_ss.secret_bytes());
decode_tlv_stream!(&mut rd, {
- (2, _reply_path_bytes, vec_type),
+ (2, reply_path, option),
(4, read_adapter, (option: LengthReadableArgs, rho))
});
rd.eat_remaining().map_err(|_| DecodeError::ShortRead)?;
Ok(Payload::Forward(ForwardControlTlvs::Unblinded(tlvs)))
},
Some(ChaChaPolyReadAdapter { readable: ControlTlvs::Receive(tlvs)}) => {
- Ok(Payload::Receive { control_tlvs: ReceiveControlTlvs::Unblinded(tlvs)})
+ Ok(Payload::Receive { control_tlvs: ReceiveControlTlvs::Unblinded(tlvs), reply_path })
},
}
}
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent>;
}
+/// A trait indicating an object may generate onion messages to send
+pub trait OnionMessageProvider {
+ /// Gets the next pending onion message for the peer with the given node id.
+ fn next_onion_message_for_peer(&self, peer_node_id: PublicKey) -> Option<msgs::OnionMessage>;
+}
+
/// A trait indicating an object may generate events.
///
/// Events are processed by passing an [`EventHandler`] to [`process_pending_events`].
}
/// Logs an entry at the given level.
+#[doc(hidden)]
#[macro_export]
macro_rules! log_given_level {
($logger: expr, $lvl:expr, $($arg:tt)+) => (
);
}
-/// Log an error.
+/// Log at the `ERROR` level.
#[macro_export]
macro_rules! log_error {
($logger: expr, $($arg:tt)*) => (
)
}
+/// Log at the `WARN` level.
+#[macro_export]
macro_rules! log_warn {
($logger: expr, $($arg:tt)*) => (
log_given_level!($logger, $crate::util::logger::Level::Warn, $($arg)*);
)
}
+/// Log at the `INFO` level.
+#[macro_export]
macro_rules! log_info {
($logger: expr, $($arg:tt)*) => (
log_given_level!($logger, $crate::util::logger::Level::Info, $($arg)*);
)
}
+/// Log at the `DEBUG` level.
+#[macro_export]
macro_rules! log_debug {
($logger: expr, $($arg:tt)*) => (
log_given_level!($logger, $crate::util::logger::Level::Debug, $($arg)*);
)
}
-/// Log a trace log.
+/// Log at the `TRACE` level.
#[macro_export]
macro_rules! log_trace {
($logger: expr, $($arg:tt)*) => (
)
}
-/// Log a gossip log.
+/// Log at the `GOSSIP` level.
+#[macro_export]
macro_rules! log_gossip {
($logger: expr, $($arg:tt)*) => (
log_given_level!($logger, $crate::util::logger::Level::Gossip, $($arg)*);