From: Matt Corallo Date: Thu, 7 Dec 2023 04:12:04 +0000 (+0000) Subject: Track a `counter` for each node in our network graph X-Git-Tag: v0.0.124-beta~55^2~7 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=6d714b8cbd467b2a0efe39e5973994857149a233;p=rust-lightning Track a `counter` for each node in our network graph These counters are simply a unique number describing each node. They have no specific meaning, but start at 0 and count up, with counters being reused after a node has been deleted. --- diff --git a/lightning/src/routing/gossip.rs b/lightning/src/routing/gossip.rs index c7908d004..02f8a8c32 100644 --- a/lightning/src/routing/gossip.rs +++ b/lightning/src/routing/gossip.rs @@ -42,7 +42,6 @@ use crate::sync::Mutex; use crate::sync::{LockTestExt, RwLock, RwLockReadGuard}; use core::ops::{Bound, Deref}; use core::str::FromStr; -#[cfg(feature = "std")] use core::sync::atomic::{AtomicUsize, Ordering}; use core::{cmp, fmt}; @@ -184,6 +183,8 @@ pub struct NetworkGraph where L::Target: Logger { // Lock order: channels -> nodes channels: RwLock>, nodes: RwLock>, + removed_node_counters: Mutex>, + next_node_counter: AtomicUsize, // Lock order: removed_channels -> removed_nodes // // NOTE: In the following `removed_*` maps, we use seconds since UNIX epoch to track time instead @@ -1368,7 +1369,7 @@ impl Readable for NodeAlias { } } -#[derive(Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, Eq)] /// Details about a node in the network, known from the network announcement. pub struct NodeInfo { /// All valid channels a node has announced @@ -1376,7 +1377,19 @@ pub struct NodeInfo { /// More information about a node from node_announcement. /// Optional because we store a Node entry after learning about it from /// a channel announcement, but before receiving a node announcement. - pub announcement_info: Option + pub announcement_info: Option, + /// In memory, each node is assigned a unique ID. They are eagerly reused, ensuring they remain + /// relatively dense. + /// + /// These IDs allow the router to avoid a `HashMap` lookup by simply using this value as an + /// index in a `Vec`, skipping a big step in some of the hottest code when routing. + pub(crate) node_counter: u32, +} + +impl PartialEq for NodeInfo { + fn eq(&self, o: &NodeInfo) -> bool { + self.channels == o.channels && self.announcement_info == o.announcement_info + } } impl NodeInfo { @@ -1446,6 +1459,7 @@ impl Readable for NodeInfo { Ok(NodeInfo { announcement_info: announcement_info_wrap.map(|w| w.0), channels, + node_counter: u32::max_value(), }) } } @@ -1455,6 +1469,8 @@ const MIN_SERIALIZATION_VERSION: u8 = 1; impl Writeable for NetworkGraph where L::Target: Logger { fn write(&self, writer: &mut W) -> Result<(), io::Error> { + self.test_node_counter_consistency(); + write_ver_prefix!(writer, SERIALIZATION_VERSION, MIN_SERIALIZATION_VERSION); self.chain_hash.write(writer)?; @@ -1493,11 +1509,15 @@ impl ReadableArgs for NetworkGraph where L::Target: Logger { channels.insert(chan_id, chan_info); } let nodes_count: u64 = Readable::read(reader)?; + // There shouldn't be anywhere near `u32::MAX` nodes, and we need some headroom to insert + // new nodes during sync, so reject any graphs claiming more than `u32::MAX / 2` nodes. + if nodes_count > u32::max_value() as u64 / 2 { return Err(DecodeError::InvalidValue); } // In Nov, 2023 there were about 69K channels; we cap allocations to 1.5x that. let mut nodes = IndexedMap::with_capacity(cmp::min(nodes_count as usize, 103500)); - for _ in 0..nodes_count { + for i in 0..nodes_count { let node_id = Readable::read(reader)?; - let node_info = Readable::read(reader)?; + let mut node_info: NodeInfo = Readable::read(reader)?; + node_info.node_counter = i as u32; nodes.insert(node_id, node_info); } @@ -1512,6 +1532,8 @@ impl ReadableArgs for NetworkGraph where L::Target: Logger { logger, channels: RwLock::new(channels), nodes: RwLock::new(nodes), + removed_node_counters: Mutex::new(Vec::new()), + next_node_counter: AtomicUsize::new(nodes_count as usize), last_rapid_gossip_sync_timestamp: Mutex::new(last_rapid_gossip_sync_timestamp), removed_nodes: Mutex::new(new_hash_map()), removed_channels: Mutex::new(new_hash_map()), @@ -1557,6 +1579,8 @@ impl NetworkGraph where L::Target: Logger { logger, channels: RwLock::new(IndexedMap::new()), nodes: RwLock::new(IndexedMap::new()), + next_node_counter: AtomicUsize::new(0), + removed_node_counters: Mutex::new(Vec::new()), last_rapid_gossip_sync_timestamp: Mutex::new(None), removed_channels: Mutex::new(new_hash_map()), removed_nodes: Mutex::new(new_hash_map()), @@ -1564,8 +1588,33 @@ impl NetworkGraph where L::Target: Logger { } } + fn test_node_counter_consistency(&self) { + #[cfg(debug_assertions)] { + let nodes = self.nodes.read().unwrap(); + let removed_node_counters = self.removed_node_counters.lock().unwrap(); + let next_counter = self.next_node_counter.load(Ordering::Acquire); + assert!(next_counter < (u32::max_value() as usize) / 2); + let mut used_node_counters = vec![0u8; next_counter / 8 + 1]; + + for counter in removed_node_counters.iter() { + let pos = (*counter as usize) / 8; + let bit = 1 << (counter % 8); + assert_eq!(used_node_counters[pos] & bit, 0); + used_node_counters[pos] |= bit; + } + for (_, node) in nodes.unordered_iter() { + assert!((node.node_counter as usize) < next_counter); + let pos = (node.node_counter as usize) / 8; + let bit = 1 << (node.node_counter % 8); + assert_eq!(used_node_counters[pos] & bit, 0); + used_node_counters[pos] |= bit; + } + } + } + /// Returns a read-only view of the network graph. pub fn read_only(&'_ self) -> ReadOnlyNetworkGraph<'_> { + self.test_node_counter_consistency(); let channels = self.channels.read().unwrap(); let nodes = self.nodes.read().unwrap(); ReadOnlyNetworkGraph { @@ -1752,7 +1801,7 @@ impl NetworkGraph where L::Target: Logger { // b) we don't track UTXOs of channels we know about and remove them if they // get reorg'd out. // c) it's unclear how to do so without exposing ourselves to massive DoS risk. - Self::remove_channel_in_nodes(&mut nodes, &entry.get(), short_channel_id); + self.remove_channel_in_nodes(&mut nodes, &entry.get(), short_channel_id); *entry.get_mut() = channel_info; } else { return Err(LightningError{err: "Already have knowledge of channel".to_owned(), action: ErrorAction::IgnoreDuplicateGossip}); @@ -1769,9 +1818,13 @@ impl NetworkGraph where L::Target: Logger { node_entry.into_mut().channels.push(short_channel_id); }, IndexedMapEntry::Vacant(node_entry) => { + let mut removed_node_counters = self.removed_node_counters.lock().unwrap(); + let node_counter = removed_node_counters.pop() + .unwrap_or(self.next_node_counter.fetch_add(1, Ordering::Relaxed) as u32); node_entry.insert(NodeInfo { channels: vec!(short_channel_id), announcement_info: None, + node_counter, }); } }; @@ -1890,7 +1943,7 @@ impl NetworkGraph where L::Target: Logger { if let Some(chan) = channels.remove(&short_channel_id) { let mut nodes = self.nodes.write().unwrap(); self.removed_channels.lock().unwrap().insert(short_channel_id, current_time_unix); - Self::remove_channel_in_nodes(&mut nodes, &chan, short_channel_id); + self.remove_channel_in_nodes(&mut nodes, &chan, short_channel_id); } } @@ -1909,6 +1962,7 @@ impl NetworkGraph where L::Target: Logger { let mut removed_nodes = self.removed_nodes.lock().unwrap(); if let Some(node) = nodes.remove(&node_id) { + let mut removed_node_counters = self.removed_node_counters.lock().unwrap(); for scid in node.channels.iter() { if let Some(chan_info) = channels.remove(scid) { let other_node_id = if node_id == chan_info.node_one { chan_info.node_two } else { chan_info.node_one }; @@ -1917,12 +1971,14 @@ impl NetworkGraph where L::Target: Logger { *scid != *chan_id }); if other_node_entry.get().channels.is_empty() { + removed_node_counters.push(other_node_entry.get().node_counter); other_node_entry.remove_entry(); } } removed_channels.insert(*scid, current_time_unix); } } + removed_node_counters.push(node.node_counter); removed_nodes.insert(node_id, current_time_unix); } } @@ -1998,7 +2054,7 @@ impl NetworkGraph where L::Target: Logger { let mut nodes = self.nodes.write().unwrap(); for scid in scids_to_remove { let info = channels.remove(&scid).expect("We just accessed this scid, it should be present"); - Self::remove_channel_in_nodes(&mut nodes, &info, scid); + self.remove_channel_in_nodes(&mut nodes, &info, scid); self.removed_channels.lock().unwrap().insert(scid, Some(current_time_unix)); } } @@ -2180,7 +2236,7 @@ impl NetworkGraph where L::Target: Logger { Ok(()) } - fn remove_channel_in_nodes(nodes: &mut IndexedMap, chan: &ChannelInfo, short_channel_id: u64) { + fn remove_channel_in_nodes(&self, nodes: &mut IndexedMap, chan: &ChannelInfo, short_channel_id: u64) { macro_rules! remove_from_node { ($node_id: expr) => { if let IndexedMapEntry::Occupied(mut entry) = nodes.entry($node_id) { @@ -2188,6 +2244,7 @@ impl NetworkGraph where L::Target: Logger { short_channel_id != *chan_id }); if entry.get().channels.is_empty() { + self.removed_node_counters.lock().unwrap().push(entry.get().node_counter); entry.remove_entry(); } } else { @@ -3604,6 +3661,7 @@ pub(crate) mod tests { let valid_node_info = NodeInfo { channels: Vec::new(), announcement_info: Some(valid_node_ann_info), + node_counter: 0, }; let mut encoded_valid_node_info = Vec::new();