Test announcement inclusion logic.
[rapid-gossip-sync-server] / src / tests / mod.rs
1 //! Multi-module tests that use database fixtures
2
3 use std::cell::RefCell;
4 use std::sync::Arc;
5 use std::{fs, thread};
6 use std::time::{SystemTime, UNIX_EPOCH};
7 use bitcoin::{BlockHash, Network};
8 use bitcoin::secp256k1::ecdsa::Signature;
9 use bitcoin::secp256k1::{Secp256k1, SecretKey};
10 use bitcoin::hashes::Hash;
11 use bitcoin::hashes::hex::ToHex;
12 use bitcoin::hashes::sha256d::Hash as Sha256dHash;
13 use lightning::ln::features::ChannelFeatures;
14 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate};
15 use lightning::routing::gossip::{NetworkGraph, NodeId};
16 use lightning::util::ser::Writeable;
17 use lightning_rapid_gossip_sync::RapidGossipSync;
18 use crate::{config, serialize_delta};
19 use crate::persistence::GossipPersister;
20 use crate::snapshot::Snapshotter;
21 use crate::types::{GossipMessage, tests::TestLogger};
22
23 const CLIENT_BACKDATE_INTERVAL: u32 = 3600 * 24 * 7; // client backdates RGS by a week
24
25 thread_local! {
26         static DB_TEST_SCHEMA: RefCell<Option<String>> = RefCell::new(None);
27         static IS_TEST_SCHEMA_CLEAN: RefCell<Option<bool>> = RefCell::new(None);
28 }
29
30 fn blank_signature() -> Signature {
31         Signature::from_compact(&[0u8; 64]).unwrap()
32 }
33
34 fn genesis_hash() -> BlockHash {
35         bitcoin::blockdata::constants::genesis_block(Network::Bitcoin).block_hash()
36 }
37
38 fn current_time() -> u32 {
39         SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs() as u32
40 }
41
42 pub(crate) fn db_test_schema() -> String {
43         DB_TEST_SCHEMA.with(|suffix_reference| {
44                 let suffix_option = suffix_reference.borrow();
45                 suffix_option.as_ref().unwrap().clone()
46         })
47 }
48
49 fn generate_announcement(short_channel_id: u64) -> ChannelAnnouncement {
50         let secp_context = Secp256k1::new();
51
52         let random_private_key_1 = SecretKey::from_slice(&[1; 32]).unwrap();
53         let random_public_key_1 = random_private_key_1.public_key(&secp_context);
54         let node_id_1 = NodeId::from_pubkey(&random_public_key_1);
55
56         let random_private_key_2 = SecretKey::from_slice(&[2; 32]).unwrap();
57         let random_public_key_2 = random_private_key_2.public_key(&secp_context);
58         let node_id_2 = NodeId::from_pubkey(&random_public_key_2);
59
60         let announcement = UnsignedChannelAnnouncement {
61                 features: ChannelFeatures::empty(),
62                 chain_hash: genesis_hash(),
63                 short_channel_id,
64                 node_id_1,
65                 node_id_2,
66                 bitcoin_key_1: node_id_1,
67                 bitcoin_key_2: node_id_2,
68                 excess_data: vec![],
69         };
70
71         let msg_hash = bitcoin::secp256k1::Message::from_slice(&Sha256dHash::hash(&announcement.encode()[..])[..]).unwrap();
72         let node_signature_1 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_1);
73         let node_signature_2 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_2);
74
75         ChannelAnnouncement {
76                 node_signature_1,
77                 node_signature_2,
78                 bitcoin_signature_1: node_signature_1,
79                 bitcoin_signature_2: node_signature_2,
80                 contents: announcement,
81         }
82 }
83
84 fn generate_update(scid: u64, direction: bool, timestamp: u32, expiry_delta: u16, min_msat: u64, max_msat: u64, base_msat: u32, fee_rate: u32) -> ChannelUpdate {
85         let flag_mask = if direction { 1 } else { 0 };
86         ChannelUpdate {
87                 signature: blank_signature(),
88                 contents: UnsignedChannelUpdate {
89                         chain_hash: genesis_hash(),
90                         short_channel_id: scid,
91                         timestamp,
92                         flags: 0 | flag_mask,
93                         cltv_expiry_delta: expiry_delta,
94                         htlc_minimum_msat: min_msat,
95                         htlc_maximum_msat: max_msat,
96                         fee_base_msat: base_msat,
97                         fee_proportional_millionths: fee_rate,
98                         excess_data: vec![],
99                 },
100         }
101 }
102
103 struct SchemaSanitizer {}
104
105 impl SchemaSanitizer {
106         fn new() -> Self {
107                 IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
108                         let mut is_clean_option = cleanliness_reference.borrow_mut();
109                         assert!(is_clean_option.is_none());
110                         *is_clean_option = Some(false);
111                 });
112
113                 DB_TEST_SCHEMA.with(|suffix_reference| {
114                         let mut suffix_option = suffix_reference.borrow_mut();
115                         let current_time = SystemTime::now();
116                         let unix_time = current_time.duration_since(UNIX_EPOCH).expect("Time went backwards");
117                         let timestamp_seconds = unix_time.as_secs();
118                         let timestamp_nanos = unix_time.as_nanos();
119                         // sometimes Rust thinks two tests start at the same nanosecond, causing a schema conflict
120                         let thread_id = thread::current().id();
121                         let preimage = format!("{:?}-{}", thread_id, timestamp_nanos);
122                         println!("test schema preimage: {}", preimage);
123                         let suffix = Sha256dHash::hash(preimage.as_bytes()).into_inner().to_hex();
124                         // the schema must start with a letter
125                         let schema = format!("test_{}_{}", timestamp_seconds, suffix);
126                         *suffix_option = Some(schema);
127                 });
128
129                 return Self {};
130         }
131 }
132
133 impl Drop for SchemaSanitizer {
134         fn drop(&mut self) {
135                 IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
136                         let is_clean_option = cleanliness_reference.borrow();
137                         if let Some(is_clean) = *is_clean_option {
138                                 assert_eq!(is_clean, true);
139                         }
140                 });
141         }
142 }
143
144 struct CacheSanitizer {}
145
146 impl CacheSanitizer {
147         /// The CacheSanitizer instantiation requires that there be a schema sanitizer
148         fn new(_: &SchemaSanitizer) -> Self {
149                 Self {}
150         }
151
152         fn cache_path(&self) -> String {
153                 format!("./res/{}/", db_test_schema())
154         }
155 }
156
157 impl Drop for CacheSanitizer {
158         fn drop(&mut self) {
159                 let cache_path = self.cache_path();
160                 fs::remove_dir_all(cache_path).unwrap();
161         }
162 }
163
164
165 async fn clean_test_db() {
166         let client = crate::connect_to_db().await;
167         let schema = db_test_schema();
168         client.execute(&format!("DROP SCHEMA IF EXISTS {} CASCADE", schema), &[]).await.unwrap();
169         IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
170                 let mut is_clean_option = cleanliness_reference.borrow_mut();
171                 *is_clean_option = Some(true);
172         });
173 }
174
175 #[tokio::test]
176 async fn test_trivial_setup() {
177         let _sanitizer = SchemaSanitizer::new();
178         let logger = Arc::new(TestLogger::new());
179         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
180         let network_graph_arc = Arc::new(network_graph);
181         let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
182
183         let short_channel_id = 1;
184         let timestamp = current_time() - 10;
185         println!("timestamp: {}", timestamp);
186
187         { // seed the db
188                 let announcement = generate_announcement(short_channel_id);
189                 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 5, 0);
190                 let update_2 = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 10, 0);
191
192                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
193                 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
194                 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
195
196                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
197                 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
198                 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
199                 drop(receiver);
200                 persister.persist_gossip().await;
201         }
202
203         let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
204         logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
205         clean_test_db().await;
206
207         let channel_count = network_graph_arc.read_only().channels().len();
208
209         assert_eq!(channel_count, 1);
210         assert_eq!(serialization.message_count, 3);
211         assert_eq!(serialization.announcement_count, 1);
212         assert_eq!(serialization.update_count, 2);
213
214         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
215         let client_graph_arc = Arc::new(client_graph);
216         let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
217         let update_result = rgs.update_network_graph(&serialization.data).unwrap();
218         println!("update result: {}", update_result);
219         // the update result must be a multiple of our snapshot granularity
220         assert_eq!(update_result % config::snapshot_generation_interval(), 0);
221         assert!(update_result < timestamp);
222
223         let timestamp_delta = timestamp - update_result;
224         println!("timestamp delta: {}", timestamp_delta);
225         assert!(timestamp_delta < config::snapshot_generation_interval());
226
227         let readonly_graph = client_graph_arc.read_only();
228         let channels = readonly_graph.channels();
229         let client_channel_count = channels.len();
230         assert_eq!(client_channel_count, 1);
231
232         let first_channel = channels.get(&short_channel_id).unwrap();
233         assert!(&first_channel.announcement_message.is_none());
234         assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.base_msat, 5);
235         assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.base_msat, 10);
236         let last_update_seen_a = first_channel.one_to_two.as_ref().unwrap().last_update;
237         let last_update_seen_b = first_channel.two_to_one.as_ref().unwrap().last_update;
238         println!("last update a: {}", last_update_seen_a);
239         println!("last update b: {}", last_update_seen_b);
240         assert_eq!(last_update_seen_a, update_result - CLIENT_BACKDATE_INTERVAL);
241         assert_eq!(last_update_seen_b, update_result - CLIENT_BACKDATE_INTERVAL);
242 }
243
244 /// If a channel has only seen updates in one direction, it should not be announced
245 #[tokio::test]
246 async fn test_unidirectional_intermediate_update_consideration() {
247         let _sanitizer = SchemaSanitizer::new();
248
249         let logger = Arc::new(TestLogger::new());
250         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
251         let network_graph_arc = Arc::new(network_graph);
252         let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
253
254         let short_channel_id = 1;
255         let timestamp = current_time() - 10;
256         println!("timestamp: {}", timestamp);
257
258         { // seed the db
259                 let announcement = generate_announcement(short_channel_id);
260                 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 6, 0);
261                 let update_2 = generate_update(short_channel_id, true, timestamp + 1, 0, 0, 0, 3, 0);
262                 let update_3 = generate_update(short_channel_id, true, timestamp + 2, 0, 0, 0, 4, 0);
263
264                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
265                 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
266                 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
267                 network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap();
268
269                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
270                 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
271                 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
272                 receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap();
273                 drop(receiver);
274                 persister.persist_gossip().await;
275         }
276
277         let channel_count = network_graph_arc.read_only().channels().len();
278         assert_eq!(channel_count, 1);
279
280         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
281         let client_graph_arc = Arc::new(client_graph);
282         let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
283
284         let serialization = serialize_delta(network_graph_arc.clone(), timestamp + 1, logger.clone()).await;
285
286         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 1 update rows of the first update in a new direction", 1);
287         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 1 reference rows", 1);
288         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
289
290         assert_eq!(serialization.message_count, 3);
291         assert_eq!(serialization.announcement_count, 1);
292         assert_eq!(serialization.update_count, 2);
293         assert_eq!(serialization.update_count_full, 2);
294         assert_eq!(serialization.update_count_incremental, 0);
295
296         let update_result = rgs.update_network_graph(&serialization.data).unwrap();
297         println!("update result: {}", update_result);
298         // the update result must be a multiple of our snapshot granularity
299
300         let readonly_graph = client_graph_arc.read_only();
301         let channels = readonly_graph.channels();
302         let client_channel_count = channels.len();
303         assert_eq!(client_channel_count, 1);
304
305         clean_test_db().await;
306 }
307
308 /// If a channel has only seen updates in one direction, it should not be announced
309 #[tokio::test]
310 async fn test_bidirectional_intermediate_update_consideration() {
311         let _sanitizer = SchemaSanitizer::new();
312
313         let logger = Arc::new(TestLogger::new());
314         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
315         let network_graph_arc = Arc::new(network_graph);
316         let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
317
318         let short_channel_id = 1;
319         let timestamp = current_time() - 10;
320         println!("timestamp: {}", timestamp);
321
322         { // seed the db
323                 let announcement = generate_announcement(short_channel_id);
324                 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 5, 0);
325                 let update_2 = generate_update(short_channel_id, false, timestamp + 1, 0, 0, 0, 4, 0);
326                 let update_3 = generate_update(short_channel_id, false, timestamp + 2, 0, 0, 0, 3, 0);
327                 let update_4 = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 3, 0);
328
329                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
330                 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
331                 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
332                 network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap();
333                 network_graph_arc.update_channel_unsigned(&update_4.contents).unwrap();
334
335                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
336                 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
337                 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
338                 receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap();
339                 receiver.send(GossipMessage::ChannelUpdate(update_4, None)).await.unwrap();
340                 drop(receiver);
341                 persister.persist_gossip().await;
342         }
343
344         let channel_count = network_graph_arc.read_only().channels().len();
345         assert_eq!(channel_count, 1);
346
347         let serialization = serialize_delta(network_graph_arc.clone(), timestamp + 1, logger.clone()).await;
348
349         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 0 update rows of the first update in a new direction", 1);
350         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 2 reference rows", 1);
351         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
352
353         assert_eq!(serialization.message_count, 1);
354         assert_eq!(serialization.announcement_count, 0);
355         assert_eq!(serialization.update_count, 1);
356         assert_eq!(serialization.update_count_full, 0);
357         assert_eq!(serialization.update_count_incremental, 1);
358
359         clean_test_db().await;
360 }
361
362 #[tokio::test]
363 async fn test_full_snapshot_recency() {
364         let _sanitizer = SchemaSanitizer::new();
365         let logger = Arc::new(TestLogger::new());
366         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
367         let network_graph_arc = Arc::new(network_graph);
368
369         let short_channel_id = 1;
370         let timestamp = current_time();
371         println!("timestamp: {}", timestamp);
372
373         { // seed the db
374                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
375                 let announcement = generate_announcement(short_channel_id);
376                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
377                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
378
379                 { // direction false
380                         { // first update
381                                 let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
382                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
383                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
384                         }
385                         { // second update
386                                 let update = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
387                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
388                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
389                         }
390                 }
391                 { // direction true
392                         { // first and only update
393                                 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
394                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
395                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
396                         }
397                 }
398
399                 drop(receiver);
400                 persister.persist_gossip().await;
401         }
402
403         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
404         let client_graph_arc = Arc::new(client_graph);
405
406         { // sync after initial seed
407                 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
408                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
409
410                 let channel_count = network_graph_arc.read_only().channels().len();
411
412                 assert_eq!(channel_count, 1);
413                 assert_eq!(serialization.message_count, 3);
414                 assert_eq!(serialization.announcement_count, 1);
415                 assert_eq!(serialization.update_count, 2);
416
417                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
418                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
419                 // the update result must be a multiple of our snapshot granularity
420                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
421                 assert!(update_result < timestamp);
422
423                 let readonly_graph = client_graph_arc.read_only();
424                 let channels = readonly_graph.channels();
425                 let client_channel_count = channels.len();
426                 assert_eq!(client_channel_count, 1);
427
428                 let first_channel = channels.get(&short_channel_id).unwrap();
429                 assert!(&first_channel.announcement_message.is_none());
430                 // ensure the update in one direction shows the latest fee
431                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
432                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
433         }
434
435         clean_test_db().await;
436 }
437
438 #[tokio::test]
439 async fn test_full_snapshot_recency_with_wrong_seen_order() {
440         let _sanitizer = SchemaSanitizer::new();
441         let logger = Arc::new(TestLogger::new());
442         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
443         let network_graph_arc = Arc::new(network_graph);
444
445         let short_channel_id = 1;
446         let timestamp = current_time();
447         println!("timestamp: {}", timestamp);
448
449         { // seed the db
450                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
451                 let announcement = generate_announcement(short_channel_id);
452                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
453                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
454
455                 { // direction false
456                         { // first update, seen latest
457                                 let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
458                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
459                                 receiver.send(GossipMessage::ChannelUpdate(update, Some(timestamp))).await.unwrap();
460                         }
461                         { // second update, seen first
462                                 let update = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
463                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
464                                 receiver.send(GossipMessage::ChannelUpdate(update, Some(timestamp - 1))).await.unwrap();
465                         }
466                 }
467                 { // direction true
468                         { // first and only update
469                                 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
470                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
471                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
472                         }
473                 }
474
475                 drop(receiver);
476                 persister.persist_gossip().await;
477         }
478
479         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
480         let client_graph_arc = Arc::new(client_graph);
481
482         { // sync after initial seed
483                 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
484                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
485
486                 let channel_count = network_graph_arc.read_only().channels().len();
487
488                 assert_eq!(channel_count, 1);
489                 assert_eq!(serialization.message_count, 3);
490                 assert_eq!(serialization.announcement_count, 1);
491                 assert_eq!(serialization.update_count, 2);
492
493                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
494                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
495                 // the update result must be a multiple of our snapshot granularity
496                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
497                 assert!(update_result < timestamp);
498
499                 let readonly_graph = client_graph_arc.read_only();
500                 let channels = readonly_graph.channels();
501                 let client_channel_count = channels.len();
502                 assert_eq!(client_channel_count, 1);
503
504                 let first_channel = channels.get(&short_channel_id).unwrap();
505                 assert!(&first_channel.announcement_message.is_none());
506                 // ensure the update in one direction shows the latest fee
507                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
508                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
509         }
510
511         clean_test_db().await;
512 }
513
514 #[tokio::test]
515 async fn test_full_snapshot_recency_with_wrong_propagation_order() {
516         let _sanitizer = SchemaSanitizer::new();
517         let logger = Arc::new(TestLogger::new());
518         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
519         let network_graph_arc = Arc::new(network_graph);
520
521         let short_channel_id = 1;
522         let timestamp = current_time();
523         println!("timestamp: {}", timestamp);
524
525         { // seed the db
526                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
527                 let announcement = generate_announcement(short_channel_id);
528                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
529                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
530
531                 { // direction false
532                         // apply updates in their timestamp order
533                         let update_1 = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
534                         let update_2 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
535                         network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
536                         network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
537
538                         // propagate updates in their seen order
539                         receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - 1))).await.unwrap();
540                         receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp))).await.unwrap();
541                 }
542                 { // direction true
543                         { // first and only update
544                                 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
545                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
546                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
547                         }
548                 }
549
550                 drop(receiver);
551                 persister.persist_gossip().await;
552         }
553
554         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
555         let client_graph_arc = Arc::new(client_graph);
556
557         { // sync after initial seed
558                 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
559                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
560
561                 let channel_count = network_graph_arc.read_only().channels().len();
562
563                 assert_eq!(channel_count, 1);
564                 assert_eq!(serialization.message_count, 3);
565                 assert_eq!(serialization.announcement_count, 1);
566                 assert_eq!(serialization.update_count, 2);
567
568                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
569                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
570                 // the update result must be a multiple of our snapshot granularity
571                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
572                 assert!(update_result < timestamp);
573
574                 let readonly_graph = client_graph_arc.read_only();
575                 let channels = readonly_graph.channels();
576                 let client_channel_count = channels.len();
577                 assert_eq!(client_channel_count, 1);
578
579                 let first_channel = channels.get(&short_channel_id).unwrap();
580                 assert!(&first_channel.announcement_message.is_none());
581                 // ensure the update in one direction shows the latest fee
582                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
583                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
584         }
585
586         clean_test_db().await;
587 }
588
589 #[tokio::test]
590 async fn test_full_snapshot_mutiny_scenario() {
591         let _sanitizer = SchemaSanitizer::new();
592         let logger = Arc::new(TestLogger::new());
593         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
594         let network_graph_arc = Arc::new(network_graph);
595
596         let short_channel_id = 873706024403271681;
597         let timestamp = current_time();
598         // let oldest_simulation_timestamp = 1693300588;
599         let latest_simulation_timestamp = 1695909301;
600         let timestamp_offset = timestamp - latest_simulation_timestamp;
601         println!("timestamp: {}", timestamp);
602
603         { // seed the db
604                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
605                 let announcement = generate_announcement(short_channel_id);
606                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
607                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
608
609                 { // direction false
610                         {
611                                 let update = generate_update(short_channel_id, false, 1693507369 + timestamp_offset, 0, 0, 0, 0, 38);
612                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
613                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
614                         }
615                         {
616                                 let update = generate_update(short_channel_id, false, 1693680390 + timestamp_offset, 0, 0, 0, 0, 38);
617                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
618                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
619                         }
620                         {
621                                 let update = generate_update(short_channel_id, false, 1693749109 + timestamp_offset, 0, 0, 0, 0, 200);
622                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
623                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
624                         }
625                         {
626                                 let update = generate_update(short_channel_id, false, 1693925190 + timestamp_offset, 0, 0, 0, 0, 200);
627                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
628                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
629                         }
630                         {
631                                 let update = generate_update(short_channel_id, false, 1694008323 + timestamp_offset, 0, 0, 0, 0, 209);
632                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
633                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
634                         }
635                         {
636                                 let update = generate_update(short_channel_id, false, 1694219924 + timestamp_offset, 0, 0, 0, 0, 209);
637                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
638                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
639                         }
640                         {
641                                 let update = generate_update(short_channel_id, false, 1694267536 + timestamp_offset, 0, 0, 0, 0, 210);
642                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
643                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
644                         }
645                         {
646                                 let update = generate_update(short_channel_id, false, 1694458808 + timestamp_offset, 0, 0, 0, 0, 210);
647                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
648                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
649                         }
650                         {
651                                 let update = generate_update(short_channel_id, false, 1694526734 + timestamp_offset, 0, 0, 0, 0, 200);
652                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
653                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
654                         }
655                         {
656                                 let update = generate_update(short_channel_id, false, 1694794765 + timestamp_offset, 0, 0, 0, 0, 200);
657                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
658                                 receiver.send(GossipMessage::ChannelUpdate(update, Some(1695909301 + 2 * config::SYMLINK_GRANULARITY_INTERVAL + timestamp_offset))).await.unwrap();
659                         }
660                         {
661                                 let update = generate_update(short_channel_id, false, 1695909301 + timestamp_offset, 0, 0, 0, 0, 130);
662                                 // network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
663                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
664                         }
665                 }
666                 { // direction true
667                         {
668                                 let update = generate_update(short_channel_id, true, 1693300588 + timestamp_offset, 0, 0, 0, 0, 10);
669                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
670                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
671                         }
672                         {
673                                 let update = generate_update(short_channel_id, true, 1695003621 + timestamp_offset, 0, 0, 0, 0, 10);
674                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
675                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
676                         }
677                 }
678
679                 drop(receiver);
680                 persister.persist_gossip().await;
681         }
682
683         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
684         let client_graph_arc = Arc::new(client_graph);
685
686         { // sync after initial seed
687                 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
688                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
689
690                 let channel_count = network_graph_arc.read_only().channels().len();
691
692                 assert_eq!(channel_count, 1);
693                 assert_eq!(serialization.message_count, 3);
694                 assert_eq!(serialization.announcement_count, 1);
695                 assert_eq!(serialization.update_count, 2);
696
697                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
698                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
699                 println!("update result: {}", update_result);
700                 // the update result must be a multiple of our snapshot granularity
701                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
702                 assert!(update_result < timestamp);
703
704                 let timestamp_delta = timestamp - update_result;
705                 println!("timestamp delta: {}", timestamp_delta);
706                 assert!(timestamp_delta < config::snapshot_generation_interval());
707
708                 let readonly_graph = client_graph_arc.read_only();
709                 let channels = readonly_graph.channels();
710                 let client_channel_count = channels.len();
711                 assert_eq!(client_channel_count, 1);
712
713                 let first_channel = channels.get(&short_channel_id).unwrap();
714                 assert!(&first_channel.announcement_message.is_none());
715                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 130);
716                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
717         }
718
719         clean_test_db().await;
720 }
721
722 #[tokio::test]
723 async fn test_full_snapshot_interlaced_channel_timestamps() {
724         let _sanitizer = SchemaSanitizer::new();
725         let logger = Arc::new(TestLogger::new());
726         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
727         let network_graph_arc = Arc::new(network_graph);
728
729         let main_channel_id = 1;
730         let timestamp = current_time();
731         println!("timestamp: {}", timestamp);
732
733         { // seed the db
734                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
735                 let secondary_channel_id = main_channel_id + 1;
736
737                 { // main channel
738                         let announcement = generate_announcement(main_channel_id);
739                         network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
740                         receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
741                 }
742
743                 { // secondary channel
744                         let announcement = generate_announcement(secondary_channel_id);
745                         network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
746                         receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
747                 }
748
749                 { // main channel
750                         { // direction false
751                                 let update = generate_update(main_channel_id, false, timestamp - 2, 0, 0, 0, 0, 10);
752                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
753                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
754                         }
755                         { // direction true
756                                 let update = generate_update(main_channel_id, true, timestamp - 2, 0, 0, 0, 0, 5);
757                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
758                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
759                         }
760                 }
761
762                 { // in-between channel
763                         { // direction false
764                                 let update = generate_update(secondary_channel_id, false, timestamp - 1, 0, 0, 0, 0, 42);
765                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
766                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
767                         }
768                         { // direction true
769                                 let update = generate_update(secondary_channel_id, true, timestamp - 1, 0, 0, 0, 0, 42);
770                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
771                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
772                         }
773                 }
774
775                 { // main channel
776                         { // direction false
777                                 let update = generate_update(main_channel_id, false, timestamp, 0, 0, 0, 0, 11);
778                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
779                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
780                         }
781                         { // direction true
782                                 let update = generate_update(main_channel_id, true, timestamp, 0, 0, 0, 0, 6);
783                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
784                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
785                         }
786                 }
787
788                 drop(receiver);
789                 persister.persist_gossip().await;
790         }
791
792         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
793         let client_graph_arc = Arc::new(client_graph);
794
795         { // sync after initial seed
796                 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
797                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 2", 1);
798
799                 let channel_count = network_graph_arc.read_only().channels().len();
800
801                 assert_eq!(channel_count, 2);
802                 assert_eq!(serialization.message_count, 6);
803                 assert_eq!(serialization.announcement_count, 2);
804                 assert_eq!(serialization.update_count, 4);
805
806                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
807                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
808                 // the update result must be a multiple of our snapshot granularity
809                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
810                 assert!(update_result < timestamp);
811
812                 let readonly_graph = client_graph_arc.read_only();
813                 let channels = readonly_graph.channels();
814                 let client_channel_count = channels.len();
815                 assert_eq!(client_channel_count, 2);
816
817                 let first_channel = channels.get(&main_channel_id).unwrap();
818                 assert!(&first_channel.announcement_message.is_none());
819                 // ensure the update in one direction shows the latest fee
820                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 11);
821                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 6);
822         }
823
824         clean_test_db().await;
825 }
826
827 #[tokio::test]
828 async fn test_full_snapshot_persistence() {
829         let schema_sanitizer = SchemaSanitizer::new();
830         let logger = Arc::new(TestLogger::new());
831         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
832         let network_graph_arc = Arc::new(network_graph);
833         let snapshotter = Snapshotter::new(network_graph_arc.clone(), logger.clone());
834         let cache_sanitizer = CacheSanitizer::new(&schema_sanitizer);
835
836         let short_channel_id = 1;
837         let timestamp = current_time();
838         println!("timestamp: {}", timestamp);
839
840         { // seed the db
841                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
842                 let announcement = generate_announcement(short_channel_id);
843                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
844                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
845
846                 { // direction true
847                         let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
848                         network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
849                         receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
850                 }
851
852                 { // direction false
853                         let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
854                         network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
855                         receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
856                 }
857
858
859                 drop(receiver);
860                 persister.persist_gossip().await;
861         }
862
863         let cache_path = cache_sanitizer.cache_path();
864         let symlink_path = format!("{}/symlinks/0.bin", cache_path);
865
866         // generate snapshots
867         {
868                 snapshotter.generate_snapshots(20, 5, &[5, u64::MAX], &cache_path, Some(10)).await;
869
870                 let symlinked_data = fs::read(&symlink_path).unwrap();
871                 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
872                 let client_graph_arc = Arc::new(client_graph);
873
874                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
875                 let update_result = rgs.update_network_graph(&symlinked_data).unwrap();
876                 // the update result must be a multiple of our snapshot granularity
877                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
878
879                 let readonly_graph = client_graph_arc.read_only();
880                 let channels = readonly_graph.channels();
881                 let client_channel_count = channels.len();
882                 assert_eq!(client_channel_count, 1);
883
884                 let first_channel = channels.get(&short_channel_id).unwrap();
885                 assert!(&first_channel.announcement_message.is_none());
886                 // ensure the update in one direction shows the latest fee
887                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 38);
888                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
889         }
890
891         { // update the db
892                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
893
894                 { // second update
895                         let update = generate_update(short_channel_id, false, timestamp + 30, 0, 0, 0, 0, 39);
896                         network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
897                         receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
898                 }
899
900                 drop(receiver);
901                 persister.persist_gossip().await;
902         }
903
904         // regenerate snapshots
905         {
906                 snapshotter.generate_snapshots(20, 5, &[5, u64::MAX], &cache_path, Some(10)).await;
907
908                 let symlinked_data = fs::read(&symlink_path).unwrap();
909                 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
910                 let client_graph_arc = Arc::new(client_graph);
911
912                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
913                 let update_result = rgs.update_network_graph(&symlinked_data).unwrap();
914                 // the update result must be a multiple of our snapshot granularity
915                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
916
917                 let readonly_graph = client_graph_arc.read_only();
918                 let channels = readonly_graph.channels();
919                 let client_channel_count = channels.len();
920                 assert_eq!(client_channel_count, 1);
921
922                 let first_channel = channels.get(&short_channel_id).unwrap();
923                 assert!(&first_channel.announcement_message.is_none());
924                 // ensure the update in one direction shows the latest fee
925                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
926                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
927         }
928
929         // clean up afterwards
930         clean_test_db().await;
931 }