1 use std::collections::{BTreeMap, HashSet};
5 use std::time::{Instant, SystemTime, UNIX_EPOCH};
7 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate};
8 use lightning::routing::gossip::NetworkGraph;
9 use lightning::util::ser::Readable;
10 use tokio_postgres::Client;
12 use futures::StreamExt;
13 use lightning::{log_debug, log_gossip, log_info};
14 use lightning::util::logger::Logger;
17 use crate::serialization::MutatedProperties;
19 /// The delta set needs to be a BTreeMap so the keys are sorted.
20 /// That way, the scids in the response automatically grow monotonically
21 pub(super) type DeltaSet = BTreeMap<u64, ChannelDelta>;
23 pub(super) struct AnnouncementDelta {
25 pub(super) announcement: UnsignedChannelAnnouncement,
28 pub(super) struct UpdateDelta {
30 pub(super) update: UnsignedChannelUpdate,
33 pub(super) struct DirectedUpdateDelta {
34 /// the last update we saw prior to the user-provided timestamp
35 pub(super) last_update_before_seen: Option<UpdateDelta>,
36 /// the latest update we saw overall
37 pub(super) latest_update_after_seen: Option<UpdateDelta>,
38 /// the set of all mutated properties across all updates between the last seen by the user and
39 /// the latest one known to us
40 pub(super) mutated_properties: MutatedProperties,
41 /// Specifically for reminder updates, the flag-only value to send to the client
42 pub(super) serialization_update_flags: Option<u8>
45 pub(super) struct ChannelDelta {
46 pub(super) announcement: Option<AnnouncementDelta>,
47 pub(super) updates: (Option<DirectedUpdateDelta>, Option<DirectedUpdateDelta>),
48 pub(super) first_bidirectional_updates_seen: Option<u32>,
49 /// The seen timestamp of the older of the two latest directional updates
50 pub(super) requires_reminder: bool,
53 impl Default for ChannelDelta {
54 fn default() -> Self {
57 updates: (None, None),
58 first_bidirectional_updates_seen: None,
59 requires_reminder: false,
64 impl Default for DirectedUpdateDelta {
65 fn default() -> Self {
67 last_update_before_seen: None,
68 mutated_properties: MutatedProperties::default(),
69 latest_update_after_seen: None,
70 serialization_update_flags: None,
75 /// Fetch all the channel announcements that are presently in the network graph, regardless of
76 /// whether they had been seen before.
77 /// Also include all announcements for which the first update was announced
78 /// after `last_sync_timestamp`
79 pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<L>>, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger {
80 log_info!(logger, "Obtaining channel ids from network graph");
82 let read_only_graph = network_graph.read_only();
83 log_info!(logger, "Retrieved read-only network graph copy");
84 let channel_iterator = read_only_graph.channels().unordered_iter();
86 .filter(|c| c.1.announcement_message.is_some())
87 .map(|c| c.1.announcement_message.as_ref().unwrap().contents.short_channel_id as i64)
91 log_info!(logger, "Channel IDs: {:?}", channel_ids);
92 log_info!(logger, "Last sync timestamp: {}", last_sync_timestamp);
93 let last_sync_timestamp_float = last_sync_timestamp as f64;
95 let current_time = SystemTime::now();
96 let current_timestamp = current_time.duration_since(UNIX_EPOCH).unwrap().as_secs();
97 log_info!(logger, "Current timestamp: {}", current_timestamp);
99 let current_day = current_timestamp / (24 * 3600);
100 let snapshot_scope = current_timestamp.saturating_sub(last_sync_timestamp as u64);
101 log_debug!(logger, "Current day index: {}", current_day);
102 log_debug!(logger, "Snapshot scope: {}s", snapshot_scope);
103 let include_reminders = ((current_day % 5) == 0 || snapshot_scope > (40 * 3600));
105 log_info!(logger, "Obtaining corresponding database entries");
106 // get all the channel announcements that are currently in the network graph
107 let announcement_rows = client.query_raw("SELECT announcement_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", [&channel_ids]).await.unwrap();
108 let mut pinned_rows = Box::pin(announcement_rows);
110 let mut announcement_count = 0;
111 while let Some(row_res) = pinned_rows.next().await {
112 let current_announcement_row = row_res.unwrap();
113 let blob: Vec<u8> = current_announcement_row.get("announcement_signed");
114 let mut readable = Cursor::new(blob);
115 let unsigned_announcement = ChannelAnnouncement::read(&mut readable).unwrap().contents;
117 let scid = unsigned_announcement.short_channel_id;
118 let current_seen_timestamp = current_announcement_row.get::<_, i64>("seen") as u32;
120 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
121 (*current_channel_delta).announcement = Some(AnnouncementDelta {
122 announcement: unsigned_announcement,
123 seen: current_seen_timestamp,
126 announcement_count += 1;
128 log_info!(logger, "Fetched {} announcement rows", announcement_count);
131 // THIS STEP IS USED TO DETERMINE IF A CHANNEL SHOULD BE OMITTED FROM THE DELTA
133 log_info!(logger, "Annotating channel announcements whose oldest channel update in a given direction occurred after the last sync");
135 // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC // to find the oldest update in a given direction
136 // — From those updates, select distinct by (scid), ordered by seen DESC (to obtain the newer one per direction)
137 // This will allow us to mark the first time updates in both directions were seen
139 // here is where the channels whose first update in either direction occurred after
140 // `last_seen_timestamp` are added to the selection
141 let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] =
142 [&channel_ids, &last_sync_timestamp_float];
143 let newer_oldest_directional_updates = client.query_raw("
144 SELECT short_channel_id, CAST(EXTRACT('epoch' from distinct_chans.seen) AS BIGINT) AS seen FROM (
145 SELECT DISTINCT ON (short_channel_id) *
147 SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen
149 WHERE short_channel_id = any($1)
150 ORDER BY short_channel_id ASC, direction ASC, seen ASC
151 ) AS directional_last_seens
152 ORDER BY short_channel_id ASC, seen DESC
154 WHERE distinct_chans.seen >= TO_TIMESTAMP($2)
155 ", params).await.unwrap();
156 let mut pinned_updates = Box::pin(newer_oldest_directional_updates);
158 let mut newer_oldest_directional_update_count = 0;
159 while let Some(row_res) = pinned_updates.next().await {
160 let current_row = row_res.unwrap();
162 let scid: i64 = current_row.get("short_channel_id");
163 let current_seen_timestamp = current_row.get::<_, i64>("seen") as u32;
165 // the newer of the two oldest seen directional updates came after last sync timestamp
166 let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default());
167 // first time a channel was seen in both directions
168 (*current_channel_delta).first_bidirectional_updates_seen = Some(current_seen_timestamp);
170 newer_oldest_directional_update_count += 1;
172 log_info!(logger, "Fetched {} update rows of the first update in a new direction", newer_oldest_directional_update_count);
175 if include_reminders {
176 // THIS STEP IS USED TO DETERMINE IF A REMINDER UPDATE SHOULD BE SENT
178 log_info!(logger, "Annotating channel announcements whose latest channel update in a given direction occurred more than six days ago");
180 // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC
181 // — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction)
182 let reminder_threshold_timestamp = current_time.checked_sub(config::CHANNEL_REMINDER_AGE).unwrap().duration_since(UNIX_EPOCH).unwrap().as_secs() as f64;
184 log_info!(logger, "Fetch first time we saw the current value combination for each direction (prior mutations excepted)");
185 let reminder_lookup_threshold_timestamp = current_time.checked_sub(config::CHANNEL_REMINDER_AGE * 3).unwrap().duration_since(UNIX_EPOCH).unwrap().as_secs() as f64;
186 let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&channel_ids, &reminder_lookup_threshold_timestamp];
189 What exactly is the below query doing?
191 First, the inner query groups all channel updates by their scid/direction combination,
192 and then sorts those in reverse chronological order by the "seen" column.
194 Then, each row is annotated based on whether its subsequent row for the same scid/direction
195 combination has a different value for any one of these six fields:
196 disable, cltv_expiry_delta, htlc_minimum_msat, fee_base_msat, fee_proportional_millionths, htlc_maximum_msat
197 Those are simply the properties we use to keep track of channel mutations.
199 The outer query takes all of those results and selects the first value that has a distinct
200 successor for each scid/direction combination. That yields the first instance at which
201 a given channel configuration was received after any prior mutations.
203 Knowing that, we can check whether or not there have been any mutations within the
204 reminder requirement window. Because we only care about that window (and potentially the
205 2-week-window), we pre-filter the scanned updates by only those that were received within
206 3x the timeframe that we consider necessitates reminders.
209 let mutated_updates = client.query_raw("
210 SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM (
211 SELECT short_channel_id, direction, timestamp, seen, blob_signed, COALESCE (
212 disable<>lead(disable) OVER w1
214 cltv_expiry_delta<>lead(cltv_expiry_delta) OVER w1
216 htlc_minimum_msat<>lead(htlc_minimum_msat) OVER w1
218 fee_base_msat<>lead(fee_base_msat) OVER w1
220 fee_proportional_millionths<>lead(fee_proportional_millionths) OVER w1
222 htlc_maximum_msat<>lead(htlc_maximum_msat) OVER w1,
224 ) has_distinct_successor
226 WHERE short_channel_id = any($1) AND seen >= TO_TIMESTAMP($2)
227 WINDOW w1 AS (PARTITION BY short_channel_id, direction ORDER BY seen DESC)
229 WHERE has_distinct_successor
230 ORDER BY short_channel_id ASC, direction ASC, timestamp DESC
231 ", params).await.unwrap();
233 let mut pinned_updates = Box::pin(mutated_updates);
234 let mut older_latest_directional_update_count = 0;
235 while let Some(row_res) = pinned_updates.next().await {
236 let current_row = row_res.unwrap();
237 let seen = current_row.get::<_, i64>("seen") as u32;
239 if seen < reminder_threshold_timestamp as u32 {
240 let blob: Vec<u8> = current_row.get("blob_signed");
241 let mut readable = Cursor::new(blob);
242 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
244 let scid = unsigned_channel_update.short_channel_id;
245 let direction: bool = current_row.get("direction");
247 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
249 // We might be able to get away with not using this
250 (*current_channel_delta).requires_reminder = true;
251 older_latest_directional_update_count += 1;
253 if let Some(current_channel_info) = network_graph.read_only().channel(scid) {
254 if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() {
255 // we don't send reminders if we don't have bidirectional update data
259 if let Some(info) = current_channel_info.one_to_two.as_ref() {
260 let flags: u8 = if info.enabled { 0 } else { 2 };
261 let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default());
262 current_update.serialization_update_flags = Some(flags);
265 if let Some(info) = current_channel_info.two_to_one.as_ref() {
266 let flags: u8 = if info.enabled { 1 } else { 3 };
267 let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default());
268 current_update.serialization_update_flags = Some(flags);
271 // we don't send reminders if we don't have the channel
275 log_gossip!(logger, "Reminder requirement triggered by update for channel {} in direction {}", scid, direction);
278 log_info!(logger, "Fetched {} update rows of the latest update in the less recently updated direction", older_latest_directional_update_count);
282 pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger {
283 let start = Instant::now();
284 let last_sync_timestamp_float = last_sync_timestamp as f64;
286 // get the latest channel update in each direction prior to last_sync_timestamp, provided
287 // there was an update in either direction that happened after the last sync (to avoid
288 // collecting too many reference updates)
289 let reference_rows = client.query_raw("
290 SELECT id, direction, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen, blob_signed FROM channel_updates
292 SELECT DISTINCT ON (short_channel_id, direction) id
294 WHERE seen < TO_TIMESTAMP($1) AND short_channel_id IN (
295 SELECT DISTINCT ON (short_channel_id) short_channel_id
297 WHERE seen >= TO_TIMESTAMP($1)
299 ORDER BY short_channel_id ASC, direction ASC, seen DESC
301 ", [last_sync_timestamp_float]).await.unwrap();
302 let mut pinned_rows = Box::pin(reference_rows);
304 log_info!(logger, "Fetched reference rows in {:?}", start.elapsed());
306 let mut last_seen_update_ids: Vec<i32> = Vec::new();
307 let mut non_intermediate_ids: HashSet<i32> = HashSet::new();
308 let mut reference_row_count = 0;
310 while let Some(row_res) = pinned_rows.next().await {
311 let current_reference = row_res.unwrap();
312 let update_id: i32 = current_reference.get("id");
313 last_seen_update_ids.push(update_id);
314 non_intermediate_ids.insert(update_id);
316 let direction: bool = current_reference.get("direction");
317 let seen = current_reference.get::<_, i64>("seen") as u32;
318 let blob: Vec<u8> = current_reference.get("blob_signed");
319 let mut readable = Cursor::new(blob);
320 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
321 let scid = unsigned_channel_update.short_channel_id;
323 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
324 let update_delta = if !direction {
325 (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
327 (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
329 log_gossip!(logger, "Channel {} last update before seen: {}/{}/{}", scid, update_id, direction, unsigned_channel_update.timestamp);
330 update_delta.last_update_before_seen = Some(UpdateDelta {
332 update: unsigned_channel_update,
335 reference_row_count += 1;
338 log_info!(logger, "Processed {} reference rows (delta size: {}) in {:?}",
339 reference_row_count, delta_set.len(), start.elapsed());
341 // get all the intermediate channel updates
342 // (to calculate the set of mutated fields for snapshotting, where intermediate updates may
343 // have been omitted)
345 let intermediate_updates = client.query_raw("
346 SELECT id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen
348 WHERE seen >= TO_TIMESTAMP($1)
349 ORDER BY short_channel_id ASC, timestamp DESC
350 ", [last_sync_timestamp_float]).await.unwrap();
351 let mut pinned_updates = Box::pin(intermediate_updates);
352 log_info!(logger, "Fetched intermediate rows in {:?}", start.elapsed());
354 let mut previous_scid = u64::MAX;
355 let mut previously_seen_directions = (false, false);
357 // let mut previously_seen_directions = (false, false);
358 let mut intermediate_update_count = 0;
359 while let Some(row_res) = pinned_updates.next().await {
360 let intermediate_update = row_res.unwrap();
361 let update_id: i32 = intermediate_update.get("id");
362 if non_intermediate_ids.contains(&update_id) {
365 intermediate_update_count += 1;
367 let direction: bool = intermediate_update.get("direction");
368 let current_seen_timestamp = intermediate_update.get::<_, i64>("seen") as u32;
369 let blob: Vec<u8> = intermediate_update.get("blob_signed");
370 let mut readable = Cursor::new(blob);
371 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
373 let scid = unsigned_channel_update.short_channel_id;
374 if scid != previous_scid {
375 previous_scid = scid;
376 previously_seen_directions = (false, false);
379 // get the write configuration for this particular channel's directional details
380 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
381 let update_delta = if !direction {
382 (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
384 (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
388 // handle the latest deltas
389 if !direction && !previously_seen_directions.0 {
390 previously_seen_directions.0 = true;
391 update_delta.latest_update_after_seen = Some(UpdateDelta {
392 seen: current_seen_timestamp,
393 update: unsigned_channel_update.clone(),
395 } else if direction && !previously_seen_directions.1 {
396 previously_seen_directions.1 = true;
397 update_delta.latest_update_after_seen = Some(UpdateDelta {
398 seen: current_seen_timestamp,
399 update: unsigned_channel_update.clone(),
404 // determine mutations
405 if let Some(last_seen_update) = update_delta.last_update_before_seen.as_ref() {
406 if unsigned_channel_update.flags != last_seen_update.update.flags {
407 update_delta.mutated_properties.flags = true;
409 if unsigned_channel_update.cltv_expiry_delta != last_seen_update.update.cltv_expiry_delta {
410 update_delta.mutated_properties.cltv_expiry_delta = true;
412 if unsigned_channel_update.htlc_minimum_msat != last_seen_update.update.htlc_minimum_msat {
413 update_delta.mutated_properties.htlc_minimum_msat = true;
415 if unsigned_channel_update.fee_base_msat != last_seen_update.update.fee_base_msat {
416 update_delta.mutated_properties.fee_base_msat = true;
418 if unsigned_channel_update.fee_proportional_millionths != last_seen_update.update.fee_proportional_millionths {
419 update_delta.mutated_properties.fee_proportional_millionths = true;
421 if unsigned_channel_update.htlc_maximum_msat != last_seen_update.update.htlc_maximum_msat {
422 update_delta.mutated_properties.htlc_maximum_msat = true;
426 log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
429 pub(super) fn filter_delta_set<L: Deref>(delta_set: &mut DeltaSet, logger: L) where L::Target: Logger {
430 let original_length = delta_set.len();
431 let keys: Vec<u64> = delta_set.keys().cloned().collect();
433 let v = delta_set.get(&k).unwrap();
434 if v.announcement.is_none() {
435 // this channel is not currently in the network graph
436 delta_set.remove(&k);
440 let update_meets_criteria = |update: &Option<DirectedUpdateDelta>| {
441 if update.is_none() {
444 let update_reference = update.as_ref().unwrap();
445 // update_reference.latest_update_after_seen.is_some() && !update_reference.intermediate_updates.is_empty()
446 // if there has been an update after the channel was first seen
448 v.requires_reminder || update_reference.latest_update_after_seen.is_some()
451 let direction_a_meets_criteria = update_meets_criteria(&v.updates.0);
452 let direction_b_meets_criteria = update_meets_criteria(&v.updates.1);
454 if !v.requires_reminder && !direction_a_meets_criteria && !direction_b_meets_criteria {
455 delta_set.remove(&k);
459 let new_length = delta_set.len();
460 if original_length != new_length {
461 log_info!(logger, "length modified!");