pub mod chain;
pub mod ln;
pub mod routing;
-#[cfg(fuzzing)]
pub mod onion_message;
-#[cfg(not(fuzzing))]
-#[allow(unused)]
-mod onion_message; // To be exposed after sending/receiving OMs is supported in PeerManager.
#[cfg(feature = "std")]
/// Re-export of either `core2::io` or `std::io`, depending on the `std` feature flag.
define_feature!(27, ShutdownAnySegwit, [InitContext, NodeContext],
"Feature flags for `opt_shutdown_anysegwit`.", set_shutdown_any_segwit_optional,
set_shutdown_any_segwit_required, supports_shutdown_anysegwit, requires_shutdown_anysegwit);
+ // We do not yet advertise the onion messages feature bit, but we need to detect when peers
+ // support it.
+ define_feature!(39, OnionMessages, [InitContext, NodeContext],
+ "Feature flags for `option_onion_messages`.", set_onion_messages_optional,
+ set_onion_messages_required, supports_onion_messages, requires_onion_messages);
define_feature!(45, ChannelType, [InitContext, NodeContext],
"Feature flags for `option_channel_type`.", set_channel_type_optional,
set_channel_type_required, supports_channel_type, requires_channel_type);
impl<T: sealed::InitialRoutingSync> Features<T> {
// We are no longer setting initial_routing_sync now that gossip_queries
- // is enabled. This feature is ignored by a peer when gossip_queries has
+ // is enabled. This feature is ignored by a peer when gossip_queries has
// been negotiated.
#[cfg(test)]
pub(crate) fn clear_initial_routing_sync(&mut self) {
pub trait OnionMessageHandler : OnionMessageProvider {
/// Handle an incoming onion_message message from the given peer.
fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage);
+ /// Called when a connection is established with a peer. Can be used to track which peers
+ /// advertise onion message support and are online.
+ fn peer_connected(&self, their_node_id: &PublicKey, init: &Init);
+ /// Indicates a connection to the peer failed/an existing connection was lost. Allows handlers to
+ /// drop and refuse to forward onion messages to this peer.
+ fn peer_disconnected(&self, their_node_id: &PublicKey, no_connection_possible: bool);
}
mod fuzzy_internal_msgs {
}
impl OnionMessageHandler for IgnoringMessageHandler {
fn handle_onion_message(&self, _their_node_id: &PublicKey, _msg: &msgs::OnionMessage) {}
+ fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init) {}
+ fn peer_disconnected(&self, _their_node_id: &PublicKey, _no_connection_possible: bool) {}
}
impl Deref for IgnoringMessageHandler {
type Target = IgnoringMessageHandler;
}
self.message_handler.route_handler.peer_connected(&their_node_id, &msg);
-
self.message_handler.chan_handler.peer_connected(&their_node_id, &msg);
+ self.message_handler.onion_message_handler.peer_connected(&their_node_id, &msg);
+
peer_lock.their_features = Some(msg.features);
return Ok(None);
} else if peer_lock.their_features.is_none() {
}
descriptor.disconnect_socket();
self.message_handler.chan_handler.peer_disconnected(&node_id, false);
+ self.message_handler.onion_message_handler.peer_disconnected(&node_id, false);
}
}
}
log_pubkey!(node_id), if no_connection_possible { "no " } else { "" });
self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible);
+ self.message_handler.onion_message_handler.peer_disconnected(&node_id, no_connection_possible);
}
}
};
log_trace!(self.logger, "Disconnecting peer with id {} due to client request", node_id);
peers_lock.remove(&descriptor);
self.message_handler.chan_handler.peer_disconnected(&node_id, no_connection_possible);
+ self.message_handler.onion_message_handler.peer_disconnected(&node_id, no_connection_possible);
descriptor.disconnect_socket();
}
}
if let Some(node_id) = peer.lock().unwrap().their_node_id {
log_trace!(self.logger, "Disconnecting peer with id {} due to client request to disconnect all peers", node_id);
self.message_handler.chan_handler.peer_disconnected(&node_id, false);
+ self.message_handler.onion_message_handler.peer_disconnected(&node_id, false);
}
descriptor.disconnect_socket();
}
log_trace!(self.logger, "Disconnecting peer with id {} due to ping timeout", node_id);
self.node_id_to_descriptor.lock().unwrap().remove(&node_id);
self.message_handler.chan_handler.peer_disconnected(&node_id, false);
+ self.message_handler.onion_message_handler.peer_disconnected(&node_id, false);
}
}
}
//! Onion message testing and test utilities live here.
use chain::keysinterface::{KeysInterface, Recipient};
-use ln::msgs::OnionMessageHandler;
+use ln::features::InitFeatures;
+use ln::msgs::{self, OnionMessageHandler};
use super::{BlindedRoute, Destination, OnionMessenger, SendError};
use util::enforcing_trait_impls::EnforcingSigner;
use util::test_utils;
use bitcoin::network::constants::Network;
-use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
+use bitcoin::secp256k1::{PublicKey, Secp256k1};
use sync::Arc;
}
fn create_nodes(num_messengers: u8) -> Vec<MessengerNode> {
- let mut res = Vec::new();
+ let mut nodes = Vec::new();
for i in 0..num_messengers {
let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
let seed = [i as u8; 32];
let keys_manager = Arc::new(test_utils::TestKeysInterface::new(&seed, Network::Testnet));
- res.push(MessengerNode {
+ nodes.push(MessengerNode {
keys_manager: keys_manager.clone(),
messenger: OnionMessenger::new(keys_manager, logger.clone()),
logger,
});
}
- res
+ for idx in 0..num_messengers - 1 {
+ let i = idx as usize;
+ let mut features = InitFeatures::known();
+ features.set_onion_messages_optional();
+ let init_msg = msgs::Init { features, remote_network_address: None };
+ nodes[i].messenger.peer_connected(&nodes[i + 1].get_node_pk(), &init_msg.clone());
+ nodes[i + 1].messenger.peer_connected(&nodes[i].get_node_pk(), &init_msg.clone());
+ }
+ nodes
}
fn pass_along_path(path: &Vec<MessengerNode>, expected_path_id: Option<[u8; 32]>) {
let num_nodes = path.len();
for (idx, node) in path.into_iter().skip(1).enumerate() {
let events = prev_node.messenger.release_pending_msgs();
- assert_eq!(events.len(), 1);
let onion_msg = {
let msgs = events.get(&node.get_node_pk()).unwrap();
assert_eq!(msgs.len(), 1);
#[test]
fn too_big_packet_error() {
// Make sure we error as expected if a packet is too big to send.
- let nodes = create_nodes(1);
-
- let hop_secret = SecretKey::from_slice(&hex::decode("0101010101010101010101010101010101010101010101010101010101010101").unwrap()[..]).unwrap();
- let secp_ctx = Secp256k1::new();
- let hop_node_id = PublicKey::from_secret_key(&secp_ctx, &hop_secret);
+ let nodes = create_nodes(2);
+ let hop_node_id = nodes[1].get_node_pk();
let hops = [hop_node_id; 400];
let err = nodes[0].messenger.send_onion_message(&hops, Destination::Node(hop_node_id), None).unwrap_err();
assert_eq!(err, SendError::TooBigPacket);
#[test]
fn invalid_blinded_route_error() {
// Make sure we error as expected if a provided blinded route has 0 or 1 hops.
- let mut nodes = create_nodes(3);
+ let nodes = create_nodes(3);
// 0 hops
let secp_ctx = Secp256k1::new();
#[test]
fn reply_path() {
- let mut nodes = create_nodes(4);
+ let nodes = create_nodes(4);
let secp_ctx = Secp256k1::new();
// Destination::Node
"lightning::onion_message::messenger".to_string(),
format!("Received an onion message with path_id: None and reply_path").to_string(), 2);
}
+
+#[test]
+fn peer_buffer_full() {
+ let nodes = create_nodes(2);
+ for _ in 0..188 { // Based on MAX_PER_PEER_BUFFER_SIZE in OnionMessenger
+ nodes[0].messenger.send_onion_message(&[], Destination::Node(nodes[1].get_node_pk()), None).unwrap();
+ }
+ let err = nodes[0].messenger.send_onion_message(&[], Destination::Node(nodes[1].get_node_pk()), None).unwrap_err();
+ assert_eq!(err, SendError::BufferFull);
+}
use super::utils;
use util::events::OnionMessageProvider;
use util::logger::Logger;
+use util::ser::Writeable;
-use core::mem;
use core::ops::Deref;
use sync::{Arc, Mutex};
use prelude::*;
///
/// # Example
///
-// Needs to be `ignore` until the `onion_message` module is made public, otherwise this is a test
-// failure.
-/// ```ignore
+/// ```
/// # extern crate bitcoin;
/// # use bitcoin::hashes::_export::_core::time::Duration;
/// # use bitcoin::secp256k1::{PublicKey, Secp256k1, SecretKey};
///
/// // Send an empty onion message to a node id.
/// let intermediate_hops = [hop_node_id1, hop_node_id2];
-/// onion_messenger.send_onion_message(&intermediate_hops, Destination::Node(destination_node_id));
+/// let reply_path = None;
+/// onion_messenger.send_onion_message(&intermediate_hops, Destination::Node(destination_node_id), reply_path);
///
/// // Create a blinded route to yourself, for someone to send an onion message to.
/// # let your_node_id = hop_node_id1;
///
/// // Send an empty onion message to a blinded route.
/// # let intermediate_hops = [hop_node_id1, hop_node_id2];
-/// onion_messenger.send_onion_message(&intermediate_hops, Destination::BlindedRoute(blinded_route));
+/// let reply_path = None;
+/// onion_messenger.send_onion_message(&intermediate_hops, Destination::BlindedRoute(blinded_route), reply_path);
/// ```
///
/// [offers]: <https://github.com/lightning/bolts/pull/798>
/// The provided [`Destination`] was an invalid [`BlindedRoute`], due to having fewer than two
/// blinded hops.
TooFewBlindedHops,
+ /// Our next-hop peer was offline or does not support onion message forwarding.
+ InvalidFirstHop,
+ /// Our next-hop peer's buffer was full or our total outbound buffer was full.
+ BufferFull,
}
impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
.map_err(|e| SendError::Secp256k1(e))?;
let prng_seed = self.keys_manager.get_secure_random_bytes();
- let onion_packet = construct_onion_message_packet(
+ let onion_routing_packet = construct_onion_message_packet(
packet_payloads, packet_keys, prng_seed).map_err(|()| SendError::TooBigPacket)?;
let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap();
- let pending_msgs = pending_per_peer_msgs.entry(introduction_node_id).or_insert_with(VecDeque::new);
- pending_msgs.push_back(
- msgs::OnionMessage {
- blinding_point,
- onion_routing_packet: onion_packet,
+ if outbound_buffer_full(&introduction_node_id, &pending_per_peer_msgs) { return Err(SendError::BufferFull) }
+ match pending_per_peer_msgs.entry(introduction_node_id) {
+ hash_map::Entry::Vacant(_) => Err(SendError::InvalidFirstHop),
+ hash_map::Entry::Occupied(mut e) => {
+ e.get_mut().push_back(msgs::OnionMessage { blinding_point, onion_routing_packet });
+ Ok(())
}
- );
- Ok(())
+ }
}
#[cfg(test)]
pub(super) fn release_pending_msgs(&self) -> HashMap<PublicKey, VecDeque<msgs::OnionMessage>> {
let mut pending_msgs = self.pending_messages.lock().unwrap();
let mut msgs = HashMap::new();
- core::mem::swap(&mut *pending_msgs, &mut msgs);
+ // We don't want to disconnect the peers by removing them entirely from the original map, so we
+ // swap the pending message buffers individually.
+ for (peer_node_id, pending_messages) in &mut *pending_msgs {
+ msgs.insert(*peer_node_id, core::mem::take(pending_messages));
+ }
msgs
}
}
+fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap<PublicKey, VecDeque<msgs::OnionMessage>>) -> bool {
+ const MAX_TOTAL_BUFFER_SIZE: usize = (1 << 20) * 128;
+ const MAX_PER_PEER_BUFFER_SIZE: usize = (1 << 10) * 256;
+ let mut total_buffered_bytes = 0;
+ let mut peer_buffered_bytes = 0;
+ for (pk, peer_buf) in buffer {
+ for om in peer_buf {
+ let om_len = om.serialized_length();
+ if pk == peer_node_id {
+ peer_buffered_bytes += om_len;
+ }
+ total_buffered_bytes += om_len;
+
+ if total_buffered_bytes >= MAX_TOTAL_BUFFER_SIZE ||
+ peer_buffered_bytes >= MAX_PER_PEER_BUFFER_SIZE
+ {
+ return true
+ }
+ }
+ }
+ false
+}
+
impl<Signer: Sign, K: Deref, L: Deref> OnionMessageHandler for OnionMessenger<Signer, K, L>
where K::Target: KeysInterface<Signer = Signer>,
L::Target: Logger,
hop_data: new_packet_bytes,
hmac: next_hop_hmac,
};
-
- let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap();
- let pending_msgs = pending_per_peer_msgs.entry(next_node_id).or_insert_with(VecDeque::new);
- pending_msgs.push_back(
- msgs::OnionMessage {
- blinding_point: match next_blinding_override {
- Some(blinding_point) => blinding_point,
- None => {
- let blinding_factor = {
- let mut sha = Sha256::engine();
- sha.input(&msg.blinding_point.serialize()[..]);
- sha.input(control_tlvs_ss.as_ref());
- Sha256::from_engine(sha).into_inner()
- };
- let next_blinding_point = msg.blinding_point;
- match next_blinding_point.mul_tweak(&self.secp_ctx, &Scalar::from_be_bytes(blinding_factor).unwrap()) {
- Ok(bp) => bp,
- Err(e) => {
- log_trace!(self.logger, "Failed to compute next blinding point: {}", e);
- return
- }
+ let onion_message = msgs::OnionMessage {
+ blinding_point: match next_blinding_override {
+ Some(blinding_point) => blinding_point,
+ None => {
+ let blinding_factor = {
+ let mut sha = Sha256::engine();
+ sha.input(&msg.blinding_point.serialize()[..]);
+ sha.input(control_tlvs_ss.as_ref());
+ Sha256::from_engine(sha).into_inner()
+ };
+ let next_blinding_point = msg.blinding_point;
+ match next_blinding_point.mul_tweak(&self.secp_ctx, &Scalar::from_be_bytes(blinding_factor).unwrap()) {
+ Ok(bp) => bp,
+ Err(e) => {
+ log_trace!(self.logger, "Failed to compute next blinding point: {}", e);
+ return
}
- },
+ }
},
- onion_routing_packet: outgoing_packet,
},
- );
- log_trace!(self.logger, "Forwarding an onion message to peer {}", next_node_id);
+ onion_routing_packet: outgoing_packet,
+ };
+
+ let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap();
+ if outbound_buffer_full(&next_node_id, &pending_per_peer_msgs) {
+ log_trace!(self.logger, "Dropping forwarded onion message to peer {:?}: outbound buffer full", next_node_id);
+ return
+ }
+
+ #[cfg(fuzzing)]
+ pending_per_peer_msgs.entry(next_node_id).or_insert_with(VecDeque::new);
+
+ match pending_per_peer_msgs.entry(next_node_id) {
+ hash_map::Entry::Vacant(_) => {
+ log_trace!(self.logger, "Dropping forwarded onion message to disconnected peer {:?}", next_node_id);
+ return
+ },
+ hash_map::Entry::Occupied(mut e) => {
+ e.get_mut().push_back(onion_message);
+ log_trace!(self.logger, "Forwarding an onion message to peer {}", next_node_id);
+ }
+ };
},
Err(e) => {
log_trace!(self.logger, "Errored decoding onion message packet: {:?}", e);
},
};
}
+
+ fn peer_connected(&self, their_node_id: &PublicKey, init: &msgs::Init) {
+ if init.features.supports_onion_messages() {
+ let mut peers = self.pending_messages.lock().unwrap();
+ peers.insert(their_node_id.clone(), VecDeque::new());
+ }
+ }
+
+ fn peer_disconnected(&self, their_node_id: &PublicKey, _no_connection_possible: bool) {
+ let mut pending_msgs = self.pending_messages.lock().unwrap();
+ pending_msgs.remove(their_node_id);
+ }
}
impl<Signer: Sign, K: Deref, L: Deref> OnionMessageProvider for OnionMessenger<Signer, K, L>
while read_idx < hop_data_len {
let mut read_buffer = [0; READ_BUFFER_SIZE];
let read_amt = cmp::min(hop_data_len - read_idx, READ_BUFFER_SIZE);
- r.read_exact(&mut read_buffer[..read_amt]);
+ r.read_exact(&mut read_buffer[..read_amt])?;
hop_data.extend_from_slice(&read_buffer[..read_amt]);
read_idx += read_amt;
}
// Uses the provided secret to simultaneously decode and decrypt the control TLVs.
impl ReadableArgs<SharedSecret> for Payload {
- fn read<R: Read>(mut r: &mut R, encrypted_tlvs_ss: SharedSecret) -> Result<Self, DecodeError> {
+ fn read<R: Read>(r: &mut R, encrypted_tlvs_ss: SharedSecret) -> Result<Self, DecodeError> {
let v: BigSize = Readable::read(r)?;
let mut rd = FixedLengthReader::new(r, v.0);
let mut reply_path: Option<BlindedRoute> = None;