Track in-flight `channel_announcement` lookups and avoid duplicates
authorMatt Corallo <git@bluematt.me>
Wed, 8 Feb 2023 22:11:56 +0000 (22:11 +0000)
committerMatt Corallo <git@bluematt.me>
Wed, 8 Feb 2023 23:54:30 +0000 (23:54 +0000)
If we receive two `channel_announcement`s for the same channel at
the same time, we shouldn't spawn a second UTXO lookup for an
identical message. This likely isn't too rare - if we start syncing
the graph from two peers at the same time, it isn't unlikely that
we'll end up with the same messages around the same time.

In order to avoid this we keep a hash map of all the pending
`channel_announcement` messages by SCID and simply ignore duplicate
message lookups.

lightning/src/routing/utxo.rs

index 18c7ff97d94508093507c8684bf537d5b5b71bf0..c18b4f459fadb6d1f97b997795d57efa3d33b4ec 100644 (file)
@@ -105,6 +105,7 @@ impl UtxoFuture {
        pub fn resolve<L: Deref>(&self, graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
        where L::Target: Logger {
                let announcement = {
+                       let mut pending_checks = graph.pending_checks.internal.lock().unwrap();
                        let mut async_messages = self.state.lock().unwrap();
 
                        if async_messages.channel_announce.is_none() {
@@ -114,6 +115,12 @@ impl UtxoFuture {
                                async_messages.complete = Some(result);
                                return;
                        }
+                       let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() {
+                               ChannelAnnouncement::Full(signed_msg) => &signed_msg.contents,
+                               ChannelAnnouncement::Unsigned(msg) => &msg,
+                       };
+
+                       pending_checks.lookup_completed(announcement_msg, &Arc::downgrade(&self.state));
 
                        async_messages.channel_announce.take().unwrap()
                };
@@ -134,13 +141,87 @@ impl UtxoFuture {
        }
 }
 
+struct PendingChecksContext {
+       channels: HashMap<u64, Weak<Mutex<UtxoMessages>>>,
+}
+
+impl PendingChecksContext {
+       fn lookup_completed(&mut self,
+               msg: &msgs::UnsignedChannelAnnouncement, completed_state: &Weak<Mutex<UtxoMessages>>
+       ) {
+               if let hash_map::Entry::Occupied(e) = self.channels.entry(msg.short_channel_id) {
+                       if Weak::ptr_eq(e.get(), &completed_state) {
+                               e.remove();
+                       }
+               }
+       }
+}
+
 /// A set of messages which are pending UTXO lookups for processing.
 pub(super) struct PendingChecks {
+       internal: Mutex<PendingChecksContext>,
 }
 
 impl PendingChecks {
        pub(super) fn new() -> Self {
-               PendingChecks {}
+               PendingChecks { internal: Mutex::new(PendingChecksContext {
+                       channels: HashMap::new(),
+               }) }
+       }
+
+       fn check_replace_previous_entry(msg: &msgs::UnsignedChannelAnnouncement,
+               full_msg: Option<&msgs::ChannelAnnouncement>, replacement: Option<Weak<Mutex<UtxoMessages>>>,
+               pending_channels: &mut HashMap<u64, Weak<Mutex<UtxoMessages>>>
+       ) -> Result<(), msgs::LightningError> {
+               match pending_channels.entry(msg.short_channel_id) {
+                       hash_map::Entry::Occupied(mut e) => {
+                               // There's already a pending lookup for the given SCID. Check if the messages
+                               // are the same and, if so, return immediately (don't bother spawning another
+                               // lookup if we haven't gotten that far yet).
+                               match Weak::upgrade(&e.get()) {
+                                       Some(pending_msgs) => {
+                                               let pending_matches = match &pending_msgs.lock().unwrap().channel_announce {
+                                                       Some(ChannelAnnouncement::Full(pending_msg)) => Some(pending_msg) == full_msg,
+                                                       Some(ChannelAnnouncement::Unsigned(pending_msg)) => pending_msg == msg,
+                                                       None => {
+                                                               // This shouldn't actually be reachable. We set the
+                                                               // `channel_announce` field under the same lock as setting the
+                                                               // channel map entry. Still, we can just treat it as
+                                                               // non-matching and let the new request fly.
+                                                               debug_assert!(false);
+                                                               false
+                                                       },
+                                               };
+                                               if pending_matches {
+                                                       return Err(LightningError {
+                                                               err: "Channel announcement is already being checked".to_owned(),
+                                                               action: ErrorAction::IgnoreDuplicateGossip,
+                                                       });
+                                               } else {
+                                                       // The earlier lookup is a different message. If we have another
+                                                       // request in-flight now replace the original.
+                                                       // Note that in the replace case whether to replace is somewhat
+                                                       // arbitrary - both results will be handled, we're just updating the
+                                                       // value that will be compared to future lookups with the same SCID.
+                                                       if let Some(item) = replacement {
+                                                               *e.get_mut() = item;
+                                                       }
+                                               }
+                                       },
+                                       None => {
+                                               // The earlier lookup already resolved. We can't be sure its the same
+                                               // so just remove/replace it and move on.
+                                               if let Some(item) = replacement {
+                                                       *e.get_mut() = item;
+                                               } else { e.remove(); }
+                                       },
+                               }
+                       },
+                       hash_map::Entry::Vacant(v) => {
+                               if let Some(item) = replacement { v.insert(item); }
+                       },
+               }
+               Ok(())
        }
 
        pub(super) fn check_channel_announcement<U: Deref>(&self,
@@ -177,6 +258,9 @@ impl PendingChecks {
                        }
                };
 
+               Self::check_replace_previous_entry(msg, full_msg, None,
+                       &mut self.internal.lock().unwrap().channels)?;
+
                match utxo_lookup {
                        &None => {
                                // Tentatively accept, potentially exposing us to DoS attacks
@@ -186,12 +270,15 @@ impl PendingChecks {
                                match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) {
                                        UtxoResult::Sync(res) => handle_result(res),
                                        UtxoResult::Async(future) => {
+                                               let mut pending_checks = self.internal.lock().unwrap();
                                                let mut async_messages = future.state.lock().unwrap();
                                                if let Some(res) = async_messages.complete.take() {
                                                        // In the unlikely event the future resolved before we managed to get it,
                                                        // handle the result in-line.
                                                        handle_result(res)
                                                } else {
+                                                       Self::check_replace_previous_entry(msg, full_msg,
+                                                               Some(Arc::downgrade(&future.state)), &mut pending_checks.channels)?;
                                                        async_messages.channel_announce = Some(
                                                                if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) }
                                                                else { ChannelAnnouncement::Unsigned(msg.clone()) });