From 67c9c7f2ae150a287370d56373f673e116172690 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 7 Feb 2023 20:38:20 +0000 Subject: [PATCH] Process `channel_update`/`node_announcement` async if needed If we have a `channel_announcement` which is waiting on a UTXO lookup before we can process it, and we receive a `channel_update` or `node_announcement` for the same channel or a node which is a part of the channel, we have to wait until the lookup completes until we can decide if we want to accept the new message. Here, we store the new message in the pending lookup state and process it asynchronously like the original `channel_announcement`. --- lightning/src/routing/gossip.rs | 15 ++- lightning/src/routing/utxo.rs | 179 +++++++++++++++++++++++++++++++- 2 files changed, 188 insertions(+), 6 deletions(-) diff --git a/lightning/src/routing/gossip.rs b/lightning/src/routing/gossip.rs index dd9cabd25..8a54e63e2 100644 --- a/lightning/src/routing/gossip.rs +++ b/lightning/src/routing/gossip.rs @@ -1298,8 +1298,13 @@ impl NetworkGraph where L::Target: Logger { } fn update_node_from_announcement_intern(&self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement>) -> Result<(), LightningError> { - match self.nodes.write().unwrap().get_mut(&msg.node_id) { - None => Err(LightningError{err: "No existing channels for node_announcement".to_owned(), action: ErrorAction::IgnoreError}), + let mut nodes = self.nodes.write().unwrap(); + match nodes.get_mut(&msg.node_id) { + None => { + core::mem::drop(nodes); + self.pending_checks.check_hold_pending_node_announcement(msg, full_msg)?; + Err(LightningError{err: "No existing channels for node_announcement".to_owned(), action: ErrorAction::IgnoreError}) + }, Some(node) => { if let Some(node_info) = node.announcement_info.as_ref() { // The timestamp field is somewhat of a misnomer - the BOLTs use it to order @@ -1724,7 +1729,11 @@ impl NetworkGraph where L::Target: Logger { let mut channels = self.channels.write().unwrap(); match channels.get_mut(&msg.short_channel_id) { - None => return Err(LightningError{err: "Couldn't find channel for update".to_owned(), action: ErrorAction::IgnoreError}), + None => { + core::mem::drop(channels); + self.pending_checks.check_hold_pending_channel_update(msg, full_msg)?; + return Err(LightningError{err: "Couldn't find channel for update".to_owned(), action: ErrorAction::IgnoreError}); + }, Some(channel) => { if msg.htlc_maximum_msat > MAX_VALUE_MSAT { return Err(LightningError{err: diff --git a/lightning/src/routing/utxo.rs b/lightning/src/routing/utxo.rs index c18b4f459..56b671845 100644 --- a/lightning/src/routing/utxo.rs +++ b/lightning/src/routing/utxo.rs @@ -69,10 +69,48 @@ enum ChannelAnnouncement { Full(msgs::ChannelAnnouncement), Unsigned(msgs::UnsignedChannelAnnouncement), } +impl ChannelAnnouncement { + fn node_id_1(&self) -> &NodeId { + match self { + ChannelAnnouncement::Full(msg) => &msg.contents.node_id_1, + ChannelAnnouncement::Unsigned(msg) => &msg.node_id_1, + } + } +} + +enum NodeAnnouncement { + Full(msgs::NodeAnnouncement), + Unsigned(msgs::UnsignedNodeAnnouncement), +} +impl NodeAnnouncement { + fn timestamp(&self) -> u32 { + match self { + NodeAnnouncement::Full(msg) => msg.contents.timestamp, + NodeAnnouncement::Unsigned(msg) => msg.timestamp, + } + } +} + +enum ChannelUpdate { + Full(msgs::ChannelUpdate), + Unsigned(msgs::UnsignedChannelUpdate), +} +impl ChannelUpdate { + fn timestamp(&self) -> u32 { + match self { + ChannelUpdate::Full(msg) => msg.contents.timestamp, + ChannelUpdate::Unsigned(msg) => msg.timestamp, + } + } +} struct UtxoMessages { complete: Option>, channel_announce: Option, + latest_node_announce_a: Option, + latest_node_announce_b: Option, + latest_channel_update_a: Option, + latest_channel_update_b: Option, } /// Represents a future resolution of a [`UtxoLookup::get_utxo`] query resolving async. @@ -98,13 +136,17 @@ impl UtxoFuture { Self { state: Arc::new(Mutex::new(UtxoMessages { complete: None, channel_announce: None, + latest_node_announce_a: None, + latest_node_announce_b: None, + latest_channel_update_a: None, + latest_channel_update_b: None, }))} } /// Resolves this future against the given `graph` and with the given `result`. pub fn resolve(&self, graph: &NetworkGraph, result: Result) where L::Target: Logger { - let announcement = { + let (announcement, node_a, node_b, update_a, update_b) = { let mut pending_checks = graph.pending_checks.internal.lock().unwrap(); let mut async_messages = self.state.lock().unwrap(); @@ -115,6 +157,7 @@ impl UtxoFuture { async_messages.complete = Some(result); return; } + let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() { ChannelAnnouncement::Full(signed_msg) => &signed_msg.contents, ChannelAnnouncement::Unsigned(msg) => &msg, @@ -122,7 +165,11 @@ impl UtxoFuture { pending_checks.lookup_completed(announcement_msg, &Arc::downgrade(&self.state)); - async_messages.channel_announce.take().unwrap() + (async_messages.channel_announce.take().unwrap(), + async_messages.latest_node_announce_a.take(), + async_messages.latest_node_announce_b.take(), + async_messages.latest_channel_update_a.take(), + async_messages.latest_channel_update_b.take()) }; // Now that we've updated our internal state, pass the pending messages back through the @@ -138,11 +185,36 @@ impl UtxoFuture { let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver)); }, } + + for announce in core::iter::once(node_a).chain(core::iter::once(node_b)) { + match announce { + Some(NodeAnnouncement::Full(signed_msg)) => { + let _ = graph.update_node_from_announcement(&signed_msg); + }, + Some(NodeAnnouncement::Unsigned(msg)) => { + let _ = graph.update_node_from_unsigned_announcement(&msg); + }, + None => {}, + } + } + + for update in core::iter::once(update_a).chain(core::iter::once(update_b)) { + match update { + Some(ChannelUpdate::Full(signed_msg)) => { + let _ = graph.update_channel(&signed_msg); + }, + Some(ChannelUpdate::Unsigned(msg)) => { + let _ = graph.update_channel_unsigned(&msg); + }, + None => {}, + } + } } } struct PendingChecksContext { channels: HashMap>>, + nodes: HashMap>>>, } impl PendingChecksContext { @@ -154,6 +226,15 @@ impl PendingChecksContext { e.remove(); } } + + if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_1) { + e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state)); + if e.get().is_empty() { e.remove(); } + } + if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_2) { + e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state)); + if e.get().is_empty() { e.remove(); } + } } } @@ -165,10 +246,98 @@ pub(super) struct PendingChecks { impl PendingChecks { pub(super) fn new() -> Self { PendingChecks { internal: Mutex::new(PendingChecksContext { - channels: HashMap::new(), + channels: HashMap::new(), nodes: HashMap::new(), }) } } + /// Checks if there is a pending `channel_update` UTXO validation for the given channel, + /// and, if so, stores the channel message for handling later and returns an `Err`. + pub(super) fn check_hold_pending_channel_update( + &self, msg: &msgs::UnsignedChannelUpdate, full_msg: Option<&msgs::ChannelUpdate> + ) -> Result<(), LightningError> { + let mut pending_checks = self.internal.lock().unwrap(); + if let hash_map::Entry::Occupied(e) = pending_checks.channels.entry(msg.short_channel_id) { + let is_from_a = (msg.flags & 1) == 1; + match Weak::upgrade(e.get()) { + Some(msgs_ref) => { + let mut messages = msgs_ref.lock().unwrap(); + let latest_update = if is_from_a { + &mut messages.latest_channel_update_a + } else { + &mut messages.latest_channel_update_b + }; + if latest_update.is_none() || latest_update.as_ref().unwrap().timestamp() < msg.timestamp { + // If the messages we got has a higher timestamp, just blindly assume the + // signatures on the new message are correct and drop the old message. This + // may cause us to end up dropping valid `channel_update`s if a peer is + // malicious, but we should get the correct ones when the node updates them. + *latest_update = Some( + if let Some(msg) = full_msg { ChannelUpdate::Full(msg.clone()) } + else { ChannelUpdate::Unsigned(msg.clone()) }); + } + return Err(LightningError { + err: "Awaiting channel_announcement validation to accept channel_update".to_owned(), + action: ErrorAction::IgnoreAndLog(Level::Gossip), + }); + }, + None => { e.remove(); }, + } + } + Ok(()) + } + + /// Checks if there is a pending `node_announcement` UTXO validation for a channel with the + /// given node and, if so, stores the channel message for handling later and returns an `Err`. + pub(super) fn check_hold_pending_node_announcement( + &self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement> + ) -> Result<(), LightningError> { + let mut pending_checks = self.internal.lock().unwrap(); + if let hash_map::Entry::Occupied(mut e) = pending_checks.nodes.entry(msg.node_id) { + let mut found_at_least_one_chan = false; + e.get_mut().retain(|node_msgs| { + match Weak::upgrade(&node_msgs) { + Some(chan_mtx) => { + let mut chan_msgs = chan_mtx.lock().unwrap(); + if let Some(chan_announce) = &chan_msgs.channel_announce { + let latest_announce = + if *chan_announce.node_id_1() == msg.node_id { + &mut chan_msgs.latest_node_announce_a + } else { + &mut chan_msgs.latest_node_announce_b + }; + if latest_announce.is_none() || + latest_announce.as_ref().unwrap().timestamp() < msg.timestamp + { + // If the messages we got has a higher timestamp, just blindly + // assume the signatures on the new message are correct and drop + // the old message. This may cause us to end up dropping valid + // `node_announcement`s if a peer is malicious, but we should get + // the correct ones when the node updates them. + *latest_announce = Some( + if let Some(msg) = full_msg { NodeAnnouncement::Full(msg.clone()) } + else { NodeAnnouncement::Unsigned(msg.clone()) }); + } + found_at_least_one_chan = true; + true + } else { + debug_assert!(false, "channel_announce is set before struct is added to node map"); + false + } + }, + None => false, + } + }); + if e.get().is_empty() { e.remove(); } + if found_at_least_one_chan { + return Err(LightningError { + err: "Awaiting channel_announcement validation to accept node_announcement".to_owned(), + action: ErrorAction::IgnoreAndLog(Level::Gossip), + }); + } + } + Ok(()) + } + fn check_replace_previous_entry(msg: &msgs::UnsignedChannelAnnouncement, full_msg: Option<&msgs::ChannelAnnouncement>, replacement: Option>>, pending_channels: &mut HashMap>> @@ -282,6 +451,10 @@ impl PendingChecks { async_messages.channel_announce = Some( if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) } else { ChannelAnnouncement::Unsigned(msg.clone()) }); + pending_checks.nodes.entry(msg.node_id_1) + .or_insert(Vec::new()).push(Arc::downgrade(&future.state)); + pending_checks.nodes.entry(msg.node_id_2) + .or_insert(Vec::new()).push(Arc::downgrade(&future.state)); Err(LightningError { err: "Channel being checked async".to_owned(), action: ErrorAction::IgnoreAndLog(Level::Gossip), -- 2.39.5