use tokio_postgres::{Client, Connection, NoTls, Socket};
use tokio_postgres::tls::NoTlsStream;
-use crate::{config, hex_utils, TestLogger};
+use crate::{config, TestLogger};
use crate::serialization::MutatedProperties;
/// The delta set needs to be a BTreeMap so the keys are sorted.
/// whether they had been seen before.
/// Also include all announcements for which the first update was announced
/// after `last_syc_timestamp`
-pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<Arc<TestLogger>>>, client: &Client, last_sync_timestamp: u32) {
+pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<TestLogger>>, client: &Client, last_sync_timestamp: u32) {
let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64));
println!("Obtaining channel ids from network graph");
let channel_ids = {
let read_only_graph = network_graph.read_only();
println!("Retrieved read-only network graph copy");
- let channel_iterator = read_only_graph.channels().into_iter();
+ let channel_iterator = read_only_graph.channels().unordered_iter();
channel_iterator
.filter(|c| c.1.announcement_message.is_some())
- .map(|c| hex_utils::hex_str(&c.1.announcement_message.clone().unwrap().contents.short_channel_id.to_be_bytes()))
- .collect::<Vec<String>>()
+ .map(|c| c.1.announcement_message.as_ref().unwrap().contents.short_channel_id as i64)
+ .collect::<Vec<_>>()
};
println!("Obtaining corresponding database entries");
// get all the channel announcements that are currently in the network graph
- let announcement_rows = client.query("SELECT short_channel_id, announcement_signed, seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", &[&channel_ids]).await.unwrap();
+ let announcement_rows = client.query("SELECT announcement_signed, seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", &[&channel_ids]).await.unwrap();
for current_announcement_row in announcement_rows {
let blob: Vec<u8> = current_announcement_row.get("announcement_signed");
let current_seen_timestamp_object: SystemTime = current_announcement_row.get("seen");
let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
- let mut current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
+ let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
(*current_channel_delta).announcement = Some(AnnouncementDelta {
announcement: unsigned_announcement,
seen: current_seen_timestamp,
});
}
- println!("Obtaining channel announcements whose first channel updates had not been seen yet");
+ println!("Annotating channel announcements whose oldest channel update in a given direction occurred after the last sync");
+ /// Steps:
+ /// — Obtain all updates, distinct by (scid, direction), ordered by seen DESC // to find the oldest update in a given direction
+ /// — From those updates, select distinct by (scid), ordered by seen DESC (to obtain the newer one per direction)
+ /// This will allow us to mark the first time updates in both directions were seen
// here is where the channels whose first update in either direction occurred after
// `last_seen_timestamp` are added to the selection
- let unannounced_rows = client.query("SELECT short_channel_id, blob_signed, seen FROM (SELECT DISTINCT ON (short_channel_id) short_channel_id, blob_signed, seen FROM channel_updates ORDER BY short_channel_id ASC, seen ASC) AS first_seens WHERE first_seens.seen >= $1", &[&last_sync_timestamp_object]).await.unwrap();
- for current_row in unannounced_rows {
-
+ let newer_oldest_directional_updates = client.query("
+ SELECT DISTINCT ON (short_channel_id) *
+ FROM (
+ SELECT DISTINCT ON (short_channel_id, direction) blob_signed
+ FROM channel_updates
+ WHERE short_channel_id = any($1)
+ ORDER BY seen ASC, short_channel_id ASC, direction ASC
+ ) AS directional_last_seens
+ ORDER BY short_channel_id ASC, seen DESC
+ ", &[&channel_ids]).await.unwrap();
+
+ for current_row in newer_oldest_directional_updates {
let blob: Vec<u8> = current_row.get("blob_signed");
let mut readable = Cursor::new(blob);
let unsigned_update = ChannelUpdate::read(&mut readable).unwrap().contents;
let current_seen_timestamp_object: SystemTime = current_row.get("seen");
let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
- let mut current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
- (*current_channel_delta).first_update_seen = Some(current_seen_timestamp);
+ if (current_seen_timestamp > last_sync_timestamp) {
+ // the newer of the two oldest seen directional updates came after last sync timestamp
+ let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
+ // first time a channel was seen in both directions
+ (*current_channel_delta).first_bidirectional_updates_seen = Some(current_seen_timestamp);
+ }
}
}
// get the latest channel update in each direction prior to last_sync_timestamp, provided
// there was an update in either direction that happened after the last sync (to avoid
// collecting too many reference updates)
- let reference_rows = client.query("SELECT DISTINCT ON (short_channel_id, direction) id, short_channel_id, direction, blob_signed FROM channel_updates WHERE seen < $1 AND short_channel_id IN (SELECT short_channel_id FROM channel_updates WHERE seen >= $1 GROUP BY short_channel_id) ORDER BY short_channel_id ASC, direction ASC, seen DESC", &[&last_sync_timestamp_object]).await.unwrap();
+ let reference_rows = client.query("SELECT DISTINCT ON (short_channel_id, direction) id, direction, blob_signed FROM channel_updates WHERE seen < $1 AND short_channel_id IN (SELECT short_channel_id FROM channel_updates WHERE seen >= $1 GROUP BY short_channel_id) ORDER BY short_channel_id ASC, direction ASC, seen DESC", &[&last_sync_timestamp_object]).await.unwrap();
println!("Fetched reference rows ({}): {:?}", reference_rows.len(), start.elapsed());
last_seen_update_ids.push(update_id);
non_intermediate_ids.insert(update_id);
- let direction: i32 = current_reference.get("direction");
+ let direction: bool = current_reference.get("direction");
let blob: Vec<u8> = current_reference.get("blob_signed");
let mut readable = Cursor::new(blob);
let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
let scid = unsigned_channel_update.short_channel_id;
let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
- let mut update_delta = if direction == 0 {
+ let update_delta = if !direction {
(*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
- } else if direction == 1 {
- (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
} else {
- panic!("Channel direction must be binary!")
+ (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
};
update_delta.last_update_before_seen = Some(unsigned_channel_update);
-
-
}
println!("Processed reference rows (delta size: {}): {:?}", delta_set.len(), start.elapsed());
intermediate_update_prefix = "DISTINCT ON (short_channel_id, direction)";
}
- let query_string = format!("SELECT {} id, short_channel_id, direction, blob_signed, seen FROM channel_updates WHERE seen >= $1 ORDER BY short_channel_id ASC, direction ASC, seen DESC", intermediate_update_prefix);
+ let query_string = format!("SELECT {} id, direction, blob_signed, seen FROM channel_updates WHERE seen >= $1 ORDER BY short_channel_id ASC, direction ASC, seen DESC", intermediate_update_prefix);
let intermediate_updates = client.query(&query_string, &[&last_sync_timestamp_object]).await.unwrap();
println!("Fetched intermediate rows ({}): {:?}", intermediate_updates.len(), start.elapsed());
}
intermediate_update_count += 1;
- let direction: i32 = intermediate_update.get("direction");
+ let direction: bool = intermediate_update.get("direction");
let current_seen_timestamp_object: SystemTime = intermediate_update.get("seen");
let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
let blob: Vec<u8> = intermediate_update.get("blob_signed");
let scid = unsigned_channel_update.short_channel_id;
if scid != previous_scid {
- previous_scid = scid.clone();
+ previous_scid = scid;
previously_seen_directions = (false, false);
}
// get the write configuration for this particular channel's directional details
- let current_channel_delta = delta_set.entry(scid.clone()).or_insert(ChannelDelta::default());
- let update_delta = if direction == 0 {
+ let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
+ let update_delta = if !direction {
(*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
- } else if direction == 1 {
- (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
} else {
- panic!("Channel direction must be binary!")
+ (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
};
{
// handle the latest deltas
- if direction == 0 && !previously_seen_directions.0 {
+ if !direction && !previously_seen_directions.0 {
previously_seen_directions.0 = true;
update_delta.latest_update_after_seen = Some(UpdateDelta {
seen: current_seen_timestamp,
update: unsigned_channel_update.clone(),
});
- } else if direction == 1 && !previously_seen_directions.1 {
+ } else if direction && !previously_seen_directions.1 {
previously_seen_directions.1 = true;
update_delta.latest_update_after_seen = Some(UpdateDelta {
seen: current_seen_timestamp,