use crate::lookup::DeltaSet;
use crate::persistence::GossipPersister;
-use crate::serialization::{SerializationSet, UpdateSerialization};
+use crate::serialization::{MutatedNodeProperties, NodeSerializationStrategy, SerializationSet, UpdateSerialization};
use crate::snapshot::Snapshotter;
use crate::types::RGSSLogger;
log_info!(logger, "announcement channel count: {}", delta_set.len());
lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp, logger.clone()).await;
log_info!(logger, "update-fetched channel count: {}", delta_set.len());
- let node_delta_set = lookup::fetch_node_updates(network_graph, &client, last_sync_timestamp, logger.clone()).await;
+ let node_delta_set = lookup::fetch_node_updates(network_graph, &client, last_sync_timestamp, snapshot_reference_timestamp, logger.clone()).await;
log_info!(logger, "update-fetched node count: {}", node_delta_set.len());
lookup::filter_delta_set(&mut delta_set, logger.clone());
log_info!(logger, "update-filtered channel count: {}", delta_set.len());
if serialization_version >= 2 {
if let Some(node_delta) = serialization_details.node_mutations.get(¤t_node_id) {
+ let strategy = node_delta.strategy.as_ref().unwrap();
+ let mut node_has_update = false;
+
/*
Bitmap:
7: expect extra data after the pubkey (a u16 for the count, and then that number of bytes)
0: used for odd keys
*/
- if node_delta.has_address_set_changed {
- node_address_update_count += 1;
-
- let address_set = &node_delta.latest_details.as_ref().unwrap().addresses;
- let mut address_serialization = Vec::new();
-
- // we don't know a priori how many are <= 255 bytes
- let mut total_address_count = 0u8;
-
- for address in address_set.iter() {
- if total_address_count == u8::MAX {
- // don't serialize more than 255 addresses
- break;
+ match strategy {
+ NodeSerializationStrategy::Mutated(MutatedNodeProperties { addresses: true, .. }) | NodeSerializationStrategy::Full => {
+ let address_set = &node_delta.latest_details.as_ref().unwrap().addresses;
+ let mut address_serialization = Vec::new();
+
+ // we don't know a priori how many are <= 255 bytes
+ let mut total_address_count = 0u8;
+
+ for address in address_set.iter() {
+ if total_address_count == u8::MAX {
+ // don't serialize more than 255 addresses
+ break;
+ }
+ if let Ok(serialized_length) = u8::try_from(address.serialized_length()) {
+ total_address_count += 1;
+ serialized_length.write(&mut address_serialization).unwrap();
+ address.write(&mut address_serialization).unwrap();
+ };
}
- if let Ok(serialized_length) = u8::try_from(address.serialized_length()) {
- total_address_count += 1;
- serialized_length.write(&mut address_serialization).unwrap();
- address.write(&mut address_serialization).unwrap();
- };
- }
-
- // signal the presence of node addresses
- current_node_delta_serialization[0] |= 1 << 2;
- // serialize the actual addresses and count
- total_address_count.write(&mut current_node_delta_serialization).unwrap();
- current_node_delta_serialization.append(&mut address_serialization);
- }
- if node_delta.has_feature_set_changed {
- node_feature_update_count += 1;
+ node_address_update_count += 1;
+ node_has_update = true;
- let latest_features = &node_delta.latest_details.as_ref().unwrap().features;
+ // signal the presence of node addresses
+ current_node_delta_serialization[0] |= 1 << 2;
+ // serialize the actual addresses and count
+ total_address_count.write(&mut current_node_delta_serialization).unwrap();
+ current_node_delta_serialization.append(&mut address_serialization);
+ },
+ _ => {}
+ }
- // are these features among the most common ones?
- if let Some(index) = serialization_details.node_announcement_feature_defaults.iter().position(|f| f == latest_features) {
- // this feature set is among the 6 defaults
- current_node_delta_serialization[0] |= ((index + 1) as u8) << 3;
- } else {
- current_node_delta_serialization[0] |= 0b_0011_1000; // 7 << 3
- latest_features.write(&mut current_node_delta_serialization).unwrap();
- }
+ match strategy {
+ NodeSerializationStrategy::Mutated(MutatedNodeProperties { features: true, .. }) | NodeSerializationStrategy::Full => {
+ let latest_features = &node_delta.latest_details.as_ref().unwrap().features;
+ node_feature_update_count += 1;
+ node_has_update = true;
+
+ // are these features among the most common ones?
+ if let Some(index) = serialization_details.node_announcement_feature_defaults.iter().position(|f| f == latest_features) {
+ // this feature set is among the 6 defaults
+ current_node_delta_serialization[0] |= ((index + 1) as u8) << 3;
+ } else {
+ current_node_delta_serialization[0] |= 0b_0011_1000; // 7 << 3
+ latest_features.write(&mut current_node_delta_serialization).unwrap();
+ }
+ },
+ _ => {}
}
- if node_delta.has_address_set_changed || node_delta.has_feature_set_changed {
+ if node_has_update {
node_update_count += 1;
+ } else if let NodeSerializationStrategy::Reminder = strategy {
+ current_node_delta_serialization[0] |= 1 << 6;
}
}
}
use lightning::util::logger::Logger;
use crate::config;
-use crate::serialization::MutatedProperties;
+use crate::serialization::{MutatedNodeProperties, MutatedProperties, NodeSerializationStrategy};
/// The delta set needs to be a BTreeMap so the keys are sorted.
/// That way, the scids in the response automatically grow monotonically
/// The most recently received, but new-to-the-client, node details
pub(super) latest_details: Option<NodeDetails>,
- /// Between last_details_before_seen and latest_details_after_seen, including any potential
- /// intermediate updates that are not kept track of here, has the set of features this node
- /// supports changed?
- pub(super) has_feature_set_changed: bool,
-
- /// Between last_details_before_seen and latest_details_after_seen, including any potential
- /// intermediate updates that are not kept track of here, has the set of socket addresses this
- /// node listens on changed?
- pub(super) has_address_set_changed: bool,
+ /// How should this delta be serialized?
+ pub(super) strategy: Option<NodeSerializationStrategy>,
/// The most recent node details that the client would have seen already
pub(super) last_details_before_seen: Option<NodeDetails>
}
pub(super) struct NodeDetails {
- #[allow(unused)]
- pub(super) seen: u32,
+ pub(super) seen: Option<u32>,
pub(super) features: NodeFeatures,
pub(super) addresses: HashSet<SocketAddress>
}
fn default() -> Self {
Self {
latest_details: None,
- has_feature_set_changed: false,
- has_address_set_changed: false,
last_details_before_seen: None,
+ strategy: None,
}
}
}
log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
}
-pub(super) async fn fetch_node_updates<L: Deref>(network_graph: Arc<NetworkGraph<L>>, client: &Client, last_sync_timestamp: u32, logger: L) -> NodeDeltaSet where L::Target: Logger {
+pub(super) async fn fetch_node_updates<L: Deref + Clone>(network_graph: Arc<NetworkGraph<L>>, client: &Client, last_sync_timestamp: u32, snapshot_reference_timestamp: Option<u64>, logger: L) -> NodeDeltaSet where L::Target: Logger {
let start = Instant::now();
let last_sync_timestamp_float = last_sync_timestamp as f64;
read_only_graph.nodes().unordered_iter().flat_map(|(node_id, node_info)| {
let details: NodeDetails = if let Some(details) = node_info.announcement_info.as_ref() {
NodeDetails {
- seen: 0,
+ seen: None,
features: details.features().clone(),
addresses: details.addresses().into_iter().cloned().collect(),
}
};
Some((node_id.clone(), NodeDelta {
latest_details: Some(details),
- has_feature_set_changed: false,
- has_address_set_changed: false,
+ strategy: None,
last_details_before_seen: None,
}))
}).collect()
(*current_node_delta).last_details_before_seen.get_or_insert_with(|| {
let address_set: HashSet<SocketAddress> = unsigned_node_announcement.addresses.into_iter().collect();
NodeDetails {
- seen,
+ seen: Some(seen),
features: unsigned_node_announcement.features,
addresses: address_set,
}
log_info!(logger, "Processed {} node announcement reference rows (delta size: {}) in {:?}",
reference_row_count, delta_set.len(), start.elapsed());
+ let current_timestamp = snapshot_reference_timestamp.unwrap_or(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs());
+ let reminder_inclusion_threshold_timestamp = current_timestamp.checked_sub(config::CHANNEL_REMINDER_AGE.as_secs()).unwrap() as u32;
+ let reminder_lookup_threshold_timestamp = current_timestamp.checked_sub(config::PRUNE_INTERVAL.as_secs()).unwrap() as u32;
+
+ // this is the timestamp we need to fetch all relevant updates
+ let include_reminders = should_snapshot_include_reminders(last_sync_timestamp, current_timestamp, &logger);
+ let effective_threshold_timestamp = if include_reminders {
+ std::cmp::min(last_sync_timestamp, reminder_lookup_threshold_timestamp) as f64
+ } else {
+ // If we include reminders, the decision logic is as follows:
+ // If the pre-sync update was more than 6 days ago, serialize in full.
+ // Otherwise:
+ // If the last mutation occurred after the last sync, serialize the mutated properties.
+ // Otherwise:
+ // If the last mutation occurred more than 6 days ago, serialize as a reminder.
+ // Otherwise, don't serialize at all.
+ last_sync_timestamp as f64
+ };
+
// get all the intermediate node updates
// (to calculate the set of mutated fields for snapshotting, where intermediate updates may
// have been omitted)
- let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&node_ids, &last_sync_timestamp_float];
+ let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&node_ids, &effective_threshold_timestamp];
let intermediate_updates = client.query_raw("
SELECT announcement_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen
FROM node_announcements
let mut previous_node_id: Option<NodeId> = None;
let mut intermediate_update_count = 0;
+ let mut has_address_set_changed = false;
+ let mut has_feature_set_changed = false;
+ let mut latest_mutation_timestamp = None;
while let Some(row_res) = pinned_updates.next().await {
let intermediate_update = row_res.unwrap();
intermediate_update_count += 1;
let unsigned_node_announcement = NodeAnnouncement::read(&mut readable).unwrap().contents;
let node_id = unsigned_node_announcement.node_id;
- let is_previously_processed_node_id = Some(node_id) == previous_node_id;
// get this node's address set
let current_node_delta = delta_set.entry(node_id).or_insert(NodeDelta::default());
let address_set: HashSet<SocketAddress> = unsigned_node_announcement.addresses.into_iter().collect();
- // determine mutations
+ if previous_node_id != Some(node_id) {
+ // we're traversing a new node id, initialize the values
+ has_address_set_changed = false;
+ has_feature_set_changed = false;
+ latest_mutation_timestamp = None;
+
+ // this is the highest timestamp value, so set the seen timestamp accordingly
+ current_node_delta.latest_details.as_mut().map(|mut d| d.seen.replace(current_seen_timestamp));
+ }
+
if let Some(last_seen_update) = current_node_delta.last_details_before_seen.as_ref() {
- if unsigned_node_announcement.features != last_seen_update.features {
- current_node_delta.has_feature_set_changed = true;
- }
- if address_set != last_seen_update.addresses {
- current_node_delta.has_address_set_changed = true;
- }
- } else if !is_previously_processed_node_id {
- if current_node_delta.last_details_before_seen.is_none() {
- if !address_set.is_empty() {
- current_node_delta.has_address_set_changed = true;
+ { // determine the latest mutation timestamp
+ if address_set != last_seen_update.addresses {
+ has_address_set_changed = true;
+ if latest_mutation_timestamp.is_none() {
+ latest_mutation_timestamp = Some(current_seen_timestamp);
+ }
}
- if unsigned_node_announcement.features != NodeFeatures::empty() {
- current_node_delta.has_feature_set_changed = true;
+ if unsigned_node_announcement.features != last_seen_update.features {
+ has_feature_set_changed = true;
+ if latest_mutation_timestamp.is_none() {
+ latest_mutation_timestamp = Some(current_seen_timestamp);
+ }
}
}
- }
- if !is_previously_processed_node_id {
- (*current_node_delta).latest_details.get_or_insert(NodeDetails {
- seen: current_seen_timestamp,
- features: unsigned_node_announcement.features,
- addresses: address_set,
- });
+ if current_seen_timestamp >= last_sync_timestamp {
+ if has_address_set_changed || has_feature_set_changed {
+ // if the last mutation occurred since the last sync, send the mutation variant
+ current_node_delta.strategy = Some(NodeSerializationStrategy::Mutated(MutatedNodeProperties {
+ addresses: has_address_set_changed,
+ features: has_feature_set_changed,
+ }));
+ }
+ } else if include_reminders && latest_mutation_timestamp.unwrap_or(u32::MAX) <= reminder_inclusion_threshold_timestamp {
+ // only send a reminder if the latest mutation occurred at least 6 days ago
+ current_node_delta.strategy = Some(NodeSerializationStrategy::Reminder);
+ }
+
+ // Note that we completely ignore the case when the last mutation occurred less than
+ // 6 days ago, but prior to the last sync. In that scenario, we send nothing.
+
+ } else {
+ // absent any update that was seen prior to the last sync, send the full version
+ current_node_delta.strategy = Some(NodeSerializationStrategy::Full);
}
previous_node_id = Some(node_id);
}
}
+pub(super) struct MutatedNodeProperties {
+ pub(super) addresses: bool,
+ pub(super) features: bool,
+}
+
+pub(super) enum NodeSerializationStrategy {
+ /// Only serialize the aspects of the node ID that have been mutated. Skip if they haven't been
+ Mutated(MutatedNodeProperties),
+ /// Whether or not the addresses or features have been mutated, serialize this node in full. It
+ /// may have been purged from the client.
+ Full,
+ /// This node ID has been seen recently enough to not have been pruned, and this update serves
+ /// solely the purpose of delaying any pruning, without applying any mutations
+ Reminder
+}
+
struct FullUpdateValueHistograms {
cltv_expiry_delta: HashMap<u16, usize>,
htlc_minimum_msat: HashMap<u64, usize>,
serialization_set.node_mutations = node_delta_set.into_iter().filter(|(_id, delta)| {
// either something changed, or this node is new
- delta.has_feature_set_changed || delta.has_address_set_changed || delta.last_details_before_seen.is_none()
+ delta.strategy.is_some()
}).collect();
let mut node_feature_histogram: HashMap<&NodeFeatures, usize> = Default::default();
for (_id, delta) in serialization_set.node_mutations.iter() {
- if delta.has_feature_set_changed || delta.last_details_before_seen.is_none() {
+ // consider either full or feature-mutating serializations for histogram
+ let mut should_add_to_histogram = matches!(delta.strategy, Some(NodeSerializationStrategy::Full));
+ if let Some(NodeSerializationStrategy::Mutated(mutation)) = delta.strategy.as_ref() {
+ should_add_to_histogram = mutation.features;
+ }
+
+ if should_add_to_histogram {
if let Some(latest_details) = delta.latest_details.as_ref() {
*node_feature_histogram.entry(&latest_details.features).or_insert(0) += 1;
};
let timestamp = current_time() - 10;
{ // seed the db
+
+ { // necessary for the node announcements to be considered relevant
+ let announcement = generate_channel_announcement(1);
+ let update_1 = generate_update(1, false, timestamp, 0, 0, 0, 6, 0);
+ let update_2 = generate_update(1, true, timestamp, 0, 0, 0, 6, 0);
+
+ network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
+ network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
+ network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
+
+ receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
+ receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp))).await.unwrap();
+ receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp))).await.unwrap();
+ }
+
let mut announcement = generate_node_announcement(None);
- receiver.send(GossipMessage::NodeAnnouncement(announcement.clone(), Some(timestamp - 10))).await.unwrap();
- receiver.send(GossipMessage::NodeAnnouncement(announcement.clone(), Some(timestamp - 8))).await.unwrap();
+ announcement.contents.timestamp = timestamp - 10;
+ network_graph_arc.update_node_from_unsigned_announcement(&announcement.contents).unwrap();
+ receiver.send(GossipMessage::NodeAnnouncement(announcement.clone(), Some(announcement.contents.timestamp))).await.unwrap();
+ announcement.contents.timestamp = timestamp - 8;
+ network_graph_arc.update_node_from_unsigned_announcement(&announcement.contents).unwrap();
+ receiver.send(GossipMessage::NodeAnnouncement(announcement.clone(), Some(announcement.contents.timestamp))).await.unwrap();
{
let mut current_announcement = generate_node_announcement(Some(SecretKey::from_slice(&[2; 32]).unwrap()));
current_announcement.contents.features = NodeFeatures::from_be_bytes(vec![23, 48]);
+ current_announcement.contents.timestamp = timestamp;
+ network_graph_arc.update_node_from_unsigned_announcement(¤t_announcement.contents).unwrap();
receiver.send(GossipMessage::NodeAnnouncement(current_announcement, Some(timestamp))).await.unwrap();
}
{
let mut current_announcement = generate_node_announcement(Some(SecretKey::from_slice(&[3; 32]).unwrap()));
current_announcement.contents.features = NodeFeatures::from_be_bytes(vec![22, 49]);
+ current_announcement.contents.timestamp = timestamp;
receiver.send(GossipMessage::NodeAnnouncement(current_announcement, Some(timestamp))).await.unwrap();
}
version: 3,
port: 4,
});
+ announcement.contents.timestamp = timestamp;
}
+ network_graph_arc.update_node_from_unsigned_announcement(&announcement.contents).unwrap();
receiver.send(GossipMessage::NodeAnnouncement(announcement, Some(timestamp))).await.unwrap();
- { // necessary for the node announcements to be considered relevant
- let announcement = generate_channel_announcement(1);
- let update_1 = generate_update(1, false, timestamp, 0, 0, 0, 6, 0);
- let update_2 = generate_update(1, true, timestamp, 0, 0, 0, 6, 0);
-
- network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
- network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
- network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
-
- receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
- receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp))).await.unwrap();
- receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp))).await.unwrap();
- }
-
drop(receiver);
persister.persist_gossip().await;
clean_test_db().await;
assert_eq!(serialization.message_count, 3);
- assert_eq!(serialization.node_announcement_count, 3);
- assert_eq!(serialization.node_update_count, 3);
- assert_eq!(serialization.node_feature_update_count, 3);
- assert_eq!(serialization.node_address_update_count, 1);
+ assert_eq!(serialization.node_announcement_count, 2);
+ assert_eq!(serialization.node_update_count, 2);
+ assert_eq!(serialization.node_feature_update_count, 2);
+ assert_eq!(serialization.node_address_update_count, 2);
}
/// If a channel has only seen updates in one direction, it should not be announced