Merge pull request #42 from TheBlueMatt/main
authorArik <arik-so@users.noreply.github.com>
Sun, 2 Jul 2023 19:43:01 +0000 (12:43 -0700)
committerGitHub <noreply@github.com>
Sun, 2 Jul 2023 19:43:01 +0000 (12:43 -0700)
#39 Followups

src/config.rs
src/lib.rs
src/lookup.rs
src/serialization.rs

index acf7ac98a1956ffdb71459a4b278e776ba7a8435..6728312badc4621d532c692744b15ede3d8de742 100644 (file)
@@ -4,6 +4,7 @@ use std::convert::TryInto;
 use std::env;
 use std::io::Cursor;
 use std::net::{SocketAddr, ToSocketAddrs};
+use std::time::Duration;
 
 use bitcoin::Network;
 use bitcoin::hashes::hex::FromHex;
@@ -14,12 +15,12 @@ use lightning::util::ser::Readable;
 use lightning_block_sync::http::HttpEndpoint;
 use tokio_postgres::Config;
 
-pub(crate) const SCHEMA_VERSION: i32 = 8;
+pub(crate) const SCHEMA_VERSION: i32 = 9;
 pub(crate) const SNAPSHOT_CALCULATION_INTERVAL: u32 = 3600 * 24; // every 24 hours, in seconds
 /// If the last update in either direction was more than six days ago, we send a reminder
 /// That reminder may be either in the form of a channel announcement, or in the form of empty
 /// updates in both directions.
-pub(crate) const CHANNEL_REMINDER_AGE: u32 = 6 * 24 * 3600;
+pub(crate) const CHANNEL_REMINDER_AGE: Duration = Duration::from_secs(6 * 24 * 60 * 60);
 pub(crate) const DOWNLOAD_NEW_GOSSIP: bool = true;
 
 pub(crate) fn network() -> Network {
@@ -103,10 +104,9 @@ pub(crate) fn db_channel_update_table_creation_query() -> &'static str {
 
 pub(crate) fn db_index_creation_query() -> &'static str {
        "
-       CREATE INDEX IF NOT EXISTS channel_updates_seen ON channel_updates(seen, short_channel_id, direction) INCLUDE (id, blob_signed);
-       CREATE INDEX IF NOT EXISTS channel_updates_scid_seen ON channel_updates(short_channel_id, seen) INCLUDE (blob_signed);
        CREATE INDEX IF NOT EXISTS channel_updates_seen_scid ON channel_updates(seen, short_channel_id);
        CREATE INDEX IF NOT EXISTS channel_updates_scid_dir_seen ON channel_updates(short_channel_id ASC, direction ASC, seen DESC) INCLUDE (id, blob_signed);
+       CREATE INDEX IF NOT EXISTS channel_updates_scid_dir_seen_asc ON channel_updates(short_channel_id, direction, seen);
        CREATE UNIQUE INDEX IF NOT EXISTS channel_updates_key ON channel_updates (short_channel_id, direction, timestamp);
        "
 }
@@ -221,6 +221,13 @@ pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client)
                tx.execute("UPDATE config SET db_schema = 8 WHERE id = 1", &[]).await.unwrap();
                tx.commit().await.unwrap();
        }
+       if schema >= 1 && schema <= 8 {
+               let tx = client.transaction().await.unwrap();
+               tx.execute("DROP INDEX channel_updates_seen", &[]).await.unwrap();
+               tx.execute("DROP INDEX channel_updates_scid_seen", &[]).await.unwrap();
+               tx.execute("UPDATE config SET db_schema = 9 WHERE id = 1", &[]).await.unwrap();
+               tx.commit().await.unwrap();
+       }
        if schema <= 1 || schema > SCHEMA_VERSION {
                panic!("Unknown schema in db: {}, we support up to {}", schema, SCHEMA_VERSION);
        }
index c6f266db0590fbf0e327a28fe019d3b6364e75f4..1115568025e213885af30372a93cc652b8e40494 100644 (file)
@@ -20,7 +20,7 @@ use tokio::sync::mpsc;
 use crate::lookup::DeltaSet;
 
 use crate::persistence::GossipPersister;
-use crate::serialization::UpdateSerializationMechanism;
+use crate::serialization::UpdateSerialization;
 use crate::snapshot::Snapshotter;
 use crate::types::TestLogger;
 
