1 use std::collections::{BTreeMap, HashSet};
5 use std::time::{Instant, SystemTime, UNIX_EPOCH};
7 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate};
8 use lightning::routing::gossip::NetworkGraph;
9 use lightning::util::ser::Readable;
10 use tokio_postgres::Client;
12 use futures::StreamExt;
13 use lightning::{log_gossip, log_info};
14 use lightning::util::logger::Logger;
17 use crate::serialization::MutatedProperties;
19 /// The delta set needs to be a BTreeMap so the keys are sorted.
20 /// That way, the scids in the response automatically grow monotonically
21 pub(super) type DeltaSet = BTreeMap<u64, ChannelDelta>;
23 pub(super) struct AnnouncementDelta {
25 pub(super) announcement: UnsignedChannelAnnouncement,
28 pub(super) struct UpdateDelta {
30 pub(super) update: UnsignedChannelUpdate,
33 pub(super) struct DirectedUpdateDelta {
34 /// the last update we saw prior to the user-provided timestamp
35 pub(super) last_update_before_seen: Option<UpdateDelta>,
36 /// the latest update we saw overall
37 pub(super) latest_update_after_seen: Option<UpdateDelta>,
38 /// the set of all mutated properties across all updates between the last seen by the user and
39 /// the latest one known to us
40 pub(super) mutated_properties: MutatedProperties,
41 /// Specifically for reminder updates, the flag-only value to send to the client
42 pub(super) serialization_update_flags: Option<u8>
45 pub(super) struct ChannelDelta {
46 pub(super) announcement: Option<AnnouncementDelta>,
47 pub(super) updates: (Option<DirectedUpdateDelta>, Option<DirectedUpdateDelta>),
48 pub(super) first_bidirectional_updates_seen: Option<u32>,
49 /// The seen timestamp of the older of the two latest directional updates
50 pub(super) requires_reminder: bool,
53 impl Default for ChannelDelta {
54 fn default() -> Self {
57 updates: (None, None),
58 first_bidirectional_updates_seen: None,
59 requires_reminder: false,
64 impl Default for DirectedUpdateDelta {
65 fn default() -> Self {
67 last_update_before_seen: None,
68 mutated_properties: MutatedProperties::default(),
69 latest_update_after_seen: None,
70 serialization_update_flags: None,
75 /// Fetch all the channel announcements that are presently in the network graph, regardless of
76 /// whether they had been seen before.
77 /// Also include all announcements for which the first update was announced
78 /// after `last_sync_timestamp`
79 pub(super) async fn fetch_channel_announcements<L: Deref>(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<L>>, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger {
80 log_info!(logger, "Obtaining channel ids from network graph");
82 let read_only_graph = network_graph.read_only();
83 log_info!(logger, "Retrieved read-only network graph copy");
84 let channel_iterator = read_only_graph.channels().unordered_iter();
86 .filter(|c| c.1.announcement_message.is_some())
87 .map(|c| c.1.announcement_message.as_ref().unwrap().contents.short_channel_id as i64)
91 log_info!(logger, "Channel IDs: {:?}", channel_ids);
92 log_info!(logger, "Last sync timestamp: {}", last_sync_timestamp);
93 let last_sync_timestamp_float = last_sync_timestamp as f64;
95 log_info!(logger, "Obtaining corresponding database entries");
96 // get all the channel announcements that are currently in the network graph
97 let announcement_rows = client.query_raw("SELECT announcement_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", [&channel_ids]).await.unwrap();
98 let mut pinned_rows = Box::pin(announcement_rows);
100 let mut announcement_count = 0;
101 while let Some(row_res) = pinned_rows.next().await {
102 let current_announcement_row = row_res.unwrap();
103 let blob: Vec<u8> = current_announcement_row.get("announcement_signed");
104 let mut readable = Cursor::new(blob);
105 let unsigned_announcement = ChannelAnnouncement::read(&mut readable).unwrap().contents;
107 let scid = unsigned_announcement.short_channel_id;
108 let current_seen_timestamp = current_announcement_row.get::<_, i64>("seen") as u32;
110 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
111 (*current_channel_delta).announcement = Some(AnnouncementDelta {
112 announcement: unsigned_announcement,
113 seen: current_seen_timestamp,
116 announcement_count += 1;
118 log_info!(logger, "Fetched {} announcement rows", announcement_count);
121 // THIS STEP IS USED TO DETERMINE IF A CHANNEL SHOULD BE OMITTED FROM THE DELTA
123 log_info!(logger, "Annotating channel announcements whose oldest channel update in a given direction occurred after the last sync");
125 // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC // to find the oldest update in a given direction
126 // — From those updates, select distinct by (scid), ordered by seen DESC (to obtain the newer one per direction)
127 // This will allow us to mark the first time updates in both directions were seen
129 // here is where the channels whose first update in either direction occurred after
130 // `last_seen_timestamp` are added to the selection
131 let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] =
132 [&channel_ids, &last_sync_timestamp_float];
133 let newer_oldest_directional_updates = client.query_raw("
134 SELECT short_channel_id, CAST(EXTRACT('epoch' from distinct_chans.seen) AS BIGINT) AS seen FROM (
135 SELECT DISTINCT ON (short_channel_id) *
137 SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, seen
139 WHERE short_channel_id = any($1)
140 ORDER BY short_channel_id ASC, direction ASC, seen ASC
141 ) AS directional_last_seens
142 ORDER BY short_channel_id ASC, seen DESC
144 WHERE distinct_chans.seen >= TO_TIMESTAMP($2)
145 ", params).await.unwrap();
146 let mut pinned_updates = Box::pin(newer_oldest_directional_updates);
148 let mut newer_oldest_directional_update_count = 0;
149 while let Some(row_res) = pinned_updates.next().await {
150 let current_row = row_res.unwrap();
152 let scid: i64 = current_row.get("short_channel_id");
153 let current_seen_timestamp = current_row.get::<_, i64>("seen") as u32;
155 // the newer of the two oldest seen directional updates came after last sync timestamp
156 let current_channel_delta = delta_set.entry(scid as u64).or_insert(ChannelDelta::default());
157 // first time a channel was seen in both directions
158 (*current_channel_delta).first_bidirectional_updates_seen = Some(current_seen_timestamp);
160 newer_oldest_directional_update_count += 1;
162 log_info!(logger, "Fetched {} update rows of the first update in a new direction", newer_oldest_directional_update_count);
166 // THIS STEP IS USED TO DETERMINE IF A REMINDER UPDATE SHOULD BE SENT
168 log_info!(logger, "Annotating channel announcements whose latest channel update in a given direction occurred more than six days ago");
170 // — Obtain all updates, distinct by (scid, direction), ordered by seen DESC
171 // — From those updates, select distinct by (scid), ordered by seen ASC (to obtain the older one per direction)
172 let reminder_threshold_timestamp = SystemTime::now().checked_sub(config::CHANNEL_REMINDER_AGE).unwrap().duration_since(UNIX_EPOCH).unwrap().as_secs() as f64;
174 log_info!(logger, "Fetch first time we saw the current value combination for each direction (prior mutations excepted)");
175 let reminder_lookup_threshold_timestamp = SystemTime::now().checked_sub(config::CHANNEL_REMINDER_AGE * 3).unwrap().duration_since(UNIX_EPOCH).unwrap().as_secs() as f64;
176 let params: [&(dyn tokio_postgres::types::ToSql + Sync); 2] = [&channel_ids, &reminder_lookup_threshold_timestamp];
179 What exactly is the below query doing?
181 First, the inner query groups all channel updates by their scid/direction combination,
182 and then sorts those in reverse chronological order by the "seen" column.
184 Then, each row is annotated based on whether its subsequent row for the same scid/direction
185 combination has a different value for any one of these six fields:
186 disable, cltv_expiry_delta, htlc_minimum_msat, fee_base_msat, fee_proportional_millionths, htlc_maximum_msat
187 Those are simply the properties we use to keep track of channel mutations.
189 The outer query takes all of those results and selects the first value that has a distinct
190 successor for each scid/direction combination. That yields the first instance at which
191 a given channel configuration was received after any prior mutations.
193 Knowing that, we can check whether or not there have been any mutations within the
194 reminder requirement window. Because we only care about that window (and potentially the
195 2-week-window), we pre-filter the scanned updates by only those that were received within
196 3x the timeframe that we consider necessitates reminders.
199 let mutated_updates = client.query_raw("
200 SELECT DISTINCT ON (short_channel_id, direction) short_channel_id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen FROM (
201 SELECT short_channel_id, direction, timestamp, seen, blob_signed, COALESCE (
202 disable<>lead(disable) OVER w1
204 cltv_expiry_delta<>lead(cltv_expiry_delta) OVER w1
206 htlc_minimum_msat<>lead(htlc_minimum_msat) OVER w1
208 fee_base_msat<>lead(fee_base_msat) OVER w1
210 fee_proportional_millionths<>lead(fee_proportional_millionths) OVER w1
212 htlc_maximum_msat<>lead(htlc_maximum_msat) OVER w1,
214 ) has_distinct_successor
216 WHERE short_channel_id = any($1) AND seen >= TO_TIMESTAMP($2)
217 WINDOW w1 AS (PARTITION BY short_channel_id, direction ORDER BY seen DESC)
219 WHERE has_distinct_successor
220 ORDER BY short_channel_id ASC, direction ASC, timestamp DESC
221 ", params).await.unwrap();
223 let mut pinned_updates = Box::pin(mutated_updates);
224 let mut older_latest_directional_update_count = 0;
225 while let Some(row_res) = pinned_updates.next().await {
226 let current_row = row_res.unwrap();
227 let seen = current_row.get::<_, i64>("seen") as u32;
229 if seen < reminder_threshold_timestamp as u32 {
230 let blob: Vec<u8> = current_row.get("blob_signed");
231 let mut readable = Cursor::new(blob);
232 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
234 let scid = unsigned_channel_update.short_channel_id;
235 let direction: bool = current_row.get("direction");
237 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
239 // We might be able to get away with not using this
240 (*current_channel_delta).requires_reminder = true;
241 older_latest_directional_update_count += 1;
243 if let Some(current_channel_info) = network_graph.read_only().channel(scid) {
244 if current_channel_info.one_to_two.is_none() || current_channel_info.two_to_one.is_none() {
245 // we don't send reminders if we don't have bidirectional update data
249 if let Some(info) = current_channel_info.one_to_two.as_ref() {
250 let flags: u8 = if info.enabled { 0 } else { 2 };
251 let current_update = (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default());
252 current_update.serialization_update_flags = Some(flags);
255 if let Some(info) = current_channel_info.two_to_one.as_ref() {
256 let flags: u8 = if info.enabled { 1 } else { 3 };
257 let current_update = (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default());
258 current_update.serialization_update_flags = Some(flags);
261 // we don't send reminders if we don't have the channel
265 log_gossip!(logger, "Reminder requirement triggered by update for channel {} in direction {}", scid, direction);
268 log_info!(logger, "Fetched {} update rows of the latest update in the less recently updated direction", older_latest_directional_update_count);
272 pub(super) async fn fetch_channel_updates<L: Deref>(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32, logger: L) where L::Target: Logger {
273 let start = Instant::now();
274 let last_sync_timestamp_float = last_sync_timestamp as f64;
276 // get the latest channel update in each direction prior to last_sync_timestamp, provided
277 // there was an update in either direction that happened after the last sync (to avoid
278 // collecting too many reference updates)
279 let reference_rows = client.query_raw("
280 SELECT id, direction, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen, blob_signed FROM channel_updates
282 SELECT DISTINCT ON (short_channel_id, direction) id
284 WHERE seen < TO_TIMESTAMP($1) AND short_channel_id IN (
285 SELECT DISTINCT ON (short_channel_id) short_channel_id
287 WHERE seen >= TO_TIMESTAMP($1)
289 ORDER BY short_channel_id ASC, direction ASC, seen DESC
291 ", [last_sync_timestamp_float]).await.unwrap();
292 let mut pinned_rows = Box::pin(reference_rows);
294 log_info!(logger, "Fetched reference rows in {:?}", start.elapsed());
296 let mut last_seen_update_ids: Vec<i32> = Vec::new();
297 let mut non_intermediate_ids: HashSet<i32> = HashSet::new();
298 let mut reference_row_count = 0;
300 while let Some(row_res) = pinned_rows.next().await {
301 let current_reference = row_res.unwrap();
302 let update_id: i32 = current_reference.get("id");
303 last_seen_update_ids.push(update_id);
304 non_intermediate_ids.insert(update_id);
306 let direction: bool = current_reference.get("direction");
307 let seen = current_reference.get::<_, i64>("seen") as u32;
308 let blob: Vec<u8> = current_reference.get("blob_signed");
309 let mut readable = Cursor::new(blob);
310 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
311 let scid = unsigned_channel_update.short_channel_id;
313 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
314 let update_delta = if !direction {
315 (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
317 (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
319 log_gossip!(logger, "Channel {} last update before seen: {}/{}/{}", scid, update_id, direction, unsigned_channel_update.timestamp);
320 update_delta.last_update_before_seen = Some(UpdateDelta {
322 update: unsigned_channel_update,
325 reference_row_count += 1;
328 log_info!(logger, "Processed {} reference rows (delta size: {}) in {:?}",
329 reference_row_count, delta_set.len(), start.elapsed());
331 // get all the intermediate channel updates
332 // (to calculate the set of mutated fields for snapshotting, where intermediate updates may
333 // have been omitted)
335 let intermediate_updates = client.query_raw("
336 SELECT id, direction, blob_signed, CAST(EXTRACT('epoch' from seen) AS BIGINT) AS seen
338 WHERE seen >= TO_TIMESTAMP($1)
339 ORDER BY short_channel_id ASC, timestamp DESC
340 ", [last_sync_timestamp_float]).await.unwrap();
341 let mut pinned_updates = Box::pin(intermediate_updates);
342 log_info!(logger, "Fetched intermediate rows in {:?}", start.elapsed());
344 let mut previous_scid = u64::MAX;
345 let mut previously_seen_directions = (false, false);
347 // let mut previously_seen_directions = (false, false);
348 let mut intermediate_update_count = 0;
349 while let Some(row_res) = pinned_updates.next().await {
350 let intermediate_update = row_res.unwrap();
351 let update_id: i32 = intermediate_update.get("id");
352 if non_intermediate_ids.contains(&update_id) {
355 intermediate_update_count += 1;
357 let direction: bool = intermediate_update.get("direction");
358 let current_seen_timestamp = intermediate_update.get::<_, i64>("seen") as u32;
359 let blob: Vec<u8> = intermediate_update.get("blob_signed");
360 let mut readable = Cursor::new(blob);
361 let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
363 let scid = unsigned_channel_update.short_channel_id;
364 if scid != previous_scid {
365 previous_scid = scid;
366 previously_seen_directions = (false, false);
369 // get the write configuration for this particular channel's directional details
370 let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
371 let update_delta = if !direction {
372 (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
374 (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
378 // handle the latest deltas
379 if !direction && !previously_seen_directions.0 {
380 previously_seen_directions.0 = true;
381 update_delta.latest_update_after_seen = Some(UpdateDelta {
382 seen: current_seen_timestamp,
383 update: unsigned_channel_update.clone(),
385 } else if direction && !previously_seen_directions.1 {
386 previously_seen_directions.1 = true;
387 update_delta.latest_update_after_seen = Some(UpdateDelta {
388 seen: current_seen_timestamp,
389 update: unsigned_channel_update.clone(),
394 // determine mutations
395 if let Some(last_seen_update) = update_delta.last_update_before_seen.as_ref() {
396 if unsigned_channel_update.flags != last_seen_update.update.flags {
397 update_delta.mutated_properties.flags = true;
399 if unsigned_channel_update.cltv_expiry_delta != last_seen_update.update.cltv_expiry_delta {
400 update_delta.mutated_properties.cltv_expiry_delta = true;
402 if unsigned_channel_update.htlc_minimum_msat != last_seen_update.update.htlc_minimum_msat {
403 update_delta.mutated_properties.htlc_minimum_msat = true;
405 if unsigned_channel_update.fee_base_msat != last_seen_update.update.fee_base_msat {
406 update_delta.mutated_properties.fee_base_msat = true;
408 if unsigned_channel_update.fee_proportional_millionths != last_seen_update.update.fee_proportional_millionths {
409 update_delta.mutated_properties.fee_proportional_millionths = true;
411 if unsigned_channel_update.htlc_maximum_msat != last_seen_update.update.htlc_maximum_msat {
412 update_delta.mutated_properties.htlc_maximum_msat = true;
416 log_info!(logger, "Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
419 pub(super) fn filter_delta_set<L: Deref>(delta_set: &mut DeltaSet, logger: L) where L::Target: Logger {
420 let original_length = delta_set.len();
421 let keys: Vec<u64> = delta_set.keys().cloned().collect();
423 let v = delta_set.get(&k).unwrap();
424 if v.announcement.is_none() {
425 // this channel is not currently in the network graph
426 delta_set.remove(&k);
430 let update_meets_criteria = |update: &Option<DirectedUpdateDelta>| {
431 if update.is_none() {
434 let update_reference = update.as_ref().unwrap();
435 // update_reference.latest_update_after_seen.is_some() && !update_reference.intermediate_updates.is_empty()
436 // if there has been an update after the channel was first seen
438 v.requires_reminder || update_reference.latest_update_after_seen.is_some()
441 let direction_a_meets_criteria = update_meets_criteria(&v.updates.0);
442 let direction_b_meets_criteria = update_meets_criteria(&v.updates.1);
444 if !v.requires_reminder && !direction_a_meets_criteria && !direction_b_meets_criteria {
445 delta_set.remove(&k);
449 let new_length = delta_set.len();
450 if original_length != new_length {
451 log_info!(logger, "length modified!");