Process `channel_update`/`node_announcement` async if needed
authorMatt Corallo <git@bluematt.me>
Tue, 7 Feb 2023 20:38:20 +0000 (20:38 +0000)
committerMatt Corallo <git@bluematt.me>
Wed, 8 Feb 2023 23:54:30 +0000 (23:54 +0000)
If we have a `channel_announcement` which is waiting on a UTXO
lookup before we can process it, and we receive a `channel_update`
or `node_announcement` for the same channel or a node which is a
part of the channel, we have to wait until the lookup completes
until we can decide if we want to accept the new message.

Here, we store the new message in the pending lookup state and
process it asynchronously like the original `channel_announcement`.

lightning/src/routing/gossip.rs
lightning/src/routing/utxo.rs

index dd9cabd25eba50d8aba05e267c6900cb073fb28d..8a54e63e2b5b2a5965d2c0cbbc8939f028d9bc14 100644 (file)
@@ -1298,8 +1298,13 @@ impl<L: Deref> NetworkGraph<L> where L::Target: Logger {
        }
 
        fn update_node_from_announcement_intern(&self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement>) -> Result<(), LightningError> {
-               match self.nodes.write().unwrap().get_mut(&msg.node_id) {
-                       None => Err(LightningError{err: "No existing channels for node_announcement".to_owned(), action: ErrorAction::IgnoreError}),
+               let mut nodes = self.nodes.write().unwrap();
+               match nodes.get_mut(&msg.node_id) {
+                       None => {
+                               core::mem::drop(nodes);
+                               self.pending_checks.check_hold_pending_node_announcement(msg, full_msg)?;
+                               Err(LightningError{err: "No existing channels for node_announcement".to_owned(), action: ErrorAction::IgnoreError})
+                       },
                        Some(node) => {
                                if let Some(node_info) = node.announcement_info.as_ref() {
                                        // The timestamp field is somewhat of a misnomer - the BOLTs use it to order
@@ -1724,7 +1729,11 @@ impl<L: Deref> NetworkGraph<L> where L::Target: Logger {
 
                let mut channels = self.channels.write().unwrap();
                match channels.get_mut(&msg.short_channel_id) {
-                       None => return Err(LightningError{err: "Couldn't find channel for update".to_owned(), action: ErrorAction::IgnoreError}),
+                       None => {
+                               core::mem::drop(channels);
+                               self.pending_checks.check_hold_pending_channel_update(msg, full_msg)?;
+                               return Err(LightningError{err: "Couldn't find channel for update".to_owned(), action: ErrorAction::IgnoreError});
+                       },
                        Some(channel) => {
                                if msg.htlc_maximum_msat > MAX_VALUE_MSAT {
                                        return Err(LightningError{err:
index c18b4f459fadb6d1f97b997795d57efa3d33b4ec..56b671845894eaefe4f0bd6f7fe3e869a3ec1cb3 100644 (file)
@@ -69,10 +69,48 @@ enum ChannelAnnouncement {
        Full(msgs::ChannelAnnouncement),
        Unsigned(msgs::UnsignedChannelAnnouncement),
 }
+impl ChannelAnnouncement {
+       fn node_id_1(&self) -> &NodeId {
+               match self {
+                       ChannelAnnouncement::Full(msg) => &msg.contents.node_id_1,
+                       ChannelAnnouncement::Unsigned(msg) => &msg.node_id_1,
+               }
+       }
+}
+
+enum NodeAnnouncement {
+       Full(msgs::NodeAnnouncement),
+       Unsigned(msgs::UnsignedNodeAnnouncement),
+}
+impl NodeAnnouncement {
+       fn timestamp(&self) -> u32 {
+               match self {
+                       NodeAnnouncement::Full(msg) => msg.contents.timestamp,
+                       NodeAnnouncement::Unsigned(msg) => msg.timestamp,
+               }
+       }
+}
+
+enum ChannelUpdate {
+       Full(msgs::ChannelUpdate),
+       Unsigned(msgs::UnsignedChannelUpdate),
+}
+impl ChannelUpdate {
+       fn timestamp(&self) -> u32 {
+               match self {
+                       ChannelUpdate::Full(msg) => msg.contents.timestamp,
+                       ChannelUpdate::Unsigned(msg) => msg.timestamp,
+               }
+       }
+}
 
 struct UtxoMessages {
        complete: Option<Result<TxOut, UtxoLookupError>>,
        channel_announce: Option<ChannelAnnouncement>,
+       latest_node_announce_a: Option<NodeAnnouncement>,
+       latest_node_announce_b: Option<NodeAnnouncement>,
+       latest_channel_update_a: Option<ChannelUpdate>,
+       latest_channel_update_b: Option<ChannelUpdate>,
 }
 
 /// Represents a future resolution of a [`UtxoLookup::get_utxo`] query resolving async.
@@ -98,13 +136,17 @@ impl UtxoFuture {
                Self { state: Arc::new(Mutex::new(UtxoMessages {
                        complete: None,
                        channel_announce: None,
+                       latest_node_announce_a: None,
+                       latest_node_announce_b: None,
+                       latest_channel_update_a: None,
+                       latest_channel_update_b: None,
                }))}
        }
 
        /// Resolves this future against the given `graph` and with the given `result`.
        pub fn resolve<L: Deref>(&self, graph: &NetworkGraph<L>, result: Result<TxOut, UtxoLookupError>)
        where L::Target: Logger {
-               let announcement = {
+               let (announcement, node_a, node_b, update_a, update_b) = {
                        let mut pending_checks = graph.pending_checks.internal.lock().unwrap();
                        let mut async_messages = self.state.lock().unwrap();
 
@@ -115,6 +157,7 @@ impl UtxoFuture {
                                async_messages.complete = Some(result);
                                return;
                        }
+
                        let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() {
                                ChannelAnnouncement::Full(signed_msg) => &signed_msg.contents,
                                ChannelAnnouncement::Unsigned(msg) => &msg,
@@ -122,7 +165,11 @@ impl UtxoFuture {
 
                        pending_checks.lookup_completed(announcement_msg, &Arc::downgrade(&self.state));
 
-                       async_messages.channel_announce.take().unwrap()
+                       (async_messages.channel_announce.take().unwrap(),
+                               async_messages.latest_node_announce_a.take(),
+                               async_messages.latest_node_announce_b.take(),
+                               async_messages.latest_channel_update_a.take(),
+                               async_messages.latest_channel_update_b.take())
                };
 
                // Now that we've updated our internal state, pass the pending messages back through the
@@ -138,11 +185,36 @@ impl UtxoFuture {
                                let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver));
                        },
                }
+
+               for announce in core::iter::once(node_a).chain(core::iter::once(node_b)) {
+                       match announce {
+                               Some(NodeAnnouncement::Full(signed_msg)) => {
+                                       let _ = graph.update_node_from_announcement(&signed_msg);
+                               },
+                               Some(NodeAnnouncement::Unsigned(msg)) => {
+                                       let _ = graph.update_node_from_unsigned_announcement(&msg);
+                               },
+                               None => {},
+                       }
+               }
+
+               for update in core::iter::once(update_a).chain(core::iter::once(update_b)) {
+                       match update {
+                               Some(ChannelUpdate::Full(signed_msg)) => {
+                                       let _ = graph.update_channel(&signed_msg);
+                               },
+                               Some(ChannelUpdate::Unsigned(msg)) => {
+                                       let _ = graph.update_channel_unsigned(&msg);
+                               },
+                               None => {},
+                       }
+               }
        }
 }
 
 struct PendingChecksContext {
        channels: HashMap<u64, Weak<Mutex<UtxoMessages>>>,
+       nodes: HashMap<NodeId, Vec<Weak<Mutex<UtxoMessages>>>>,
 }
 
 impl PendingChecksContext {
@@ -154,6 +226,15 @@ impl PendingChecksContext {
                                e.remove();
                        }
                }
+
+               if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_1) {
+                       e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
+                       if e.get().is_empty() { e.remove(); }
+               }
+               if let hash_map::Entry::Occupied(mut e) = self.nodes.entry(msg.node_id_2) {
+                       e.get_mut().retain(|elem| !Weak::ptr_eq(&elem, &completed_state));
+                       if e.get().is_empty() { e.remove(); }
+               }
        }
 }
 
@@ -165,10 +246,98 @@ pub(super) struct PendingChecks {
 impl PendingChecks {
        pub(super) fn new() -> Self {
                PendingChecks { internal: Mutex::new(PendingChecksContext {
-                       channels: HashMap::new(),
+                       channels: HashMap::new(), nodes: HashMap::new(),
                }) }
        }
 
+       /// Checks if there is a pending `channel_update` UTXO validation for the given channel,
+       /// and, if so, stores the channel message for handling later and returns an `Err`.
+       pub(super) fn check_hold_pending_channel_update(
+               &self, msg: &msgs::UnsignedChannelUpdate, full_msg: Option<&msgs::ChannelUpdate>
+       ) -> Result<(), LightningError> {
+               let mut pending_checks = self.internal.lock().unwrap();
+               if let hash_map::Entry::Occupied(e) = pending_checks.channels.entry(msg.short_channel_id) {
+                       let is_from_a = (msg.flags & 1) == 1;
+                       match Weak::upgrade(e.get()) {
+                               Some(msgs_ref) => {
+                                       let mut messages = msgs_ref.lock().unwrap();
+                                       let latest_update = if is_from_a {
+                                                       &mut messages.latest_channel_update_a
+                                               } else {
+                                                       &mut messages.latest_channel_update_b
+                                               };
+                                       if latest_update.is_none() || latest_update.as_ref().unwrap().timestamp() < msg.timestamp {
+                                               // If the messages we got has a higher timestamp, just blindly assume the
+                                               // signatures on the new message are correct and drop the old message. This
+                                               // may cause us to end up dropping valid `channel_update`s if a peer is
+                                               // malicious, but we should get the correct ones when the node updates them.
+                                               *latest_update = Some(
+                                                       if let Some(msg) = full_msg { ChannelUpdate::Full(msg.clone()) }
+                                                       else { ChannelUpdate::Unsigned(msg.clone()) });
+                                       }
+                                       return Err(LightningError {
+                                               err: "Awaiting channel_announcement validation to accept channel_update".to_owned(),
+                                               action: ErrorAction::IgnoreAndLog(Level::Gossip),
+                                       });
+                               },
+                               None => { e.remove(); },
+                       }
+               }
+               Ok(())
+       }
+
+       /// Checks if there is a pending `node_announcement` UTXO validation for a channel with the
+       /// given node and, if so, stores the channel message for handling later and returns an `Err`.
+       pub(super) fn check_hold_pending_node_announcement(
+               &self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement>
+       ) -> Result<(), LightningError> {
+               let mut pending_checks = self.internal.lock().unwrap();
+               if let hash_map::Entry::Occupied(mut e) = pending_checks.nodes.entry(msg.node_id) {
+                       let mut found_at_least_one_chan = false;
+                       e.get_mut().retain(|node_msgs| {
+                               match Weak::upgrade(&node_msgs) {
+                                       Some(chan_mtx) => {
+                                               let mut chan_msgs = chan_mtx.lock().unwrap();
+                                               if let Some(chan_announce) = &chan_msgs.channel_announce {
+                                                       let latest_announce =
+                                                               if *chan_announce.node_id_1() == msg.node_id {
+                                                                       &mut chan_msgs.latest_node_announce_a
+                                                               } else {
+                                                                       &mut chan_msgs.latest_node_announce_b
+                                                               };
+                                                       if latest_announce.is_none() ||
+                                                               latest_announce.as_ref().unwrap().timestamp() < msg.timestamp
+                                                       {
+                                                               // If the messages we got has a higher timestamp, just blindly
+                                                               // assume the signatures on the new message are correct and drop
+                                                               // the old message. This may cause us to end up dropping valid
+                                                               // `node_announcement`s if a peer is malicious, but we should get
+                                                               // the correct ones when the node updates them.
+                                                               *latest_announce = Some(
+                                                                       if let Some(msg) = full_msg { NodeAnnouncement::Full(msg.clone()) }
+                                                                       else { NodeAnnouncement::Unsigned(msg.clone()) });
+                                                       }
+                                                       found_at_least_one_chan = true;
+                                                       true
+                                               } else {
+                                                       debug_assert!(false, "channel_announce is set before struct is added to node map");
+                                                       false
+                                               }
+                                       },
+                                       None => false,
+                               }
+                       });
+                       if e.get().is_empty() { e.remove(); }
+                       if found_at_least_one_chan {
+                               return Err(LightningError {
+                                       err: "Awaiting channel_announcement validation to accept node_announcement".to_owned(),
+                                       action: ErrorAction::IgnoreAndLog(Level::Gossip),
+                               });
+                       }
+               }
+               Ok(())
+       }
+
        fn check_replace_previous_entry(msg: &msgs::UnsignedChannelAnnouncement,
                full_msg: Option<&msgs::ChannelAnnouncement>, replacement: Option<Weak<Mutex<UtxoMessages>>>,
                pending_channels: &mut HashMap<u64, Weak<Mutex<UtxoMessages>>>
@@ -282,6 +451,10 @@ impl PendingChecks {
                                                        async_messages.channel_announce = Some(
                                                                if let Some(msg) = full_msg { ChannelAnnouncement::Full(msg.clone()) }
                                                                else { ChannelAnnouncement::Unsigned(msg.clone()) });
+                                                       pending_checks.nodes.entry(msg.node_id_1)
+                                                               .or_insert(Vec::new()).push(Arc::downgrade(&future.state));
+                                                       pending_checks.nodes.entry(msg.node_id_2)
+                                                               .or_insert(Vec::new()).push(Arc::downgrade(&future.state));
                                                        Err(LightningError {
                                                                err: "Channel being checked async".to_owned(),
                                                                action: ErrorAction::IgnoreAndLog(Level::Gossip),