Merge pull request #1980 from TheBlueMatt/2023-01-async-utxo-lookups
[rust-lightning] / lightning / src / routing / utxo.rs
index eb8e2a7c620aa635167562807d205faba1ec7286..020993f23fb809c250084afe78ec9fad19fbbaad 100644 (file)
@@ -42,6 +42,7 @@ pub enum UtxoLookupError {
 /// The result of a [`UtxoLookup::get_utxo`] call. A call may resolve either synchronously,
 /// returning the `Sync` variant, or asynchronously, returning an [`UtxoFuture`] in the `Async`
 /// variant.
+#[derive(Clone)]
 pub enum UtxoResult {
        /// A result which was resolved synchronously. It either includes a [`TxOut`] for the output
        /// requested or a [`UtxoLookupError`].
@@ -148,6 +149,13 @@ impl UtxoFuture {
        ///
        /// This is identical to calling [`UtxoFuture::resolve`] with a dummy `gossip`, disabling
        /// forwarding the validated gossip message onwards to peers.
+       ///
+       /// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order
+       /// to allow us to interact with peers again, you should call [`PeerManager::process_events`]
+       /// after this.
+       ///
+       /// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high
+       /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
        pub fn resolve_without_forwarding<L: Deref>(&self,
                graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
        where L::Target: Logger {
@@ -158,6 +166,13 @@ impl UtxoFuture {
        ///
        /// The given `gossip` is used to broadcast any validated messages onwards to all peers which
        /// have available buffer space.
+       ///
+       /// Because this may cause the [`NetworkGraph`]'s [`processing_queue_high`] to flip, in order
+       /// to allow us to interact with peers again, you should call [`PeerManager::process_events`]
+       /// after this.
+       ///
+       /// [`processing_queue_high`]: crate::ln::msgs::RoutingMessageHandler::processing_queue_high
+       /// [`PeerManager::process_events`]: crate::ln::peer_handler::PeerManager::process_events
        pub fn resolve<L: Deref, G: Deref<Target=NetworkGraph<L>>, U: Deref, GS: Deref<Target = P2PGossipSync<G, U, L>>>(&self,
                graph: &NetworkGraph<L>, gossip: GS, result: Result<TxOut, UtxoLookupError>
        ) where L::Target: Logger, U::Target: UtxoLookup {
@@ -353,11 +368,6 @@ impl PendingChecks {
                                                        if latest_announce.is_none() ||
                                                                latest_announce.as_ref().unwrap().timestamp() < msg.timestamp
                                                        {
-                                                               // If the messages we got has a higher timestamp, just blindly
-                                                               // assume the signatures on the new message are correct and drop
-                                                               // the old message. This may cause us to end up dropping valid
-                                                               // `node_announcement`s if a peer is malicious, but we should get
-                                                               // the correct ones when the node updates them.
                                                                *latest_announce = Some(
                                                                        if let Some(msg) = full_msg { NodeAnnouncement::Full(msg.clone()) }
                                                                        else { NodeAnnouncement::Unsigned(msg.clone()) });
@@ -510,4 +520,342 @@ impl PendingChecks {
                        }
                }
        }
+
+       /// The maximum number of pending gossip checks before [`Self::too_many_checks_pending`]
+       /// returns `true`. Note that this isn't a strict upper-bound on the number of checks pending -
+       /// each peer may, at a minimum, read one more socket buffer worth of `channel_announcement`s
+       /// which we'll have to process. With a socket buffer of 4KB and a minimum
+       /// `channel_announcement` size of, roughly, 429 bytes, this may leave us with `10*our peer
+       /// count` messages to process beyond this limit. Because we'll probably have a few peers,
+       /// there's no reason for this constant to be materially less than 30 or so, and 32 in-flight
+       /// checks should be more than enough for decent parallelism.
+       const MAX_PENDING_LOOKUPS: usize = 32;
+
+       /// Returns true if there are a large number of async checks pending and future
+       /// `channel_announcement` messages should be delayed. Note that this is only a hint and
+       /// messages already in-flight may still have to be handled for various reasons.
+       pub(super) fn too_many_checks_pending(&self) -> bool {
+               let mut pending_checks = self.internal.lock().unwrap();
+               if pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS {
+                       // If we have many channel checks pending, ensure we don't have any dangling checks
+                       // (i.e. checks where the user told us they'd call back but drop'd the `UtxoFuture`
+                       // instead) before we commit to applying backpressure.
+                       pending_checks.channels.retain(|_, chan| {
+                               Weak::upgrade(&chan).is_some()
+                       });
+                       pending_checks.nodes.retain(|_, channels| {
+                               channels.retain(|chan| Weak::upgrade(&chan).is_some());
+                               !channels.is_empty()
+                       });
+                       pending_checks.channels.len() > Self::MAX_PENDING_LOOKUPS
+               } else {
+                       false
+               }
+       }
+}
+
+#[cfg(test)]
+mod tests {
+       use super::*;
+       use crate::routing::gossip::tests::*;
+       use crate::util::test_utils::{TestChainSource, TestLogger};
+       use crate::ln::msgs;
+
+       use bitcoin::blockdata::constants::genesis_block;
+       use bitcoin::secp256k1::{Secp256k1, SecretKey};
+
+       use core::sync::atomic::Ordering;
+
+       fn get_network() -> (TestChainSource, NetworkGraph<Box<TestLogger>>) {
+               let logger = Box::new(TestLogger::new());
+               let genesis_hash = genesis_block(bitcoin::Network::Testnet).header.block_hash();
+               let chain_source = TestChainSource::new(bitcoin::Network::Testnet);
+               let network_graph = NetworkGraph::new(genesis_hash, logger);
+
+               (chain_source, network_graph)
+       }
+
+       fn get_test_objects() -> (msgs::ChannelAnnouncement, TestChainSource,
+               NetworkGraph<Box<TestLogger>>, bitcoin::Script, msgs::NodeAnnouncement,
+               msgs::NodeAnnouncement, msgs::ChannelUpdate, msgs::ChannelUpdate, msgs::ChannelUpdate)
+       {
+               let secp_ctx = Secp256k1::new();
+
+               let (chain_source, network_graph) = get_network();
+
+               let good_script = get_channel_script(&secp_ctx);
+               let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
+               let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
+               let valid_announcement = get_signed_channel_announcement(|_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
+
+               let node_a_announce = get_signed_node_announcement(|_| {}, node_1_privkey, &secp_ctx);
+               let node_b_announce = get_signed_node_announcement(|_| {}, node_2_privkey, &secp_ctx);
+
+               // Note that we have to set the "direction" flag correctly on both messages
+               let chan_update_a = get_signed_channel_update(|msg| msg.flags = 0, node_1_privkey, &secp_ctx);
+               let chan_update_b = get_signed_channel_update(|msg| msg.flags = 1, node_2_privkey, &secp_ctx);
+               let chan_update_c = get_signed_channel_update(|msg| {
+                       msg.flags = 1; msg.timestamp += 1; }, node_2_privkey, &secp_ctx);
+
+               (valid_announcement, chain_source, network_graph, good_script, node_a_announce,
+                       node_b_announce, chan_update_a, chan_update_b, chan_update_c)
+       }
+
+       #[test]
+       fn test_fast_async_lookup() {
+               // Check that async lookups which resolve quicker than the future is returned to the
+               // `get_utxo` call can read it still resolve properly.
+               let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects();
+
+               let future = UtxoFuture::new();
+               future.resolve_without_forwarding(&network_graph,
+                       Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
+               *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
+
+               network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap();
+               assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_some());
+       }
+
+       #[test]
+       fn test_async_lookup() {
+               // Test a simple async lookup
+               let (valid_announcement, chain_source, network_graph, good_script,
+                       node_a_announce, node_b_announce, ..) = get_test_objects();
+
+               let future = UtxoFuture::new();
+               *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
+
+               assert_eq!(
+                       network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
+                       "Channel being checked async");
+               assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
+
+               future.resolve_without_forwarding(&network_graph,
+                       Ok(TxOut { value: 0, script_pubkey: good_script }));
+               network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap();
+               network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).unwrap();
+
+               assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
+                       .unwrap().announcement_info.is_none());
+
+               network_graph.update_node_from_announcement(&node_a_announce).unwrap();
+               network_graph.update_node_from_announcement(&node_b_announce).unwrap();
+
+               assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
+                       .unwrap().announcement_info.is_some());
+       }
+
+       #[test]
+       fn test_invalid_async_lookup() {
+               // Test an async lookup which returns an incorrect script
+               let (valid_announcement, chain_source, network_graph, ..) = get_test_objects();
+
+               let future = UtxoFuture::new();
+               *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
+
+               assert_eq!(
+                       network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
+                       "Channel being checked async");
+               assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
+
+               future.resolve_without_forwarding(&network_graph,
+                       Ok(TxOut { value: 1_000_000, script_pubkey: bitcoin::Script::new() }));
+               assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
+       }
+
+       #[test]
+       fn test_failing_async_lookup() {
+               // Test an async lookup which returns an error
+               let (valid_announcement, chain_source, network_graph, ..) = get_test_objects();
+
+               let future = UtxoFuture::new();
+               *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
+
+               assert_eq!(
+                       network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
+                       "Channel being checked async");
+               assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
+
+               future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx));
+               assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
+       }
+
+       #[test]
+       fn test_updates_async_lookup() {
+               // Test async lookups will process pending channel_update/node_announcements once they
+               // complete.
+               let (valid_announcement, chain_source, network_graph, good_script, node_a_announce,
+                       node_b_announce, chan_update_a, chan_update_b, ..) = get_test_objects();
+
+               let future = UtxoFuture::new();
+               *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
+
+               assert_eq!(
+                       network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
+                       "Channel being checked async");
+               assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
+
+               assert_eq!(
+                       network_graph.update_node_from_announcement(&node_a_announce).unwrap_err().err,
+                       "Awaiting channel_announcement validation to accept node_announcement");
+               assert_eq!(
+                       network_graph.update_node_from_announcement(&node_b_announce).unwrap_err().err,
+                       "Awaiting channel_announcement validation to accept node_announcement");
+
+               assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err,
+                       "Awaiting channel_announcement validation to accept channel_update");
+               assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err,
+                       "Awaiting channel_announcement validation to accept channel_update");
+
+               future.resolve_without_forwarding(&network_graph,
+                       Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
+
+               assert!(network_graph.read_only().channels()
+                       .get(&valid_announcement.contents.short_channel_id).unwrap().one_to_two.is_some());
+               assert!(network_graph.read_only().channels()
+                       .get(&valid_announcement.contents.short_channel_id).unwrap().two_to_one.is_some());
+
+               assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_1)
+                       .unwrap().announcement_info.is_some());
+               assert!(network_graph.read_only().nodes().get(&valid_announcement.contents.node_id_2)
+                       .unwrap().announcement_info.is_some());
+       }
+
+       #[test]
+       fn test_latest_update_async_lookup() {
+               // Test async lookups will process the latest channel_update if two are received while
+               // awaiting an async UTXO lookup.
+               let (valid_announcement, chain_source, network_graph, good_script, _,
+                       _, chan_update_a, chan_update_b, chan_update_c, ..) = get_test_objects();
+
+               let future = UtxoFuture::new();
+               *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
+
+               assert_eq!(
+                       network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
+                       "Channel being checked async");
+               assert!(network_graph.read_only().channels().get(&valid_announcement.contents.short_channel_id).is_none());
+
+               assert_eq!(network_graph.update_channel(&chan_update_a).unwrap_err().err,
+                       "Awaiting channel_announcement validation to accept channel_update");
+               assert_eq!(network_graph.update_channel(&chan_update_b).unwrap_err().err,
+                       "Awaiting channel_announcement validation to accept channel_update");
+               assert_eq!(network_graph.update_channel(&chan_update_c).unwrap_err().err,
+                       "Awaiting channel_announcement validation to accept channel_update");
+
+               future.resolve_without_forwarding(&network_graph,
+                       Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
+
+               assert_eq!(chan_update_a.contents.timestamp, chan_update_b.contents.timestamp);
+               assert!(network_graph.read_only().channels()
+                               .get(&valid_announcement.contents.short_channel_id).as_ref().unwrap()
+                               .one_to_two.as_ref().unwrap().last_update !=
+                       network_graph.read_only().channels()
+                               .get(&valid_announcement.contents.short_channel_id).as_ref().unwrap()
+                               .two_to_one.as_ref().unwrap().last_update);
+       }
+
+       #[test]
+       fn test_no_double_lookups() {
+               // Test that a pending async lookup will prevent a second async lookup from flying, but
+               // only if the channel_announcement message is identical.
+               let (valid_announcement, chain_source, network_graph, good_script, ..) = get_test_objects();
+
+               let future = UtxoFuture::new();
+               *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
+
+               assert_eq!(
+                       network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
+                       "Channel being checked async");
+               assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1);
+
+               // If we make a second request with the same message, the call count doesn't increase...
+               let future_b = UtxoFuture::new();
+               *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future_b.clone());
+               assert_eq!(
+                       network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err().err,
+                       "Channel announcement is already being checked");
+               assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 1);
+
+               // But if we make a third request with a tweaked message, we should get a second call
+               // against our new future...
+               let secp_ctx = Secp256k1::new();
+               let replacement_pk_1 = &SecretKey::from_slice(&[99; 32]).unwrap();
+               let replacement_pk_2 = &SecretKey::from_slice(&[98; 32]).unwrap();
+               let invalid_announcement = get_signed_channel_announcement(|_| {}, replacement_pk_1, replacement_pk_2, &secp_ctx);
+               assert_eq!(
+                       network_graph.update_channel_from_announcement(&invalid_announcement, &Some(&chain_source)).unwrap_err().err,
+                       "Channel being checked async");
+               assert_eq!(chain_source.get_utxo_call_count.load(Ordering::Relaxed), 2);
+
+               // Still, if we resolve the original future, the original channel will be accepted.
+               future.resolve_without_forwarding(&network_graph,
+                       Ok(TxOut { value: 1_000_000, script_pubkey: good_script }));
+               assert!(!network_graph.read_only().channels()
+                       .get(&valid_announcement.contents.short_channel_id).unwrap()
+                       .announcement_message.as_ref().unwrap()
+                       .contents.features.supports_unknown_test_feature());
+       }
+
+       #[test]
+       fn test_checks_backpressure() {
+               // Test that too_many_checks_pending returns true when there are many checks pending, and
+               // returns false once they complete.
+               let secp_ctx = Secp256k1::new();
+               let (chain_source, network_graph) = get_network();
+
+               // We cheat and use a single future for all the lookups to complete them all at once.
+               let future = UtxoFuture::new();
+               *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(future.clone());
+
+               let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
+               let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
+
+               for i in 0..PendingChecks::MAX_PENDING_LOOKUPS {
+                       let valid_announcement = get_signed_channel_announcement(
+                               |msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx);
+                       network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
+                       assert!(!network_graph.pending_checks.too_many_checks_pending());
+               }
+
+               let valid_announcement = get_signed_channel_announcement(
+                       |_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
+               network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
+               assert!(network_graph.pending_checks.too_many_checks_pending());
+
+               // Once the future completes the "too many checks" flag should reset.
+               future.resolve_without_forwarding(&network_graph, Err(UtxoLookupError::UnknownTx));
+               assert!(!network_graph.pending_checks.too_many_checks_pending());
+       }
+
+       #[test]
+       fn test_checks_backpressure_drop() {
+               // Test that too_many_checks_pending returns true when there are many checks pending, and
+               // returns false if we drop some of the futures without completion.
+               let secp_ctx = Secp256k1::new();
+               let (chain_source, network_graph) = get_network();
+
+               // We cheat and use a single future for all the lookups to complete them all at once.
+               *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Async(UtxoFuture::new());
+
+               let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
+               let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
+
+               for i in 0..PendingChecks::MAX_PENDING_LOOKUPS {
+                       let valid_announcement = get_signed_channel_announcement(
+                               |msg| msg.short_channel_id += 1 + i as u64, node_1_privkey, node_2_privkey, &secp_ctx);
+                       network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
+                       assert!(!network_graph.pending_checks.too_many_checks_pending());
+               }
+
+               let valid_announcement = get_signed_channel_announcement(
+                       |_| {}, node_1_privkey, node_2_privkey, &secp_ctx);
+               network_graph.update_channel_from_announcement(&valid_announcement, &Some(&chain_source)).unwrap_err();
+               assert!(network_graph.pending_checks.too_many_checks_pending());
+
+               // Once the future is drop'd (by resetting the `utxo_ret` value) the "too many checks" flag
+               // should reset to false.
+               *chain_source.utxo_ret.lock().unwrap() = UtxoResult::Sync(Err(UtxoLookupError::UnknownTx));
+               assert!(!network_graph.pending_checks.too_many_checks_pending());
+       }
 }