Don't panic while panicking.
[rapid-gossip-sync-server] / src / tests / mod.rs
1 //! Multi-module tests that use database fixtures
2
3 use std::cell::RefCell;
4 use std::sync::Arc;
5 use std::{fs, thread};
6 use std::time::{SystemTime, UNIX_EPOCH};
7 use bitcoin::blockdata::constants::ChainHash;
8 use bitcoin::Network;
9 use bitcoin::secp256k1::ecdsa::Signature;
10 use bitcoin::secp256k1::{Secp256k1, SecretKey};
11 use bitcoin::hashes::Hash;
12 use bitcoin::hashes::sha256d::Hash as Sha256dHash;
13 use hex_conservative::DisplayHex;
14 use lightning::ln::features::ChannelFeatures;
15 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate};
16 use lightning::routing::gossip::{NetworkGraph, NodeId};
17 use lightning::util::ser::Writeable;
18 use lightning_rapid_gossip_sync::RapidGossipSync;
19 use crate::{config, serialize_delta};
20 use crate::persistence::GossipPersister;
21 use crate::snapshot::Snapshotter;
22 use crate::types::{GossipMessage, tests::TestLogger};
23
24 const CLIENT_BACKDATE_INTERVAL: u32 = 3600 * 24 * 7; // client backdates RGS by a week
25
26 thread_local! {
27         static DB_TEST_SCHEMA: RefCell<Option<String>> = RefCell::new(None);
28         static IS_TEST_SCHEMA_CLEAN: RefCell<Option<bool>> = RefCell::new(None);
29 }
30
31 fn blank_signature() -> Signature {
32         Signature::from_compact(&[0u8; 64]).unwrap()
33 }
34
35 fn genesis_hash() -> ChainHash {
36         ChainHash::using_genesis_block(Network::Bitcoin)
37 }
38
39 fn current_time() -> u32 {
40         SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs() as u32
41 }
42
43 pub(crate) fn db_test_schema() -> String {
44         DB_TEST_SCHEMA.with(|suffix_reference| {
45                 let suffix_option = suffix_reference.borrow();
46                 suffix_option.as_ref().unwrap().clone()
47         })
48 }
49
50 fn generate_announcement(short_channel_id: u64) -> ChannelAnnouncement {
51         let secp_context = Secp256k1::new();
52
53         let random_private_key_1 = SecretKey::from_slice(&[1; 32]).unwrap();
54         let random_public_key_1 = random_private_key_1.public_key(&secp_context);
55         let node_id_1 = NodeId::from_pubkey(&random_public_key_1);
56
57         let random_private_key_2 = SecretKey::from_slice(&[2; 32]).unwrap();
58         let random_public_key_2 = random_private_key_2.public_key(&secp_context);
59         let node_id_2 = NodeId::from_pubkey(&random_public_key_2);
60
61         let announcement = UnsignedChannelAnnouncement {
62                 features: ChannelFeatures::empty(),
63                 chain_hash: genesis_hash(),
64                 short_channel_id,
65                 node_id_1,
66                 node_id_2,
67                 bitcoin_key_1: node_id_1,
68                 bitcoin_key_2: node_id_2,
69                 excess_data: vec![],
70         };
71
72         let msg_hash = bitcoin::secp256k1::Message::from_slice(&Sha256dHash::hash(&announcement.encode()[..])[..]).unwrap();
73         let node_signature_1 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_1);
74         let node_signature_2 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_2);
75
76         ChannelAnnouncement {
77                 node_signature_1,
78                 node_signature_2,
79                 bitcoin_signature_1: node_signature_1,
80                 bitcoin_signature_2: node_signature_2,
81                 contents: announcement,
82         }
83 }
84
85 fn generate_update(scid: u64, direction: bool, timestamp: u32, expiry_delta: u16, min_msat: u64, max_msat: u64, base_msat: u32, fee_rate: u32) -> ChannelUpdate {
86         let flag_mask = if direction { 1 } else { 0 };
87         ChannelUpdate {
88                 signature: blank_signature(),
89                 contents: UnsignedChannelUpdate {
90                         chain_hash: genesis_hash(),
91                         short_channel_id: scid,
92                         timestamp,
93                         flags: 0 | flag_mask,
94                         cltv_expiry_delta: expiry_delta,
95                         htlc_minimum_msat: min_msat,
96                         htlc_maximum_msat: max_msat,
97                         fee_base_msat: base_msat,
98                         fee_proportional_millionths: fee_rate,
99                         excess_data: vec![],
100                 },
101         }
102 }
103
104 struct SchemaSanitizer {}
105
106 impl SchemaSanitizer {
107         fn new() -> Self {
108                 IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
109                         let mut is_clean_option = cleanliness_reference.borrow_mut();
110                         assert!(is_clean_option.is_none());
111                         *is_clean_option = Some(false);
112                 });
113
114                 DB_TEST_SCHEMA.with(|suffix_reference| {
115                         let mut suffix_option = suffix_reference.borrow_mut();
116                         let current_time = SystemTime::now();
117                         let unix_time = current_time.duration_since(UNIX_EPOCH).expect("Time went backwards");
118                         let timestamp_seconds = unix_time.as_secs();
119                         let timestamp_nanos = unix_time.as_nanos();
120                         // sometimes Rust thinks two tests start at the same nanosecond, causing a schema conflict
121                         let thread_id = thread::current().id();
122                         let preimage = format!("{:?}-{}", thread_id, timestamp_nanos);
123                         println!("test schema preimage: {}", preimage);
124                         let suffix = Sha256dHash::hash(preimage.as_bytes()).encode();
125                         // the schema must start with a letter
126                         let schema = format!("test_{}_{}", timestamp_seconds, suffix.as_hex());
127                         *suffix_option = Some(schema);
128                 });
129
130                 return Self {};
131         }
132 }
133
134 impl Drop for SchemaSanitizer {
135         fn drop(&mut self) {
136                 IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
137                         let is_clean_option = cleanliness_reference.borrow();
138                         if let Some(is_clean) = *is_clean_option {
139                                 if std::thread::panicking() {
140                                         return;
141                                 }
142                                 assert_eq!(is_clean, true);
143                         }
144                 });
145         }
146 }
147
148 struct CacheSanitizer {}
149
150 impl CacheSanitizer {
151         /// The CacheSanitizer instantiation requires that there be a schema sanitizer
152         fn new(_: &SchemaSanitizer) -> Self {
153                 Self {}
154         }
155
156         fn cache_path(&self) -> String {
157                 format!("./res/{}/", db_test_schema())
158         }
159 }
160
161 impl Drop for CacheSanitizer {
162         fn drop(&mut self) {
163                 let cache_path = self.cache_path();
164                 fs::remove_dir_all(cache_path).unwrap();
165         }
166 }
167
168
169 async fn clean_test_db() {
170         let client = crate::connect_to_db().await;
171         let schema = db_test_schema();
172         client.execute(&format!("DROP SCHEMA IF EXISTS {} CASCADE", schema), &[]).await.unwrap();
173         IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
174                 let mut is_clean_option = cleanliness_reference.borrow_mut();
175                 *is_clean_option = Some(true);
176         });
177 }
178
179 #[tokio::test]
180 async fn test_persistence_runtime() {
181         let _sanitizer = SchemaSanitizer::new();
182         let logger = Arc::new(TestLogger::new());
183         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
184         let network_graph_arc = Arc::new(network_graph);
185         let (_persister, _receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
186
187         tokio::task::spawn_blocking(move || {
188                 drop(_persister);
189         }).await.unwrap();
190
191         clean_test_db().await;
192 }
193
194
195 #[tokio::test]
196 async fn test_trivial_setup() {
197         let _sanitizer = SchemaSanitizer::new();
198         let logger = Arc::new(TestLogger::new());
199         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
200         let network_graph_arc = Arc::new(network_graph);
201         let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
202
203         let short_channel_id = 1;
204         let timestamp = current_time() - 10;
205         println!("timestamp: {}", timestamp);
206
207         { // seed the db
208                 let announcement = generate_announcement(short_channel_id);
209                 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 5, 0);
210                 let update_2 = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 10, 0);
211
212                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
213                 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
214                 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
215
216                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
217                 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
218                 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
219                 drop(receiver);
220                 persister.persist_gossip().await;
221         }
222
223         let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
224         logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
225         clean_test_db().await;
226
227         let channel_count = network_graph_arc.read_only().channels().len();
228
229         assert_eq!(channel_count, 1);
230         assert_eq!(serialization.message_count, 3);
231         assert_eq!(serialization.announcement_count, 1);
232         assert_eq!(serialization.update_count, 2);
233
234         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
235         let client_graph_arc = Arc::new(client_graph);
236         let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
237         let update_result = rgs.update_network_graph(&serialization.data).unwrap();
238         println!("update result: {}", update_result);
239         // the update result must be a multiple of our snapshot granularity
240         assert_eq!(update_result % config::snapshot_generation_interval(), 0);
241         assert!(update_result < timestamp);
242
243         let timestamp_delta = timestamp - update_result;
244         println!("timestamp delta: {}", timestamp_delta);
245         assert!(timestamp_delta < config::snapshot_generation_interval());
246
247         let readonly_graph = client_graph_arc.read_only();
248         let channels = readonly_graph.channels();
249         let client_channel_count = channels.len();
250         assert_eq!(client_channel_count, 1);
251
252         let first_channel = channels.get(&short_channel_id).unwrap();
253         assert!(&first_channel.announcement_message.is_none());
254         assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.base_msat, 5);
255         assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.base_msat, 10);
256         let last_update_seen_a = first_channel.one_to_two.as_ref().unwrap().last_update;
257         let last_update_seen_b = first_channel.two_to_one.as_ref().unwrap().last_update;
258         println!("last update a: {}", last_update_seen_a);
259         println!("last update b: {}", last_update_seen_b);
260         assert_eq!(last_update_seen_a, update_result - CLIENT_BACKDATE_INTERVAL);
261         assert_eq!(last_update_seen_b, update_result - CLIENT_BACKDATE_INTERVAL);
262
263         tokio::task::spawn_blocking(move || {
264                 drop(persister);
265         }).await.unwrap();
266 }
267
268 /// If a channel has only seen updates in one direction, it should not be announced
269 #[tokio::test]
270 async fn test_unidirectional_intermediate_update_consideration() {
271         let _sanitizer = SchemaSanitizer::new();
272
273         let logger = Arc::new(TestLogger::new());
274         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
275         let network_graph_arc = Arc::new(network_graph);
276         let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
277
278         let short_channel_id = 1;
279         let timestamp = current_time() - 10;
280         println!("timestamp: {}", timestamp);
281
282         { // seed the db
283                 let announcement = generate_announcement(short_channel_id);
284                 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 6, 0);
285                 let update_2 = generate_update(short_channel_id, true, timestamp + 1, 0, 0, 0, 3, 0);
286                 let update_3 = generate_update(short_channel_id, true, timestamp + 2, 0, 0, 0, 4, 0);
287
288                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
289                 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
290                 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
291                 network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap();
292
293                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
294                 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
295                 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
296                 receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap();
297                 drop(receiver);
298                 persister.persist_gossip().await;
299         }
300
301         let channel_count = network_graph_arc.read_only().channels().len();
302         assert_eq!(channel_count, 1);
303
304         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
305         let client_graph_arc = Arc::new(client_graph);
306         let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
307
308         let serialization = serialize_delta(network_graph_arc.clone(), timestamp + 1, logger.clone()).await;
309
310         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 1 update rows of the first update in a new direction", 1);
311         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 1 reference rows", 1);
312         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
313
314         assert_eq!(serialization.message_count, 3);
315         assert_eq!(serialization.announcement_count, 1);
316         assert_eq!(serialization.update_count, 2);
317         assert_eq!(serialization.update_count_full, 2);
318         assert_eq!(serialization.update_count_incremental, 0);
319
320         let update_result = rgs.update_network_graph(&serialization.data).unwrap();
321         println!("update result: {}", update_result);
322         // the update result must be a multiple of our snapshot granularity
323
324         let readonly_graph = client_graph_arc.read_only();
325         let channels = readonly_graph.channels();
326         let client_channel_count = channels.len();
327         assert_eq!(client_channel_count, 1);
328
329         tokio::task::spawn_blocking(move || {
330                 drop(persister);
331         }).await.unwrap();
332
333         clean_test_db().await;
334 }
335
336 /// If a channel has only seen updates in one direction, it should not be announced
337 #[tokio::test]
338 async fn test_bidirectional_intermediate_update_consideration() {
339         let _sanitizer = SchemaSanitizer::new();
340
341         let logger = Arc::new(TestLogger::new());
342         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
343         let network_graph_arc = Arc::new(network_graph);
344         let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
345
346         let short_channel_id = 1;
347         let timestamp = current_time() - 10;
348         println!("timestamp: {}", timestamp);
349
350         { // seed the db
351                 let announcement = generate_announcement(short_channel_id);
352                 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 5, 0);
353                 let update_2 = generate_update(short_channel_id, false, timestamp + 1, 0, 0, 0, 4, 0);
354                 let update_3 = generate_update(short_channel_id, false, timestamp + 2, 0, 0, 0, 3, 0);
355                 let update_4 = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 3, 0);
356
357                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
358                 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
359                 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
360                 network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap();
361                 network_graph_arc.update_channel_unsigned(&update_4.contents).unwrap();
362
363                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
364                 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
365                 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
366                 receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap();
367                 receiver.send(GossipMessage::ChannelUpdate(update_4, None)).await.unwrap();
368                 drop(receiver);
369                 persister.persist_gossip().await;
370         }
371
372         let channel_count = network_graph_arc.read_only().channels().len();
373         assert_eq!(channel_count, 1);
374
375         let serialization = serialize_delta(network_graph_arc.clone(), timestamp + 1, logger.clone()).await;
376
377         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 0 update rows of the first update in a new direction", 1);
378         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 2 reference rows", 1);
379         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
380
381         assert_eq!(serialization.message_count, 1);
382         assert_eq!(serialization.announcement_count, 0);
383         assert_eq!(serialization.update_count, 1);
384         assert_eq!(serialization.update_count_full, 0);
385         assert_eq!(serialization.update_count_incremental, 1);
386
387         tokio::task::spawn_blocking(move || {
388                 drop(persister);
389         }).await.unwrap();
390
391         clean_test_db().await;
392 }
393
394 #[tokio::test]
395 async fn test_full_snapshot_recency() {
396         let _sanitizer = SchemaSanitizer::new();
397         let logger = Arc::new(TestLogger::new());
398         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
399         let network_graph_arc = Arc::new(network_graph);
400
401         let short_channel_id = 1;
402         let timestamp = current_time();
403         println!("timestamp: {}", timestamp);
404
405         { // seed the db
406                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
407                 let announcement = generate_announcement(short_channel_id);
408                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
409                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
410
411                 { // direction false
412                         { // first update
413                                 let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
414                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
415                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
416                         }
417                         { // second update
418                                 let update = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
419                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
420                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
421                         }
422                 }
423                 { // direction true
424                         { // first and only update
425                                 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
426                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
427                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
428                         }
429                 }
430
431                 drop(receiver);
432                 persister.persist_gossip().await;
433
434                 tokio::task::spawn_blocking(move || {
435                         drop(persister);
436                 }).await.unwrap();
437         }
438
439         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
440         let client_graph_arc = Arc::new(client_graph);
441
442         { // sync after initial seed
443                 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
444                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
445
446                 let channel_count = network_graph_arc.read_only().channels().len();
447
448                 assert_eq!(channel_count, 1);
449                 assert_eq!(serialization.message_count, 3);
450                 assert_eq!(serialization.announcement_count, 1);
451                 assert_eq!(serialization.update_count, 2);
452
453                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
454                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
455                 // the update result must be a multiple of our snapshot granularity
456                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
457                 assert!(update_result < timestamp);
458
459                 let readonly_graph = client_graph_arc.read_only();
460                 let channels = readonly_graph.channels();
461                 let client_channel_count = channels.len();
462                 assert_eq!(client_channel_count, 1);
463
464                 let first_channel = channels.get(&short_channel_id).unwrap();
465                 assert!(&first_channel.announcement_message.is_none());
466                 // ensure the update in one direction shows the latest fee
467                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
468                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
469         }
470
471         clean_test_db().await;
472 }
473
474 #[tokio::test]
475 async fn test_full_snapshot_recency_with_wrong_seen_order() {
476         let _sanitizer = SchemaSanitizer::new();
477         let logger = Arc::new(TestLogger::new());
478         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
479         let network_graph_arc = Arc::new(network_graph);
480
481         let short_channel_id = 1;
482         let timestamp = current_time();
483         println!("timestamp: {}", timestamp);
484
485         { // seed the db
486                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
487                 let announcement = generate_announcement(short_channel_id);
488                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
489                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
490
491                 { // direction false
492                         { // first update, seen latest
493                                 let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
494                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
495                                 receiver.send(GossipMessage::ChannelUpdate(update, Some(timestamp))).await.unwrap();
496                         }
497                         { // second update, seen first
498                                 let update = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
499                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
500                                 receiver.send(GossipMessage::ChannelUpdate(update, Some(timestamp - 1))).await.unwrap();
501                         }
502                 }
503                 { // direction true
504                         { // first and only update
505                                 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
506                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
507                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
508                         }
509                 }
510
511                 drop(receiver);
512                 persister.persist_gossip().await;
513
514                 tokio::task::spawn_blocking(move || {
515                         drop(persister);
516                 }).await.unwrap();
517         }
518
519         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
520         let client_graph_arc = Arc::new(client_graph);
521
522         { // sync after initial seed
523                 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
524                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
525
526                 let channel_count = network_graph_arc.read_only().channels().len();
527
528                 assert_eq!(channel_count, 1);
529                 assert_eq!(serialization.message_count, 3);
530                 assert_eq!(serialization.announcement_count, 1);
531                 assert_eq!(serialization.update_count, 2);
532
533                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
534                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
535                 // the update result must be a multiple of our snapshot granularity
536                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
537                 assert!(update_result < timestamp);
538
539                 let readonly_graph = client_graph_arc.read_only();
540                 let channels = readonly_graph.channels();
541                 let client_channel_count = channels.len();
542                 assert_eq!(client_channel_count, 1);
543
544                 let first_channel = channels.get(&short_channel_id).unwrap();
545                 assert!(&first_channel.announcement_message.is_none());
546                 // ensure the update in one direction shows the latest fee
547                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
548                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
549         }
550
551         clean_test_db().await;
552 }
553
554 #[tokio::test]
555 async fn test_full_snapshot_recency_with_wrong_propagation_order() {
556         let _sanitizer = SchemaSanitizer::new();
557         let logger = Arc::new(TestLogger::new());
558         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
559         let network_graph_arc = Arc::new(network_graph);
560
561         let short_channel_id = 1;
562         let timestamp = current_time();
563         println!("timestamp: {}", timestamp);
564
565         { // seed the db
566                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
567                 let announcement = generate_announcement(short_channel_id);
568                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
569                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
570
571                 { // direction false
572                         // apply updates in their timestamp order
573                         let update_1 = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
574                         let update_2 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
575                         network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
576                         network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
577
578                         // propagate updates in their seen order
579                         receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - 1))).await.unwrap();
580                         receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp))).await.unwrap();
581                 }
582                 { // direction true
583                         { // first and only update
584                                 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
585                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
586                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
587                         }
588                 }
589
590                 drop(receiver);
591                 persister.persist_gossip().await;
592
593                 tokio::task::spawn_blocking(move || {
594                         drop(persister);
595                 }).await.unwrap();
596         }
597
598         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
599         let client_graph_arc = Arc::new(client_graph);
600
601         { // sync after initial seed
602                 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
603                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
604
605                 let channel_count = network_graph_arc.read_only().channels().len();
606
607                 assert_eq!(channel_count, 1);
608                 assert_eq!(serialization.message_count, 3);
609                 assert_eq!(serialization.announcement_count, 1);
610                 assert_eq!(serialization.update_count, 2);
611
612                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
613                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
614                 // the update result must be a multiple of our snapshot granularity
615                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
616                 assert!(update_result < timestamp);
617
618                 let readonly_graph = client_graph_arc.read_only();
619                 let channels = readonly_graph.channels();
620                 let client_channel_count = channels.len();
621                 assert_eq!(client_channel_count, 1);
622
623                 let first_channel = channels.get(&short_channel_id).unwrap();
624                 assert!(&first_channel.announcement_message.is_none());
625                 // ensure the update in one direction shows the latest fee
626                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
627                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
628         }
629
630         clean_test_db().await;
631 }
632
633 #[tokio::test]
634 async fn test_full_snapshot_mutiny_scenario() {
635         let _sanitizer = SchemaSanitizer::new();
636         let logger = Arc::new(TestLogger::new());
637         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
638         let network_graph_arc = Arc::new(network_graph);
639
640         let short_channel_id = 873706024403271681;
641         let timestamp = current_time();
642         // let oldest_simulation_timestamp = 1693300588;
643         let latest_simulation_timestamp = 1695909301;
644         let timestamp_offset = timestamp - latest_simulation_timestamp;
645         println!("timestamp: {}", timestamp);
646
647         { // seed the db
648                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
649                 let announcement = generate_announcement(short_channel_id);
650                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
651                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
652
653                 { // direction false
654                         {
655                                 let update = generate_update(short_channel_id, false, 1693507369 + timestamp_offset, 0, 0, 0, 0, 38);
656                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
657                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
658                         }
659                         {
660                                 let update = generate_update(short_channel_id, false, 1693680390 + timestamp_offset, 0, 0, 0, 0, 38);
661                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
662                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
663                         }
664                         {
665                                 let update = generate_update(short_channel_id, false, 1693749109 + timestamp_offset, 0, 0, 0, 0, 200);
666                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
667                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
668                         }
669                         {
670                                 let update = generate_update(short_channel_id, false, 1693925190 + timestamp_offset, 0, 0, 0, 0, 200);
671                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
672                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
673                         }
674                         {
675                                 let update = generate_update(short_channel_id, false, 1694008323 + timestamp_offset, 0, 0, 0, 0, 209);
676                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
677                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
678                         }
679                         {
680                                 let update = generate_update(short_channel_id, false, 1694219924 + timestamp_offset, 0, 0, 0, 0, 209);
681                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
682                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
683                         }
684                         {
685                                 let update = generate_update(short_channel_id, false, 1694267536 + timestamp_offset, 0, 0, 0, 0, 210);
686                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
687                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
688                         }
689                         {
690                                 let update = generate_update(short_channel_id, false, 1694458808 + timestamp_offset, 0, 0, 0, 0, 210);
691                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
692                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
693                         }
694                         {
695                                 let update = generate_update(short_channel_id, false, 1694526734 + timestamp_offset, 0, 0, 0, 0, 200);
696                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
697                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
698                         }
699                         {
700                                 let update = generate_update(short_channel_id, false, 1694794765 + timestamp_offset, 0, 0, 0, 0, 200);
701                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
702                                 receiver.send(GossipMessage::ChannelUpdate(update, Some(1695909301 + 2 * config::SYMLINK_GRANULARITY_INTERVAL + timestamp_offset))).await.unwrap();
703                         }
704                         {
705                                 let update = generate_update(short_channel_id, false, 1695909301 + timestamp_offset, 0, 0, 0, 0, 130);
706                                 // network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
707                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
708                         }
709                 }
710                 { // direction true
711                         {
712                                 let update = generate_update(short_channel_id, true, 1693300588 + timestamp_offset, 0, 0, 0, 0, 10);
713                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
714                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
715                         }
716                         {
717                                 let update = generate_update(short_channel_id, true, 1695003621 + timestamp_offset, 0, 0, 0, 0, 10);
718                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
719                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
720                         }
721                 }
722
723                 drop(receiver);
724                 persister.persist_gossip().await;
725
726                 tokio::task::spawn_blocking(move || {
727                         drop(persister);
728                 }).await.unwrap();
729         }
730
731         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
732         let client_graph_arc = Arc::new(client_graph);
733
734         { // sync after initial seed
735                 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
736                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
737
738                 let channel_count = network_graph_arc.read_only().channels().len();
739
740                 assert_eq!(channel_count, 1);
741                 assert_eq!(serialization.message_count, 3);
742                 assert_eq!(serialization.announcement_count, 1);
743                 assert_eq!(serialization.update_count, 2);
744
745                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
746                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
747                 println!("update result: {}", update_result);
748                 // the update result must be a multiple of our snapshot granularity
749                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
750                 assert!(update_result < timestamp);
751
752                 let timestamp_delta = timestamp - update_result;
753                 println!("timestamp delta: {}", timestamp_delta);
754                 assert!(timestamp_delta < config::snapshot_generation_interval());
755
756                 let readonly_graph = client_graph_arc.read_only();
757                 let channels = readonly_graph.channels();
758                 let client_channel_count = channels.len();
759                 assert_eq!(client_channel_count, 1);
760
761                 let first_channel = channels.get(&short_channel_id).unwrap();
762                 assert!(&first_channel.announcement_message.is_none());
763                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 130);
764                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
765         }
766
767         clean_test_db().await;
768 }
769
770 #[tokio::test]
771 async fn test_full_snapshot_interlaced_channel_timestamps() {
772         let _sanitizer = SchemaSanitizer::new();
773         let logger = Arc::new(TestLogger::new());
774         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
775         let network_graph_arc = Arc::new(network_graph);
776
777         let main_channel_id = 1;
778         let timestamp = current_time();
779         println!("timestamp: {}", timestamp);
780
781         { // seed the db
782                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
783                 let secondary_channel_id = main_channel_id + 1;
784
785                 { // main channel
786                         let announcement = generate_announcement(main_channel_id);
787                         network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
788                         receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
789                 }
790
791                 { // secondary channel
792                         let announcement = generate_announcement(secondary_channel_id);
793                         network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
794                         receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
795                 }
796
797                 { // main channel
798                         { // direction false
799                                 let update = generate_update(main_channel_id, false, timestamp - 2, 0, 0, 0, 0, 10);
800                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
801                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
802                         }
803                         { // direction true
804                                 let update = generate_update(main_channel_id, true, timestamp - 2, 0, 0, 0, 0, 5);
805                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
806                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
807                         }
808                 }
809
810                 { // in-between channel
811                         { // direction false
812                                 let update = generate_update(secondary_channel_id, false, timestamp - 1, 0, 0, 0, 0, 42);
813                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
814                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
815                         }
816                         { // direction true
817                                 let update = generate_update(secondary_channel_id, true, timestamp - 1, 0, 0, 0, 0, 42);
818                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
819                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
820                         }
821                 }
822
823                 { // main channel
824                         { // direction false
825                                 let update = generate_update(main_channel_id, false, timestamp, 0, 0, 0, 0, 11);
826                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
827                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
828                         }
829                         { // direction true
830                                 let update = generate_update(main_channel_id, true, timestamp, 0, 0, 0, 0, 6);
831                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
832                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
833                         }
834                 }
835
836                 drop(receiver);
837                 persister.persist_gossip().await;
838
839                 tokio::task::spawn_blocking(move || {
840                         drop(persister);
841                 }).await.unwrap();
842         }
843
844         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
845         let client_graph_arc = Arc::new(client_graph);
846
847         { // sync after initial seed
848                 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
849                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 2", 1);
850
851                 let channel_count = network_graph_arc.read_only().channels().len();
852
853                 assert_eq!(channel_count, 2);
854                 assert_eq!(serialization.message_count, 6);
855                 assert_eq!(serialization.announcement_count, 2);
856                 assert_eq!(serialization.update_count, 4);
857
858                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
859                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
860                 // the update result must be a multiple of our snapshot granularity
861                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
862                 assert!(update_result < timestamp);
863
864                 let readonly_graph = client_graph_arc.read_only();
865                 let channels = readonly_graph.channels();
866                 let client_channel_count = channels.len();
867                 assert_eq!(client_channel_count, 2);
868
869                 let first_channel = channels.get(&main_channel_id).unwrap();
870                 assert!(&first_channel.announcement_message.is_none());
871                 // ensure the update in one direction shows the latest fee
872                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 11);
873                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 6);
874         }
875
876         clean_test_db().await;
877 }
878
879 #[tokio::test]
880 async fn test_full_snapshot_persistence() {
881         let schema_sanitizer = SchemaSanitizer::new();
882         let logger = Arc::new(TestLogger::new());
883         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
884         let network_graph_arc = Arc::new(network_graph);
885         let snapshotter = Snapshotter::new(network_graph_arc.clone(), logger.clone());
886         let cache_sanitizer = CacheSanitizer::new(&schema_sanitizer);
887
888         let short_channel_id = 1;
889         let timestamp = current_time();
890         println!("timestamp: {}", timestamp);
891
892         { // seed the db
893                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
894                 let announcement = generate_announcement(short_channel_id);
895                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
896                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
897
898                 { // direction true
899                         let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
900                         network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
901                         receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
902                 }
903
904                 { // direction false
905                         let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
906                         network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
907                         receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
908                 }
909
910
911                 drop(receiver);
912                 persister.persist_gossip().await;
913
914                 tokio::task::spawn_blocking(move || {
915                         drop(persister);
916                 }).await.unwrap();
917         }
918
919         let cache_path = cache_sanitizer.cache_path();
920         let symlink_path = format!("{}/symlinks/0.bin", cache_path);
921
922         // generate snapshots
923         {
924                 snapshotter.generate_snapshots(20, 5, &[5, u64::MAX], &cache_path, Some(10)).await;
925
926                 let symlinked_data = fs::read(&symlink_path).unwrap();
927                 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
928                 let client_graph_arc = Arc::new(client_graph);
929
930                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
931                 let update_result = rgs.update_network_graph(&symlinked_data).unwrap();
932                 // the update result must be a multiple of our snapshot granularity
933                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
934
935                 let readonly_graph = client_graph_arc.read_only();
936                 let channels = readonly_graph.channels();
937                 let client_channel_count = channels.len();
938                 assert_eq!(client_channel_count, 1);
939
940                 let first_channel = channels.get(&short_channel_id).unwrap();
941                 assert!(&first_channel.announcement_message.is_none());
942                 // ensure the update in one direction shows the latest fee
943                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 38);
944                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
945         }
946
947         { // update the db
948                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
949
950                 { // second update
951                         let update = generate_update(short_channel_id, false, timestamp + 30, 0, 0, 0, 0, 39);
952                         network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
953                         receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
954                 }
955
956                 drop(receiver);
957                 persister.persist_gossip().await;
958
959                 tokio::task::spawn_blocking(move || {
960                         drop(persister);
961                 }).await.unwrap();
962         }
963
964         // regenerate snapshots
965         {
966                 snapshotter.generate_snapshots(20, 5, &[5, u64::MAX], &cache_path, Some(10)).await;
967
968                 let symlinked_data = fs::read(&symlink_path).unwrap();
969                 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
970                 let client_graph_arc = Arc::new(client_graph);
971
972                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
973                 let update_result = rgs.update_network_graph(&symlinked_data).unwrap();
974                 // the update result must be a multiple of our snapshot granularity
975                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
976
977                 let readonly_graph = client_graph_arc.read_only();
978                 let channels = readonly_graph.channels();
979                 let client_channel_count = channels.len();
980                 assert_eq!(client_channel_count, 1);
981
982                 let first_channel = channels.get(&short_channel_id).unwrap();
983                 assert!(&first_channel.announcement_message.is_none());
984                 // ensure the update in one direction shows the latest fee
985                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
986                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
987         }
988
989         // clean up afterwards
990         clean_test_db().await;
991 }