@@ -209,11 +209,11 @@ async fn serialize_delta(network_graph: Arc<NetworkGraph<TestLogger>>, last_sync
        let mut update_count_full = 0;
        let mut update_count_incremental = 0;
        for current_update in serialization_details.updates {
-               match &current_update.mechanism {
-                       UpdateSerializationMechanism::Full => {
+               match &current_update {
+                       UpdateSerialization::Full(_) => {
                                update_count_full += 1;
                        }
-                       UpdateSerializationMechanism::Incremental(_) | UpdateSerializationMechanism::Reminder => {
+                       UpdateSerialization::Incremental(_, _) | UpdateSerialization::Reminder(_, _) => {
                                update_count_incremental += 1;
                        }
                };
@@ -221,7 +221,7 @@ async fn serialize_delta(network_graph: Arc<NetworkGraph<TestLogger>>, last_sync
                let mut stripped_update = serialization::serialize_stripped_channel_update(&current_update, &default_update_values, previous_update_scid);
                output.append(&mut stripped_update);
 
-               previous_update_scid = current_update.update.short_channel_id;
+               previous_update_scid = current_update.scid();
        }
 
        // some stats
index c96ef771e5cef7bb9e9875ab16c9fe97ebfb9f12..8018e5ca19d62a5b564b9f40a777f2aad18100b3 100644 (file)
@@ -75,6 +75,7 @@ pub(super) async fn connect_to_db() -> (Client, Connection<Socket, NoTlsStream>)
 /// after `last_sync_timestamp`
 pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<TestLogger>>, client: &Client, last_sync_timestamp: u32) {
        println!("Obtaining channel ids from network graph");
+       let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64));
        let channel_ids = {
                let read_only_graph = network_graph.read_only();
                println!("Retrieved read-only network graph copy");
@@ -117,30 +118,28 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ
                // here is where the channels whose first update in either direction occurred after
                // `last_seen_timestamp` are added to the selection
                let newer_oldest_directional_updates = client.query("
-               SELECT DISTINCT ON (short_channel_id) *
-               FROM (
-                       SELECT DISTINCT ON (short_channel_id, direction) blob_signed
-                       FROM channel_updates
-                       WHERE short_channel_id = any($1)
-                       ORDER BY seen ASC, short_channel_id ASC, direction ASC
-               ) AS directional_last_seens
-               ORDER BY short_channel_id ASC, seen DESC
-       ", &[&channel_ids]).await.unwrap();
+                       SELECT * FROM (
+                               SELECT DISTINCT ON (short_channel_id) *
+                               FROM (
+                                       SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen
+                                       FROM channel_updates
+                                       WHERE short_channel_id = any($1)
+                                       ORDER BY short_channel_id ASC, direction ASC, seen ASC
+                               ) AS directional_last_seens
+                               ORDER BY short_channel_id ASC, seen DESC
+                       ) AS distinct_chans
+                       WHERE distinct_chans.seen >= $2
+                       ", &[&channel_ids, &last_sync_timestamp_object]).await.unwrap();
 
                for current_row in newer_oldest_directional_updates {
-                       let blob: Vec<u8> = current_row.get("blob_signed");
-                       let mut readable = Cursor::new(blob);
-                       let unsigned_update = ChannelUpdate::read(&mut readable).unwrap().contents;
-                       let scid = unsigned_update.short_channel_id;
+                       let scid: i64 = current_row.get("short_channel_id");
                        let current_seen_timestamp_object: SystemTime = current_row.get("seen");
                        let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
 
-                       if current_seen_timestamp > last_sync_timestamp {
-                               // the newer of the two oldest seen directional updates came after last sync timestamp
-                               let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
-                               // first time a channel was seen in both directions
-                               (*current_channel_delta).first_bidirectional_updates_seen = Some(current_seen_timestamp);
-                       }
+                       // the newer of the two oldest seen directional updates came after last sync timestamp
+                       let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default());
+                       // first time a channel was seen in both directions
+                       (*current_channel_delta).first_bidirectional_updates_seen = Some(current_seen_timestamp);
                }
        }
 
@@ -151,57 +150,52 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ
                // Steps:
                // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC
                // — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction)
-               let current_timestamp = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
-               let reminder_threshold_timestamp = current_timestamp.saturating_sub(config::CHANNEL_REMINDER_AGE);
+               let reminder_threshold_timestamp = SystemTime::now().checked_sub(config::CHANNEL_REMINDER_AGE).unwrap();
                let read_only_graph = network_graph.read_only();
 
                let older_latest_directional_updates = client.query("
-               SELECT DISTINCT ON (short_channel_id) *
-               FROM (
-                       SELECT DISTINCT ON (short_channel_id, direction) *
-                       FROM channel_updates
-                       WHERE short_channel_id = any($1)
-                       ORDER BY short_channel_id ASC, direction ASC, seen DESC
-               ) AS directional_last_seens
-               ORDER BY short_channel_id ASC, seen ASC
-       ", &[&channel_ids]).await.unwrap();
+                       SELECT short_channel_id FROM (
+                               SELECT DISTINCT ON (short_channel_id) *
+                               FROM (
+                                       SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen
+                                       FROM channel_updates
+                                       WHERE short_channel_id = any($1)
+                                       ORDER BY short_channel_id ASC, direction ASC, seen DESC
+                               ) AS directional_last_seens
+                               ORDER BY short_channel_id ASC, seen ASC
+                       ) AS distinct_chans
+                       WHERE distinct_chans.seen <= $2
+                       ", &[&channel_ids, &reminder_threshold_timestamp]).await.unwrap();
 
                for current_row in older_latest_directional_updates {
-                       let blob: Vec<u8> = current_row.get("blob_signed");
-                       let mut readable = Cursor::new(blob);
-                       let unsigned_update = ChannelUpdate::read(&mut readable).unwrap().contents;
-                       let scid = unsigned_update.short_channel_id;
-                       let current_seen_timestamp_object: SystemTime = current_row.get("seen");
-                       let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
+                       let scid: i64 = current_row.get("short_channel_id");
+
+                       // annotate this channel as requiring that reminders be sent to the client
+                       let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default());
 
-                       if current_seen_timestamp <= reminder_threshold_timestamp {
-                               // annotate this channel as requiring that reminders be sent to the client
-                               let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
-
-                               // way might be able to get away with not using this
-                               (*current_channel_delta).requires_reminder = true;
-
-                               if let Some(current_channel_info) = read_only_graph.channel(scid) {
-                                       if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() {
-                                               // we don't send reminders if we don't have bidirectional update data
-                                               continue;
-                                       }
-
-                                       if let Some(info) = current_channel_info.one_to_two.as_ref() {
-                                               let flags: u8 = if info.enabled { 0 } else { 2 };
-                                               let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default());
-                                               current_update.serialization_update_flags = Some(flags);
-                                       }
-
-                                       if let Some(info) = current_channel_info.two_to_one.as_ref() {
-                                               let flags: u8 = if info.enabled { 1 } else { 3 };
-                                               let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default());
-                                               current_update.serialization_update_flags = Some(flags);
-                                       }
-                               } else {
-                                       // we don't send reminders if we don't have the channel
+                       // way might be able to get away with not using this
+                       (*current_channel_delta).requires_reminder = true;
+
+                       if let Some(current_channel_info) = read_only_graph.channel(scid as u64) {
+                               if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() {
+                                       // we don't send reminders if we don't have bidirectional update data
                                        continue;
                                }
+
+                               if let Some(info) = current_channel_info.one_to_two.as_ref() {
+                                       let flags: u8 = if info.enabled { 0 } else { 2 };
+                                       let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default());
+                                       current_update.serialization_update_flags = Some(flags);
+                               }
+
+                               if let Some(info) = current_channel_info.two_to_one.as_ref() {
+                                       let flags: u8 = if info.enabled { 1 } else { 3 };
+                                       let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default());
+                                       current_update.serialization_update_flags = Some(flags);
+                               }
+                       } else {
+                               // we don't send reminders if we don't have the channel
+                               continue;
                        }
                }
        }
