pub(super) last_update_before_seen: Option<UnsignedChannelUpdate>,
pub(super) mutated_properties: MutatedProperties,
pub(super) latest_update_after_seen: Option<UpdateDelta>,
+ pub(super) serialization_update_flags: Option<u8>,
}
pub(super) struct ChannelDelta {
pub(super) announcement: Option<AnnouncementDelta>,
pub(super) updates: (Option<DirectedUpdateDelta>, Option<DirectedUpdateDelta>),
- pub(super) first_update_seen: Option<u32>,
+ pub(super) first_bidirectional_updates_seen: Option<u32>,
+ /// The seen timestamp of the older of the two latest directional updates
+ pub(super) requires_reminder: bool,
}
impl Default for ChannelDelta {
fn default() -> Self {
- Self { announcement: None, updates: (None, None), first_update_seen: None }
+ Self {
+ announcement: None,
+ updates: (None, None),
+ first_bidirectional_updates_seen: None,
+ requires_reminder: false,
+ }
}
}
last_update_before_seen: None,
mutated_properties: MutatedProperties::default(),
latest_update_after_seen: None,
+ serialization_update_flags: None
}
}
}
/// Fetch all the channel announcements that are presently in the network graph, regardless of
/// whether they had been seen before.
/// Also include all announcements for which the first update was announced
-/// after `last_syc_timestamp`
+/// after `last_sync_timestamp`
pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<TestLogger>>, client: &Client, last_sync_timestamp: u32) {
- let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64));
println!("Obtaining channel ids from network graph");
let channel_ids = {
let read_only_graph = network_graph.read_only();
let current_seen_timestamp_object: SystemTime = current_row.get("seen");
let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
- if (current_seen_timestamp > last_sync_timestamp) {
+ if current_seen_timestamp > last_sync_timestamp {
// the newer of the two oldest seen directional updates came after last sync timestamp
let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
// first time a channel was seen in both directions
(*current_channel_delta).first_bidirectional_updates_seen = Some(current_seen_timestamp);
}
}
+
+ println!("Annotating channel announcements whose latest channel update in a given direction occurred more than six days ago");
+ /// Steps:
+ /// — Obtain all updates, distinct by (scid, direction), ordered by seen DESC
+ /// — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction)
+ let current_timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
+ let reminder_threshold_timestamp = current_timestamp.saturating_sub(config::CHANNEL_REMINDER_AGE);
+ let mut channels_requiring_reminders: Vec<i64> = vec![];
+
+ let older_latest_directional_updates = client.query("
+ SELECT DISTINCT ON (short_channel_id) *
+ FROM (
+ SELECT DISTINCT ON (short_channel_id, direction) *
+ FROM channel_updates
+ WHERE short_channel_id = any($1)
+ ORDER BY short_channel_id ASC, direction ASC, seen DESC
+ ) AS directional_last_seens
+ ORDER BY short_channel_id ASC, seen ASC
+ ", &[&channel_ids]).await.unwrap();
+
+ for current_row in older_latest_directional_updates {
+ let blob: Vec<u8> = current_row.get("blob_signed");
+ let mut readable = Cursor::new(blob);
+ let unsigned_update = ChannelUpdate::read(&mut readable).unwrap().contents;
+ let scid = unsigned_update.short_channel_id;
+ let current_seen_timestamp_object: SystemTime = current_row.get("seen");
+ let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
+
+ if current_seen_timestamp <= reminder_threshold_timestamp {
+ // annotate this channel as requiring that reminders be sent to the client
+ let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
+
+ // way might be able to get away with not using this
+ (*current_channel_delta).requires_reminder = true;
+
+ // get the latest seen update in both directions for this channel
+ channels_requiring_reminders.push(scid as i64);
+ }
+ }
+
+ println!("Fetching latest update data for channels requiring reminders");
+ let latest_reminder_updates = client.query("
+ SELECT DISTINCT ON (short_channel_id, direction) *
+ FROM channel_updates
+ WHERE short_channel_id = any($1)
+ ORDER BY short_channel_id ASC, direction ASC, seen DESC
+ ", &[&channels_requiring_reminders]).await.unwrap();
+
+ for current_update in latest_reminder_updates {
+ let blob: Vec<u8> = current_update.get("blob_signed");
+ let mut readable = Cursor::new(blob);
+ let unsigned_update = ChannelUpdate::read(&mut readable).unwrap().contents;
+ let scid = unsigned_update.short_channel_id;
+ let direction: bool = current_update.get("direction");
+
+ let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
+ let update_delta = if !direction {
+ (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
+ } else {
+ (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
+ };
+ update_delta.serialization_update_flags = Some(unsigned_update.flags);
+ }
}
pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32, consider_intermediate_updates: bool) {
}
// determine mutations
- if let Some(last_seen_update) = update_delta.last_update_before_seen.as_ref(){
+ if let Some(last_seen_update) = update_delta.last_update_before_seen.as_ref() {
if unsigned_channel_update.flags != last_seen_update.flags {
update_delta.mutated_properties.flags = true;
}
update_delta.mutated_properties.htlc_maximum_msat = true;
}
}
-
}
println!("Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
}
let update_reference = update.as_ref().unwrap();
// update_reference.latest_update_after_seen.is_some() && !update_reference.intermediate_updates.is_empty()
// if there has been an update after the channel was first seen
- update_reference.latest_update_after_seen.is_some()
+
+ v.requires_reminder || update_reference.latest_update_after_seen.is_some()
};
let direction_a_meets_criteria = update_meets_criteria(&v.updates.0);
let direction_b_meets_criteria = update_meets_criteria(&v.updates.1);
- if !direction_a_meets_criteria && !direction_b_meets_criteria {
+ if !v.requires_reminder && !direction_a_meets_criteria && !direction_b_meets_criteria {
delta_set.remove(&k);
}
}
pub(super) enum UpdateSerializationMechanism {
Full,
Incremental(MutatedProperties),
+ Reminder,
}
struct FullUpdateValueHistograms {
let current_announcement_seen = channel_announcement_delta.seen;
let is_new_announcement = current_announcement_seen >= last_sync_timestamp;
- let is_newly_updated_announcement = if let Some(first_update_seen) = channel_delta.first_update_seen {
+ let is_newly_updated_announcement = if let Some(first_update_seen) = channel_delta.first_bidirectional_updates_seen {
first_update_seen >= last_sync_timestamp
} else {
false
mechanism: UpdateSerializationMechanism::Full,
});
}
+ } else if let Some(flags) = updates.serialization_update_flags {
+ // we need to insert a fake channel update where the only information
+ let fake_update = UnsignedChannelUpdate {
+ flags,
+ chain_hash: BlockHash::all_zeros(),
+ short_channel_id: 0,
+ cltv_expiry_delta: 0,
+ fee_base_msat: 0,
+ fee_proportional_millionths: 0,
+ htlc_maximum_msat: 0,
+ htlc_minimum_msat: 0,
+ timestamp: 0,
+ excess_data: Vec::with_capacity(0),
+ };
+ serialization_set.updates.push(UpdateSerialization {
+ update: fake_update,
+ mechanism: UpdateSerializationMechanism::Reminder
+ })
}
- };
+ }
};
categorize_directed_update_serialization(direction_a_updates);
serialized_flags |= 0b_0000_0100;
latest_update.htlc_maximum_msat.write(&mut delta_serialization).unwrap();
}
+ },
+
+ UpdateSerializationMechanism::Reminder => {
+ // indicate that this update is incremental
+ serialized_flags |= 0b_1000_0000;
}
}
let scid_delta = BigSize(latest_update.short_channel_id - previous_scid);