use std::collections::{BTreeMap, HashSet};
use std::io::Cursor;
-use std::ops::{Add, Deref};
+use std::ops::Deref;
use std::sync::Arc;
-use std::time::{Duration, Instant, SystemTime};
+use std::time::{Instant, SystemTime, UNIX_EPOCH};
use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate};
use lightning::routing::gossip::NetworkGraph;
use lightning::util::ser::Readable;
-use tokio_postgres::{Client, Connection, NoTls, Socket};
-use tokio_postgres::tls::NoTlsStream;
+use tokio_postgres::Client;
use futures::StreamExt;
-use lightning::log_info;
+use lightning::{log_gossip, log_info};
use lightning::util::logger::Logger;
use crate::config;
}
pub(super) struct DirectedUpdateDelta {
- pub(super) last_update_before_seen: Option<UnsignedChannelUpdate>,
+ pub(super) last_update_before_seen: Option<UpdateDelta>,
pub(super) mutated_properties: MutatedProperties,
pub(super) latest_update_after_seen: Option<UpdateDelta>,
pub(super) serialization_update_flags: Option<u8>,
}
}
-pub(super) async fn connect_to_db() -> (Client, Connection<Socket, NoTlsStream>) {
- let connection_config = config::db_connection_config();
- connection_config.connect(NoTls).await.unwrap()
-}
-
/// Fetch all the channel announcements that are presently in the network graph, regardless of
/// whether they had been seen before.
/// Also include all announcements for which the first update was announced
/// after `last_sync_timestamp`
pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<L>>, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger {
log_info!(logger, "Obtaining channel ids from network graph");
- let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64));
let channel_ids = {
let read_only_graph = network_graph.read_only();
log_info!(logger, "Retrieved read-only network graph copy");
.map(|c| c.1.announcement_message.as_ref().unwrap().contents.short_channel_id as i64)
.collect::<Vec<_>>()
};
+ #[cfg(test)]
+ log_info!(logger, "Channel IDs: {:?}", channel_ids);
+ log_info!(logger, "Last sync timestamp: {}", last_sync_timestamp);
+ let last_sync_timestamp_float = last_sync_timestamp as f64;
log_info!(logger, "Obtaining corresponding database entries");
// get all the channel announcements that are currently in the network graph
- let announcement_rows = client.query_raw("SELECT announcement_signed, seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", [&channel_ids]).await.unwrap();
+ let announcement_rows = client.query_raw("SELECT announcement_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", [&channel_ids]).await.unwrap();
let mut pinned_rows = Box::pin(announcement_rows);
+ let mut announcement_count = 0;
while let Some(row_res) = pinned_rows.next().await {
let current_announcement_row = row_res.unwrap();
let blob: Vec<u8> = current_announcement_row.get("announcement_signed");
let unsigned_announcement = ChannelAnnouncement::read(&mut readable).unwrap().contents;
let scid = unsigned_announcement.short_channel_id;
- let current_seen_timestamp_object: SystemTime = current_announcement_row.get("seen");
- let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
+ let current_seen_timestamp = current_announcement_row.get::<_, i64>("seen") as u32;
let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
(*current_channel_delta).announcement = Some(AnnouncementDelta {
announcement: unsigned_announcement,
seen: current_seen_timestamp,
});
+
+ announcement_count += 1;
}
+ log_info!(logger, "Fetched {} announcement rows", announcement_count);
{
// THIS STEP IS USED TO DETERMINE IF A CHANNEL SHOULD BE OMITTED FROM THE DELTA
// here is where the channels whose first update in either direction occurred after
// `last_seen_timestamp` are added to the selection
let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] =
- [&channel_ids, &last_sync_timestamp_object];
+ [&channel_ids, &last_sync_timestamp_float];
let newer_oldest_directional_updates = client.query_raw("
- SELECT * FROM (
+ SELECT short_channel_id, CAST(EXTRACT('epoch' from distinct_chans.seen) AS BIGINT) AS seen FROM (
SELECT DISTINCT ON (short_channel_id) *
FROM (
SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen
) AS directional_last_seens
ORDER BY short_channel_id ASC, seen DESC
) AS distinct_chans
- WHERE distinct_chans.seen >= $2
+ WHERE distinct_chans.seen >= TO_TIMESTAMP($2)
", params).await.unwrap();
let mut pinned_updates = Box::pin(newer_oldest_directional_updates);
+ let mut newer_oldest_directional_update_count = 0;
while let Some(row_res) = pinned_updates.next().await {
let current_row = row_res.unwrap();
let scid: i64 = current_row.get("short_channel_id");
- let current_seen_timestamp_object: SystemTime = current_row.get("seen");
- let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
+ let current_seen_timestamp = current_row.get::<_, i64>("seen") as u32;
// the newer of the two oldest seen directional updates came after last sync timestamp
let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default());
// first time a channel was seen in both directions
(*current_channel_delta).first_bidirectional_updates_seen = Some(current_seen_timestamp);
+
+ newer_oldest_directional_update_count += 1;
}
+ log_info!(logger, "Fetched {} update rows of the first update in a new direction", newer_oldest_directional_update_count);
}
{
// Steps:
// — Obtain all updates, distinct by (scid, direction), ordered by seen DESC
// — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction)
- let reminder_threshold_timestamp = SystemTime::now().checked_sub(config::CHANNEL_REMINDER_AGE).unwrap();
+ let reminder_threshold_timestamp = SystemTime::now().checked_sub(config::CHANNEL_REMINDER_AGE).unwrap().duration_since(UNIX_EPOCH).unwrap().as_secs() as f64;
let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] =
[&channel_ids, &reminder_threshold_timestamp];
) AS directional_last_seens
ORDER BY short_channel_id ASC, seen ASC
) AS distinct_chans
- WHERE distinct_chans.seen <= $2
+ WHERE distinct_chans.seen <= TO_TIMESTAMP($2)
", params).await.unwrap();
let mut pinned_updates = Box::pin(older_latest_directional_updates);
+ let mut older_latest_directional_update_count = 0;
while let Some(row_res) = pinned_updates.next().await {
let current_row = row_res.unwrap();
let scid: i64 = current_row.get("short_channel_id");
// we don't send reminders if we don't have the channel
continue;
}
+ older_latest_directional_update_count += 1;
}
+ log_info!(logger, "Fetched {} update rows of the latest update in the less recently updated direction", older_latest_directional_update_count);
}
}
pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger {
let start = Instant::now();
- let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64));
+ let last_sync_timestamp_float = last_sync_timestamp as f64;
// get the latest channel update in each direction prior to last_sync_timestamp, provided
// there was an update in either direction that happened after the last sync (to avoid
// collecting too many reference updates)
let reference_rows = client.query_raw("
- SELECT id, direction, blob_signed FROM channel_updates
+ SELECT id, direction, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen, blob_signed FROM channel_updates
WHERE id IN (
SELECT DISTINCT ON (short_channel_id, direction) id
FROM channel_updates
- WHERE seen < $1 AND short_channel_id IN (
+ WHERE seen < TO_TIMESTAMP($1) AND short_channel_id IN (
SELECT DISTINCT ON (short_channel_id) short_channel_id
FROM channel_updates
- WHERE seen >= $1
+ WHERE seen >= TO_TIMESTAMP($1)
)
ORDER BY short_channel_id ASC, direction ASC, seen DESC
)
- ", [last_sync_timestamp_object]).await.unwrap();
+ ", [last_sync_timestamp_float]).await.unwrap();
let mut pinned_rows = Box::pin(reference_rows);
log_info!(logger, "Fetched reference rows in {:?}", start.elapsed());
non_intermediate_ids.insert(update_id);
let direction: bool = current_reference.get("direction");
+ let seen = current_reference.get::<_, i64>("seen") as u32;
let blob: Vec<u8> = current_reference.get("blob_signed");
let mut readable = Cursor::new(blob);
let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
} else {
(*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
};
- update_delta.last_update_before_seen = Some(unsigned_channel_update);
+ log_gossip!(logger, "Channel {} last update before seen: {}/{}/{}", scid, update_id, direction, unsigned_channel_update.timestamp);
+ update_delta.last_update_before_seen = Some(UpdateDelta {
+ seen,
+ update: unsigned_channel_update,
+ });
+
reference_row_count += 1;
}
// have been omitted)
let intermediate_updates = client.query_raw("
- SELECT id, direction, blob_signed, seen
+ SELECT id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen
FROM channel_updates
- WHERE seen >= $1
- ", [last_sync_timestamp_object]).await.unwrap();
+ WHERE seen >= TO_TIMESTAMP($1)
+ ORDER BY timestamp DESC
+ ", [last_sync_timestamp_float]).await.unwrap();
let mut pinned_updates = Box::pin(intermediate_updates);
log_info!(logger, "Fetched intermediate rows in {:?}", start.elapsed());
intermediate_update_count += 1;
let direction: bool = intermediate_update.get("direction");
- let current_seen_timestamp_object: SystemTime = intermediate_update.get("seen");
- let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
+ let current_seen_timestamp = intermediate_update.get::<_, i64>("seen") as u32;
let blob: Vec<u8> = intermediate_update.get("blob_signed");
let mut readable = Cursor::new(blob);
let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
// determine mutations
if let Some(last_seen_update) = update_delta.last_update_before_seen.as_ref() {
- if unsigned_channel_update.flags != last_seen_update.flags {
+ if unsigned_channel_update.flags != last_seen_update.update.flags {
update_delta.mutated_properties.flags = true;
}
- if unsigned_channel_update.cltv_expiry_delta != last_seen_update.cltv_expiry_delta {
+ if unsigned_channel_update.cltv_expiry_delta != last_seen_update.update.cltv_expiry_delta {
update_delta.mutated_properties.cltv_expiry_delta = true;
}
- if unsigned_channel_update.htlc_minimum_msat != last_seen_update.htlc_minimum_msat {
+ if unsigned_channel_update.htlc_minimum_msat != last_seen_update.update.htlc_minimum_msat {
update_delta.mutated_properties.htlc_minimum_msat = true;
}
- if unsigned_channel_update.fee_base_msat != last_seen_update.fee_base_msat {
+ if unsigned_channel_update.fee_base_msat != last_seen_update.update.fee_base_msat {
update_delta.mutated_properties.fee_base_msat = true;
}
- if unsigned_channel_update.fee_proportional_millionths != last_seen_update.fee_proportional_millionths {
+ if unsigned_channel_update.fee_proportional_millionths != last_seen_update.update.fee_proportional_millionths {
update_delta.mutated_properties.fee_proportional_millionths = true;
}
- if unsigned_channel_update.htlc_maximum_msat != last_seen_update.htlc_maximum_msat {
+ if unsigned_channel_update.htlc_maximum_msat != last_seen_update.update.htlc_maximum_msat {
update_delta.mutated_properties.htlc_maximum_msat = true;
}
}