From: Arik Sosman Date: Tue, 2 Jul 2024 09:58:58 +0000 (-0700) Subject: Introduce node serialization strategy. X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=c9aaf09d04116f631029b6e43252407103d44b70;p=rapid-gossip-sync-server Introduce node serialization strategy. With the addition of reminders, we may encounter scenarios where either a bit flip may suffice, instructing the client to look up its latest data, or we may need to serialize all announcement details a new if the client may have already purged the old data. To better distinguish between these scenarios, we introduce a serialization strategy enum that allows serializing either the full announcement, just the mutations, or serve solely as a reminder and serialize nothing at all. --- diff --git a/src/lib.rs b/src/lib.rs index 21e21d6..00a5ed5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,7 +26,7 @@ use crate::config::SYMLINK_GRANULARITY_INTERVAL; use crate::lookup::DeltaSet; use crate::persistence::GossipPersister; -use crate::serialization::{SerializationSet, UpdateSerialization}; +use crate::serialization::{MutatedNodeProperties, NodeSerializationStrategy, SerializationSet, UpdateSerialization}; use crate::snapshot::Snapshotter; use crate::types::RGSSLogger; @@ -191,7 +191,7 @@ async fn calculate_delta(network_graph: Arc>, log_info!(logger, "announcement channel count: {}", delta_set.len()); lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp, logger.clone()).await; log_info!(logger, "update-fetched channel count: {}", delta_set.len()); - let node_delta_set = lookup::fetch_node_updates(network_graph, &client, last_sync_timestamp, logger.clone()).await; + let node_delta_set = lookup::fetch_node_updates(network_graph, &client, last_sync_timestamp, snapshot_reference_timestamp, logger.clone()).await; log_info!(logger, "update-fetched node count: {}", node_delta_set.len()); lookup::filter_delta_set(&mut delta_set, logger.clone()); log_info!(logger, "update-filtered channel count: {}", delta_set.len()); @@ -306,6 +306,9 @@ fn serialize_delta(serialization_details: &SerializationSet, s if serialization_version >= 2 { if let Some(node_delta) = serialization_details.node_mutations.get(¤t_node_id) { + let strategy = node_delta.strategy.as_ref().unwrap(); + let mut node_has_update = false; + /* Bitmap: 7: expect extra data after the pubkey (a u16 for the count, and then that number of bytes) @@ -317,51 +320,60 @@ fn serialize_delta(serialization_details: &SerializationSet, s 0: used for odd keys */ - if node_delta.has_address_set_changed { - node_address_update_count += 1; - - let address_set = &node_delta.latest_details.as_ref().unwrap().addresses; - let mut address_serialization = Vec::new(); - - // we don't know a priori how many are <= 255 bytes - let mut total_address_count = 0u8; - - for address in address_set.iter() { - if total_address_count == u8::MAX { - // don't serialize more than 255 addresses - break; + match strategy { + NodeSerializationStrategy::Mutated(MutatedNodeProperties { addresses: true, .. }) | NodeSerializationStrategy::Full => { + let address_set = &node_delta.latest_details.as_ref().unwrap().addresses; + let mut address_serialization = Vec::new(); + + // we don't know a priori how many are <= 255 bytes + let mut total_address_count = 0u8; + + for address in address_set.iter() { + if total_address_count == u8::MAX { + // don't serialize more than 255 addresses + break; + } + if let Ok(serialized_length) = u8::try_from(address.serialized_length()) { + total_address_count += 1; + serialized_length.write(&mut address_serialization).unwrap(); + address.write(&mut address_serialization).unwrap(); + }; } - if let Ok(serialized_length) = u8::try_from(address.serialized_length()) { - total_address_count += 1; - serialized_length.write(&mut address_serialization).unwrap(); - address.write(&mut address_serialization).unwrap(); - }; - } - - // signal the presence of node addresses - current_node_delta_serialization[0] |= 1 << 2; - // serialize the actual addresses and count - total_address_count.write(&mut current_node_delta_serialization).unwrap(); - current_node_delta_serialization.append(&mut address_serialization); - } - if node_delta.has_feature_set_changed { - node_feature_update_count += 1; + node_address_update_count += 1; + node_has_update = true; - let latest_features = &node_delta.latest_details.as_ref().unwrap().features; + // signal the presence of node addresses + current_node_delta_serialization[0] |= 1 << 2; + // serialize the actual addresses and count + total_address_count.write(&mut current_node_delta_serialization).unwrap(); + current_node_delta_serialization.append(&mut address_serialization); + }, + _ => {} + } - // are these features among the most common ones? - if let Some(index) = serialization_details.node_announcement_feature_defaults.iter().position(|f| f == latest_features) { - // this feature set is among the 6 defaults - current_node_delta_serialization[0] |= ((index + 1) as u8) << 3; - } else { - current_node_delta_serialization[0] |= 0b_0011_1000; // 7 << 3 - latest_features.write(&mut current_node_delta_serialization).unwrap(); - } + match strategy { + NodeSerializationStrategy::Mutated(MutatedNodeProperties { features: true, .. }) | NodeSerializationStrategy::Full => { + let latest_features = &node_delta.latest_details.as_ref().unwrap().features; + node_feature_update_count += 1; + node_has_update = true; + + // are these features among the most common ones? + if let Some(index) = serialization_details.node_announcement_feature_defaults.iter().position(|f| f == latest_features) { + // this feature set is among the 6 defaults + current_node_delta_serialization[0] |= ((index + 1) as u8) << 3; + } else { + current_node_delta_serialization[0] |= 0b_0011_1000; // 7 << 3 + latest_features.write(&mut current_node_delta_serialization).unwrap(); + } + }, + _ => {} } - if node_delta.has_address_set_changed || node_delta.has_feature_set_changed { + if node_has_update { node_update_count += 1; + } else if let NodeSerializationStrategy::Reminder = strategy { + current_node_delta_serialization[0] |= 1 << 6; } } } diff --git a/src/lookup.rs b/src/lookup.rs index 026bdeb..87a35ae 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -17,7 +17,7 @@ use lightning::ln::features::NodeFeatures; use lightning::util::logger::Logger; use crate::config; -use crate::serialization::MutatedProperties; +use crate::serialization::{MutatedNodeProperties, MutatedProperties, NodeSerializationStrategy}; /// The delta set needs to be a BTreeMap so the keys are sorted. /// That way, the scids in the response automatically grow monotonically @@ -58,23 +58,15 @@ pub(super) struct NodeDelta { /// The most recently received, but new-to-the-client, node details pub(super) latest_details: Option, - /// Between last_details_before_seen and latest_details_after_seen, including any potential - /// intermediate updates that are not kept track of here, has the set of features this node - /// supports changed? - pub(super) has_feature_set_changed: bool, - - /// Between last_details_before_seen and latest_details_after_seen, including any potential - /// intermediate updates that are not kept track of here, has the set of socket addresses this - /// node listens on changed? - pub(super) has_address_set_changed: bool, + /// How should this delta be serialized? + pub(super) strategy: Option, /// The most recent node details that the client would have seen already pub(super) last_details_before_seen: Option } pub(super) struct NodeDetails { - #[allow(unused)] - pub(super) seen: u32, + pub(super) seen: Option, pub(super) features: NodeFeatures, pub(super) addresses: HashSet } @@ -94,9 +86,8 @@ impl Default for NodeDelta { fn default() -> Self { Self { latest_details: None, - has_feature_set_changed: false, - has_address_set_changed: false, last_details_before_seen: None, + strategy: None, } } } @@ -478,7 +469,7 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, cl log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed()); } -pub(super) async fn fetch_node_updates(network_graph: Arc>, client: &Client, last_sync_timestamp: u32, logger: L) -> NodeDeltaSet where L::Target: Logger { +pub(super) async fn fetch_node_updates(network_graph: Arc>, client: &Client, last_sync_timestamp: u32, snapshot_reference_timestamp: Option, logger: L) -> NodeDeltaSet where L::Target: Logger { let start = Instant::now(); let last_sync_timestamp_float = last_sync_timestamp as f64; @@ -487,7 +478,7 @@ pub(super) async fn fetch_node_updates(network_graph: Arc(network_graph: Arc(network_graph: Arc = unsigned_node_announcement.addresses.into_iter().collect(); NodeDetails { - seen, + seen: Some(seen), features: unsigned_node_announcement.features, addresses: address_set, } @@ -550,10 +540,29 @@ pub(super) async fn fetch_node_updates(network_graph: Arc(network_graph: Arc = None; let mut intermediate_update_count = 0; + let mut has_address_set_changed = false; + let mut has_feature_set_changed = false; + let mut latest_mutation_timestamp = None; while let Some(row_res) = pinned_updates.next().await { let intermediate_update = row_res.unwrap(); intermediate_update_count += 1; @@ -578,37 +590,56 @@ pub(super) async fn fetch_node_updates(network_graph: Arc = unsigned_node_announcement.addresses.into_iter().collect(); - // determine mutations + if previous_node_id != Some(node_id) { + // we're traversing a new node id, initialize the values + has_address_set_changed = false; + has_feature_set_changed = false; + latest_mutation_timestamp = None; + + // this is the highest timestamp value, so set the seen timestamp accordingly + current_node_delta.latest_details.as_mut().map(|mut d| d.seen.replace(current_seen_timestamp)); + } + if let Some(last_seen_update) = current_node_delta.last_details_before_seen.as_ref() { - if unsigned_node_announcement.features != last_seen_update.features { - current_node_delta.has_feature_set_changed = true; - } - if address_set != last_seen_update.addresses { - current_node_delta.has_address_set_changed = true; - } - } else if !is_previously_processed_node_id { - if current_node_delta.last_details_before_seen.is_none() { - if !address_set.is_empty() { - current_node_delta.has_address_set_changed = true; + { // determine the latest mutation timestamp + if address_set != last_seen_update.addresses { + has_address_set_changed = true; + if latest_mutation_timestamp.is_none() { + latest_mutation_timestamp = Some(current_seen_timestamp); + } } - if unsigned_node_announcement.features != NodeFeatures::empty() { - current_node_delta.has_feature_set_changed = true; + if unsigned_node_announcement.features != last_seen_update.features { + has_feature_set_changed = true; + if latest_mutation_timestamp.is_none() { + latest_mutation_timestamp = Some(current_seen_timestamp); + } } } - } - if !is_previously_processed_node_id { - (*current_node_delta).latest_details.get_or_insert(NodeDetails { - seen: current_seen_timestamp, - features: unsigned_node_announcement.features, - addresses: address_set, - }); + if current_seen_timestamp >= last_sync_timestamp { + if has_address_set_changed || has_feature_set_changed { + // if the last mutation occurred since the last sync, send the mutation variant + current_node_delta.strategy = Some(NodeSerializationStrategy::Mutated(MutatedNodeProperties { + addresses: has_address_set_changed, + features: has_feature_set_changed, + })); + } + } else if include_reminders && latest_mutation_timestamp.unwrap_or(u32::MAX) <= reminder_inclusion_threshold_timestamp { + // only send a reminder if the latest mutation occurred at least 6 days ago + current_node_delta.strategy = Some(NodeSerializationStrategy::Reminder); + } + + // Note that we completely ignore the case when the last mutation occurred less than + // 6 days ago, but prior to the last sync. In that scenario, we send nothing. + + } else { + // absent any update that was seen prior to the last sync, send the full version + current_node_delta.strategy = Some(NodeSerializationStrategy::Full); } previous_node_id = Some(node_id); diff --git a/src/serialization.rs b/src/serialization.rs index 74c92ca..2cbde46 100644 --- a/src/serialization.rs +++ b/src/serialization.rs @@ -99,6 +99,22 @@ impl UpdateSerialization { } } +pub(super) struct MutatedNodeProperties { + pub(super) addresses: bool, + pub(super) features: bool, +} + +pub(super) enum NodeSerializationStrategy { + /// Only serialize the aspects of the node ID that have been mutated. Skip if they haven't been + Mutated(MutatedNodeProperties), + /// Whether or not the addresses or features have been mutated, serialize this node in full. It + /// may have been purged from the client. + Full, + /// This node ID has been seen recently enough to not have been pruned, and this update serves + /// solely the purpose of delaying any pruning, without applying any mutations + Reminder +} + struct FullUpdateValueHistograms { cltv_expiry_delta: HashMap, htlc_minimum_msat: HashMap, @@ -222,12 +238,18 @@ pub(super) fn serialize_delta_set(channel_delta_set: DeltaSet, node_delta_set: N serialization_set.node_mutations = node_delta_set.into_iter().filter(|(_id, delta)| { // either something changed, or this node is new - delta.has_feature_set_changed || delta.has_address_set_changed || delta.last_details_before_seen.is_none() + delta.strategy.is_some() }).collect(); let mut node_feature_histogram: HashMap<&NodeFeatures, usize> = Default::default(); for (_id, delta) in serialization_set.node_mutations.iter() { - if delta.has_feature_set_changed || delta.last_details_before_seen.is_none() { + // consider either full or feature-mutating serializations for histogram + let mut should_add_to_histogram = matches!(delta.strategy, Some(NodeSerializationStrategy::Full)); + if let Some(NodeSerializationStrategy::Mutated(mutation)) = delta.strategy.as_ref() { + should_add_to_histogram = mutation.features; + } + + if should_add_to_histogram { if let Some(latest_details) = delta.latest_details.as_ref() { *node_feature_histogram.entry(&latest_details.features).or_insert(0) += 1; }; diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 2e0e0c1..9a5e22e 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -347,19 +347,41 @@ async fn test_node_announcement_delta_detection() { let timestamp = current_time() - 10; { // seed the db + + { // necessary for the node announcements to be considered relevant + let announcement = generate_channel_announcement(1); + let update_1 = generate_update(1, false, timestamp, 0, 0, 0, 6, 0); + let update_2 = generate_update(1, true, timestamp, 0, 0, 0, 6, 0); + + network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); + network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap(); + network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap(); + + receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp))).await.unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp))).await.unwrap(); + } + let mut announcement = generate_node_announcement(None); - receiver.send(GossipMessage::NodeAnnouncement(announcement.clone(), Some(timestamp - 10))).await.unwrap(); - receiver.send(GossipMessage::NodeAnnouncement(announcement.clone(), Some(timestamp - 8))).await.unwrap(); + announcement.contents.timestamp = timestamp - 10; + network_graph_arc.update_node_from_unsigned_announcement(&announcement.contents).unwrap(); + receiver.send(GossipMessage::NodeAnnouncement(announcement.clone(), Some(announcement.contents.timestamp))).await.unwrap(); + announcement.contents.timestamp = timestamp - 8; + network_graph_arc.update_node_from_unsigned_announcement(&announcement.contents).unwrap(); + receiver.send(GossipMessage::NodeAnnouncement(announcement.clone(), Some(announcement.contents.timestamp))).await.unwrap(); { let mut current_announcement = generate_node_announcement(Some(SecretKey::from_slice(&[2; 32]).unwrap())); current_announcement.contents.features = NodeFeatures::from_be_bytes(vec![23, 48]); + current_announcement.contents.timestamp = timestamp; + network_graph_arc.update_node_from_unsigned_announcement(¤t_announcement.contents).unwrap(); receiver.send(GossipMessage::NodeAnnouncement(current_announcement, Some(timestamp))).await.unwrap(); } { let mut current_announcement = generate_node_announcement(Some(SecretKey::from_slice(&[3; 32]).unwrap())); current_announcement.contents.features = NodeFeatures::from_be_bytes(vec![22, 49]); + current_announcement.contents.timestamp = timestamp; receiver.send(GossipMessage::NodeAnnouncement(current_announcement, Some(timestamp))).await.unwrap(); } @@ -379,23 +401,11 @@ async fn test_node_announcement_delta_detection() { version: 3, port: 4, }); + announcement.contents.timestamp = timestamp; } + network_graph_arc.update_node_from_unsigned_announcement(&announcement.contents).unwrap(); receiver.send(GossipMessage::NodeAnnouncement(announcement, Some(timestamp))).await.unwrap(); - { // necessary for the node announcements to be considered relevant - let announcement = generate_channel_announcement(1); - let update_1 = generate_update(1, false, timestamp, 0, 0, 0, 6, 0); - let update_2 = generate_update(1, true, timestamp, 0, 0, 0, 6, 0); - - network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap(); - network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap(); - network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap(); - - receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap(); - receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp))).await.unwrap(); - receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp))).await.unwrap(); - } - drop(receiver); persister.persist_gossip().await; @@ -409,10 +419,10 @@ async fn test_node_announcement_delta_detection() { clean_test_db().await; assert_eq!(serialization.message_count, 3); - assert_eq!(serialization.node_announcement_count, 3); - assert_eq!(serialization.node_update_count, 3); - assert_eq!(serialization.node_feature_update_count, 3); - assert_eq!(serialization.node_address_update_count, 1); + assert_eq!(serialization.node_announcement_count, 2); + assert_eq!(serialization.node_update_count, 2); + assert_eq!(serialization.node_feature_update_count, 2); + assert_eq!(serialization.node_address_update_count, 2); } /// If a channel has only seen updates in one direction, it should not be announced