Use separate tokio runtime for gossip persistence.
[rapid-gossip-sync-server] / src / tests / mod.rs
1 //! Multi-module tests that use database fixtures
2
3 use std::cell::RefCell;
4 use std::sync::Arc;
5 use std::{fs, thread};
6 use std::time::{SystemTime, UNIX_EPOCH};
7 use bitcoin::blockdata::constants::ChainHash;
8 use bitcoin::Network;
9 use bitcoin::secp256k1::ecdsa::Signature;
10 use bitcoin::secp256k1::{Secp256k1, SecretKey};
11 use bitcoin::hashes::Hash;
12 use bitcoin::hashes::hex::ToHex;
13 use bitcoin::hashes::sha256d::Hash as Sha256dHash;
14 use lightning::ln::features::ChannelFeatures;
15 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate};
16 use lightning::routing::gossip::{NetworkGraph, NodeId};
17 use lightning::util::ser::Writeable;
18 use lightning_rapid_gossip_sync::RapidGossipSync;
19 use crate::{config, serialize_delta};
20 use crate::persistence::GossipPersister;
21 use crate::snapshot::Snapshotter;
22 use crate::types::{GossipMessage, tests::TestLogger};
23
24 const CLIENT_BACKDATE_INTERVAL: u32 = 3600 * 24 * 7; // client backdates RGS by a week
25
26 thread_local! {
27         static DB_TEST_SCHEMA: RefCell<Option<String>> = RefCell::new(None);
28         static IS_TEST_SCHEMA_CLEAN: RefCell<Option<bool>> = RefCell::new(None);
29 }
30
31 fn blank_signature() -> Signature {
32         Signature::from_compact(&[0u8; 64]).unwrap()
33 }
34
35 fn genesis_hash() -> ChainHash {
36         ChainHash::using_genesis_block(Network::Bitcoin)
37 }
38
39 fn current_time() -> u32 {
40         SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs() as u32
41 }
42
43 pub(crate) fn db_test_schema() -> String {
44         DB_TEST_SCHEMA.with(|suffix_reference| {
45                 let suffix_option = suffix_reference.borrow();
46                 suffix_option.as_ref().unwrap().clone()
47         })
48 }
49
50 fn generate_announcement(short_channel_id: u64) -> ChannelAnnouncement {
51         let secp_context = Secp256k1::new();
52
53         let random_private_key_1 = SecretKey::from_slice(&[1; 32]).unwrap();
54         let random_public_key_1 = random_private_key_1.public_key(&secp_context);
55         let node_id_1 = NodeId::from_pubkey(&random_public_key_1);
56
57         let random_private_key_2 = SecretKey::from_slice(&[2; 32]).unwrap();
58         let random_public_key_2 = random_private_key_2.public_key(&secp_context);
59         let node_id_2 = NodeId::from_pubkey(&random_public_key_2);
60
61         let announcement = UnsignedChannelAnnouncement {
62                 features: ChannelFeatures::empty(),
63                 chain_hash: genesis_hash(),
64                 short_channel_id,
65                 node_id_1,
66                 node_id_2,
67                 bitcoin_key_1: node_id_1,
68                 bitcoin_key_2: node_id_2,
69                 excess_data: vec![],
70         };
71
72         let msg_hash = bitcoin::secp256k1::Message::from_slice(&Sha256dHash::hash(&announcement.encode()[..])[..]).unwrap();
73         let node_signature_1 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_1);
74         let node_signature_2 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_2);
75
76         ChannelAnnouncement {
77                 node_signature_1,
78                 node_signature_2,
79                 bitcoin_signature_1: node_signature_1,
80                 bitcoin_signature_2: node_signature_2,
81                 contents: announcement,
82         }
83 }
84
85 fn generate_update(scid: u64, direction: bool, timestamp: u32, expiry_delta: u16, min_msat: u64, max_msat: u64, base_msat: u32, fee_rate: u32) -> ChannelUpdate {
86         let flag_mask = if direction { 1 } else { 0 };
87         ChannelUpdate {
88                 signature: blank_signature(),
89                 contents: UnsignedChannelUpdate {
90                         chain_hash: genesis_hash(),
91                         short_channel_id: scid,
92                         timestamp,
93                         flags: 0 | flag_mask,
94                         cltv_expiry_delta: expiry_delta,
95                         htlc_minimum_msat: min_msat,
96                         htlc_maximum_msat: max_msat,
97                         fee_base_msat: base_msat,
98                         fee_proportional_millionths: fee_rate,
99                         excess_data: vec![],
100                 },
101         }
102 }
103
104 struct SchemaSanitizer {}
105
106 impl SchemaSanitizer {
107         fn new() -> Self {
108                 IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
109                         let mut is_clean_option = cleanliness_reference.borrow_mut();
110                         assert!(is_clean_option.is_none());
111                         *is_clean_option = Some(false);
112                 });
113
114                 DB_TEST_SCHEMA.with(|suffix_reference| {
115                         let mut suffix_option = suffix_reference.borrow_mut();
116                         let current_time = SystemTime::now();
117                         let unix_time = current_time.duration_since(UNIX_EPOCH).expect("Time went backwards");
118                         let timestamp_seconds = unix_time.as_secs();
119                         let timestamp_nanos = unix_time.as_nanos();
120                         // sometimes Rust thinks two tests start at the same nanosecond, causing a schema conflict
121                         let thread_id = thread::current().id();
122                         let preimage = format!("{:?}-{}", thread_id, timestamp_nanos);
123                         println!("test schema preimage: {}", preimage);
124                         let suffix = Sha256dHash::hash(preimage.as_bytes()).into_inner().to_hex();
125                         // the schema must start with a letter
126                         let schema = format!("test_{}_{}", timestamp_seconds, suffix);
127                         *suffix_option = Some(schema);
128                 });
129
130                 return Self {};
131         }
132 }
133
134 impl Drop for SchemaSanitizer {
135         fn drop(&mut self) {
136                 IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
137                         let is_clean_option = cleanliness_reference.borrow();
138                         if let Some(is_clean) = *is_clean_option {
139                                 assert_eq!(is_clean, true);
140                         }
141                 });
142         }
143 }
144
145 struct CacheSanitizer {}
146
147 impl CacheSanitizer {
148         /// The CacheSanitizer instantiation requires that there be a schema sanitizer
149         fn new(_: &SchemaSanitizer) -> Self {
150                 Self {}
151         }
152
153         fn cache_path(&self) -> String {
154                 format!("./res/{}/", db_test_schema())
155         }
156 }
157
158 impl Drop for CacheSanitizer {
159         fn drop(&mut self) {
160                 let cache_path = self.cache_path();
161                 fs::remove_dir_all(cache_path).unwrap();
162         }
163 }
164
165
166 async fn clean_test_db() {
167         let client = crate::connect_to_db().await;
168         let schema = db_test_schema();
169         client.execute(&format!("DROP SCHEMA IF EXISTS {} CASCADE", schema), &[]).await.unwrap();
170         IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
171                 let mut is_clean_option = cleanliness_reference.borrow_mut();
172                 *is_clean_option = Some(true);
173         });
174 }
175
176 #[tokio::test]
177 async fn test_persistence_runtime() {
178         let _sanitizer = SchemaSanitizer::new();
179         let logger = Arc::new(TestLogger::new());
180         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
181         let network_graph_arc = Arc::new(network_graph);
182         let (_persister, _receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
183
184         tokio::task::spawn_blocking(move || {
185                 drop(_persister);
186         }).await.unwrap();
187
188         clean_test_db().await;
189 }
190
191
192 #[tokio::test]
193 async fn test_trivial_setup() {
194         let _sanitizer = SchemaSanitizer::new();
195         let logger = Arc::new(TestLogger::new());
196         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
197         let network_graph_arc = Arc::new(network_graph);
198         let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
199
200         let short_channel_id = 1;
201         let timestamp = current_time() - 10;
202         println!("timestamp: {}", timestamp);
203
204         { // seed the db
205                 let announcement = generate_announcement(short_channel_id);
206                 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 5, 0);
207                 let update_2 = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 10, 0);
208
209                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
210                 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
211                 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
212
213                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
214                 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
215                 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
216                 drop(receiver);
217                 persister.persist_gossip().await;
218         }
219
220         let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
221         logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
222         clean_test_db().await;
223
224         let channel_count = network_graph_arc.read_only().channels().len();
225
226         assert_eq!(channel_count, 1);
227         assert_eq!(serialization.message_count, 3);
228         assert_eq!(serialization.announcement_count, 1);
229         assert_eq!(serialization.update_count, 2);
230
231         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
232         let client_graph_arc = Arc::new(client_graph);
233         let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
234         let update_result = rgs.update_network_graph(&serialization.data).unwrap();
235         println!("update result: {}", update_result);
236         // the update result must be a multiple of our snapshot granularity
237         assert_eq!(update_result % config::snapshot_generation_interval(), 0);
238         assert!(update_result < timestamp);
239
240         let timestamp_delta = timestamp - update_result;
241         println!("timestamp delta: {}", timestamp_delta);
242         assert!(timestamp_delta < config::snapshot_generation_interval());
243
244         let readonly_graph = client_graph_arc.read_only();
245         let channels = readonly_graph.channels();
246         let client_channel_count = channels.len();
247         assert_eq!(client_channel_count, 1);
248
249         let first_channel = channels.get(&short_channel_id).unwrap();
250         assert!(&first_channel.announcement_message.is_none());
251         assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.base_msat, 5);
252         assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.base_msat, 10);
253         let last_update_seen_a = first_channel.one_to_two.as_ref().unwrap().last_update;
254         let last_update_seen_b = first_channel.two_to_one.as_ref().unwrap().last_update;
255         println!("last update a: {}", last_update_seen_a);
256         println!("last update b: {}", last_update_seen_b);
257         assert_eq!(last_update_seen_a, update_result - CLIENT_BACKDATE_INTERVAL);
258         assert_eq!(last_update_seen_b, update_result - CLIENT_BACKDATE_INTERVAL);
259
260         tokio::task::spawn_blocking(move || {
261                 drop(persister);
262         }).await.unwrap();
263 }
264
265 /// If a channel has only seen updates in one direction, it should not be announced
266 #[tokio::test]
267 async fn test_unidirectional_intermediate_update_consideration() {
268         let _sanitizer = SchemaSanitizer::new();
269
270         let logger = Arc::new(TestLogger::new());
271         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
272         let network_graph_arc = Arc::new(network_graph);
273         let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
274
275         let short_channel_id = 1;
276         let timestamp = current_time() - 10;
277         println!("timestamp: {}", timestamp);
278
279         { // seed the db
280                 let announcement = generate_announcement(short_channel_id);
281                 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 6, 0);
282                 let update_2 = generate_update(short_channel_id, true, timestamp + 1, 0, 0, 0, 3, 0);
283                 let update_3 = generate_update(short_channel_id, true, timestamp + 2, 0, 0, 0, 4, 0);
284
285                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
286                 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
287                 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
288                 network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap();
289
290                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
291                 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
292                 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
293                 receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap();
294                 drop(receiver);
295                 persister.persist_gossip().await;
296         }
297
298         let channel_count = network_graph_arc.read_only().channels().len();
299         assert_eq!(channel_count, 1);
300
301         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
302         let client_graph_arc = Arc::new(client_graph);
303         let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
304
305         let serialization = serialize_delta(network_graph_arc.clone(), timestamp + 1, logger.clone()).await;
306
307         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 1 update rows of the first update in a new direction", 1);
308         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 1 reference rows", 1);
309         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
310
311         assert_eq!(serialization.message_count, 3);
312         assert_eq!(serialization.announcement_count, 1);
313         assert_eq!(serialization.update_count, 2);
314         assert_eq!(serialization.update_count_full, 2);
315         assert_eq!(serialization.update_count_incremental, 0);
316
317         let update_result = rgs.update_network_graph(&serialization.data).unwrap();
318         println!("update result: {}", update_result);
319         // the update result must be a multiple of our snapshot granularity
320
321         let readonly_graph = client_graph_arc.read_only();
322         let channels = readonly_graph.channels();
323         let client_channel_count = channels.len();
324         assert_eq!(client_channel_count, 1);
325
326         tokio::task::spawn_blocking(move || {
327                 drop(persister);
328         }).await.unwrap();
329
330         clean_test_db().await;
331 }
332
333 /// If a channel has only seen updates in one direction, it should not be announced
334 #[tokio::test]
335 async fn test_bidirectional_intermediate_update_consideration() {
336         let _sanitizer = SchemaSanitizer::new();
337
338         let logger = Arc::new(TestLogger::new());
339         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
340         let network_graph_arc = Arc::new(network_graph);
341         let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
342
343         let short_channel_id = 1;
344         let timestamp = current_time() - 10;
345         println!("timestamp: {}", timestamp);
346
347         { // seed the db
348                 let announcement = generate_announcement(short_channel_id);
349                 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 5, 0);
350                 let update_2 = generate_update(short_channel_id, false, timestamp + 1, 0, 0, 0, 4, 0);
351                 let update_3 = generate_update(short_channel_id, false, timestamp + 2, 0, 0, 0, 3, 0);
352                 let update_4 = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 3, 0);
353
354                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
355                 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
356                 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
357                 network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap();
358                 network_graph_arc.update_channel_unsigned(&update_4.contents).unwrap();
359
360                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
361                 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
362                 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
363                 receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap();
364                 receiver.send(GossipMessage::ChannelUpdate(update_4, None)).await.unwrap();
365                 drop(receiver);
366                 persister.persist_gossip().await;
367         }
368
369         let channel_count = network_graph_arc.read_only().channels().len();
370         assert_eq!(channel_count, 1);
371
372         let serialization = serialize_delta(network_graph_arc.clone(), timestamp + 1, logger.clone()).await;
373
374         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 0 update rows of the first update in a new direction", 1);
375         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 2 reference rows", 1);
376         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
377
378         assert_eq!(serialization.message_count, 1);
379         assert_eq!(serialization.announcement_count, 0);
380         assert_eq!(serialization.update_count, 1);
381         assert_eq!(serialization.update_count_full, 0);
382         assert_eq!(serialization.update_count_incremental, 1);
383
384         tokio::task::spawn_blocking(move || {
385                 drop(persister);
386         }).await.unwrap();
387
388         clean_test_db().await;
389 }
390
391 #[tokio::test]
392 async fn test_full_snapshot_recency() {
393         let _sanitizer = SchemaSanitizer::new();
394         let logger = Arc::new(TestLogger::new());
395         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
396         let network_graph_arc = Arc::new(network_graph);
397
398         let short_channel_id = 1;
399         let timestamp = current_time();
400         println!("timestamp: {}", timestamp);
401
402         { // seed the db
403                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
404                 let announcement = generate_announcement(short_channel_id);
405                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
406                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
407
408                 { // direction false
409                         { // first update
410                                 let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
411                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
412                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
413                         }
414                         { // second update
415                                 let update = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
416                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
417                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
418                         }
419                 }
420                 { // direction true
421                         { // first and only update
422                                 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
423                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
424                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
425                         }
426                 }
427
428                 drop(receiver);
429                 persister.persist_gossip().await;
430
431                 tokio::task::spawn_blocking(move || {
432                         drop(persister);
433                 }).await.unwrap();
434         }
435
436         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
437         let client_graph_arc = Arc::new(client_graph);
438
439         { // sync after initial seed
440                 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
441                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
442
443                 let channel_count = network_graph_arc.read_only().channels().len();
444
445                 assert_eq!(channel_count, 1);
446                 assert_eq!(serialization.message_count, 3);
447                 assert_eq!(serialization.announcement_count, 1);
448                 assert_eq!(serialization.update_count, 2);
449
450                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
451                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
452                 // the update result must be a multiple of our snapshot granularity
453                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
454                 assert!(update_result < timestamp);
455
456                 let readonly_graph = client_graph_arc.read_only();
457                 let channels = readonly_graph.channels();
458                 let client_channel_count = channels.len();
459                 assert_eq!(client_channel_count, 1);
460
461                 let first_channel = channels.get(&short_channel_id).unwrap();
462                 assert!(&first_channel.announcement_message.is_none());
463                 // ensure the update in one direction shows the latest fee
464                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
465                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
466         }
467
468         clean_test_db().await;
469 }
470
471 #[tokio::test]
472 async fn test_full_snapshot_recency_with_wrong_seen_order() {
473         let _sanitizer = SchemaSanitizer::new();
474         let logger = Arc::new(TestLogger::new());
475         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
476         let network_graph_arc = Arc::new(network_graph);
477
478         let short_channel_id = 1;
479         let timestamp = current_time();
480         println!("timestamp: {}", timestamp);
481
482         { // seed the db
483                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
484                 let announcement = generate_announcement(short_channel_id);
485                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
486                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
487
488                 { // direction false
489                         { // first update, seen latest
490                                 let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
491                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
492                                 receiver.send(GossipMessage::ChannelUpdate(update, Some(timestamp))).await.unwrap();
493                         }
494                         { // second update, seen first
495                                 let update = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
496                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
497                                 receiver.send(GossipMessage::ChannelUpdate(update, Some(timestamp - 1))).await.unwrap();
498                         }
499                 }
500                 { // direction true
501                         { // first and only update
502                                 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
503                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
504                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
505                         }
506                 }
507
508                 drop(receiver);
509                 persister.persist_gossip().await;
510
511                 tokio::task::spawn_blocking(move || {
512                         drop(persister);
513                 }).await.unwrap();
514         }
515
516         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
517         let client_graph_arc = Arc::new(client_graph);
518
519         { // sync after initial seed
520                 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
521                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
522
523                 let channel_count = network_graph_arc.read_only().channels().len();
524
525                 assert_eq!(channel_count, 1);
526                 assert_eq!(serialization.message_count, 3);
527                 assert_eq!(serialization.announcement_count, 1);
528                 assert_eq!(serialization.update_count, 2);
529
530                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
531                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
532                 // the update result must be a multiple of our snapshot granularity
533                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
534                 assert!(update_result < timestamp);
535
536                 let readonly_graph = client_graph_arc.read_only();
537                 let channels = readonly_graph.channels();
538                 let client_channel_count = channels.len();
539                 assert_eq!(client_channel_count, 1);
540
541                 let first_channel = channels.get(&short_channel_id).unwrap();
542                 assert!(&first_channel.announcement_message.is_none());
543                 // ensure the update in one direction shows the latest fee
544                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
545                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
546         }
547
548         clean_test_db().await;
549 }
550
551 #[tokio::test]
552 async fn test_full_snapshot_recency_with_wrong_propagation_order() {
553         let _sanitizer = SchemaSanitizer::new();
554         let logger = Arc::new(TestLogger::new());
555         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
556         let network_graph_arc = Arc::new(network_graph);
557
558         let short_channel_id = 1;
559         let timestamp = current_time();
560         println!("timestamp: {}", timestamp);
561
562         { // seed the db
563                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
564                 let announcement = generate_announcement(short_channel_id);
565                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
566                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
567
568                 { // direction false
569                         // apply updates in their timestamp order
570                         let update_1 = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
571                         let update_2 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
572                         network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
573                         network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
574
575                         // propagate updates in their seen order
576                         receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - 1))).await.unwrap();
577                         receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp))).await.unwrap();
578                 }
579                 { // direction true
580                         { // first and only update
581                                 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
582                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
583                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
584                         }
585                 }
586
587                 drop(receiver);
588                 persister.persist_gossip().await;
589
590                 tokio::task::spawn_blocking(move || {
591                         drop(persister);
592                 }).await.unwrap();
593         }
594
595         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
596         let client_graph_arc = Arc::new(client_graph);
597
598         { // sync after initial seed
599                 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
600                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
601
602                 let channel_count = network_graph_arc.read_only().channels().len();
603
604                 assert_eq!(channel_count, 1);
605                 assert_eq!(serialization.message_count, 3);
606                 assert_eq!(serialization.announcement_count, 1);
607                 assert_eq!(serialization.update_count, 2);
608
609                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
610                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
611                 // the update result must be a multiple of our snapshot granularity
612                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
613                 assert!(update_result < timestamp);
614
615                 let readonly_graph = client_graph_arc.read_only();
616                 let channels = readonly_graph.channels();
617                 let client_channel_count = channels.len();
618                 assert_eq!(client_channel_count, 1);
619
620                 let first_channel = channels.get(&short_channel_id).unwrap();
621                 assert!(&first_channel.announcement_message.is_none());
622                 // ensure the update in one direction shows the latest fee
623                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
624                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
625         }
626
627         clean_test_db().await;
628 }
629
630 #[tokio::test]
631 async fn test_full_snapshot_mutiny_scenario() {
632         let _sanitizer = SchemaSanitizer::new();
633         let logger = Arc::new(TestLogger::new());
634         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
635         let network_graph_arc = Arc::new(network_graph);
636
637         let short_channel_id = 873706024403271681;
638         let timestamp = current_time();
639         // let oldest_simulation_timestamp = 1693300588;
640         let latest_simulation_timestamp = 1695909301;
641         let timestamp_offset = timestamp - latest_simulation_timestamp;
642         println!("timestamp: {}", timestamp);
643
644         { // seed the db
645                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
646                 let announcement = generate_announcement(short_channel_id);
647                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
648                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
649
650                 { // direction false
651                         {
652                                 let update = generate_update(short_channel_id, false, 1693507369 + timestamp_offset, 0, 0, 0, 0, 38);
653                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
654                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
655                         }
656                         {
657                                 let update = generate_update(short_channel_id, false, 1693680390 + timestamp_offset, 0, 0, 0, 0, 38);
658                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
659                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
660                         }
661                         {
662                                 let update = generate_update(short_channel_id, false, 1693749109 + timestamp_offset, 0, 0, 0, 0, 200);
663                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
664                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
665                         }
666                         {
667                                 let update = generate_update(short_channel_id, false, 1693925190 + timestamp_offset, 0, 0, 0, 0, 200);
668                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
669                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
670                         }
671                         {
672                                 let update = generate_update(short_channel_id, false, 1694008323 + timestamp_offset, 0, 0, 0, 0, 209);
673                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
674                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
675                         }
676                         {
677                                 let update = generate_update(short_channel_id, false, 1694219924 + timestamp_offset, 0, 0, 0, 0, 209);
678                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
679                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
680                         }
681                         {
682                                 let update = generate_update(short_channel_id, false, 1694267536 + timestamp_offset, 0, 0, 0, 0, 210);
683                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
684                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
685                         }
686                         {
687                                 let update = generate_update(short_channel_id, false, 1694458808 + timestamp_offset, 0, 0, 0, 0, 210);
688                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
689                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
690                         }
691                         {
692                                 let update = generate_update(short_channel_id, false, 1694526734 + timestamp_offset, 0, 0, 0, 0, 200);
693                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
694                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
695                         }
696                         {
697                                 let update = generate_update(short_channel_id, false, 1694794765 + timestamp_offset, 0, 0, 0, 0, 200);
698                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
699                                 receiver.send(GossipMessage::ChannelUpdate(update, Some(1695909301 + 2 * config::SYMLINK_GRANULARITY_INTERVAL + timestamp_offset))).await.unwrap();
700                         }
701                         {
702                                 let update = generate_update(short_channel_id, false, 1695909301 + timestamp_offset, 0, 0, 0, 0, 130);
703                                 // network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
704                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
705                         }
706                 }
707                 { // direction true
708                         {
709                                 let update = generate_update(short_channel_id, true, 1693300588 + timestamp_offset, 0, 0, 0, 0, 10);
710                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
711                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
712                         }
713                         {
714                                 let update = generate_update(short_channel_id, true, 1695003621 + timestamp_offset, 0, 0, 0, 0, 10);
715                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
716                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
717                         }
718                 }
719
720                 drop(receiver);
721                 persister.persist_gossip().await;
722
723                 tokio::task::spawn_blocking(move || {
724                         drop(persister);
725                 }).await.unwrap();
726         }
727
728         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
729         let client_graph_arc = Arc::new(client_graph);
730
731         { // sync after initial seed
732                 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
733                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
734
735                 let channel_count = network_graph_arc.read_only().channels().len();
736
737                 assert_eq!(channel_count, 1);
738                 assert_eq!(serialization.message_count, 3);
739                 assert_eq!(serialization.announcement_count, 1);
740                 assert_eq!(serialization.update_count, 2);
741
742                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
743                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
744                 println!("update result: {}", update_result);
745                 // the update result must be a multiple of our snapshot granularity
746                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
747                 assert!(update_result < timestamp);
748
749                 let timestamp_delta = timestamp - update_result;
750                 println!("timestamp delta: {}", timestamp_delta);
751                 assert!(timestamp_delta < config::snapshot_generation_interval());
752
753                 let readonly_graph = client_graph_arc.read_only();
754                 let channels = readonly_graph.channels();
755                 let client_channel_count = channels.len();
756                 assert_eq!(client_channel_count, 1);
757
758                 let first_channel = channels.get(&short_channel_id).unwrap();
759                 assert!(&first_channel.announcement_message.is_none());
760                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 130);
761                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
762         }
763
764         clean_test_db().await;
765 }
766
767 #[tokio::test]
768 async fn test_full_snapshot_interlaced_channel_timestamps() {
769         let _sanitizer = SchemaSanitizer::new();
770         let logger = Arc::new(TestLogger::new());
771         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
772         let network_graph_arc = Arc::new(network_graph);
773
774         let main_channel_id = 1;
775         let timestamp = current_time();
776         println!("timestamp: {}", timestamp);
777
778         { // seed the db
779                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
780                 let secondary_channel_id = main_channel_id + 1;
781
782                 { // main channel
783                         let announcement = generate_announcement(main_channel_id);
784                         network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
785                         receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
786                 }
787
788                 { // secondary channel
789                         let announcement = generate_announcement(secondary_channel_id);
790                         network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
791                         receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
792                 }
793
794                 { // main channel
795                         { // direction false
796                                 let update = generate_update(main_channel_id, false, timestamp - 2, 0, 0, 0, 0, 10);
797                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
798                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
799                         }
800                         { // direction true
801                                 let update = generate_update(main_channel_id, true, timestamp - 2, 0, 0, 0, 0, 5);
802                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
803                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
804                         }
805                 }
806
807                 { // in-between channel
808                         { // direction false
809                                 let update = generate_update(secondary_channel_id, false, timestamp - 1, 0, 0, 0, 0, 42);
810                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
811                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
812                         }
813                         { // direction true
814                                 let update = generate_update(secondary_channel_id, true, timestamp - 1, 0, 0, 0, 0, 42);
815                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
816                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
817                         }
818                 }
819
820                 { // main channel
821                         { // direction false
822                                 let update = generate_update(main_channel_id, false, timestamp, 0, 0, 0, 0, 11);
823                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
824                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
825                         }
826                         { // direction true
827                                 let update = generate_update(main_channel_id, true, timestamp, 0, 0, 0, 0, 6);
828                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
829                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
830                         }
831                 }
832
833                 drop(receiver);
834                 persister.persist_gossip().await;
835
836                 tokio::task::spawn_blocking(move || {
837                         drop(persister);
838                 }).await.unwrap();
839         }
840
841         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
842         let client_graph_arc = Arc::new(client_graph);
843
844         { // sync after initial seed
845                 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
846                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 2", 1);
847
848                 let channel_count = network_graph_arc.read_only().channels().len();
849
850                 assert_eq!(channel_count, 2);
851                 assert_eq!(serialization.message_count, 6);
852                 assert_eq!(serialization.announcement_count, 2);
853                 assert_eq!(serialization.update_count, 4);
854
855                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
856                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
857                 // the update result must be a multiple of our snapshot granularity
858                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
859                 assert!(update_result < timestamp);
860
861                 let readonly_graph = client_graph_arc.read_only();
862                 let channels = readonly_graph.channels();
863                 let client_channel_count = channels.len();
864                 assert_eq!(client_channel_count, 2);
865
866                 let first_channel = channels.get(&main_channel_id).unwrap();
867                 assert!(&first_channel.announcement_message.is_none());
868                 // ensure the update in one direction shows the latest fee
869                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 11);
870                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 6);
871         }
872
873         clean_test_db().await;
874 }
875
876 #[tokio::test]
877 async fn test_full_snapshot_persistence() {
878         let schema_sanitizer = SchemaSanitizer::new();
879         let logger = Arc::new(TestLogger::new());
880         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
881         let network_graph_arc = Arc::new(network_graph);
882         let snapshotter = Snapshotter::new(network_graph_arc.clone(), logger.clone());
883         let cache_sanitizer = CacheSanitizer::new(&schema_sanitizer);
884
885         let short_channel_id = 1;
886         let timestamp = current_time();
887         println!("timestamp: {}", timestamp);
888
889         { // seed the db
890                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
891                 let announcement = generate_announcement(short_channel_id);
892                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
893                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
894
895                 { // direction true
896                         let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
897                         network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
898                         receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
899                 }
900
901                 { // direction false
902                         let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
903                         network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
904                         receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
905                 }
906
907
908                 drop(receiver);
909                 persister.persist_gossip().await;
910
911                 tokio::task::spawn_blocking(move || {
912                         drop(persister);
913                 }).await.unwrap();
914         }
915
916         let cache_path = cache_sanitizer.cache_path();
917         let symlink_path = format!("{}/symlinks/0.bin", cache_path);
918
919         // generate snapshots
920         {
921                 snapshotter.generate_snapshots(20, 5, &[5, u64::MAX], &cache_path, Some(10)).await;
922
923                 let symlinked_data = fs::read(&symlink_path).unwrap();
924                 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
925                 let client_graph_arc = Arc::new(client_graph);
926
927                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
928                 let update_result = rgs.update_network_graph(&symlinked_data).unwrap();
929                 // the update result must be a multiple of our snapshot granularity
930                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
931
932                 let readonly_graph = client_graph_arc.read_only();
933                 let channels = readonly_graph.channels();
934                 let client_channel_count = channels.len();
935                 assert_eq!(client_channel_count, 1);
936
937                 let first_channel = channels.get(&short_channel_id).unwrap();
938                 assert!(&first_channel.announcement_message.is_none());
939                 // ensure the update in one direction shows the latest fee
940                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 38);
941                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
942         }
943
944         { // update the db
945                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
946
947                 { // second update
948                         let update = generate_update(short_channel_id, false, timestamp + 30, 0, 0, 0, 0, 39);
949                         network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
950                         receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
951                 }
952
953                 drop(receiver);
954                 persister.persist_gossip().await;
955
956                 tokio::task::spawn_blocking(move || {
957                         drop(persister);
958                 }).await.unwrap();
959         }
960
961         // regenerate snapshots
962         {
963                 snapshotter.generate_snapshots(20, 5, &[5, u64::MAX], &cache_path, Some(10)).await;
964
965                 let symlinked_data = fs::read(&symlink_path).unwrap();
966                 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
967                 let client_graph_arc = Arc::new(client_graph);
968
969                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
970                 let update_result = rgs.update_network_graph(&symlinked_data).unwrap();
971                 // the update result must be a multiple of our snapshot granularity
972                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
973
974                 let readonly_graph = client_graph_arc.read_only();
975                 let channels = readonly_graph.channels();
976                 let client_channel_count = channels.len();
977                 assert_eq!(client_channel_count, 1);
978
979                 let first_channel = channels.get(&short_channel_id).unwrap();
980                 assert!(&first_channel.announcement_message.is_none());
981                 // ensure the update in one direction shows the latest fee
982                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
983                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
984         }
985
986         // clean up afterwards
987         clean_test_db().await;
988 }