@@ -214,7 +208,16 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Cli
        // get the latest channel update in each direction prior to last_sync_timestamp, provided
        // there was an update in either direction that happened after the last sync (to avoid
        // collecting too many reference updates)
-       let reference_rows = client.query("SELECT DISTINCT ON (short_channel_id, direction) id, direction, blob_signed FROM channel_updates WHERE seen < $1 AND short_channel_id IN (SELECT short_channel_id FROM channel_updates WHERE seen >= $1 GROUP BY short_channel_id) ORDER BY short_channel_id ASC, direction ASC, seen DESC", &[&last_sync_timestamp_object]).await.unwrap();
+       let reference_rows = client.query("
+               SELECT DISTINCT ON (short_channel_id, direction) id, direction, blob_signed
+               FROM channel_updates
+               WHERE seen < $1 AND short_channel_id IN (
+                       SELECT DISTINCT ON (short_channel_id) short_channel_id
+                       FROM channel_updates
+                       WHERE seen >= $1
+               )
+               ORDER BY short_channel_id ASC, direction ASC, seen DESC
+               ", &[&last_sync_timestamp_object]).await.unwrap();
 
        println!("Fetched reference rows ({}): {:?}", reference_rows.len(), start.elapsed());
 
@@ -252,7 +255,12 @@ pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Cli
                intermediate_update_prefix = "DISTINCT ON (short_channel_id, direction)";
        }
 
