/// resync them from gossip. Each `NodeId` is mapped to the time (in seconds) it was removed so
/// that once some time passes, we can potentially resync it from gossip again.
removed_nodes: Mutex<HashMap<NodeId, Option<u64>>>,
+ /// Announcement messages which are awaiting an on-chain lookup to be processed.
+ pub(super) pending_checks: utxo::PendingChecks,
}
/// A read-only view of [`NetworkGraph`].
false
}
}
+
+ /// Used to broadcast forward gossip messages which were validated async.
+ ///
+ /// Note that this will ignore events other than `Broadcast*` or messages with too much excess
+ /// data.
+ pub(super) fn forward_gossip_msg(&self, mut ev: MessageSendEvent) {
+ match &mut ev {
+ MessageSendEvent::BroadcastChannelAnnouncement { msg, ref mut update_msg } => {
+ if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY { return; }
+ if update_msg.as_ref()
+ .map(|msg| msg.contents.excess_data.len()).unwrap_or(0) > MAX_EXCESS_BYTES_FOR_RELAY
+ {
+ *update_msg = None;
+ }
+ },
+ MessageSendEvent::BroadcastChannelUpdate { msg } => {
+ if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY { return; }
+ },
+ MessageSendEvent::BroadcastNodeAnnouncement { msg } => {
+ if msg.contents.excess_data.len() > MAX_EXCESS_BYTES_FOR_RELAY ||
+ msg.contents.excess_address_data.len() > MAX_EXCESS_BYTES_FOR_RELAY ||
+ msg.contents.excess_data.len() + msg.contents.excess_address_data.len() > MAX_EXCESS_BYTES_FOR_RELAY
+ {
+ return;
+ }
+ },
+ _ => return,
+ }
+ self.pending_events.lock().unwrap().push(ev);
+ }
}
impl<L: Deref> NetworkGraph<L> where L::Target: Logger {
features.set_gossip_queries_optional();
features
}
+
+ fn processing_queue_high(&self) -> bool {
+ self.network_graph.pending_checks.too_many_checks_pending()
+ }
}
impl<G: Deref<Target=NetworkGraph<L>>, U: Deref, L: Deref> MessageSendEventsProvider for P2PGossipSync<G, U, L>
last_rapid_gossip_sync_timestamp: Mutex::new(last_rapid_gossip_sync_timestamp),
removed_nodes: Mutex::new(HashMap::new()),
removed_channels: Mutex::new(HashMap::new()),
+ pending_checks: utxo::PendingChecks::new(),
})
}
}
last_rapid_gossip_sync_timestamp: Mutex::new(None),
removed_channels: Mutex::new(HashMap::new()),
removed_nodes: Mutex::new(HashMap::new()),
+ pending_checks: utxo::PendingChecks::new(),
}
}
}
fn update_node_from_announcement_intern(&self, msg: &msgs::UnsignedNodeAnnouncement, full_msg: Option<&msgs::NodeAnnouncement>) -> Result<(), LightningError> {
- match self.nodes.write().unwrap().get_mut(&msg.node_id) {
- None => Err(LightningError{err: "No existing channels for node_announcement".to_owned(), action: ErrorAction::IgnoreError}),
+ let mut nodes = self.nodes.write().unwrap();
+ match nodes.get_mut(&msg.node_id) {
+ None => {
+ core::mem::drop(nodes);
+ self.pending_checks.check_hold_pending_node_announcement(msg, full_msg)?;
+ Err(LightningError{err: "No existing channels for node_announcement".to_owned(), action: ErrorAction::IgnoreError})
+ },
Some(node) => {
if let Some(node_info) = node.announcement_info.as_ref() {
// The timestamp field is somewhat of a misnomer - the BOLTs use it to order
}
}
- let utxo_value = utxo::check_channel_announcement(utxo_lookup, msg)?;
+ let utxo_value = self.pending_checks.check_channel_announcement(
+ utxo_lookup, msg, full_msg)?;
#[allow(unused_mut, unused_assignments)]
let mut announcement_received_time = 0;
let mut channels = self.channels.write().unwrap();
match channels.get_mut(&msg.short_channel_id) {
- None => return Err(LightningError{err: "Couldn't find channel for update".to_owned(), action: ErrorAction::IgnoreError}),
+ None => {
+ core::mem::drop(channels);
+ self.pending_checks.check_hold_pending_channel_update(msg, full_msg)?;
+ return Err(LightningError{err: "Couldn't find channel for update".to_owned(), action: ErrorAction::IgnoreError});
+ },
Some(channel) => {
if msg.htlc_maximum_msat > MAX_VALUE_MSAT {
return Err(LightningError{err: