From 73e7727588cb0cd4b6cbf5489b8092fa0560ca75 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 7 Dec 2023 04:12:04 +0000 Subject: [PATCH] Track a `counter` for each node in our network graph These counters are simply a unique number describing each node. They have no specific meaning, but start at 0 and count up, with counters being reused after a node has been deleted. --- lightning/src/routing/gossip.rs | 74 +++++++++++++++++++++++++++++---- 1 file changed, 65 insertions(+), 9 deletions(-) diff --git a/lightning/src/routing/gossip.rs b/lightning/src/routing/gossip.rs index 11deb058..49ffbee5 100644 --- a/lightning/src/routing/gossip.rs +++ b/lightning/src/routing/gossip.rs @@ -40,7 +40,6 @@ use crate::prelude::*; use core::{cmp, fmt}; use core::convert::TryFrom; use crate::sync::{RwLock, RwLockReadGuard, LockTestExt}; -#[cfg(feature = "std")] use core::sync::atomic::{AtomicUsize, Ordering}; use crate::sync::Mutex; use core::ops::{Bound, Deref}; @@ -174,6 +173,8 @@ pub struct NetworkGraph where L::Target: Logger { // Lock order: channels -> nodes channels: RwLock>, nodes: RwLock>, + removed_node_counters: Mutex>, + next_node_counter: AtomicUsize, // Lock order: removed_channels -> removed_nodes // // NOTE: In the following `removed_*` maps, we use seconds since UNIX epoch to track time instead @@ -1220,7 +1221,7 @@ impl Readable for NodeAlias { } } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, Eq)] /// Details about a node in the network, known from the network announcement. pub struct NodeInfo { /// All valid channels a node has announced @@ -1228,7 +1229,19 @@ pub struct NodeInfo { /// More information about a node from node_announcement. /// Optional because we store a Node entry after learning about it from /// a channel announcement, but before receiving a node announcement. - pub announcement_info: Option + pub announcement_info: Option, + /// In memory, each node is assigned a unique ID. They are eagerly reused, ensuring they remain + /// relatively dense. + /// + /// These IDs allow the router to avoid a `HashMap` lookup by simply using this value as an + /// index in a `Vec`, skipping a big step in some of the hottest code when routing. + pub(crate) node_counter: u32, +} + +impl PartialEq for NodeInfo { + fn eq(&self, o: &NodeInfo) -> bool { + self.channels == o.channels && self.announcement_info == o.announcement_info + } } impl fmt::Display for NodeInfo { @@ -1286,6 +1299,7 @@ impl Readable for NodeInfo { Ok(NodeInfo { announcement_info: announcement_info_wrap.map(|w| w.0), channels, + node_counter: u32::max_value(), }) } } @@ -1295,6 +1309,8 @@ const MIN_SERIALIZATION_VERSION: u8 = 1; impl Writeable for NetworkGraph where L::Target: Logger { fn write(&self, writer: &mut W) -> Result<(), io::Error> { + self.test_node_counter_consistency(); + write_ver_prefix!(writer, SERIALIZATION_VERSION, MIN_SERIALIZATION_VERSION); self.chain_hash.write(writer)?; @@ -1333,11 +1349,13 @@ impl ReadableArgs for NetworkGraph where L::Target: Logger { channels.insert(chan_id, chan_info); } let nodes_count: u64 = Readable::read(reader)?; + if nodes_count > u32::max_value() as u64 / 2 { return Err(DecodeError::InvalidValue); } // In Nov, 2023 there were about 69K channels; we cap allocations to 1.5x that. let mut nodes = IndexedMap::with_capacity(cmp::min(nodes_count as usize, 103500)); - for _ in 0..nodes_count { + for i in 0..nodes_count { let node_id = Readable::read(reader)?; - let node_info = Readable::read(reader)?; + let mut node_info: NodeInfo = Readable::read(reader)?; + node_info.node_counter = i as u32; nodes.insert(node_id, node_info); } @@ -1352,6 +1370,8 @@ impl ReadableArgs for NetworkGraph where L::Target: Logger { logger, channels: RwLock::new(channels), nodes: RwLock::new(nodes), + removed_node_counters: Mutex::new(Vec::new()), + next_node_counter: AtomicUsize::new(nodes_count as usize), last_rapid_gossip_sync_timestamp: Mutex::new(last_rapid_gossip_sync_timestamp), removed_nodes: Mutex::new(HashMap::new()), removed_channels: Mutex::new(HashMap::new()), @@ -1397,6 +1417,8 @@ impl NetworkGraph where L::Target: Logger { logger, channels: RwLock::new(IndexedMap::new()), nodes: RwLock::new(IndexedMap::new()), + next_node_counter: AtomicUsize::new(0), + removed_node_counters: Mutex::new(Vec::new()), last_rapid_gossip_sync_timestamp: Mutex::new(None), removed_channels: Mutex::new(HashMap::new()), removed_nodes: Mutex::new(HashMap::new()), @@ -1404,8 +1426,33 @@ impl NetworkGraph where L::Target: Logger { } } + fn test_node_counter_consistency(&self) { + #[cfg(debug_assertions)] { + let nodes = self.nodes.read().unwrap(); + let removed_node_counters = self.removed_node_counters.lock().unwrap(); + let next_counter = self.next_node_counter.load(Ordering::Acquire); + assert!(next_counter < (u32::max_value() as usize) / 2); + let mut used_node_counters = vec![0u8; next_counter / 8 + 1]; + + for counter in removed_node_counters.iter() { + let pos = (*counter as usize) / 8; + let bit = 1 << (counter % 8); + assert_eq!(used_node_counters[pos] & bit, 0); + used_node_counters[pos] |= bit; + } + for (_, node) in nodes.unordered_iter() { + assert!((node.node_counter as usize) < next_counter); + let pos = (node.node_counter as usize) / 8; + let bit = 1 << (node.node_counter % 8); + assert_eq!(used_node_counters[pos] & bit, 0); + used_node_counters[pos] |= bit; + } + } + } + /// Returns a read-only view of the network graph. pub fn read_only(&'_ self) -> ReadOnlyNetworkGraph<'_> { + self.test_node_counter_consistency(); let channels = self.channels.read().unwrap(); let nodes = self.nodes.read().unwrap(); ReadOnlyNetworkGraph { @@ -1587,7 +1634,7 @@ impl NetworkGraph where L::Target: Logger { // b) we don't track UTXOs of channels we know about and remove them if they // get reorg'd out. // c) it's unclear how to do so without exposing ourselves to massive DoS risk. - Self::remove_channel_in_nodes(&mut nodes, &entry.get(), short_channel_id); + self.remove_channel_in_nodes(&mut nodes, &entry.get(), short_channel_id); *entry.get_mut() = channel_info; } else { return Err(LightningError{err: "Already have knowledge of channel".to_owned(), action: ErrorAction::IgnoreDuplicateGossip}); @@ -1604,9 +1651,13 @@ impl NetworkGraph where L::Target: Logger { node_entry.into_mut().channels.push(short_channel_id); }, IndexedMapEntry::Vacant(node_entry) => { + let mut removed_node_counters = self.removed_node_counters.lock().unwrap(); + let node_counter = removed_node_counters.pop() + .unwrap_or(self.next_node_counter.fetch_add(1, Ordering::Relaxed) as u32); node_entry.insert(NodeInfo { channels: vec!(short_channel_id), announcement_info: None, + node_counter, }); } }; @@ -1725,7 +1776,7 @@ impl NetworkGraph where L::Target: Logger { if let Some(chan) = channels.remove(&short_channel_id) { let mut nodes = self.nodes.write().unwrap(); self.removed_channels.lock().unwrap().insert(short_channel_id, current_time_unix); - Self::remove_channel_in_nodes(&mut nodes, &chan, short_channel_id); + self.remove_channel_in_nodes(&mut nodes, &chan, short_channel_id); } } @@ -1744,6 +1795,7 @@ impl NetworkGraph where L::Target: Logger { let mut removed_nodes = self.removed_nodes.lock().unwrap(); if let Some(node) = nodes.remove(&node_id) { + let mut removed_node_counters = self.removed_node_counters.lock().unwrap(); for scid in node.channels.iter() { if let Some(chan_info) = channels.remove(scid) { let other_node_id = if node_id == chan_info.node_one { chan_info.node_two } else { chan_info.node_one }; @@ -1752,12 +1804,14 @@ impl NetworkGraph where L::Target: Logger { *scid != *chan_id }); if other_node_entry.get().channels.is_empty() { + removed_node_counters.push(other_node_entry.get().node_counter); other_node_entry.remove_entry(); } } removed_channels.insert(*scid, current_time_unix); } } + removed_node_counters.push(node.node_counter); removed_nodes.insert(node_id, current_time_unix); } } @@ -1833,7 +1887,7 @@ impl NetworkGraph where L::Target: Logger { let mut nodes = self.nodes.write().unwrap(); for scid in scids_to_remove { let info = channels.remove(&scid).expect("We just accessed this scid, it should be present"); - Self::remove_channel_in_nodes(&mut nodes, &info, scid); + self.remove_channel_in_nodes(&mut nodes, &info, scid); self.removed_channels.lock().unwrap().insert(scid, Some(current_time_unix)); } } @@ -2012,7 +2066,7 @@ impl NetworkGraph where L::Target: Logger { Ok(()) } - fn remove_channel_in_nodes(nodes: &mut IndexedMap, chan: &ChannelInfo, short_channel_id: u64) { + fn remove_channel_in_nodes(&self, nodes: &mut IndexedMap, chan: &ChannelInfo, short_channel_id: u64) { macro_rules! remove_from_node { ($node_id: expr) => { if let IndexedMapEntry::Occupied(mut entry) = nodes.entry($node_id) { @@ -2020,6 +2074,7 @@ impl NetworkGraph where L::Target: Logger { short_channel_id != *chan_id }); if entry.get().channels.is_empty() { + self.removed_node_counters.lock().unwrap().push(entry.get().node_counter); entry.remove_entry(); } } else { @@ -3445,6 +3500,7 @@ pub(crate) mod tests { let valid_node_info = NodeInfo { channels: Vec::new(), announcement_info: Some(valid_node_ann_info), + node_counter: 0, }; let mut encoded_valid_node_info = Vec::new(); -- 2.30.2