"lightning::onion_message::messenger".to_string(),
format!("Received an onion message with path_id: None and reply_path").to_string(), 2);
}
+
+#[test]
+fn peer_buffer_full() {
+ let nodes = create_nodes(2);
+ for _ in 0..188 { // Based on MAX_PER_PEER_BUFFER_SIZE in OnionMessenger
+ nodes[0].messenger.send_onion_message(&[], Destination::Node(nodes[1].get_node_pk()), None).unwrap();
+ }
+ let err = nodes[0].messenger.send_onion_message(&[], Destination::Node(nodes[1].get_node_pk()), None).unwrap_err();
+ assert_eq!(err, SendError::BufferFull);
+}
use super::utils;
use util::events::OnionMessageProvider;
use util::logger::Logger;
+use util::ser::Writeable;
use core::ops::Deref;
use sync::{Arc, Mutex};
TooFewBlindedHops,
/// Our next-hop peer was offline or does not support onion message forwarding.
InvalidFirstHop,
+ /// Our next-hop peer's buffer was full or our total outbound buffer was full.
+ BufferFull,
}
impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
packet_payloads, packet_keys, prng_seed).map_err(|()| SendError::TooBigPacket)?;
let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap();
+ if outbound_buffer_full(&introduction_node_id, &pending_per_peer_msgs) { return Err(SendError::BufferFull) }
match pending_per_peer_msgs.entry(introduction_node_id) {
hash_map::Entry::Vacant(_) => Err(SendError::InvalidFirstHop),
hash_map::Entry::Occupied(mut e) => {
}
}
+fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap<PublicKey, VecDeque<msgs::OnionMessage>>) -> bool {
+ const MAX_TOTAL_BUFFER_SIZE: usize = (1 << 20) * 128;
+ const MAX_PER_PEER_BUFFER_SIZE: usize = (1 << 10) * 256;
+ let mut total_buffered_bytes = 0;
+ let mut peer_buffered_bytes = 0;
+ for (pk, peer_buf) in buffer {
+ for om in peer_buf {
+ let om_len = om.serialized_length();
+ if pk == peer_node_id {
+ peer_buffered_bytes += om_len;
+ }
+ total_buffered_bytes += om_len;
+
+ if total_buffered_bytes >= MAX_TOTAL_BUFFER_SIZE ||
+ peer_buffered_bytes >= MAX_PER_PEER_BUFFER_SIZE
+ {
+ return true
+ }
+ }
+ }
+ false
+}
+
impl<Signer: Sign, K: Deref, L: Deref> OnionMessageHandler for OnionMessenger<Signer, K, L>
where K::Target: KeysInterface<Signer = Signer>,
L::Target: Logger,
};
let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap();
+ if outbound_buffer_full(&next_node_id, &pending_per_peer_msgs) {
+ log_trace!(self.logger, "Dropping forwarded onion message to peer {:?}: outbound buffer full", next_node_id);
+ return
+ }
#[cfg(fuzzing)]
pending_per_peer_msgs.entry(next_node_id).or_insert_with(VecDeque::new);