-       let query_string = format!("SELECT {} id, direction, blob_signed, seen FROM channel_updates WHERE seen >= $1 ORDER BY short_channel_id ASC, direction ASC, seen DESC", intermediate_update_prefix);
+       let query_string = format!("
+               SELECT {} id, direction, blob_signed, seen
+               FROM channel_updates
+               WHERE seen >= $1
+               ORDER BY short_channel_id ASC, direction ASC, seen DESC
+               ", intermediate_update_prefix);
        let intermediate_updates = client.query(&query_string, &[&last_sync_timestamp_object]).await.unwrap();
        println!("Fetched intermediate rows ({}): {:?}", intermediate_updates.len(), start.elapsed());
 
index 1ab410a57b7bb19fee6a4fda06117de316887280..4c1d82879d9b7f281eb2a1c3eaa9ca6ce6f06dd4 100644 (file)
@@ -58,11 +58,6 @@ impl Default for MutatedProperties {
        }
 }
 
-pub(super) struct UpdateSerialization {
-       pub(super) update: UnsignedChannelUpdate,
-       pub(super) mechanism: UpdateSerializationMechanism,
-}
-
 impl MutatedProperties {
        /// Does not include flags because the flag byte is always sent in full
        fn len(&self) -> u8 {
@@ -76,10 +71,27 @@ impl MutatedProperties {
        }
 }
 
-pub(super) enum UpdateSerializationMechanism {
-       Full,
-       Incremental(MutatedProperties),
-       Reminder,
+pub(super) enum UpdateSerialization {
+       Full(UnsignedChannelUpdate),
+       Incremental(UnsignedChannelUpdate, MutatedProperties),
+       Reminder(u64, u8),
+}
+impl UpdateSerialization {
+       pub(super) fn scid(&self) -> u64 {
+               match self {
+                       UpdateSerialization::Full(latest_update)|
+                       UpdateSerialization::Incremental(latest_update, _) => latest_update.short_channel_id,
+                       UpdateSerialization::Reminder(scid, _) => *scid,
+               }
+       }
+
+       fn flags(&self) -> u8 {
+               match self {
+                       UpdateSerialization::Full(latest_update)|
+                       UpdateSerialization::Incremental(latest_update, _) => latest_update.flags,
+                       UpdateSerialization::Reminder(_, flags) => *flags,
+               }
+       }
 }
 
 struct FullUpdateValueHistograms {
@@ -117,7 +129,7 @@ pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_timestamp: u32)
                *full_update_histograms.htlc_maximum_msat.entry(full_update.htlc_maximum_msat).or_insert(0) += 1;
        };
 
-       for (_scid, channel_delta) in delta_set.into_iter() {
+       for (scid, channel_delta) in delta_set.into_iter() {
 
                // any announcement chain hash is gonna be the same value. Just set it from the first one.
                let channel_announcement_delta = channel_delta.announcement.as_ref().unwrap();
@@ -146,6 +158,7 @@ pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_timestamp: u32)
                        if let Some(updates) = directed_updates {
                                if let Some(latest_update_delta) = updates.latest_update_after_seen {
                                        let latest_update = latest_update_delta.update;
+                                       assert_eq!(latest_update.short_channel_id, scid, "Update in DB had wrong SCID column");
 
                                        // the returned seen timestamp should be the latest of all the returned
                                        // announcements and latest updates
@@ -158,43 +171,19 @@ pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_timestamp: u32)
                                                        // serialize the update as a full update instead of as a change
                                                        // this way, the default values can be computed more efficiently
                                                        record_full_update_in_histograms(&latest_update);
-                                                       serialization_set.updates.push(UpdateSerialization {
-                                                               update: latest_update,
-                                                               mechanism: UpdateSerializationMechanism::Full,
-                                                       });
+                                                       serialization_set.updates.push(UpdateSerialization::Full(latest_update));
                                                } else if mutated_properties.len() > 0 || mutated_properties.flags {
                                                        // we don't count flags as mutated properties
-                                                       serialization_set.updates.push(UpdateSerialization {
-                                                               update: latest_update,
-                                                               mechanism: UpdateSerializationMechanism::Incremental(mutated_properties),
-                                                       });
+                                                       serialization_set.updates.push(
+                                                               UpdateSerialization::Incremental(latest_update, mutated_properties));
                                                }
                                        } else {
                                                // serialize the full update
                                                record_full_update_in_histograms(&latest_update);
-                                               serialization_set.updates.push(UpdateSerialization {
-                                                       update: latest_update,
-                                                       mechanism: UpdateSerializationMechanism::Full,
-                                               });
+                                               serialization_set.updates.push(UpdateSerialization::Full(latest_update));
                                        }
                                } else if let Some(flags) = updates.serialization_update_flags {
-                                       // we need to insert a fake channel update where the only information
-                                       let fake_update = UnsignedChannelUpdate {
-                                               flags,
-                                               chain_hash: BlockHash::all_zeros(),
-                                               short_channel_id: 0,
-                                               cltv_expiry_delta: 0,
-                                               fee_base_msat: 0,
-                                               fee_proportional_millionths: 0,
-                                               htlc_maximum_msat: 0,
-                                               htlc_minimum_msat: 0,
-                                               timestamp: 0,
-                                               excess_data: Vec::with_capacity(0),
-                                       };
-                                       serialization_set.updates.push(UpdateSerialization {
-                                               update: fake_update,
-                                               mechanism: UpdateSerializationMechanism::Reminder
-                                       })
+                                       serialization_set.updates.push(UpdateSerialization::Reminder(scid, flags));
                                }
                        }
                };
