From f525858b51ca7c8ad9b855efd92e5e04efa53ed1 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Fri, 13 Oct 2023 18:39:27 -0700 Subject: [PATCH] Allow setting custom gossip seen timestamps. --- src/downloader.rs | 4 ++-- src/persistence.rs | 34 +++++++++++++++++++++++++++++----- src/tests/mod.rs | 6 +++--- src/types.rs | 5 +++-- 4 files changed, 37 insertions(+), 12 deletions(-) diff --git a/src/downloader.rs b/src/downloader.rs index 8d5cdb0..af854c8 100644 --- a/src/downloader.rs +++ b/src/downloader.rs @@ -62,7 +62,7 @@ impl GossipRouter where L::Target: Logger { counter.channel_announcements += 1; } - let gossip_message = GossipMessage::ChannelAnnouncement(msg); + let gossip_message = GossipMessage::ChannelAnnouncement(msg, None); if let Err(err) = self.sender.try_send(gossip_message) { let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg }; tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move { @@ -73,7 +73,7 @@ impl GossipRouter where L::Target: Logger { fn new_channel_update(&self, msg: ChannelUpdate) { self.counter.write().unwrap().channel_updates += 1; - let gossip_message = GossipMessage::ChannelUpdate(msg); + let gossip_message = GossipMessage::ChannelUpdate(msg, None); if let Err(err) = self.sender.try_send(gossip_message) { let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg }; diff --git a/src/persistence.rs b/src/persistence.rs index 8bb7b18..7b451b7 100644 --- a/src/persistence.rs +++ b/src/persistence.rs @@ -111,7 +111,7 @@ impl GossipPersister where L::Target: Logger { } match &gossip_message { - GossipMessage::ChannelAnnouncement(announcement) => { + GossipMessage::ChannelAnnouncement(announcement, _) => { let scid = announcement.contents.short_channel_id as i64; // start with the type prefix, which is already known a priori @@ -127,7 +127,7 @@ impl GossipPersister where L::Target: Logger { &announcement_signed ])).await.unwrap().unwrap(); } - GossipMessage::ChannelUpdate(update) => { + GossipMessage::ChannelUpdate(update, seen_override) => { let scid = update.contents.short_channel_id as i64; let timestamp = update.contents.timestamp as i64; @@ -146,10 +146,11 @@ impl GossipPersister where L::Target: Logger { let mut update_signed = Vec::new(); update.write(&mut update_signed).unwrap(); - tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client - .execute("INSERT INTO channel_updates (\ + let insertion_statement = if cfg!(test) { + "INSERT INTO channel_updates (\ short_channel_id, \ timestamp, \ + seen, \ channel_flags, \ direction, \ disable, \ @@ -159,9 +160,32 @@ impl GossipPersister where L::Target: Logger { fee_proportional_millionths, \ htlc_maximum_msat, \ blob_signed \ - ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT DO NOTHING", &[ + ) VALUES ($1, $2, TO_TIMESTAMP($3), $4, $5, $6, $7, $8, $9, $10, $11, $12) ON CONFLICT DO NOTHING" + } else { + "INSERT INTO channel_updates (\ + short_channel_id, \ + timestamp, \ + channel_flags, \ + direction, \ + disable, \ + cltv_expiry_delta, \ + htlc_minimum_msat, \ + fee_base_msat, \ + fee_proportional_millionths, \ + htlc_maximum_msat, \ + blob_signed \ + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ON CONFLICT DO NOTHING" + }; + + // this may not be used outside test cfg + let _seen_timestamp = seen_override.unwrap_or(timestamp as u32) as f64; + + tokio::time::timeout(POSTGRES_INSERT_TIMEOUT, client + .execute(insertion_statement, &[ &scid, ×tamp, + #[cfg(test)] + &_seen_timestamp, &(update.contents.flags as i16), &direction, &disable, diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 09147e9..f26cea8 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -168,9 +168,9 @@ async fn test_trivial_setup() { network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap(); network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap(); - receiver.send(GossipMessage::ChannelAnnouncement(announcement)).await.unwrap(); - receiver.send(GossipMessage::ChannelUpdate(update_1)).await.unwrap(); - receiver.send(GossipMessage::ChannelUpdate(update_2)).await.unwrap(); + receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap(); + receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap(); drop(receiver); persister.persist_gossip().await; } diff --git a/src/types.rs b/src/types.rs index f530bd4..93527a4 100644 --- a/src/types.rs +++ b/src/types.rs @@ -14,8 +14,9 @@ pub(crate) type GossipPeerManager = Arc), + ChannelUpdate(ChannelUpdate, Option), } #[derive(Clone, Copy)] -- 2.39.5