1 use std::collections::{BTreeMap, HashSet};
5 use std::time::{Instant, SystemTime, UNIX_EPOCH};
7 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate};
8 use lightning::routing::gossip::NetworkGraph;
9 use lightning::util::ser::Readable;
10 use tokio_postgres::Client;
12 use futures::StreamExt;
13 use lightning::{log_debug, log_gossip, log_info};
14 use lightning::util::logger::Logger;
17 use crate::serialization::MutatedProperties;
19 /// The delta set needs to be a BTreeMap so the keys are sorted.
20 /// That way, the scids in the response automatically grow monotonically
21 pub(super) type DeltaSet = BTreeMap<u64, ChannelDelta>;
23 pub(super) struct AnnouncementDelta {
25 pub(super) announcement: UnsignedChannelAnnouncement,
28 pub(super) struct UpdateDelta {
30 pub(super) update: UnsignedChannelUpdate,
33 pub(super) struct DirectedUpdateDelta {
34 /// the last update we saw prior to the user-provided timestamp
35 pub(super) last_update_before_seen: Option<UpdateDelta>,
36 /// the latest update we saw overall
37 pub(super) latest_update_after_seen: Option<UpdateDelta>,
38 /// the set of all mutated properties across all updates between the last seen by the user and
39 /// the latest one known to us
40 pub(super) mutated_properties: MutatedProperties,
41 /// Specifically for reminder updates, the flag-only value to send to the client
42 pub(super) serialization_update_flags: Option<u8>
45 pub(super) struct ChannelDelta {
46 pub(super) announcement: Option<AnnouncementDelta>,
47 pub(super) updates: (Option<DirectedUpdateDelta>, Option<DirectedUpdateDelta>),
48 pub(super) first_bidirectional_updates_seen: Option<u32>,
49 /// The seen timestamp of the older of the two latest directional updates
50 pub(super) requires_reminder: bool,
53 impl Default for ChannelDelta {
54 fn default() -> Self {
57 updates: (None, None),
58 first_bidirectional_updates_seen: None,
59 requires_reminder: false,
64 impl Default for DirectedUpdateDelta {
65 fn default() -> Self {
67 last_update_before_seen: None,
68 mutated_properties: MutatedProperties::default(),
69 latest_update_after_seen: None,
70 serialization_update_flags: None,
75 /// Fetch all the channel announcements that are presently in the network graph, regardless of
76 /// whether they had been seen before.
77 /// Also include all announcements for which the first update was announced
78 /// after `last_sync_timestamp`
79 pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<L>>, client: &Client, last_sync_timestamp: u32, snapshot_reference_timestamp: Option<u64>, logger: L) where L::Target: Logger {
80 log_info!(logger, "Obtaining channel ids from network graph");
82 let read_only_graph = network_graph.read_only();
83 log_info!(logger, "Retrieved read-only network graph copy");
84 let channel_iterator = read_only_graph.channels().unordered_iter();
86 .filter(|c| c.1.announcement_message.is_some() && c.1.one_to_two.is_some() && c.1.two_to_one.is_some())
87 .map(|c| c.1.announcement_message.as_ref().unwrap().contents.short_channel_id as i64)
91 log_info!(logger, "Channel IDs: {:?}", channel_ids);
92 log_info!(logger, "Last sync timestamp: {}", last_sync_timestamp);
93 let last_sync_timestamp_float = last_sync_timestamp as f64;
95 let current_timestamp = snapshot_reference_timestamp.unwrap_or(SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs());
96 log_info!(logger, "Current timestamp: {}", current_timestamp);
98 let include_reminders = {
99 let current_hour = current_timestamp / 3600;
100 let current_day = current_timestamp / (24 * 3600);
102 log_debug!(logger, "Current day index: {}", current_day);
103 log_debug!(logger, "Current hour: {}", current_hour);
105 // every 5th day at midnight
106 let is_reminder_hour = (current_hour % 24) == 0;
107 let is_reminder_day = (current_day % 5) == 0;
109 let snapshot_scope = current_timestamp.saturating_sub(last_sync_timestamp as u64);
110 let is_reminder_scope = snapshot_scope > (50 * 3600);
111 log_debug!(logger, "Snapshot scope: {}s", snapshot_scope);
113 (is_reminder_hour && is_reminder_day) || is_reminder_scope
116 log_info!(logger, "Obtaining corresponding database entries");
117 // get all the channel announcements that are currently in the network graph
118 let announcement_rows = client.query_raw("SELECT announcement_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", [&channel_ids]).await.unwrap();
119 let mut pinned_rows = Box::pin(announcement_rows);
121 let mut announcement_count = 0;
122 while let Some(row_res) = pinned_rows.next().await {
123 let current_announcement_row = row_res.unwrap();
124 let blob: Vec<u8> = current_announcement_row.get("announcement_signed");
125 let mut readable = Cursor::new(blob);
126 let unsigned_announcement = ChannelAnnouncement::read(&mut readable).unwrap().contents;
128 let scid = unsigned_announcement.short_channel_id;
129 let current_seen_timestamp = current_announcement_row.get::<_, i64>("seen") as u32;
131 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
132 (*current_channel_delta).announcement = Some(AnnouncementDelta {
133 announcement: unsigned_announcement,
134 seen: current_seen_timestamp,
137 announcement_count += 1;
139 log_info!(logger, "Fetched {} announcement rows", announcement_count);
142 // THIS STEP IS USED TO DETERMINE IF A CHANNEL SHOULD BE OMITTED FROM THE DELTA
144 log_info!(logger, "Annotating channel announcements whose oldest channel update in a given direction occurred after the last sync");
146 // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC // to find the oldest update in a given direction
147 // — From those updates, select distinct by (scid), ordered by seen DESC (to obtain the newer one per direction)
148 // This will allow us to mark the first time updates in both directions were seen
150 // here is where the channels whose first update in either direction occurred after
151 // `last_seen_timestamp` are added to the selection
152 let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] =
153 [&channel_ids, &last_sync_timestamp_float];
154 let newer_oldest_directional_updates = client.query_raw("
155 SELECT short_channel_id, CAST(EXTRACT('epoch' from distinct_chans.seen) AS BIGINT) AS seen FROM (
156 SELECT DISTINCT ON (short_channel_id) *
158 SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen
160 WHERE short_channel_id = any($1)
161 ORDER BY short_channel_id ASC, direction ASC, seen ASC
162 ) AS directional_last_seens
163 ORDER BY short_channel_id ASC, seen DESC
165 WHERE distinct_chans.seen >= TO_TIMESTAMP($2)
166 ", params).await.unwrap();
167 let mut pinned_updates = Box::pin(newer_oldest_directional_updates);
169 let mut newer_oldest_directional_update_count = 0;
170 while let Some(row_res) = pinned_updates.next().await {
171 let current_row = row_res.unwrap();
173 let scid: i64 = current_row.get("short_channel_id");
174 let current_seen_timestamp = current_row.get::<_, i64>("seen") as u32;
176 // the newer of the two oldest seen directional updates came after last sync timestamp
177 let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default());
178 // first time a channel was seen in both directions
179 (*current_channel_delta).first_bidirectional_updates_seen = Some(current_seen_timestamp);
181 newer_oldest_directional_update_count += 1;
183 log_info!(logger, "Fetched {} update rows of the first update in a new direction", newer_oldest_directional_update_count);
186 if include_reminders {
187 // THIS STEP IS USED TO DETERMINE IF A REMINDER UPDATE SHOULD BE SENT
189 log_info!(logger, "Annotating channel announcements whose latest channel update in a given direction occurred more than six days ago");
191 // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC
192 // — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction)
193 let reminder_threshold_timestamp = current_timestamp.checked_sub(config::CHANNEL_REMINDER_AGE.as_secs()).unwrap() as f64;
195 log_info!(logger, "Fetch first time we saw the current value combination for each direction (prior mutations excepted)");
196 let reminder_lookup_threshold_timestamp = current_timestamp.checked_sub(config::CHANNEL_REMINDER_AGE.as_secs() * 3).unwrap() as f64;
197 let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&channel_ids, &reminder_lookup_threshold_timestamp];
200 What exactly is the below query doing?
202 First, the inner query groups all channel updates by their scid/direction combination,
203 and then sorts those in reverse chronological order by the "seen" column.
205 Then, each row is annotated based on whether its subsequent row for the same scid/direction
206 combination has a different value for any one of these six fields:
207 disable, cltv_expiry_delta, htlc_minimum_msat, fee_base_msat, fee_proportional_millionths, htlc_maximum_msat
208 Those are simply the properties we use to keep track of channel mutations.
210 The outer query takes all of those results and selects the first value that has a distinct
211 successor for each scid/direction combination. That yields the first instance at which
212 a given channel configuration was received after any prior mutations.
214 Knowing that, we can check whether or not there have been any mutations within the
215 reminder requirement window. Because we only care about that window (and potentially the
216 2-week-window), we pre-filter the scanned updates by only those that were received within
217 3x the timeframe that we consider necessitates reminders.
220 let mutated_updates = client.query_raw("
221 SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM (
222 SELECT short_channel_id, direction, timestamp, seen, blob_signed, COALESCE (
223 disable<>lead(disable) OVER w1
225 cltv_expiry_delta<>lead(cltv_expiry_delta) OVER w1
227 htlc_minimum_msat<>lead(htlc_minimum_msat) OVER w1
229 fee_base_msat<>lead(fee_base_msat) OVER w1
231 fee_proportional_millionths<>lead(fee_proportional_millionths) OVER w1
233 htlc_maximum_msat<>lead(htlc_maximum_msat) OVER w1,
235 ) has_distinct_successor
237 WHERE short_channel_id = any($1) AND seen >= TO_TIMESTAMP($2)
238 WINDOW w1 AS (PARTITION BY short_channel_id, direction ORDER BY seen DESC)
240 WHERE has_distinct_successor
241 ORDER BY short_channel_id ASC, direction ASC, timestamp DESC
242 ", params).await.unwrap();
244 let mut pinned_updates = Box::pin(mutated_updates);
245 let mut older_latest_directional_update_count = 0;
246 while let Some(row_res) = pinned_updates.next().await {
247 let current_row = row_res.unwrap();
248 let seen = current_row.get::<_, i64>("seen") as u32;
250 if seen < reminder_threshold_timestamp as u32 {
251 let blob: Vec<u8> = current_row.get("blob_signed");
252 let mut readable = Cursor::new(blob);
253 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
255 let scid = unsigned_channel_update.short_channel_id;
256 let direction: bool = current_row.get("direction");
258 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
260 // We might be able to get away with not using this
261 (*current_channel_delta).requires_reminder = true;
262 older_latest_directional_update_count += 1;
264 if let Some(current_channel_info) = network_graph.read_only().channel(scid) {
265 if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() {
266 // we don't send reminders if we don't have bidirectional update data
270 if let Some(info) = current_channel_info.one_to_two.as_ref() {
271 let flags: u8 = if info.enabled { 0 } else { 2 };
272 let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default());
273 current_update.serialization_update_flags = Some(flags);
276 if let Some(info) = current_channel_info.two_to_one.as_ref() {
277 let flags: u8 = if info.enabled { 1 } else { 3 };
278 let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default());
279 current_update.serialization_update_flags = Some(flags);
282 // we don't send reminders if we don't have the channel
286 log_gossip!(logger, "Reminder requirement triggered by update for channel {} in direction {}", scid, direction);
289 log_info!(logger, "Fetched {} update rows of the latest update in the less recently updated direction", older_latest_directional_update_count);
293 pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger {
294 let start = Instant::now();
295 let last_sync_timestamp_float = last_sync_timestamp as f64;
297 // get the latest channel update in each direction prior to last_sync_timestamp, provided
298 // there was an update in either direction that happened after the last sync (to avoid
299 // collecting too many reference updates)
300 let reference_rows = client.query_raw("
301 SELECT id, direction, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen, blob_signed FROM channel_updates
303 SELECT DISTINCT ON (short_channel_id, direction) id
305 WHERE seen < TO_TIMESTAMP($1) AND short_channel_id IN (
306 SELECT DISTINCT ON (short_channel_id) short_channel_id
308 WHERE seen >= TO_TIMESTAMP($1)
310 ORDER BY short_channel_id ASC, direction ASC, seen DESC
312 ", [last_sync_timestamp_float]).await.unwrap();
313 let mut pinned_rows = Box::pin(reference_rows);
315 log_info!(logger, "Fetched reference rows in {:?}", start.elapsed());
317 let mut last_seen_update_ids: Vec<i32> = Vec::new();
318 let mut non_intermediate_ids: HashSet<i32> = HashSet::new();
319 let mut reference_row_count = 0;
321 while let Some(row_res) = pinned_rows.next().await {
322 let current_reference = row_res.unwrap();
323 let update_id: i32 = current_reference.get("id");
324 last_seen_update_ids.push(update_id);
325 non_intermediate_ids.insert(update_id);
327 let direction: bool = current_reference.get("direction");
328 let seen = current_reference.get::<_, i64>("seen") as u32;
329 let blob: Vec<u8> = current_reference.get("blob_signed");
330 let mut readable = Cursor::new(blob);
331 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
332 let scid = unsigned_channel_update.short_channel_id;
334 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
335 let update_delta = if !direction {
336 (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
338 (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
340 log_gossip!(logger, "Channel {} last update before seen: {}/{}/{}", scid, update_id, direction, unsigned_channel_update.timestamp);
341 update_delta.last_update_before_seen = Some(UpdateDelta {
343 update: unsigned_channel_update,
346 reference_row_count += 1;
349 log_info!(logger, "Processed {} reference rows (delta size: {}) in {:?}",
350 reference_row_count, delta_set.len(), start.elapsed());
352 // get all the intermediate channel updates
353 // (to calculate the set of mutated fields for snapshotting, where intermediate updates may
354 // have been omitted)
356 let intermediate_updates = client.query_raw("
357 SELECT id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen
359 WHERE seen >= TO_TIMESTAMP($1)
360 ORDER BY short_channel_id ASC, timestamp DESC
361 ", [last_sync_timestamp_float]).await.unwrap();
362 let mut pinned_updates = Box::pin(intermediate_updates);
363 log_info!(logger, "Fetched intermediate rows in {:?}", start.elapsed());
365 let mut previous_scid = u64::MAX;
366 let mut previously_seen_directions = (false, false);
368 let mut intermediate_update_count = 0;
369 while let Some(row_res) = pinned_updates.next().await {
370 let intermediate_update = row_res.unwrap();
371 let update_id: i32 = intermediate_update.get("id");
372 if non_intermediate_ids.contains(&update_id) {
375 intermediate_update_count += 1;
377 let direction: bool = intermediate_update.get("direction");
378 let current_seen_timestamp = intermediate_update.get::<_, i64>("seen") as u32;
379 let blob: Vec<u8> = intermediate_update.get("blob_signed");
380 let mut readable = Cursor::new(blob);
381 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
383 let scid = unsigned_channel_update.short_channel_id;
384 if scid != previous_scid {
385 previous_scid = scid;
386 previously_seen_directions = (false, false);
389 // get the write configuration for this particular channel's directional details
390 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
391 let update_delta = if !direction {
392 (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
394 (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
398 // handle the latest deltas
399 if !direction && !previously_seen_directions.0 {
400 previously_seen_directions.0 = true;
401 update_delta.latest_update_after_seen = Some(UpdateDelta {
402 seen: current_seen_timestamp,
403 update: unsigned_channel_update.clone(),
405 } else if direction && !previously_seen_directions.1 {
406 previously_seen_directions.1 = true;
407 update_delta.latest_update_after_seen = Some(UpdateDelta {
408 seen: current_seen_timestamp,
409 update: unsigned_channel_update.clone(),
414 // determine mutations
415 if let Some(last_seen_update) = update_delta.last_update_before_seen.as_ref() {
416 if unsigned_channel_update.flags != last_seen_update.update.flags {
417 update_delta.mutated_properties.flags = true;
419 if unsigned_channel_update.cltv_expiry_delta != last_seen_update.update.cltv_expiry_delta {
420 update_delta.mutated_properties.cltv_expiry_delta = true;
422 if unsigned_channel_update.htlc_minimum_msat != last_seen_update.update.htlc_minimum_msat {
423 update_delta.mutated_properties.htlc_minimum_msat = true;
425 if unsigned_channel_update.fee_base_msat != last_seen_update.update.fee_base_msat {
426 update_delta.mutated_properties.fee_base_msat = true;
428 if unsigned_channel_update.fee_proportional_millionths != last_seen_update.update.fee_proportional_millionths {
429 update_delta.mutated_properties.fee_proportional_millionths = true;
431 if unsigned_channel_update.htlc_maximum_msat != last_seen_update.update.htlc_maximum_msat {
432 update_delta.mutated_properties.htlc_maximum_msat = true;
436 log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
439 pub(super) fn filter_delta_set<L: Deref>(delta_set: &mut DeltaSet, logger: L) where L::Target: Logger {
440 let original_length = delta_set.len();
441 let keys: Vec<u64> = delta_set.keys().cloned().collect();
443 let v = delta_set.get(&k).unwrap();
444 if v.announcement.is_none() {
445 // this channel is not currently in the network graph
446 delta_set.remove(&k);
450 let update_meets_criteria = |update: &Option<DirectedUpdateDelta>| {
451 if update.is_none() {
454 let update_reference = update.as_ref().unwrap();
455 // update_reference.latest_update_after_seen.is_some() && !update_reference.intermediate_updates.is_empty()
456 // if there has been an update after the channel was first seen
458 v.requires_reminder || update_reference.latest_update_after_seen.is_some()
461 let direction_a_meets_criteria = update_meets_criteria(&v.updates.0);
462 let direction_b_meets_criteria = update_meets_criteria(&v.updates.1);
464 if !v.requires_reminder && !direction_a_meets_criteria && !direction_b_meets_criteria {
465 delta_set.remove(&k);
469 let new_length = delta_set.len();
470 if original_length != new_length {
471 log_info!(logger, "length modified!");