Allow `RoutingMessageHandler` to signal backpressure
authorMatt Corallo <git@bluematt.me>
Sun, 22 Jan 2023 05:12:45 +0000 (05:12 +0000)
committerMatt Corallo <git@bluematt.me>
Thu, 9 Feb 2023 15:40:43 +0000 (15:40 +0000)
Now that we allow `handle_channel_announcement` to (indirectly)
spawn async tasks which will complete later, we have to ensure it
can apply backpressure all the way up to the TCP socket to ensure
we don't end up with too many buffers allocated for UTXO
validation.

We do this by adding a new method to `RoutingMessageHandler` which
allows it to signal if there are "many" checks pending and
`channel_announcement` messages should be delayed. The actual
`PeerManager` implementation thereof is done in the next commit.

lightning-net-tokio/src/lib.rs
lightning/src/ln/msgs.rs
lightning/src/ln/peer_handler.rs
lightning/src/routing/gossip.rs
lightning/src/routing/utxo.rs
lightning/src/util/test_utils.rs

index 9b480fb0ae48ebe493c7da20fe1aeed0cb6cf5a0..9616e0db61d437285361ac6c33c36b664a60eed2 100644 (file)
@@ -623,6 +623,7 @@ mod tests {
                fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) }
                fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() }
                fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::empty() }
+               fn processing_queue_high(&self) -> bool { false }
        }
        impl ChannelMessageHandler for MsgHandler {
                fn handle_open_channel(&self, _their_node_id: &PublicKey, _msg: &OpenChannel) {}
index c971df906c281fc96fd852c483def81367618ee2..e8b40c71d89e7f1825335bbe4a0abb0ed817970d 100644 (file)
@@ -1082,6 +1082,13 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider {
        /// list of `short_channel_id`s.
        fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError>;
 
+       // Handler queueing status:
+       /// Indicates that there are a large number of [`ChannelAnnouncement`] (or other) messages
+       /// pending some async action. While there is no guarantee of the rate of future messages, the
+       /// caller should seek to reduce the rate of new gossip messages handled, especially
+       /// [`ChannelAnnouncement`]s.
+       fn processing_queue_high(&self) -> bool;
+
        // Handler information:
        /// Gets the node feature flags which this handler itself supports. All available handlers are
        /// queried similarly and their feature flags are OR'd together to form the [`NodeFeatures`]
index 4cbe2a1bf2b0a4a93e2fd8a1d110ef418cf454ef..11f1206339dde3580f4346ccf3b735c2c44e551b 100644 (file)
@@ -81,6 +81,7 @@ impl RoutingMessageHandler for IgnoringMessageHandler {
        fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures {
                InitFeatures::empty()
        }
+       fn processing_queue_high(&self) -> bool { false }
 }
 impl OnionMessageProvider for IgnoringMessageHandler {
        fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option<msgs::OnionMessage> { None }
index 2d3cda3f47396abf79dc28e3e641fef760121f6a..e2c59b4f4340ceba68b1a62e881af214598074f3 100644 (file)
@@ -657,6 +657,10 @@ where U::Target: UtxoLookup, L::Target: Logger
                features.set_gossip_queries_optional();
                features
        }
+
+       fn processing_queue_high(&self) -> bool {
+               self.network_graph.pending_checks.too_many_checks_pending()
+       }
 }
 
 impl<G: Deref<Target=NetworkGraph<L>>, U: Deref, L: Deref> MessageSendEventsProvider for P2PGossipSync<G, U, L>
index eb8e2a7c620aa635167562807d205faba1ec7286..da9c65c3b12a605c385fd566fe4cf0a505df4b35 100644 (file)
@@ -148,6 +148,13 @@ impl UtxoFuture {
        ///
        /// This is identical to calling [`UtxoFuture::resolve`] with a dummy `gossip`, disabling
        /// forwarding the validated gossip message onwards to peers.
+       ///
+       /// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order
+       /// to allow us to interact with peers again, you should call [`PeerManager::process_events`]
+       /// after this.
+       ///
+       /// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high
+       /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
        pub fn resolve_without_forwarding<L: Deref>(&self,
                graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
        where L::Target: Logger {
@@ -158,6 +165,13 @@ impl UtxoFuture {
        ///
        /// The given `gossip` is used to broadcast any validated messages onwards to all peers which
        /// have available buffer space.
+       ///
+       /// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order
+       /// to allow us to interact with peers again, you should call [`PeerManager::process_events`]
+       /// after this.
+       ///
+       /// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high
+       /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
        pub fn resolve<L: Deref, G: Deref<Target=NetworkGraph<L>>, U: Deref, GS: Deref<Target = P2PGossipSync<G, U, L>>>(&self,
                graph: &NetworkGraph<L>, gossip: GS, result: Result<TxOut, UtxoLookupError>
        ) where L::Target: Logger, U::Target: UtxoLookup {
@@ -510,4 +524,36 @@ impl PendingChecks {
                        }
                }
        }
+
+       /// The maximum number of pending gossip checks before [`Self::too_many_checks_pending`]
+       /// returns `true`. Note that this isn't a strict upper-bound on the number of checks pending -
+       /// each peer may, at a minimum, read one more socket buffer worth of `channel_announcement`s
+       /// which we'll have to process. With a socket buffer of 4KB and a minimum
+       /// `channel_announcement` size of, roughly, 429 bytes, this may leave us with `10*our peer
+       /// count` messages to process beyond this limit. Because we'll probably have a few peers,
+       /// there's no reason for this constant to be materially less than 30 or so, and 32 in-flight
+       /// checks should be more than enough for decent parallelism.
+       const MAX_PENDING_LOOKUPS: usize = 32;
+
+       /// Returns true if there are a large number of async checks pending and future
+       /// `channel_announcement` messages should be delayed. Note that this is only a hint and
+       /// messages already in-flight may still have to be handled for various reasons.
+       pub(super) fn too_many_checks_pending(&self) -> bool {
+               let mut pending_checks = self.internal.lock().unwrap();
+               if pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS {
+                       // If we have many channel checks pending, ensure we don't have any dangling checks
+                       // (i.e. checks where the user told us they'd call back but drop'd the `AccessFuture`
+                       // instead) before we commit to applying backpressure.
+                       pending_checks.channels.retain(|_, chan| {
+                               Weak::upgrade(&chan).is_some()
+                       });
+                       pending_checks.nodes.retain(|_, channels| {
+                               channels.retain(|chan| Weak::upgrade(&chan).is_some());
+                               !channels.is_empty()
+                       });
+                       pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS
+               } else {
+                       false
+               }
+       }
 }
index b62479555ccea4fd4064b175f4e24974dc696372..b47aef6f7e89974eebe79b37ed9531a4905757ff 100644 (file)
@@ -571,6 +571,8 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
                features.set_gossip_queries_optional();
                features
        }
+
+       fn processing_queue_high(&self) -> bool { false }
 }
 
 impl events::MessageSendEventsProvider for TestRoutingMessageHandler {