From 7388b6c1d795a5cbbf2c6c109bacd26e15c81500 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 8 Feb 2023 22:11:56 +0000 Subject: [PATCH] Track in-flight `channel_announcement` lookups and avoid duplicates If we receive two `channel_announcement`s for the same channel at the same time, we shouldn't spawn a second UTXO lookup for an identical message. This likely isn't too rare - if we start syncing the graph from two peers at the same time, it isn't unlikely that we'll end up with the same messages around the same time. In order to avoid this we keep a hash map of all the pending `channel_announcement` messages by SCID and simply ignore duplicate message lookups. --- lightning/src/routing/utxo.rs | 89 ++++++++++++++++++++++++++++++++++- 1 file changed, 88 insertions(+), 1 deletion(-) diff --git a/lightning/src/routing/utxo.rs b/lightning/src/routing/utxo.rs index 18c7ff97d..c18b4f459 100644 --- a/lightning/src/routing/utxo.rs +++ b/lightning/src/routing/utxo.rs @@ -105,6 +105,7 @@ impl UtxoFuture { pub fn resolve(&self, graph: &NetworkGraph, result: Result) where L::Target: Logger { let announcement = { + let mut pending_checks = graph.pending_checks.internal.lock().unwrap(); let mut async_messages = self.state.lock().unwrap(); if async_messages.channel_announce.is_none() { @@ -114,6 +115,12 @@ impl UtxoFuture { async_messages.complete = Some(result); return; } + let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() { + ChannelAnnouncement::Full(signed_msg) => &signed_msg.contents, + ChannelAnnouncement::Unsigned(msg) => &msg, + }; + + pending_checks.lookup_completed(announcement_msg, &Arc::downgrade(&self.state)); async_messages.channel_announce.take().unwrap() }; @@ -134,13 +141,87 @@ impl UtxoFuture { } } +struct PendingChecksContext { + channels: HashMap>>, +} + +impl PendingChecksContext { + fn lookup_completed(&mut self, + msg: &msgs::UnsignedChannelAnnouncement, completed_state: &Weak> + ) { + if let hash_map::Entry::Occupied(e) = self.channels.entry(msg.short_channel_id) { + if Weak::ptr_eq(e.get(), &completed_state) { + e.remove(); + } + } + } +} + /// A set of messages which are pending UTXO lookups for processing. pub(super) struct PendingChecks { + internal: Mutex, } impl PendingChecks { pub(super) fn new() -> Self { - PendingChecks {} + PendingChecks { internal: Mutex::new(PendingChecksContext { + channels: HashMap::new(), + }) } + } + + fn check_replace_previous_entry(msg: &msgs::UnsignedChannelAnnouncement, + full_msg: Option<&msgs::ChannelAnnouncement>, replacement: Option>>, + pending_channels: &mut HashMap>> + ) -> Result<(), msgs::LightningError> { + match pending_channels.entry(msg.short_channel_id) { + hash_map::Entry::Occupied(mut e) => { + // There's already a pending lookup for the given SCID. Check if the messages + // are the same and, if so, return immediately (don't bother spawning another + // lookup if we haven't gotten that far yet). + match Weak::upgrade(&e.get()) { + Some(pending_msgs) => { + let pending_matches = match &pending_msgs.lock().unwrap().channel_announce { + Some(ChannelAnnouncement::Full(pending_msg)) => Some(pending_msg) == full_msg, + Some(ChannelAnnouncement::Unsigned(pending_msg)) => pending_msg == msg, + None => { + // This shouldn't actually be reachable. We set the + // `channel_announce` field under the same lock as setting the + // channel map entry. Still, we can just treat it as + // non-matching and let the new request fly. + debug_assert!(false); + false + }, + }; + if pending_matches { + return Err(LightningError { + err: "Channel announcement is already being checked".to_owned(), + action: ErrorAction::IgnoreDuplicateGossip, + }); + } else { + // The earlier lookup is a different message. If we have another + // request in-flight now replace the original. + // Note that in the replace case whether to replace is somewhat + // arbitrary - both results will be handled, we're just updating the + // value that will be compared to future lookups with the same SCID. + if let Some(item) = replacement { + *e.get_mut() = item; + } + } + }, + None => { + // The earlier lookup already resolved. We can't be sure its the same + // so just remove/replace it and move on. + if let Some(item) = replacement { + *e.get_mut() = item; + } else { e.remove(); } + }, + } + }, + hash_map::Entry::Vacant(v) => { + if let Some(item) = replacement { v.insert(item); } + }, + } + Ok(()) } pub(super) fn check_channel_announcement(&self, @@ -177,6 +258,9 @@ impl PendingChecks { } }; + Self::check_replace_previous_entry(msg, full_msg, None, + &mut self.internal.lock().unwrap().channels)?; + match utxo_lookup { &None => { // Tentatively accept, potentially exposing us to DoS attacks @@ -186,12 +270,15 @@ impl PendingChecks { match utxo_lookup.get_utxo(&msg.chain_hash, msg.short_channel_id) { UtxoResult::Sync(res) => handle_result(res), UtxoResult::Async(future) => { + let mut pending_checks = self.internal.lock().unwrap(); let mut async_messages = future.state.lock().unwrap(); if let Some(res) = async_messages.complete.take() { // In the unlikely event the future resolved before we managed to get it, // handle the result in-line. handle_result(res) } else { + Self::check_replace_previous_entry(msg, full_msg, + Some(Arc::downgrade(&future.state)), &mut pending_checks.channels)?; async_messages.channel_announce = Some( if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) } else { ChannelAnnouncement::Unsigned(msg.clone()) }); -- 2.39.5