@@ -235,18 +224,17 @@ pub fn serialize_stripped_channel_announcement(announcement: &UnsignedChannelAnn
 }
 
 pub(super) fn serialize_stripped_channel_update(update: &UpdateSerialization, default_values: &DefaultUpdateValues, previous_scid: u64) -> Vec<u8> {
-       let latest_update = &update.update;
-       let mut serialized_flags = latest_update.flags;
+       let mut serialized_flags = update.flags();
 
-       if previous_scid > latest_update.short_channel_id {
+       if previous_scid > update.scid() {
                panic!("unsorted scids!");
        }
 
        let mut delta_serialization = Vec::new();
        let mut prefixed_serialization = Vec::new();
 
-       match &update.mechanism {
-               UpdateSerializationMechanism::Full => {
+       match update {
+               UpdateSerialization::Full(latest_update) => {
                        if latest_update.cltv_expiry_delta != default_values.cltv_expiry_delta {
                                serialized_flags |= 0b_0100_0000;
                                latest_update.cltv_expiry_delta.write(&mut delta_serialization).unwrap();
@@ -272,8 +260,7 @@ pub(super) fn serialize_stripped_channel_update(update: &UpdateSerialization, de
                                latest_update.htlc_maximum_msat.write(&mut delta_serialization).unwrap();
                        }
                }
-
-               UpdateSerializationMechanism::Incremental(mutated_properties) => {
+               UpdateSerialization::Incremental(latest_update, mutated_properties) => {
                        // indicate that this update is incremental
                        serialized_flags |= 0b_1000_0000;
 
@@ -302,13 +289,12 @@ pub(super) fn serialize_stripped_channel_update(update: &UpdateSerialization, de
                                latest_update.htlc_maximum_msat.write(&mut delta_serialization).unwrap();
                        }
                },
-
-               UpdateSerializationMechanism::Reminder => {
+               UpdateSerialization::Reminder(_, _) => {
                        // indicate that this update is incremental
                        serialized_flags |= 0b_1000_0000;
                }
        }
-       let scid_delta = BigSize(latest_update.short_channel_id - previous_scid);
+       let scid_delta = BigSize(update.scid() - previous_scid);
        scid_delta.write(&mut prefixed_serialization).unwrap();
 
        serialized_flags.write(&mut prefixed_serialization).unwrap();