bump to ldk 118
[rapid-gossip-sync-server] / src / tests / mod.rs
1 //! Multi-module tests that use database fixtures
2
3 use std::cell::RefCell;
4 use std::sync::Arc;
5 use std::{fs, thread};
6 use std::time::{SystemTime, UNIX_EPOCH};
7 use bitcoin::blockdata::constants::ChainHash;
8 use bitcoin::Network;
9 use bitcoin::secp256k1::ecdsa::Signature;
10 use bitcoin::secp256k1::{Secp256k1, SecretKey};
11 use bitcoin::hashes::Hash;
12 use bitcoin::hashes::hex::ToHex;
13 use bitcoin::hashes::sha256d::Hash as Sha256dHash;
14 use lightning::ln::features::ChannelFeatures;
15 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate};
16 use lightning::routing::gossip::{NetworkGraph, NodeId};
17 use lightning::util::ser::Writeable;
18 use lightning_rapid_gossip_sync::RapidGossipSync;
19 use crate::{config, serialize_delta};
20 use crate::persistence::GossipPersister;
21 use crate::snapshot::Snapshotter;
22 use crate::types::{GossipMessage, tests::TestLogger};
23
24 const CLIENT_BACKDATE_INTERVAL: u32 = 3600 * 24 * 7; // client backdates RGS by a week
25
26 thread_local! {
27         static DB_TEST_SCHEMA: RefCell<Option<String>> = RefCell::new(None);
28         static IS_TEST_SCHEMA_CLEAN: RefCell<Option<bool>> = RefCell::new(None);
29 }
30
31 fn blank_signature() -> Signature {
32         Signature::from_compact(&[0u8; 64]).unwrap()
33 }
34
35 fn genesis_hash() -> ChainHash {
36         ChainHash::using_genesis_block(Network::Bitcoin)
37 }
38
39 fn current_time() -> u32 {
40         SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs() as u32
41 }
42
43 pub(crate) fn db_test_schema() -> String {
44         DB_TEST_SCHEMA.with(|suffix_reference| {
45                 let suffix_option = suffix_reference.borrow();
46                 suffix_option.as_ref().unwrap().clone()
47         })
48 }
49
50 fn generate_announcement(short_channel_id: u64) -> ChannelAnnouncement {
51         let secp_context = Secp256k1::new();
52
53         let random_private_key_1 = SecretKey::from_slice(&[1; 32]).unwrap();
54         let random_public_key_1 = random_private_key_1.public_key(&secp_context);
55         let node_id_1 = NodeId::from_pubkey(&random_public_key_1);
56
57         let random_private_key_2 = SecretKey::from_slice(&[2; 32]).unwrap();
58         let random_public_key_2 = random_private_key_2.public_key(&secp_context);
59         let node_id_2 = NodeId::from_pubkey(&random_public_key_2);
60
61         let announcement = UnsignedChannelAnnouncement {
62                 features: ChannelFeatures::empty(),
63                 chain_hash: genesis_hash(),
64                 short_channel_id,
65                 node_id_1,
66                 node_id_2,
67                 bitcoin_key_1: node_id_1,
68                 bitcoin_key_2: node_id_2,
69                 excess_data: vec![],
70         };
71
72         let msg_hash = bitcoin::secp256k1::Message::from_slice(&Sha256dHash::hash(&announcement.encode()[..])[..]).unwrap();
73         let node_signature_1 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_1);
74         let node_signature_2 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_2);
75
76         ChannelAnnouncement {
77                 node_signature_1,
78                 node_signature_2,
79                 bitcoin_signature_1: node_signature_1,
80                 bitcoin_signature_2: node_signature_2,
81                 contents: announcement,
82         }
83 }
84
85 fn generate_update(scid: u64, direction: bool, timestamp: u32, expiry_delta: u16, min_msat: u64, max_msat: u64, base_msat: u32, fee_rate: u32) -> ChannelUpdate {
86         let flag_mask = if direction { 1 } else { 0 };
87         ChannelUpdate {
88                 signature: blank_signature(),
89                 contents: UnsignedChannelUpdate {
90                         chain_hash: genesis_hash(),
91                         short_channel_id: scid,
92                         timestamp,
93                         flags: 0 | flag_mask,
94                         cltv_expiry_delta: expiry_delta,
95                         htlc_minimum_msat: min_msat,
96                         htlc_maximum_msat: max_msat,
97                         fee_base_msat: base_msat,
98                         fee_proportional_millionths: fee_rate,
99                         excess_data: vec![],
100                 },
101         }
102 }
103
104 struct SchemaSanitizer {}
105
106 impl SchemaSanitizer {
107         fn new() -> Self {
108                 IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
109                         let mut is_clean_option = cleanliness_reference.borrow_mut();
110                         assert!(is_clean_option.is_none());
111                         *is_clean_option = Some(false);
112                 });
113
114                 DB_TEST_SCHEMA.with(|suffix_reference| {
115                         let mut suffix_option = suffix_reference.borrow_mut();
116                         let current_time = SystemTime::now();
117                         let unix_time = current_time.duration_since(UNIX_EPOCH).expect("Time went backwards");
118                         let timestamp_seconds = unix_time.as_secs();
119                         let timestamp_nanos = unix_time.as_nanos();
120                         // sometimes Rust thinks two tests start at the same nanosecond, causing a schema conflict
121                         let thread_id = thread::current().id();
122                         let preimage = format!("{:?}-{}", thread_id, timestamp_nanos);
123                         println!("test schema preimage: {}", preimage);
124                         let suffix = Sha256dHash::hash(preimage.as_bytes()).into_inner().to_hex();
125                         // the schema must start with a letter
126                         let schema = format!("test_{}_{}", timestamp_seconds, suffix);
127                         *suffix_option = Some(schema);
128                 });
129
130                 return Self {};
131         }
132 }
133
134 impl Drop for SchemaSanitizer {
135         fn drop(&mut self) {
136                 IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
137                         let is_clean_option = cleanliness_reference.borrow();
138                         if let Some(is_clean) = *is_clean_option {
139                                 assert_eq!(is_clean, true);
140                         }
141                 });
142         }
143 }
144
145 struct CacheSanitizer {}
146
147 impl CacheSanitizer {
148         /// The CacheSanitizer instantiation requires that there be a schema sanitizer
149         fn new(_: &SchemaSanitizer) -> Self {
150                 Self {}
151         }
152
153         fn cache_path(&self) -> String {
154                 format!("./res/{}/", db_test_schema())
155         }
156 }
157
158 impl Drop for CacheSanitizer {
159         fn drop(&mut self) {
160                 let cache_path = self.cache_path();
161                 fs::remove_dir_all(cache_path).unwrap();
162         }
163 }
164
165
166 async fn clean_test_db() {
167         let client = crate::connect_to_db().await;
168         let schema = db_test_schema();
169         client.execute(&format!("DROP SCHEMA IF EXISTS {} CASCADE", schema), &[]).await.unwrap();
170         IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
171                 let mut is_clean_option = cleanliness_reference.borrow_mut();
172                 *is_clean_option = Some(true);
173         });
174 }
175
176 #[tokio::test]
177 async fn test_trivial_setup() {
178         let _sanitizer = SchemaSanitizer::new();
179         let logger = Arc::new(TestLogger::new());
180         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
181         let network_graph_arc = Arc::new(network_graph);
182         let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
183
184         let short_channel_id = 1;
185         let timestamp = current_time() - 10;
186         println!("timestamp: {}", timestamp);
187
188         { // seed the db
189                 let announcement = generate_announcement(short_channel_id);
190                 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 5, 0);
191                 let update_2 = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 10, 0);
192
193                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
194                 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
195                 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
196
197                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
198                 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
199                 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
200                 drop(receiver);
201                 persister.persist_gossip().await;
202         }
203
204         let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
205         logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
206         clean_test_db().await;
207
208         let channel_count = network_graph_arc.read_only().channels().len();
209
210         assert_eq!(channel_count, 1);
211         assert_eq!(serialization.message_count, 3);
212         assert_eq!(serialization.announcement_count, 1);
213         assert_eq!(serialization.update_count, 2);
214
215         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
216         let client_graph_arc = Arc::new(client_graph);
217         let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
218         let update_result = rgs.update_network_graph(&serialization.data).unwrap();
219         println!("update result: {}", update_result);
220         // the update result must be a multiple of our snapshot granularity
221         assert_eq!(update_result % config::snapshot_generation_interval(), 0);
222         assert!(update_result < timestamp);
223
224         let timestamp_delta = timestamp - update_result;
225         println!("timestamp delta: {}", timestamp_delta);
226         assert!(timestamp_delta < config::snapshot_generation_interval());
227
228         let readonly_graph = client_graph_arc.read_only();
229         let channels = readonly_graph.channels();
230         let client_channel_count = channels.len();
231         assert_eq!(client_channel_count, 1);
232
233         let first_channel = channels.get(&short_channel_id).unwrap();
234         assert!(&first_channel.announcement_message.is_none());
235         assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.base_msat, 5);
236         assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.base_msat, 10);
237         let last_update_seen_a = first_channel.one_to_two.as_ref().unwrap().last_update;
238         let last_update_seen_b = first_channel.two_to_one.as_ref().unwrap().last_update;
239         println!("last update a: {}", last_update_seen_a);
240         println!("last update b: {}", last_update_seen_b);
241         assert_eq!(last_update_seen_a, update_result - CLIENT_BACKDATE_INTERVAL);
242         assert_eq!(last_update_seen_b, update_result - CLIENT_BACKDATE_INTERVAL);
243 }
244
245 /// If a channel has only seen updates in one direction, it should not be announced
246 #[tokio::test]
247 async fn test_unidirectional_intermediate_update_consideration() {
248         let _sanitizer = SchemaSanitizer::new();
249
250         let logger = Arc::new(TestLogger::new());
251         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
252         let network_graph_arc = Arc::new(network_graph);
253         let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
254
255         let short_channel_id = 1;
256         let timestamp = current_time() - 10;
257         println!("timestamp: {}", timestamp);
258
259         { // seed the db
260                 let announcement = generate_announcement(short_channel_id);
261                 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 6, 0);
262                 let update_2 = generate_update(short_channel_id, true, timestamp + 1, 0, 0, 0, 3, 0);
263                 let update_3 = generate_update(short_channel_id, true, timestamp + 2, 0, 0, 0, 4, 0);
264
265                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
266                 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
267                 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
268                 network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap();
269
270                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
271                 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
272                 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
273                 receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap();
274                 drop(receiver);
275                 persister.persist_gossip().await;
276         }
277
278         let channel_count = network_graph_arc.read_only().channels().len();
279         assert_eq!(channel_count, 1);
280
281         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
282         let client_graph_arc = Arc::new(client_graph);
283         let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
284
285         let serialization = serialize_delta(network_graph_arc.clone(), timestamp + 1, logger.clone()).await;
286
287         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 1 update rows of the first update in a new direction", 1);
288         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 1 reference rows", 1);
289         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
290
291         assert_eq!(serialization.message_count, 3);
292         assert_eq!(serialization.announcement_count, 1);
293         assert_eq!(serialization.update_count, 2);
294         assert_eq!(serialization.update_count_full, 2);
295         assert_eq!(serialization.update_count_incremental, 0);
296
297         let update_result = rgs.update_network_graph(&serialization.data).unwrap();
298         println!("update result: {}", update_result);
299         // the update result must be a multiple of our snapshot granularity
300
301         let readonly_graph = client_graph_arc.read_only();
302         let channels = readonly_graph.channels();
303         let client_channel_count = channels.len();
304         assert_eq!(client_channel_count, 1);
305
306         clean_test_db().await;
307 }
308
309 /// If a channel has only seen updates in one direction, it should not be announced
310 #[tokio::test]
311 async fn test_bidirectional_intermediate_update_consideration() {
312         let _sanitizer = SchemaSanitizer::new();
313
314         let logger = Arc::new(TestLogger::new());
315         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
316         let network_graph_arc = Arc::new(network_graph);
317         let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
318
319         let short_channel_id = 1;
320         let timestamp = current_time() - 10;
321         println!("timestamp: {}", timestamp);
322
323         { // seed the db
324                 let announcement = generate_announcement(short_channel_id);
325                 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 5, 0);
326                 let update_2 = generate_update(short_channel_id, false, timestamp + 1, 0, 0, 0, 4, 0);
327                 let update_3 = generate_update(short_channel_id, false, timestamp + 2, 0, 0, 0, 3, 0);
328                 let update_4 = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 3, 0);
329
330                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
331                 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
332                 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
333                 network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap();
334                 network_graph_arc.update_channel_unsigned(&update_4.contents).unwrap();
335
336                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
337                 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
338                 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
339                 receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap();
340                 receiver.send(GossipMessage::ChannelUpdate(update_4, None)).await.unwrap();
341                 drop(receiver);
342                 persister.persist_gossip().await;
343         }
344
345         let channel_count = network_graph_arc.read_only().channels().len();
346         assert_eq!(channel_count, 1);
347
348         let serialization = serialize_delta(network_graph_arc.clone(), timestamp + 1, logger.clone()).await;
349
350         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 0 update rows of the first update in a new direction", 1);
351         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 2 reference rows", 1);
352         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
353
354         assert_eq!(serialization.message_count, 1);
355         assert_eq!(serialization.announcement_count, 0);
356         assert_eq!(serialization.update_count, 1);
357         assert_eq!(serialization.update_count_full, 0);
358         assert_eq!(serialization.update_count_incremental, 1);
359
360         clean_test_db().await;
361 }
362
363 #[tokio::test]
364 async fn test_full_snapshot_recency() {
365         let _sanitizer = SchemaSanitizer::new();
366         let logger = Arc::new(TestLogger::new());
367         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
368         let network_graph_arc = Arc::new(network_graph);
369
370         let short_channel_id = 1;
371         let timestamp = current_time();
372         println!("timestamp: {}", timestamp);
373
374         { // seed the db
375                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
376                 let announcement = generate_announcement(short_channel_id);
377                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
378                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
379
380                 { // direction false
381                         { // first update
382                                 let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
383                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
384                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
385                         }
386                         { // second update
387                                 let update = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
388                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
389                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
390                         }
391                 }
392                 { // direction true
393                         { // first and only update
394                                 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
395                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
396                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
397                         }
398                 }
399
400                 drop(receiver);
401                 persister.persist_gossip().await;
402         }
403
404         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
405         let client_graph_arc = Arc::new(client_graph);
406
407         { // sync after initial seed
408                 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
409                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
410
411                 let channel_count = network_graph_arc.read_only().channels().len();
412
413                 assert_eq!(channel_count, 1);
414                 assert_eq!(serialization.message_count, 3);
415                 assert_eq!(serialization.announcement_count, 1);
416                 assert_eq!(serialization.update_count, 2);
417
418                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
419                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
420                 // the update result must be a multiple of our snapshot granularity
421                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
422                 assert!(update_result < timestamp);
423
424                 let readonly_graph = client_graph_arc.read_only();
425                 let channels = readonly_graph.channels();
426                 let client_channel_count = channels.len();
427                 assert_eq!(client_channel_count, 1);
428
429                 let first_channel = channels.get(&short_channel_id).unwrap();
430                 assert!(&first_channel.announcement_message.is_none());
431                 // ensure the update in one direction shows the latest fee
432                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
433                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
434         }
435
436         clean_test_db().await;
437 }
438
439 #[tokio::test]
440 async fn test_full_snapshot_recency_with_wrong_seen_order() {
441         let _sanitizer = SchemaSanitizer::new();
442         let logger = Arc::new(TestLogger::new());
443         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
444         let network_graph_arc = Arc::new(network_graph);
445
446         let short_channel_id = 1;
447         let timestamp = current_time();
448         println!("timestamp: {}", timestamp);
449
450         { // seed the db
451                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
452                 let announcement = generate_announcement(short_channel_id);
453                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
454                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
455
456                 { // direction false
457                         { // first update, seen latest
458                                 let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
459                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
460                                 receiver.send(GossipMessage::ChannelUpdate(update, Some(timestamp))).await.unwrap();
461                         }
462                         { // second update, seen first
463                                 let update = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
464                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
465                                 receiver.send(GossipMessage::ChannelUpdate(update, Some(timestamp - 1))).await.unwrap();
466                         }
467                 }
468                 { // direction true
469                         { // first and only update
470                                 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
471                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
472                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
473                         }
474                 }
475
476                 drop(receiver);
477                 persister.persist_gossip().await;
478         }
479
480         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
481         let client_graph_arc = Arc::new(client_graph);
482
483         { // sync after initial seed
484                 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
485                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
486
487                 let channel_count = network_graph_arc.read_only().channels().len();
488
489                 assert_eq!(channel_count, 1);
490                 assert_eq!(serialization.message_count, 3);
491                 assert_eq!(serialization.announcement_count, 1);
492                 assert_eq!(serialization.update_count, 2);
493
494                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
495                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
496                 // the update result must be a multiple of our snapshot granularity
497                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
498                 assert!(update_result < timestamp);
499
500                 let readonly_graph = client_graph_arc.read_only();
501                 let channels = readonly_graph.channels();
502                 let client_channel_count = channels.len();
503                 assert_eq!(client_channel_count, 1);
504
505                 let first_channel = channels.get(&short_channel_id).unwrap();
506                 assert!(&first_channel.announcement_message.is_none());
507                 // ensure the update in one direction shows the latest fee
508                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
509                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
510         }
511
512         clean_test_db().await;
513 }
514
515 #[tokio::test]
516 async fn test_full_snapshot_recency_with_wrong_propagation_order() {
517         let _sanitizer = SchemaSanitizer::new();
518         let logger = Arc::new(TestLogger::new());
519         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
520         let network_graph_arc = Arc::new(network_graph);
521
522         let short_channel_id = 1;
523         let timestamp = current_time();
524         println!("timestamp: {}", timestamp);
525
526         { // seed the db
527                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
528                 let announcement = generate_announcement(short_channel_id);
529                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
530                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
531
532                 { // direction false
533                         // apply updates in their timestamp order
534                         let update_1 = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
535                         let update_2 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
536                         network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
537                         network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
538
539                         // propagate updates in their seen order
540                         receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - 1))).await.unwrap();
541                         receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp))).await.unwrap();
542                 }
543                 { // direction true
544                         { // first and only update
545                                 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
546                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
547                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
548                         }
549                 }
550
551                 drop(receiver);
552                 persister.persist_gossip().await;
553         }
554
555         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
556         let client_graph_arc = Arc::new(client_graph);
557
558         { // sync after initial seed
559                 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
560                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
561
562                 let channel_count = network_graph_arc.read_only().channels().len();
563
564                 assert_eq!(channel_count, 1);
565                 assert_eq!(serialization.message_count, 3);
566                 assert_eq!(serialization.announcement_count, 1);
567                 assert_eq!(serialization.update_count, 2);
568
569                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
570                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
571                 // the update result must be a multiple of our snapshot granularity
572                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
573                 assert!(update_result < timestamp);
574
575                 let readonly_graph = client_graph_arc.read_only();
576                 let channels = readonly_graph.channels();
577                 let client_channel_count = channels.len();
578                 assert_eq!(client_channel_count, 1);
579
580                 let first_channel = channels.get(&short_channel_id).unwrap();
581                 assert!(&first_channel.announcement_message.is_none());
582                 // ensure the update in one direction shows the latest fee
583                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
584                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
585         }
586
587         clean_test_db().await;
588 }
589
590 #[tokio::test]
591 async fn test_full_snapshot_mutiny_scenario() {
592         let _sanitizer = SchemaSanitizer::new();
593         let logger = Arc::new(TestLogger::new());
594         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
595         let network_graph_arc = Arc::new(network_graph);
596
597         let short_channel_id = 873706024403271681;
598         let timestamp = current_time();
599         // let oldest_simulation_timestamp = 1693300588;
600         let latest_simulation_timestamp = 1695909301;
601         let timestamp_offset = timestamp - latest_simulation_timestamp;
602         println!("timestamp: {}", timestamp);
603
604         { // seed the db
605                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
606                 let announcement = generate_announcement(short_channel_id);
607                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
608                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
609
610                 { // direction false
611                         {
612                                 let update = generate_update(short_channel_id, false, 1693507369 + timestamp_offset, 0, 0, 0, 0, 38);
613                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
614                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
615                         }
616                         {
617                                 let update = generate_update(short_channel_id, false, 1693680390 + timestamp_offset, 0, 0, 0, 0, 38);
618                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
619                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
620                         }
621                         {
622                                 let update = generate_update(short_channel_id, false, 1693749109 + timestamp_offset, 0, 0, 0, 0, 200);
623                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
624                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
625                         }
626                         {
627                                 let update = generate_update(short_channel_id, false, 1693925190 + timestamp_offset, 0, 0, 0, 0, 200);
628                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
629                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
630                         }
631                         {
632                                 let update = generate_update(short_channel_id, false, 1694008323 + timestamp_offset, 0, 0, 0, 0, 209);
633                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
634                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
635                         }
636                         {
637                                 let update = generate_update(short_channel_id, false, 1694219924 + timestamp_offset, 0, 0, 0, 0, 209);
638                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
639                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
640                         }
641                         {
642                                 let update = generate_update(short_channel_id, false, 1694267536 + timestamp_offset, 0, 0, 0, 0, 210);
643                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
644                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
645                         }
646                         {
647                                 let update = generate_update(short_channel_id, false, 1694458808 + timestamp_offset, 0, 0, 0, 0, 210);
648                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
649                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
650                         }
651                         {
652                                 let update = generate_update(short_channel_id, false, 1694526734 + timestamp_offset, 0, 0, 0, 0, 200);
653                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
654                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
655                         }
656                         {
657                                 let update = generate_update(short_channel_id, false, 1694794765 + timestamp_offset, 0, 0, 0, 0, 200);
658                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
659                                 receiver.send(GossipMessage::ChannelUpdate(update, Some(1695909301 + 2 * config::SYMLINK_GRANULARITY_INTERVAL + timestamp_offset))).await.unwrap();
660                         }
661                         {
662                                 let update = generate_update(short_channel_id, false, 1695909301 + timestamp_offset, 0, 0, 0, 0, 130);
663                                 // network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
664                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
665                         }
666                 }
667                 { // direction true
668                         {
669                                 let update = generate_update(short_channel_id, true, 1693300588 + timestamp_offset, 0, 0, 0, 0, 10);
670                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
671                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
672                         }
673                         {
674                                 let update = generate_update(short_channel_id, true, 1695003621 + timestamp_offset, 0, 0, 0, 0, 10);
675                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
676                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
677                         }
678                 }
679
680                 drop(receiver);
681                 persister.persist_gossip().await;
682         }
683
684         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
685         let client_graph_arc = Arc::new(client_graph);
686
687         { // sync after initial seed
688                 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
689                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
690
691                 let channel_count = network_graph_arc.read_only().channels().len();
692
693                 assert_eq!(channel_count, 1);
694                 assert_eq!(serialization.message_count, 3);
695                 assert_eq!(serialization.announcement_count, 1);
696                 assert_eq!(serialization.update_count, 2);
697
698                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
699                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
700                 println!("update result: {}", update_result);
701                 // the update result must be a multiple of our snapshot granularity
702                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
703                 assert!(update_result < timestamp);
704
705                 let timestamp_delta = timestamp - update_result;
706                 println!("timestamp delta: {}", timestamp_delta);
707                 assert!(timestamp_delta < config::snapshot_generation_interval());
708
709                 let readonly_graph = client_graph_arc.read_only();
710                 let channels = readonly_graph.channels();
711                 let client_channel_count = channels.len();
712                 assert_eq!(client_channel_count, 1);
713
714                 let first_channel = channels.get(&short_channel_id).unwrap();
715                 assert!(&first_channel.announcement_message.is_none());
716                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 130);
717                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
718         }
719
720         clean_test_db().await;
721 }
722
723 #[tokio::test]
724 async fn test_full_snapshot_interlaced_channel_timestamps() {
725         let _sanitizer = SchemaSanitizer::new();
726         let logger = Arc::new(TestLogger::new());
727         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
728         let network_graph_arc = Arc::new(network_graph);
729
730         let main_channel_id = 1;
731         let timestamp = current_time();
732         println!("timestamp: {}", timestamp);
733
734         { // seed the db
735                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
736                 let secondary_channel_id = main_channel_id + 1;
737
738                 { // main channel
739                         let announcement = generate_announcement(main_channel_id);
740                         network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
741                         receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
742                 }
743
744                 { // secondary channel
745                         let announcement = generate_announcement(secondary_channel_id);
746                         network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
747                         receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
748                 }
749
750                 { // main channel
751                         { // direction false
752                                 let update = generate_update(main_channel_id, false, timestamp - 2, 0, 0, 0, 0, 10);
753                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
754                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
755                         }
756                         { // direction true
757                                 let update = generate_update(main_channel_id, true, timestamp - 2, 0, 0, 0, 0, 5);
758                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
759                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
760                         }
761                 }
762
763                 { // in-between channel
764                         { // direction false
765                                 let update = generate_update(secondary_channel_id, false, timestamp - 1, 0, 0, 0, 0, 42);
766                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
767                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
768                         }
769                         { // direction true
770                                 let update = generate_update(secondary_channel_id, true, timestamp - 1, 0, 0, 0, 0, 42);
771                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
772                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
773                         }
774                 }
775
776                 { // main channel
777                         { // direction false
778                                 let update = generate_update(main_channel_id, false, timestamp, 0, 0, 0, 0, 11);
779                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
780                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
781                         }
782                         { // direction true
783                                 let update = generate_update(main_channel_id, true, timestamp, 0, 0, 0, 0, 6);
784                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
785                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
786                         }
787                 }
788
789                 drop(receiver);
790                 persister.persist_gossip().await;
791         }
792
793         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
794         let client_graph_arc = Arc::new(client_graph);
795
796         { // sync after initial seed
797                 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
798                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 2", 1);
799
800                 let channel_count = network_graph_arc.read_only().channels().len();
801
802                 assert_eq!(channel_count, 2);
803                 assert_eq!(serialization.message_count, 6);
804                 assert_eq!(serialization.announcement_count, 2);
805                 assert_eq!(serialization.update_count, 4);
806
807                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
808                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
809                 // the update result must be a multiple of our snapshot granularity
810                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
811                 assert!(update_result < timestamp);
812
813                 let readonly_graph = client_graph_arc.read_only();
814                 let channels = readonly_graph.channels();
815                 let client_channel_count = channels.len();
816                 assert_eq!(client_channel_count, 2);
817
818                 let first_channel = channels.get(&main_channel_id).unwrap();
819                 assert!(&first_channel.announcement_message.is_none());
820                 // ensure the update in one direction shows the latest fee
821                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 11);
822                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 6);
823         }
824
825         clean_test_db().await;
826 }
827
828 #[tokio::test]
829 async fn test_full_snapshot_persistence() {
830         let schema_sanitizer = SchemaSanitizer::new();
831         let logger = Arc::new(TestLogger::new());
832         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
833         let network_graph_arc = Arc::new(network_graph);
834         let snapshotter = Snapshotter::new(network_graph_arc.clone(), logger.clone());
835         let cache_sanitizer = CacheSanitizer::new(&schema_sanitizer);
836
837         let short_channel_id = 1;
838         let timestamp = current_time();
839         println!("timestamp: {}", timestamp);
840
841         { // seed the db
842                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
843                 let announcement = generate_announcement(short_channel_id);
844                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
845                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
846
847                 { // direction true
848                         let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
849                         network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
850                         receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
851                 }
852
853                 { // direction false
854                         let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
855                         network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
856                         receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
857                 }
858
859
860                 drop(receiver);
861                 persister.persist_gossip().await;
862         }
863
864         let cache_path = cache_sanitizer.cache_path();
865         let symlink_path = format!("{}/symlinks/0.bin", cache_path);
866
867         // generate snapshots
868         {
869                 snapshotter.generate_snapshots(20, 5, &[5, u64::MAX], &cache_path, Some(10)).await;
870
871                 let symlinked_data = fs::read(&symlink_path).unwrap();
872                 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
873                 let client_graph_arc = Arc::new(client_graph);
874
875                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
876                 let update_result = rgs.update_network_graph(&symlinked_data).unwrap();
877                 // the update result must be a multiple of our snapshot granularity
878                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
879
880                 let readonly_graph = client_graph_arc.read_only();
881                 let channels = readonly_graph.channels();
882                 let client_channel_count = channels.len();
883                 assert_eq!(client_channel_count, 1);
884
885                 let first_channel = channels.get(&short_channel_id).unwrap();
886                 assert!(&first_channel.announcement_message.is_none());
887                 // ensure the update in one direction shows the latest fee
888                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 38);
889                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
890         }
891
892         { // update the db
893                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
894
895                 { // second update
896                         let update = generate_update(short_channel_id, false, timestamp + 30, 0, 0, 0, 0, 39);
897                         network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
898                         receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
899                 }
900
901                 drop(receiver);
902                 persister.persist_gossip().await;
903         }
904
905         // regenerate snapshots
906         {
907                 snapshotter.generate_snapshots(20, 5, &[5, u64::MAX], &cache_path, Some(10)).await;
908
909                 let symlinked_data = fs::read(&symlink_path).unwrap();
910                 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
911                 let client_graph_arc = Arc::new(client_graph);
912
913                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
914                 let update_result = rgs.update_network_graph(&symlinked_data).unwrap();
915                 // the update result must be a multiple of our snapshot granularity
916                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
917
918                 let readonly_graph = client_graph_arc.read_only();
919                 let channels = readonly_graph.channels();
920                 let client_channel_count = channels.len();
921                 assert_eq!(client_channel_count, 1);
922
923                 let first_channel = channels.get(&short_channel_id).unwrap();
924                 assert!(&first_channel.announcement_message.is_none());
925                 // ensure the update in one direction shows the latest fee
926                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
927                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
928         }
929
930         // clean up afterwards
931         clean_test_db().await;
932 }