Include old updates when necessary.
[rapid-gossip-sync-server] / src / serialization.rs
1 use std::cmp::max;
2 use std::collections::HashMap;
3 use std::time::{SystemTime, UNIX_EPOCH};
4
5 use bitcoin::BlockHash;
6 use bitcoin::hashes::Hash;
7 use lightning::ln::msgs::{UnsignedChannelAnnouncement, UnsignedChannelUpdate};
8 use lightning::util::ser::{BigSize, Writeable};
9 use crate::config;
10
11 use crate::lookup::{DeltaSet, DirectedUpdateDelta};
12
13 pub(super) struct SerializationSet {
14         pub(super) announcements: Vec<UnsignedChannelAnnouncement>,
15         pub(super) updates: Vec<UpdateSerialization>,
16         pub(super) full_update_defaults: DefaultUpdateValues,
17         pub(super) latest_seen: u32,
18         pub(super) chain_hash: BlockHash,
19 }
20
21 pub(super) struct DefaultUpdateValues {
22         pub(super) cltv_expiry_delta: u16,
23         pub(super) htlc_minimum_msat: u64,
24         pub(super) fee_base_msat: u32,
25         pub(super) fee_proportional_millionths: u32,
26         pub(super) htlc_maximum_msat: u64,
27 }
28
29 impl Default for DefaultUpdateValues {
30         fn default() -> Self {
31                 Self {
32                         cltv_expiry_delta: 0,
33                         htlc_minimum_msat: 0,
34                         fee_base_msat: 0,
35                         fee_proportional_millionths: 0,
36                         htlc_maximum_msat: 0,
37                 }
38         }
39 }
40
41 pub(super) struct MutatedProperties {
42         pub(super) flags: bool,
43         pub(super) cltv_expiry_delta: bool,
44         pub(super) htlc_minimum_msat: bool,
45         pub(super) fee_base_msat: bool,
46         pub(super) fee_proportional_millionths: bool,
47         pub(super) htlc_maximum_msat: bool,
48 }
49
50 impl Default for MutatedProperties {
51         fn default() -> Self {
52                 Self {
53                         flags: false,
54                         cltv_expiry_delta: false,
55                         htlc_minimum_msat: false,
56                         fee_base_msat: false,
57                         fee_proportional_millionths: false,
58                         htlc_maximum_msat: false,
59                 }
60         }
61 }
62
63 impl MutatedProperties {
64         /// Does not include flags because the flag byte is always sent in full
65         fn len(&self) -> u8 {
66                 let mut mutations = 0;
67                 if self.cltv_expiry_delta { mutations += 1; };
68                 if self.htlc_minimum_msat { mutations += 1; };
69                 if self.fee_base_msat { mutations += 1; };
70                 if self.fee_proportional_millionths { mutations += 1; };
71                 if self.htlc_maximum_msat { mutations += 1; };
72                 mutations
73         }
74 }
75
76 pub(super) enum UpdateSerialization {
77         Full(UnsignedChannelUpdate),
78         Incremental(UnsignedChannelUpdate, MutatedProperties),
79         Reminder(u64, u8),
80 }
81 impl UpdateSerialization {
82         pub(super) fn scid(&self) -> u64 {
83                 match self {
84                         UpdateSerialization::Full(latest_update)|
85                         UpdateSerialization::Incremental(latest_update, _) => latest_update.short_channel_id,
86                         UpdateSerialization::Reminder(scid, _) => *scid,
87                 }
88         }
89
90         fn flags(&self) -> u8 {
91                 match self {
92                         UpdateSerialization::Full(latest_update)|
93                         UpdateSerialization::Incremental(latest_update, _) => latest_update.flags,
94                         UpdateSerialization::Reminder(_, flags) => *flags,
95                 }
96         }
97 }
98
99 struct FullUpdateValueHistograms {
100         cltv_expiry_delta: HashMap<u16, usize>,
101         htlc_minimum_msat: HashMap<u64, usize>,
102         fee_base_msat: HashMap<u32, usize>,
103         fee_proportional_millionths: HashMap<u32, usize>,
104         htlc_maximum_msat: HashMap<u64, usize>,
105 }
106
107 pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_timestamp: u32) -> SerializationSet {
108         let mut serialization_set = SerializationSet {
109                 announcements: vec![],
110                 updates: vec![],
111                 full_update_defaults: Default::default(),
112                 chain_hash: BlockHash::all_zeros(),
113                 latest_seen: 0,
114         };
115
116         let mut chain_hash_set = false;
117
118         let mut full_update_histograms = FullUpdateValueHistograms {
119                 cltv_expiry_delta: Default::default(),
120                 htlc_minimum_msat: Default::default(),
121                 fee_base_msat: Default::default(),
122                 fee_proportional_millionths: Default::default(),
123                 htlc_maximum_msat: Default::default(),
124         };
125
126         let mut record_full_update_in_histograms = |full_update: &UnsignedChannelUpdate| {
127                 *full_update_histograms.cltv_expiry_delta.entry(full_update.cltv_expiry_delta).or_insert(0) += 1;
128                 *full_update_histograms.htlc_minimum_msat.entry(full_update.htlc_minimum_msat).or_insert(0) += 1;
129                 *full_update_histograms.fee_base_msat.entry(full_update.fee_base_msat).or_insert(0) += 1;
130                 *full_update_histograms.fee_proportional_millionths.entry(full_update.fee_proportional_millionths).or_insert(0) += 1;
131                 *full_update_histograms.htlc_maximum_msat.entry(full_update.htlc_maximum_msat).or_insert(0) += 1;
132         };
133
134         // if the previous seen update happened more than 6 days ago, the client may have pruned it, and an incremental update wouldn't work
135         let non_incremental_previous_update_threshold_timestamp = SystemTime::now().checked_sub(config::CHANNEL_REMINDER_AGE).unwrap().duration_since(UNIX_EPOCH).unwrap().as_secs() as u32;
136
137         for (scid, channel_delta) in delta_set.into_iter() {
138
139                 // any announcement chain hash is gonna be the same value. Just set it from the first one.
140                 let channel_announcement_delta = channel_delta.announcement.as_ref().unwrap();
141                 if !chain_hash_set {
142                         chain_hash_set = true;
143                         serialization_set.chain_hash = channel_announcement_delta.announcement.chain_hash.clone();
144                 }
145
146                 let current_announcement_seen = channel_announcement_delta.seen;
147                 let is_new_announcement = current_announcement_seen >= last_sync_timestamp;
148                 let is_newly_included_announcement = if let Some(first_update_seen) = channel_delta.first_bidirectional_updates_seen {
149                         first_update_seen >= last_sync_timestamp
150                 } else {
151                         false
152                 };
153                 let send_announcement = is_new_announcement || is_newly_included_announcement;
154                 if send_announcement {
155                         serialization_set.latest_seen = max(serialization_set.latest_seen, current_announcement_seen);
156                         serialization_set.announcements.push(channel_delta.announcement.unwrap().announcement);
157                 }
158
159                 let direction_a_updates = channel_delta.updates.0;
160                 let direction_b_updates = channel_delta.updates.1;
161
162                 let mut categorize_directed_update_serialization = |directed_updates: Option<DirectedUpdateDelta>| {
163                         if let Some(updates) = directed_updates {
164                                 if let Some(latest_update_delta) = updates.latest_update_after_seen {
165                                         let latest_update = latest_update_delta.update;
166                                         assert_eq!(latest_update.short_channel_id, scid, "Update in DB had wrong SCID column");
167
168                                         // the returned seen timestamp should be the latest of all the returned
169                                         // announcements and latest updates
170                                         serialization_set.latest_seen = max(serialization_set.latest_seen, latest_update_delta.seen);
171
172                                         if let Some(update_delta) = updates.last_update_before_seen {
173                                                 let mutated_properties = updates.mutated_properties;
174                                                 if send_announcement || mutated_properties.len() == 5 || update_delta.seen <= non_incremental_previous_update_threshold_timestamp {
175                                                         // all five values have changed, it makes more sense to just
176                                                         // serialize the update as a full update instead of as a change
177                                                         // this way, the default values can be computed more efficiently
178                                                         record_full_update_in_histograms(&latest_update);
179                                                         serialization_set.updates.push(UpdateSerialization::Full(latest_update));
180                                                 } else if mutated_properties.len() > 0 || mutated_properties.flags {
181                                                         // we don't count flags as mutated properties
182                                                         serialization_set.updates.push(
183                                                                 UpdateSerialization::Incremental(latest_update, mutated_properties));
184                                                 }
185                                         } else {
186                                                 // serialize the full update
187                                                 record_full_update_in_histograms(&latest_update);
188                                                 serialization_set.updates.push(UpdateSerialization::Full(latest_update));
189                                         }
190                                 } else if is_newly_included_announcement {
191                                         if let Some(unannounced_update) = updates.last_update_before_seen {
192                                                 serialization_set.updates.push(UpdateSerialization::Full(unannounced_update.update));
193                                         }
194                                 } else if let Some(flags) = updates.serialization_update_flags {
195                                         serialization_set.updates.push(UpdateSerialization::Reminder(scid, flags));
196                                 }
197                         }
198                 };
199
200                 categorize_directed_update_serialization(direction_a_updates);
201                 categorize_directed_update_serialization(direction_b_updates);
202         }
203
204         let default_update_values = DefaultUpdateValues {
205                 cltv_expiry_delta: find_most_common_histogram_entry_with_default(full_update_histograms.cltv_expiry_delta, 0),
206                 htlc_minimum_msat: find_most_common_histogram_entry_with_default(full_update_histograms.htlc_minimum_msat, 0),
207                 fee_base_msat: find_most_common_histogram_entry_with_default(full_update_histograms.fee_base_msat, 0),
208                 fee_proportional_millionths: find_most_common_histogram_entry_with_default(full_update_histograms.fee_proportional_millionths, 0),
209                 htlc_maximum_msat: find_most_common_histogram_entry_with_default(full_update_histograms.htlc_maximum_msat, 0),
210         };
211
212         serialization_set.full_update_defaults = default_update_values;
213         serialization_set
214 }
215
216 pub fn serialize_stripped_channel_announcement(announcement: &UnsignedChannelAnnouncement, node_id_a_index: usize, node_id_b_index: usize, previous_scid: u64) -> Vec<u8> {
217         let mut stripped_announcement = vec![];
218
219         announcement.features.write(&mut stripped_announcement).unwrap();
220
221         if previous_scid > announcement.short_channel_id {
222                 panic!("unsorted scids!");
223         }
224         let scid_delta = BigSize(announcement.short_channel_id - previous_scid);
225         scid_delta.write(&mut stripped_announcement).unwrap();
226
227         // write indices of node ids rather than the node IDs themselves
228         BigSize(node_id_a_index as u64).write(&mut stripped_announcement).unwrap();
229         BigSize(node_id_b_index as u64).write(&mut stripped_announcement).unwrap();
230
231         // println!("serialized CA: {}, \n{:?}\n{:?}\n", announcement.short_channel_id, announcement.node_id_1, announcement.node_id_2);
232         stripped_announcement
233 }
234
235 pub(super) fn serialize_stripped_channel_update(update: &UpdateSerialization, default_values: &DefaultUpdateValues, previous_scid: u64) -> Vec<u8> {
236         let mut serialized_flags = update.flags();
237
238         if previous_scid > update.scid() {
239                 panic!("unsorted scids!");
240         }
241
242         let mut delta_serialization = Vec::new();
243         let mut prefixed_serialization = Vec::new();
244
245         match update {
246                 UpdateSerialization::Full(latest_update) => {
247                         if latest_update.cltv_expiry_delta != default_values.cltv_expiry_delta {
248                                 serialized_flags |= 0b_0100_0000;
249                                 latest_update.cltv_expiry_delta.write(&mut delta_serialization).unwrap();
250                         }
251
252                         if latest_update.htlc_minimum_msat != default_values.htlc_minimum_msat {
253                                 serialized_flags |= 0b_0010_0000;
254                                 latest_update.htlc_minimum_msat.write(&mut delta_serialization).unwrap();
255                         }
256
257                         if latest_update.fee_base_msat != default_values.fee_base_msat {
258                                 serialized_flags |= 0b_0001_0000;
259                                 latest_update.fee_base_msat.write(&mut delta_serialization).unwrap();
260                         }
261
262                         if latest_update.fee_proportional_millionths != default_values.fee_proportional_millionths {
263                                 serialized_flags |= 0b_0000_1000;
264                                 latest_update.fee_proportional_millionths.write(&mut delta_serialization).unwrap();
265                         }
266
267                         if latest_update.htlc_maximum_msat != default_values.htlc_maximum_msat {
268                                 serialized_flags |= 0b_0000_0100;
269                                 latest_update.htlc_maximum_msat.write(&mut delta_serialization).unwrap();
270                         }
271                 }
272                 UpdateSerialization::Incremental(latest_update, mutated_properties) => {
273                         // indicate that this update is incremental
274                         serialized_flags |= 0b_1000_0000;
275
276                         if mutated_properties.cltv_expiry_delta {
277                                 serialized_flags |= 0b_0100_0000;
278                                 latest_update.cltv_expiry_delta.write(&mut delta_serialization).unwrap();
279                         }
280
281                         if mutated_properties.htlc_minimum_msat {
282                                 serialized_flags |= 0b_0010_0000;
283                                 latest_update.htlc_minimum_msat.write(&mut delta_serialization).unwrap();
284                         }
285
286                         if mutated_properties.fee_base_msat {
287                                 serialized_flags |= 0b_0001_0000;
288                                 latest_update.fee_base_msat.write(&mut delta_serialization).unwrap();
289                         }
290
291                         if mutated_properties.fee_proportional_millionths {
292                                 serialized_flags |= 0b_0000_1000;
293                                 latest_update.fee_proportional_millionths.write(&mut delta_serialization).unwrap();
294                         }
295
296                         if mutated_properties.htlc_maximum_msat {
297                                 serialized_flags |= 0b_0000_0100;
298                                 latest_update.htlc_maximum_msat.write(&mut delta_serialization).unwrap();
299                         }
300                 },
301                 UpdateSerialization::Reminder(_, _) => {
302                         // indicate that this update is incremental
303                         serialized_flags |= 0b_1000_0000;
304                 }
305         }
306         let scid_delta = BigSize(update.scid() - previous_scid);
307         scid_delta.write(&mut prefixed_serialization).unwrap();
308
309         serialized_flags.write(&mut prefixed_serialization).unwrap();
310         prefixed_serialization.append(&mut delta_serialization);
311
312         prefixed_serialization
313 }
314
315 pub(super) fn find_most_common_histogram_entry_with_default<T: Copy>(histogram: HashMap<T, usize>, default: T) -> T {
316         let most_frequent_entry = histogram.iter().max_by(|a, b| a.1.cmp(&b.1));
317         if let Some(entry_details) = most_frequent_entry {
318                 // .0 is the value
319                 // .1 is the frequency
320                 return entry_details.0.to_owned();
321         }
322         // the default should pretty much always be a 0 as T
323         // though for htlc maximum msat it could be a u64::max
324         default
325 }