Limit OnionMessenger outbound buffer size
authorValentine Wallace <vwallace@protonmail.com>
Thu, 1 Sep 2022 19:26:17 +0000 (15:26 -0400)
committerValentine Wallace <vwallace@protonmail.com>
Fri, 2 Sep 2022 20:35:35 +0000 (16:35 -0400)
Drop OMs if they push us over the max OnionMessenger outbound buffer size

lightning/src/onion_message/functional_tests.rs
lightning/src/onion_message/messenger.rs

index b18431a66dd7755d6e7c1aabf1b3e98f0318f9a7..5fec5be51c4f331a6c899a7f1fadef55bc9048c4 100644 (file)
@@ -170,3 +170,13 @@ fn reply_path() {
                "lightning::onion_message::messenger".to_string(),
                format!("Received an onion message with path_id: None and reply_path").to_string(), 2);
 }
+
+#[test]
+fn peer_buffer_full() {
+       let nodes = create_nodes(2);
+       for _ in 0..188 { // Based on MAX_PER_PEER_BUFFER_SIZE in OnionMessenger
+               nodes[0].messenger.send_onion_message(&[], Destination::Node(nodes[1].get_node_pk()), None).unwrap();
+       }
+       let err = nodes[0].messenger.send_onion_message(&[], Destination::Node(nodes[1].get_node_pk()), None).unwrap_err();
+       assert_eq!(err, SendError::BufferFull);
+}
index a3320f10d21091a0faa3131f3125384cc51b871f..e38a0d10f6aef42a840c9781beaa2bc540775133 100644 (file)
@@ -23,6 +23,7 @@ use super::packet::{BIG_PACKET_HOP_DATA_LEN, ForwardControlTlvs, Packet, Payload
 use super::utils;
 use util::events::OnionMessageProvider;
 use util::logger::Logger;
+use util::ser::Writeable;
 
 use core::ops::Deref;
 use sync::{Arc, Mutex};
@@ -124,6 +125,8 @@ pub enum SendError {
        TooFewBlindedHops,
        /// Our next-hop peer was offline or does not support onion message forwarding.
        InvalidFirstHop,
+       /// Our next-hop peer's buffer was full or our total outbound buffer was full.
+       BufferFull,
 }
 
 impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
@@ -171,6 +174,7 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
                        packet_payloads, packet_keys, prng_seed).map_err(|()| SendError::TooBigPacket)?;
 
                let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap();
+               if outbound_buffer_full(&introduction_node_id, &pending_per_peer_msgs) { return Err(SendError::BufferFull) }
                match pending_per_peer_msgs.entry(introduction_node_id) {
                        hash_map::Entry::Vacant(_) => Err(SendError::InvalidFirstHop),
                        hash_map::Entry::Occupied(mut e) => {
@@ -193,6 +197,29 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessenger<Signer, K, L>
        }
 }
 
+fn outbound_buffer_full(peer_node_id: &PublicKey, buffer: &HashMap<PublicKey, VecDeque<msgs::OnionMessage>>) -> bool {
+       const MAX_TOTAL_BUFFER_SIZE: usize = (1 << 20) * 128;
+       const MAX_PER_PEER_BUFFER_SIZE: usize = (1 << 10) * 256;
+       let mut total_buffered_bytes = 0;
+       let mut peer_buffered_bytes = 0;
+       for (pk, peer_buf) in buffer {
+               for om in peer_buf {
+                       let om_len = om.serialized_length();
+                       if pk == peer_node_id {
+                               peer_buffered_bytes += om_len;
+                       }
+                       total_buffered_bytes += om_len;
+
+                       if total_buffered_bytes >= MAX_TOTAL_BUFFER_SIZE ||
+                               peer_buffered_bytes >= MAX_PER_PEER_BUFFER_SIZE
+                       {
+                               return true
+                       }
+               }
+       }
+       false
+}
+
 impl<Signer: Sign, K: Deref, L: Deref> OnionMessageHandler for OnionMessenger<Signer, K, L>
        where K::Target: KeysInterface<Signer = Signer>,
              L::Target: Logger,
@@ -279,6 +306,10 @@ impl<Signer: Sign, K: Deref, L: Deref> OnionMessageHandler for OnionMessenger<Si
                                };
 
                                let mut pending_per_peer_msgs = self.pending_messages.lock().unwrap();
+                               if outbound_buffer_full(&next_node_id, &pending_per_peer_msgs) {
+                                       log_trace!(self.logger, "Dropping forwarded onion message to peer {:?}: outbound buffer full", next_node_id);
+                                       return
+                               }
 
                                #[cfg(fuzzing)]
                                pending_per_peer_msgs.entry(next_node_id).or_insert_with(VecDeque::new);