1 use std::collections::{BTreeMap, HashSet};
5 use std::time::{Instant, SystemTime, UNIX_EPOCH};
7 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate};
8 use lightning::routing::gossip::NetworkGraph;
9 use lightning::util::ser::Readable;
10 use tokio_postgres::Client;
12 use futures::StreamExt;
13 use lightning::{log_debug, log_gossip, log_info};
14 use lightning::util::logger::Logger;
17 use crate::serialization::MutatedProperties;
19 /// The delta set needs to be a BTreeMap so the keys are sorted.
20 /// That way, the scids in the response automatically grow monotonically
21 pub(super) type DeltaSet = BTreeMap<u64, ChannelDelta>;
23 pub(super) struct AnnouncementDelta {
25 pub(super) announcement: UnsignedChannelAnnouncement,
28 pub(super) struct UpdateDelta {
30 pub(super) update: UnsignedChannelUpdate,
33 pub(super) struct DirectedUpdateDelta {
34 /// the last update we saw prior to the user-provided timestamp
35 pub(super) last_update_before_seen: Option<UpdateDelta>,
36 /// the latest update we saw overall
37 pub(super) latest_update_after_seen: Option<UpdateDelta>,
38 /// the set of all mutated properties across all updates between the last seen by the user and
39 /// the latest one known to us
40 pub(super) mutated_properties: MutatedProperties,
41 /// Specifically for reminder updates, the flag-only value to send to the client
42 pub(super) serialization_update_flags: Option<u8>
45 pub(super) struct ChannelDelta {
46 pub(super) announcement: Option<AnnouncementDelta>,
47 pub(super) updates: (Option<DirectedUpdateDelta>, Option<DirectedUpdateDelta>),
48 pub(super) first_bidirectional_updates_seen: Option<u32>,
49 /// The seen timestamp of the older of the two latest directional updates
50 pub(super) requires_reminder: bool,
53 impl Default for ChannelDelta {
54 fn default() -> Self {
57 updates: (None, None),
58 first_bidirectional_updates_seen: None,
59 requires_reminder: false,
64 impl Default for DirectedUpdateDelta {
65 fn default() -> Self {
67 last_update_before_seen: None,
68 mutated_properties: MutatedProperties::default(),
69 latest_update_after_seen: None,
70 serialization_update_flags: None,
75 /// Fetch all the channel announcements that are presently in the network graph, regardless of
76 /// whether they had been seen before.
77 /// Also include all announcements for which the first update was announced
78 /// after `last_sync_timestamp`
79 pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<L>>, client: &Client, last_sync_timestamp: u32, snapshot_calculation_time: Option<SystemTime>, logger: L) where L::Target: Logger {
80 log_info!(logger, "Obtaining channel ids from network graph");
82 let read_only_graph = network_graph.read_only();
83 log_info!(logger, "Retrieved read-only network graph copy");
84 let channel_iterator = read_only_graph.channels().unordered_iter();
86 .filter(|c| c.1.announcement_message.is_some())
87 .map(|c| c.1.announcement_message.as_ref().unwrap().contents.short_channel_id as i64)
91 log_info!(logger, "Channel IDs: {:?}", channel_ids);
92 log_info!(logger, "Last sync timestamp: {}", last_sync_timestamp);
93 let last_sync_timestamp_float = last_sync_timestamp as f64;
95 let current_time = snapshot_calculation_time.unwrap_or(SystemTime::now());
96 let current_timestamp = current_time.duration_since(UNIX_EPOCH).unwrap().as_secs();
97 log_info!(logger, "Current timestamp: {}", current_timestamp);
99 let include_reminders = {
100 let current_hour = current_timestamp / 3600;
101 let current_day = current_timestamp / (24 * 3600);
103 log_debug!(logger, "Current day index: {}", current_day);
104 log_debug!(logger, "Current hour: {}", current_hour);
106 // anytime between 11pm and 1am
107 let is_reminder_hour = current_hour < 2 || current_hour > 22;
108 let is_reminder_day = (current_day % 5) == 0;
110 let snapshot_scope = current_timestamp.saturating_sub(last_sync_timestamp as u64);
111 let is_reminder_scope = snapshot_scope > (40 * 3600);
112 log_debug!(logger, "Snapshot scope: {}s", snapshot_scope);
114 (is_reminder_hour && is_reminder_day) || is_reminder_scope
117 log_info!(logger, "Obtaining corresponding database entries");
118 // get all the channel announcements that are currently in the network graph
119 let announcement_rows = client.query_raw("SELECT announcement_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", [&channel_ids]).await.unwrap();
120 let mut pinned_rows = Box::pin(announcement_rows);
122 let mut announcement_count = 0;
123 while let Some(row_res) = pinned_rows.next().await {
124 let current_announcement_row = row_res.unwrap();
125 let blob: Vec<u8> = current_announcement_row.get("announcement_signed");
126 let mut readable = Cursor::new(blob);
127 let unsigned_announcement = ChannelAnnouncement::read(&mut readable).unwrap().contents;
129 let scid = unsigned_announcement.short_channel_id;
130 let current_seen_timestamp = current_announcement_row.get::<_, i64>("seen") as u32;
132 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
133 (*current_channel_delta).announcement = Some(AnnouncementDelta {
134 announcement: unsigned_announcement,
135 seen: current_seen_timestamp,
138 announcement_count += 1;
140 log_info!(logger, "Fetched {} announcement rows", announcement_count);
143 // THIS STEP IS USED TO DETERMINE IF A CHANNEL SHOULD BE OMITTED FROM THE DELTA
145 log_info!(logger, "Annotating channel announcements whose oldest channel update in a given direction occurred after the last sync");
147 // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC // to find the oldest update in a given direction
148 // — From those updates, select distinct by (scid), ordered by seen DESC (to obtain the newer one per direction)
149 // This will allow us to mark the first time updates in both directions were seen
151 // here is where the channels whose first update in either direction occurred after
152 // `last_seen_timestamp` are added to the selection
153 let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] =
154 [&channel_ids, &last_sync_timestamp_float];
155 let newer_oldest_directional_updates = client.query_raw("
156 SELECT short_channel_id, CAST(EXTRACT('epoch' from distinct_chans.seen) AS BIGINT) AS seen FROM (
157 SELECT DISTINCT ON (short_channel_id) *
159 SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen
161 WHERE short_channel_id = any($1)
162 ORDER BY short_channel_id ASC, direction ASC, seen ASC
163 ) AS directional_last_seens
164 ORDER BY short_channel_id ASC, seen DESC
166 WHERE distinct_chans.seen >= TO_TIMESTAMP($2)
167 ", params).await.unwrap();
168 let mut pinned_updates = Box::pin(newer_oldest_directional_updates);
170 let mut newer_oldest_directional_update_count = 0;
171 while let Some(row_res) = pinned_updates.next().await {
172 let current_row = row_res.unwrap();
174 let scid: i64 = current_row.get("short_channel_id");
175 let current_seen_timestamp = current_row.get::<_, i64>("seen") as u32;
177 // the newer of the two oldest seen directional updates came after last sync timestamp
178 let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default());
179 // first time a channel was seen in both directions
180 (*current_channel_delta).first_bidirectional_updates_seen = Some(current_seen_timestamp);
182 newer_oldest_directional_update_count += 1;
184 log_info!(logger, "Fetched {} update rows of the first update in a new direction", newer_oldest_directional_update_count);
187 if include_reminders {
188 // THIS STEP IS USED TO DETERMINE IF A REMINDER UPDATE SHOULD BE SENT
190 log_info!(logger, "Annotating channel announcements whose latest channel update in a given direction occurred more than six days ago");
192 // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC
193 // — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction)
194 let reminder_threshold_timestamp = current_time.checked_sub(config::CHANNEL_REMINDER_AGE).unwrap().duration_since(UNIX_EPOCH).unwrap().as_secs() as f64;
196 log_info!(logger, "Fetch first time we saw the current value combination for each direction (prior mutations excepted)");
197 let reminder_lookup_threshold_timestamp = current_time.checked_sub(config::CHANNEL_REMINDER_AGE * 3).unwrap().duration_since(UNIX_EPOCH).unwrap().as_secs() as f64;
198 let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&channel_ids, &reminder_lookup_threshold_timestamp];
201 What exactly is the below query doing?
203 First, the inner query groups all channel updates by their scid/direction combination,
204 and then sorts those in reverse chronological order by the "seen" column.
206 Then, each row is annotated based on whether its subsequent row for the same scid/direction
207 combination has a different value for any one of these six fields:
208 disable, cltv_expiry_delta, htlc_minimum_msat, fee_base_msat, fee_proportional_millionths, htlc_maximum_msat
209 Those are simply the properties we use to keep track of channel mutations.
211 The outer query takes all of those results and selects the first value that has a distinct
212 successor for each scid/direction combination. That yields the first instance at which
213 a given channel configuration was received after any prior mutations.
215 Knowing that, we can check whether or not there have been any mutations within the
216 reminder requirement window. Because we only care about that window (and potentially the
217 2-week-window), we pre-filter the scanned updates by only those that were received within
218 3x the timeframe that we consider necessitates reminders.
221 let mutated_updates = client.query_raw("
222 SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM (
223 SELECT short_channel_id, direction, timestamp, seen, blob_signed, COALESCE (
224 disable<>lead(disable) OVER w1
226 cltv_expiry_delta<>lead(cltv_expiry_delta) OVER w1
228 htlc_minimum_msat<>lead(htlc_minimum_msat) OVER w1
230 fee_base_msat<>lead(fee_base_msat) OVER w1
232 fee_proportional_millionths<>lead(fee_proportional_millionths) OVER w1
234 htlc_maximum_msat<>lead(htlc_maximum_msat) OVER w1,
236 ) has_distinct_successor
238 WHERE short_channel_id = any($1) AND seen >= TO_TIMESTAMP($2)
239 WINDOW w1 AS (PARTITION BY short_channel_id, direction ORDER BY seen DESC)
241 WHERE has_distinct_successor
242 ORDER BY short_channel_id ASC, direction ASC, timestamp DESC
243 ", params).await.unwrap();
245 let mut pinned_updates = Box::pin(mutated_updates);
246 let mut older_latest_directional_update_count = 0;
247 while let Some(row_res) = pinned_updates.next().await {
248 let current_row = row_res.unwrap();
249 let seen = current_row.get::<_, i64>("seen") as u32;
251 if seen < reminder_threshold_timestamp as u32 {
252 let blob: Vec<u8> = current_row.get("blob_signed");
253 let mut readable = Cursor::new(blob);
254 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
256 let scid = unsigned_channel_update.short_channel_id;
257 let direction: bool = current_row.get("direction");
259 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
261 // We might be able to get away with not using this
262 (*current_channel_delta).requires_reminder = true;
263 older_latest_directional_update_count += 1;
265 if let Some(current_channel_info) = network_graph.read_only().channel(scid) {
266 if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() {
267 // we don't send reminders if we don't have bidirectional update data
271 if let Some(info) = current_channel_info.one_to_two.as_ref() {
272 let flags: u8 = if info.enabled { 0 } else { 2 };
273 let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default());
274 current_update.serialization_update_flags = Some(flags);
277 if let Some(info) = current_channel_info.two_to_one.as_ref() {
278 let flags: u8 = if info.enabled { 1 } else { 3 };
279 let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default());
280 current_update.serialization_update_flags = Some(flags);
283 // we don't send reminders if we don't have the channel
287 log_gossip!(logger, "Reminder requirement triggered by update for channel {} in direction {}", scid, direction);
290 log_info!(logger, "Fetched {} update rows of the latest update in the less recently updated direction", older_latest_directional_update_count);
294 pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger {
295 let start = Instant::now();
296 let last_sync_timestamp_float = last_sync_timestamp as f64;
298 // get the latest channel update in each direction prior to last_sync_timestamp, provided
299 // there was an update in either direction that happened after the last sync (to avoid
300 // collecting too many reference updates)
301 let reference_rows = client.query_raw("
302 SELECT id, direction, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen, blob_signed FROM channel_updates
304 SELECT DISTINCT ON (short_channel_id, direction) id
306 WHERE seen < TO_TIMESTAMP($1) AND short_channel_id IN (
307 SELECT DISTINCT ON (short_channel_id) short_channel_id
309 WHERE seen >= TO_TIMESTAMP($1)
311 ORDER BY short_channel_id ASC, direction ASC, seen DESC
313 ", [last_sync_timestamp_float]).await.unwrap();
314 let mut pinned_rows = Box::pin(reference_rows);
316 log_info!(logger, "Fetched reference rows in {:?}", start.elapsed());
318 let mut last_seen_update_ids: Vec<i32> = Vec::new();
319 let mut non_intermediate_ids: HashSet<i32> = HashSet::new();
320 let mut reference_row_count = 0;
322 while let Some(row_res) = pinned_rows.next().await {
323 let current_reference = row_res.unwrap();
324 let update_id: i32 = current_reference.get("id");
325 last_seen_update_ids.push(update_id);
326 non_intermediate_ids.insert(update_id);
328 let direction: bool = current_reference.get("direction");
329 let seen = current_reference.get::<_, i64>("seen") as u32;
330 let blob: Vec<u8> = current_reference.get("blob_signed");
331 let mut readable = Cursor::new(blob);
332 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
333 let scid = unsigned_channel_update.short_channel_id;
335 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
336 let update_delta = if !direction {
337 (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
339 (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
341 log_gossip!(logger, "Channel {} last update before seen: {}/{}/{}", scid, update_id, direction, unsigned_channel_update.timestamp);
342 update_delta.last_update_before_seen = Some(UpdateDelta {
344 update: unsigned_channel_update,
347 reference_row_count += 1;
350 log_info!(logger, "Processed {} reference rows (delta size: {}) in {:?}",
351 reference_row_count, delta_set.len(), start.elapsed());
353 // get all the intermediate channel updates
354 // (to calculate the set of mutated fields for snapshotting, where intermediate updates may
355 // have been omitted)
357 let intermediate_updates = client.query_raw("
358 SELECT id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen
360 WHERE seen >= TO_TIMESTAMP($1)
361 ORDER BY short_channel_id ASC, timestamp DESC
362 ", [last_sync_timestamp_float]).await.unwrap();
363 let mut pinned_updates = Box::pin(intermediate_updates);
364 log_info!(logger, "Fetched intermediate rows in {:?}", start.elapsed());
366 let mut previous_scid = u64::MAX;
367 let mut previously_seen_directions = (false, false);
369 // let mut previously_seen_directions = (false, false);
370 let mut intermediate_update_count = 0;
371 while let Some(row_res) = pinned_updates.next().await {
372 let intermediate_update = row_res.unwrap();
373 let update_id: i32 = intermediate_update.get("id");
374 if non_intermediate_ids.contains(&update_id) {
377 intermediate_update_count += 1;
379 let direction: bool = intermediate_update.get("direction");
380 let current_seen_timestamp = intermediate_update.get::<_, i64>("seen") as u32;
381 let blob: Vec<u8> = intermediate_update.get("blob_signed");
382 let mut readable = Cursor::new(blob);
383 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
385 let scid = unsigned_channel_update.short_channel_id;
386 if scid != previous_scid {
387 previous_scid = scid;
388 previously_seen_directions = (false, false);
391 // get the write configuration for this particular channel's directional details
392 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
393 let update_delta = if !direction {
394 (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
396 (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
400 // handle the latest deltas
401 if !direction && !previously_seen_directions.0 {
402 previously_seen_directions.0 = true;
403 update_delta.latest_update_after_seen = Some(UpdateDelta {
404 seen: current_seen_timestamp,
405 update: unsigned_channel_update.clone(),
407 } else if direction && !previously_seen_directions.1 {
408 previously_seen_directions.1 = true;
409 update_delta.latest_update_after_seen = Some(UpdateDelta {
410 seen: current_seen_timestamp,
411 update: unsigned_channel_update.clone(),
416 // determine mutations
417 if let Some(last_seen_update) = update_delta.last_update_before_seen.as_ref() {
418 if unsigned_channel_update.flags != last_seen_update.update.flags {
419 update_delta.mutated_properties.flags = true;
421 if unsigned_channel_update.cltv_expiry_delta != last_seen_update.update.cltv_expiry_delta {
422 update_delta.mutated_properties.cltv_expiry_delta = true;
424 if unsigned_channel_update.htlc_minimum_msat != last_seen_update.update.htlc_minimum_msat {
425 update_delta.mutated_properties.htlc_minimum_msat = true;
427 if unsigned_channel_update.fee_base_msat != last_seen_update.update.fee_base_msat {
428 update_delta.mutated_properties.fee_base_msat = true;
430 if unsigned_channel_update.fee_proportional_millionths != last_seen_update.update.fee_proportional_millionths {
431 update_delta.mutated_properties.fee_proportional_millionths = true;
433 if unsigned_channel_update.htlc_maximum_msat != last_seen_update.update.htlc_maximum_msat {
434 update_delta.mutated_properties.htlc_maximum_msat = true;
438 log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
441 pub(super) fn filter_delta_set<L: Deref>(delta_set: &mut DeltaSet, logger: L) where L::Target: Logger {
442 let original_length = delta_set.len();
443 let keys: Vec<u64> = delta_set.keys().cloned().collect();
445 let v = delta_set.get(&k).unwrap();
446 if v.announcement.is_none() {
447 // this channel is not currently in the network graph
448 delta_set.remove(&k);
452 let update_meets_criteria = |update: &Option<DirectedUpdateDelta>| {
453 if update.is_none() {
456 let update_reference = update.as_ref().unwrap();
457 // update_reference.latest_update_after_seen.is_some() && !update_reference.intermediate_updates.is_empty()
458 // if there has been an update after the channel was first seen
460 v.requires_reminder || update_reference.latest_update_after_seen.is_some()
463 let direction_a_meets_criteria = update_meets_criteria(&v.updates.0);
464 let direction_b_meets_criteria = update_meets_criteria(&v.updates.1);
466 if !v.requires_reminder && !direction_a_meets_criteria && !direction_b_meets_criteria {
467 delta_set.remove(&k);
471 let new_length = delta_set.len();
472 if original_length != new_length {
473 log_info!(logger, "length modified!");