From 0da7bbd5ecc578ee13d75a31c4627070c69c875f Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Sun, 22 Jan 2023 04:14:58 +0000 Subject: [PATCH] Forward gossip messages which were verified asynchronously Gossip messages which were verified against the chain asynchronously should still be forwarded to peers, but must now go out via a new `P2PGossipSync` parameter in the `AccessResolver::resolve` method, allowing us to wire them up to the `P2PGossipSync`'s `MessageSendEventsProvider` implementation. --- fuzz/src/router.rs | 4 +-- lightning/src/routing/gossip.rs | 30 +++++++++++++++++ lightning/src/routing/utxo.rs | 57 +++++++++++++++++++++++++++++---- 3 files changed, 83 insertions(+), 8 deletions(-) diff --git a/fuzz/src/router.rs b/fuzz/src/router.rs index a30c7d28a..a7c50de4a 100644 --- a/fuzz/src/router.rs +++ b/fuzz/src/router.rs @@ -99,12 +99,12 @@ impl UtxoLookup for FuzzChainSource<'_, '_, Out> { &[1, _] => UtxoResult::Sync(Err(UtxoLookupError::UnknownTx)), &[2, _] => { let future = UtxoFuture::new(); - future.resolve(self.net_graph, Ok(txo_res)); + future.resolve_without_forwarding(self.net_graph, Ok(txo_res)); UtxoResult::Async(future.clone()) }, &[3, _] => { let future = UtxoFuture::new(); - future.resolve(self.net_graph, Err(UtxoLookupError::UnknownTx)); + future.resolve_without_forwarding(self.net_graph, Err(UtxoLookupError::UnknownTx)); UtxoResult::Async(future.clone()) }, &[4, _] => { diff --git a/lightning/src/routing/gossip.rs b/lightning/src/routing/gossip.rs index 8a54e63e2..2d3cda3f4 100644 --- a/lightning/src/routing/gossip.rs +++ b/lightning/src/routing/gossip.rs @@ -272,6 +272,36 @@ where U::Target: UtxoLookup, L::Target: Logger false } } + + /// Used to broadcast forward gossip messages which were validated async. + /// + /// Note that this will ignore events other than `Broadcast*` or messages with too much excess + /// data. + pub(super) fn forward_gossip_msg(&self, mut ev: MessageSendEvent) { + match &mut ev { + MessageSendEvent::BroadcastChannelAnnouncement { msg, ref mut update_msg } => { + if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY { return; } + if update_msg.as_ref() + .map(|msg| msg.contents.excess_data.len()).unwrap_or(0) > MAX_EXCESS_BYTES_FOR_RELAY + { + *update_msg = None; + } + }, + MessageSendEvent::BroadcastChannelUpdate { msg } => { + if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY { return; } + }, + MessageSendEvent::BroadcastNodeAnnouncement { msg } => { + if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY || + msg.contents.excess_address_data.len() > MAX_EXCESS_BYTES_FOR_RELAY || + msg.contents.excess_data.len() + msg.contents.excess_address_data.len() > MAX_EXCESS_BYTES_FOR_RELAY + { + return; + } + }, + _ => return, + } + self.pending_events.lock().unwrap().push(ev); + } } impl NetworkGraph where L::Target: Logger { diff --git a/lightning/src/routing/utxo.rs b/lightning/src/routing/utxo.rs index 56b671845..eb8e2a7c6 100644 --- a/lightning/src/routing/utxo.rs +++ b/lightning/src/routing/utxo.rs @@ -18,7 +18,8 @@ use bitcoin::hashes::hex::ToHex; use crate::ln::chan_utils::make_funding_redeemscript_from_slices; use crate::ln::msgs::{self, LightningError, ErrorAction}; -use crate::routing::gossip::{NetworkGraph, NodeId}; +use crate::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync}; +use crate::util::events::MessageSendEvent; use crate::util::logger::{Level, Logger}; use crate::util::ser::Writeable; @@ -144,8 +145,32 @@ impl UtxoFuture { } /// Resolves this future against the given `graph` and with the given `result`. - pub fn resolve(&self, graph: &NetworkGraph, result: Result) + /// + /// This is identical to calling [`UtxoFuture::resolve`] with a dummy `gossip`, disabling + /// forwarding the validated gossip message onwards to peers. + pub fn resolve_without_forwarding(&self, + graph: &NetworkGraph, result: Result) where L::Target: Logger { + self.do_resolve(graph, result); + } + + /// Resolves this future against the given `graph` and with the given `result`. + /// + /// The given `gossip` is used to broadcast any validated messages onwards to all peers which + /// have available buffer space. + pub fn resolve>, U: Deref, GS: Deref>>(&self, + graph: &NetworkGraph, gossip: GS, result: Result + ) where L::Target: Logger, U::Target: UtxoLookup { + let mut res = self.do_resolve(graph, result); + for msg_opt in res.iter_mut() { + if let Some(msg) = msg_opt.take() { + gossip.forward_gossip_msg(msg); + } + } + } + + fn do_resolve(&self, graph: &NetworkGraph, result: Result) + -> [Option; 5] where L::Target: Logger { let (announcement, node_a, node_b, update_a, update_b) = { let mut pending_checks = graph.pending_checks.internal.lock().unwrap(); let mut async_messages = self.state.lock().unwrap(); @@ -155,7 +180,7 @@ impl UtxoFuture { // `channel_announce` yet. That's okay, we can set the `complete` field which it will // check once it gets control again. async_messages.complete = Some(result); - return; + return [None, None, None, None, None]; } let announcement_msg = match async_messages.channel_announce.as_ref().unwrap() { @@ -172,6 +197,9 @@ impl UtxoFuture { async_messages.latest_channel_update_b.take()) }; + let mut res = [None, None, None, None, None]; + let mut res_idx = 0; + // Now that we've updated our internal state, pass the pending messages back through the // network graph with a different `UtxoLookup` which will resolve immediately. // Note that we ignore errors as we don't disconnect peers anyway, so there's nothing to do @@ -179,7 +207,12 @@ impl UtxoFuture { let resolver = UtxoResolver(result); match announcement { ChannelAnnouncement::Full(signed_msg) => { - let _ = graph.update_channel_from_announcement(&signed_msg, &Some(&resolver)); + if graph.update_channel_from_announcement(&signed_msg, &Some(&resolver)).is_ok() { + res[res_idx] = Some(MessageSendEvent::BroadcastChannelAnnouncement { + msg: signed_msg, update_msg: None, + }); + res_idx += 1; + } }, ChannelAnnouncement::Unsigned(msg) => { let _ = graph.update_channel_from_unsigned_announcement(&msg, &Some(&resolver)); @@ -189,7 +222,12 @@ impl UtxoFuture { for announce in core::iter::once(node_a).chain(core::iter::once(node_b)) { match announce { Some(NodeAnnouncement::Full(signed_msg)) => { - let _ = graph.update_node_from_announcement(&signed_msg); + if graph.update_node_from_announcement(&signed_msg).is_ok() { + res[res_idx] = Some(MessageSendEvent::BroadcastNodeAnnouncement { + msg: signed_msg, + }); + res_idx += 1; + } }, Some(NodeAnnouncement::Unsigned(msg)) => { let _ = graph.update_node_from_unsigned_announcement(&msg); @@ -201,7 +239,12 @@ impl UtxoFuture { for update in core::iter::once(update_a).chain(core::iter::once(update_b)) { match update { Some(ChannelUpdate::Full(signed_msg)) => { - let _ = graph.update_channel(&signed_msg); + if graph.update_channel(&signed_msg).is_ok() { + res[res_idx] = Some(MessageSendEvent::BroadcastChannelUpdate { + msg: signed_msg, + }); + res_idx += 1; + } }, Some(ChannelUpdate::Unsigned(msg)) => { let _ = graph.update_channel_unsigned(&msg); @@ -209,6 +252,8 @@ impl UtxoFuture { None => {}, } } + + res } } -- 2.39.5