From 02b187856bc7f1d7edd46897e2f983a65a97230c Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 22 Jan 2023 05:12:45 +0000 Subject: [PATCH] Allow `RoutingMessageHandler` to signal backpressure Now that we allow `handle_channel_announcement` to (indirectly) spawn async tasks which will complete later, we have to ensure it can apply backpressure all the way up to the TCP socket to ensure we don't end up with too many buffers allocated for UTXO validation. We do this by adding a new method to `RoutingMessageHandler` which allows it to signal if there are "many" checks pending and `channel_announcement` messages should be delayed. The actual `PeerManager` implementation thereof is done in the next commit. --- lightning-net-tokio/src/lib.rs | 1 + lightning/src/ln/msgs.rs | 7 +++++ lightning/src/ln/peer_handler.rs | 1 + lightning/src/routing/gossip.rs | 4 +++ lightning/src/routing/utxo.rs | 46 ++++++++++++++++++++++++++++++++ lightning/src/util/test_utils.rs | 2 ++ 6 files changed, 61 insertions(+) diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 9b480fb0a..9616e0db6 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -623,6 +623,7 @@ mod tests { fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) } fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() } fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::empty() } + fn processing_queue_high(&self) -> bool { false } } impl ChannelMessageHandler for MsgHandler { fn handle_open_channel(&self, _their_node_id: &PublicKey, _msg: &OpenChannel) {} diff --git a/lightning/src/ln/msgs.rs b/lightning/src/ln/msgs.rs index c971df906..e8b40c71d 100644 --- a/lightning/src/ln/msgs.rs +++ b/lightning/src/ln/msgs.rs @@ -1082,6 +1082,13 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider { /// list of `short_channel_id`s. fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError>; + // Handler queueing status: + /// Indicates that there are a large number of [`ChannelAnnouncement`] (or other) messages + /// pending some async action. While there is no guarantee of the rate of future messages, the + /// caller should seek to reduce the rate of new gossip messages handled, especially + /// [`ChannelAnnouncement`]s. + fn processing_queue_high(&self) -> bool; + // Handler information: /// Gets the node feature flags which this handler itself supports. All available handlers are /// queried similarly and their feature flags are OR'd together to form the [`NodeFeatures`] diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index 4cbe2a1bf..11f120633 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -81,6 +81,7 @@ impl RoutingMessageHandler for IgnoringMessageHandler { fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures { InitFeatures::empty() } + fn processing_queue_high(&self) -> bool { false } } impl OnionMessageProvider for IgnoringMessageHandler { fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option { None } diff --git a/lightning/src/routing/gossip.rs b/lightning/src/routing/gossip.rs index 2d3cda3f4..e2c59b4f4 100644 --- a/lightning/src/routing/gossip.rs +++ b/lightning/src/routing/gossip.rs @@ -657,6 +657,10 @@ where U::Target: UtxoLookup, L::Target: Logger features.set_gossip_queries_optional(); features } + + fn processing_queue_high(&self) -> bool { + self.network_graph.pending_checks.too_many_checks_pending() + } } impl>, U: Deref, L: Deref> MessageSendEventsProvider for P2PGossipSync diff --git a/lightning/src/routing/utxo.rs b/lightning/src/routing/utxo.rs index eb8e2a7c6..da9c65c3b 100644 --- a/lightning/src/routing/utxo.rs +++ b/lightning/src/routing/utxo.rs @@ -148,6 +148,13 @@ impl UtxoFuture { /// /// This is identical to calling [`UtxoFuture::resolve`] with a dummy `gossip`, disabling /// forwarding the validated gossip message onwards to peers. + /// + /// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order + /// to allow us to interact with peers again, you should call [`PeerManager::process_events`] + /// after this. + /// + /// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high + /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events pub fn resolve_without_forwarding(&self, graph: &NetworkGraph, result: Result) where L::Target: Logger { @@ -158,6 +165,13 @@ impl UtxoFuture { /// /// The given `gossip` is used to broadcast any validated messages onwards to all peers which /// have available buffer space. + /// + /// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order + /// to allow us to interact with peers again, you should call [`PeerManager::process_events`] + /// after this. + /// + /// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high + /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events pub fn resolve>, U: Deref, GS: Deref>>(&self, graph: &NetworkGraph, gossip: GS, result: Result ) where L::Target: Logger, U::Target: UtxoLookup { @@ -510,4 +524,36 @@ impl PendingChecks { } } } + + /// The maximum number of pending gossip checks before [`Self::too_many_checks_pending`] + /// returns `true`. Note that this isn't a strict upper-bound on the number of checks pending - + /// each peer may, at a minimum, read one more socket buffer worth of `channel_announcement`s + /// which we'll have to process. With a socket buffer of 4KB and a minimum + /// `channel_announcement` size of, roughly, 429 bytes, this may leave us with `10*our peer + /// count` messages to process beyond this limit. Because we'll probably have a few peers, + /// there's no reason for this constant to be materially less than 30 or so, and 32 in-flight + /// checks should be more than enough for decent parallelism. + const MAX_PENDING_LOOKUPS: usize = 32; + + /// Returns true if there are a large number of async checks pending and future + /// `channel_announcement` messages should be delayed. Note that this is only a hint and + /// messages already in-flight may still have to be handled for various reasons. + pub(super) fn too_many_checks_pending(&self) -> bool { + let mut pending_checks = self.internal.lock().unwrap(); + if pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS { + // If we have many channel checks pending, ensure we don't have any dangling checks + // (i.e. checks where the user told us they'd call back but drop'd the `AccessFuture` + // instead) before we commit to applying backpressure. + pending_checks.channels.retain(|_, chan| { + Weak::upgrade(&chan).is_some() + }); + pending_checks.nodes.retain(|_, channels| { + channels.retain(|chan| Weak::upgrade(&chan).is_some()); + !channels.is_empty() + }); + pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS + } else { + false + } + } } diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index b62479555..b47aef6f7 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -571,6 +571,8 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler { features.set_gossip_queries_optional(); features } + + fn processing_queue_high(&self) -> bool { false } } impl events::MessageSendEventsProvider for TestRoutingMessageHandler { -- 2.39.5