Test NodeAnnouncement persistence.
[rapid-gossip-sync-server] / src / tests / mod.rs
1 //! Multi-module tests that use database fixtures
2
3 use std::cell::RefCell;
4 use std::sync::Arc;
5 use std::{fs, thread};
6 use std::time::{SystemTime, UNIX_EPOCH};
7 use bitcoin::blockdata::constants::ChainHash;
8 use bitcoin::Network;
9 use bitcoin::secp256k1::ecdsa::Signature;
10 use bitcoin::secp256k1::{Secp256k1, SecretKey};
11 use bitcoin::hashes::Hash;
12 use bitcoin::hashes::sha256d::Hash as Sha256dHash;
13 use hex_conservative::DisplayHex;
14 use lightning::ln::features::{ChannelFeatures, NodeFeatures};
15 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement, UnsignedChannelAnnouncement, UnsignedChannelUpdate, UnsignedNodeAnnouncement};
16 use lightning::routing::gossip::{NetworkGraph, NodeAlias, NodeId};
17 use lightning::util::ser::Writeable;
18 use lightning_rapid_gossip_sync::RapidGossipSync;
19 use crate::{config, serialize_delta};
20 use crate::persistence::GossipPersister;
21 use crate::snapshot::Snapshotter;
22 use crate::types::{GossipMessage, tests::TestLogger};
23
24 const CLIENT_BACKDATE_INTERVAL: u32 = 3600 * 24 * 7; // client backdates RGS by a week
25
26 thread_local! {
27         static DB_TEST_SCHEMA: RefCell<Option<String>> = RefCell::new(None);
28         static IS_TEST_SCHEMA_CLEAN: RefCell<Option<bool>> = RefCell::new(None);
29 }
30
31 fn blank_signature() -> Signature {
32         Signature::from_compact(&[0u8; 64]).unwrap()
33 }
34
35 fn genesis_hash() -> ChainHash {
36         ChainHash::using_genesis_block(Network::Bitcoin)
37 }
38
39 fn current_time() -> u32 {
40         SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs() as u32
41 }
42
43 pub(crate) fn db_test_schema() -> String {
44         DB_TEST_SCHEMA.with(|suffix_reference| {
45                 let suffix_option = suffix_reference.borrow();
46                 suffix_option.as_ref().unwrap().clone()
47         })
48 }
49
50 fn generate_node_announcement() -> NodeAnnouncement {
51         let secp_context = Secp256k1::new();
52
53         let random_private_key = SecretKey::from_slice(&[1; 32]).unwrap();
54         let random_public_key = random_private_key.public_key(&secp_context);
55         let node_id = NodeId::from_pubkey(&random_public_key);
56
57         let announcement = UnsignedNodeAnnouncement {
58                 features: NodeFeatures::empty(),
59                 timestamp: 0,
60                 node_id,
61                 rgb: [0, 128, 255],
62                 alias: NodeAlias([0; 32]),
63                 addresses: vec![],
64                 excess_data: vec![],
65                 excess_address_data: vec![],
66         };
67
68         let msg_hash = bitcoin::secp256k1::Message::from_slice(&Sha256dHash::hash(&announcement.encode()[..])[..]).unwrap();
69         let signature = secp_context.sign_ecdsa(&msg_hash, &random_private_key);
70
71         NodeAnnouncement {
72                 signature,
73                 contents: announcement,
74         }
75 }
76
77
78 fn generate_channel_announcement(short_channel_id: u64) -> ChannelAnnouncement {
79         let secp_context = Secp256k1::new();
80
81         let random_private_key_1 = SecretKey::from_slice(&[1; 32]).unwrap();
82         let random_public_key_1 = random_private_key_1.public_key(&secp_context);
83         let node_id_1 = NodeId::from_pubkey(&random_public_key_1);
84
85         let random_private_key_2 = SecretKey::from_slice(&[2; 32]).unwrap();
86         let random_public_key_2 = random_private_key_2.public_key(&secp_context);
87         let node_id_2 = NodeId::from_pubkey(&random_public_key_2);
88
89         let announcement = UnsignedChannelAnnouncement {
90                 features: ChannelFeatures::empty(),
91                 chain_hash: genesis_hash(),
92                 short_channel_id,
93                 node_id_1,
94                 node_id_2,
95                 bitcoin_key_1: node_id_1,
96                 bitcoin_key_2: node_id_2,
97                 excess_data: vec![],
98         };
99
100         let msg_hash = bitcoin::secp256k1::Message::from_slice(&Sha256dHash::hash(&announcement.encode()[..])[..]).unwrap();
101         let node_signature_1 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_1);
102         let node_signature_2 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_2);
103
104         ChannelAnnouncement {
105                 node_signature_1,
106                 node_signature_2,
107                 bitcoin_signature_1: node_signature_1,
108                 bitcoin_signature_2: node_signature_2,
109                 contents: announcement,
110         }
111 }
112
113 fn generate_update(scid: u64, direction: bool, timestamp: u32, expiry_delta: u16, min_msat: u64, max_msat: u64, base_msat: u32, fee_rate: u32) -> ChannelUpdate {
114         let flag_mask = if direction { 1 } else { 0 };
115         ChannelUpdate {
116                 signature: blank_signature(),
117                 contents: UnsignedChannelUpdate {
118                         chain_hash: genesis_hash(),
119                         short_channel_id: scid,
120                         timestamp,
121                         flags: 0 | flag_mask,
122                         cltv_expiry_delta: expiry_delta,
123                         htlc_minimum_msat: min_msat,
124                         htlc_maximum_msat: max_msat,
125                         fee_base_msat: base_msat,
126                         fee_proportional_millionths: fee_rate,
127                         excess_data: vec![],
128                 },
129         }
130 }
131
132 struct SchemaSanitizer {}
133
134 impl SchemaSanitizer {
135         fn new() -> Self {
136                 IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
137                         let mut is_clean_option = cleanliness_reference.borrow_mut();
138                         assert!(is_clean_option.is_none());
139                         *is_clean_option = Some(false);
140                 });
141
142                 DB_TEST_SCHEMA.with(|suffix_reference| {
143                         let mut suffix_option = suffix_reference.borrow_mut();
144                         let current_time = SystemTime::now();
145                         let unix_time = current_time.duration_since(UNIX_EPOCH).expect("Time went backwards");
146                         let timestamp_seconds = unix_time.as_secs();
147                         let timestamp_nanos = unix_time.as_nanos();
148                         // sometimes Rust thinks two tests start at the same nanosecond, causing a schema conflict
149                         let thread_id = thread::current().id();
150                         let preimage = format!("{:?}-{}", thread_id, timestamp_nanos);
151                         println!("test schema preimage: {}", preimage);
152                         let suffix = Sha256dHash::hash(preimage.as_bytes()).encode();
153                         // the schema must start with a letter
154                         let schema = format!("test_{}_{}", timestamp_seconds, suffix.as_hex());
155                         *suffix_option = Some(schema);
156                 });
157
158                 return Self {};
159         }
160 }
161
162 impl Drop for SchemaSanitizer {
163         fn drop(&mut self) {
164                 IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
165                         let is_clean_option = cleanliness_reference.borrow();
166                         if let Some(is_clean) = *is_clean_option {
167                                 if std::thread::panicking() {
168                                         return;
169                                 }
170                                 assert_eq!(is_clean, true);
171                         }
172                 });
173         }
174 }
175
176 struct CacheSanitizer {}
177
178 impl CacheSanitizer {
179         /// The CacheSanitizer instantiation requires that there be a schema sanitizer
180         fn new(_: &SchemaSanitizer) -> Self {
181                 Self {}
182         }
183
184         fn cache_path(&self) -> String {
185                 format!("./res/{}/", db_test_schema())
186         }
187 }
188
189 impl Drop for CacheSanitizer {
190         fn drop(&mut self) {
191                 let cache_path = self.cache_path();
192                 fs::remove_dir_all(cache_path).unwrap();
193         }
194 }
195
196
197 async fn clean_test_db() {
198         let client = crate::connect_to_db().await;
199         let schema = db_test_schema();
200         client.execute(&format!("DROP SCHEMA IF EXISTS {} CASCADE", schema), &[]).await.unwrap();
201         IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
202                 let mut is_clean_option = cleanliness_reference.borrow_mut();
203                 *is_clean_option = Some(true);
204         });
205 }
206
207 #[tokio::test]
208 async fn test_persistence_runtime() {
209         let _sanitizer = SchemaSanitizer::new();
210         let logger = Arc::new(TestLogger::new());
211         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
212         let network_graph_arc = Arc::new(network_graph);
213         let (_persister, _receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
214
215         tokio::task::spawn_blocking(move || {
216                 drop(_persister);
217         }).await.unwrap();
218
219         clean_test_db().await;
220 }
221
222
223 #[tokio::test]
224 async fn test_trivial_setup() {
225         let _sanitizer = SchemaSanitizer::new();
226         let logger = Arc::new(TestLogger::new());
227         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
228         let network_graph_arc = Arc::new(network_graph);
229         let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
230
231         let short_channel_id = 1;
232         let timestamp = current_time() - 10;
233         println!("timestamp: {}", timestamp);
234
235         { // seed the db
236                 let announcement = generate_channel_announcement(short_channel_id);
237                 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 5, 0);
238                 let update_2 = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 10, 0);
239
240                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
241                 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
242                 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
243
244                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
245                 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
246                 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
247                 drop(receiver);
248                 persister.persist_gossip().await;
249         }
250
251         let serialization = serialize_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
252         logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
253         clean_test_db().await;
254
255         let channel_count = network_graph_arc.read_only().channels().len();
256
257         assert_eq!(channel_count, 1);
258         assert_eq!(serialization.message_count, 3);
259         assert_eq!(serialization.announcement_count, 1);
260         assert_eq!(serialization.update_count, 2);
261
262         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
263         let client_graph_arc = Arc::new(client_graph);
264         let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
265         let update_result = rgs.update_network_graph(&serialization.data).unwrap();
266         println!("update result: {}", update_result);
267         // the update result must be a multiple of our snapshot granularity
268         assert_eq!(update_result % config::snapshot_generation_interval(), 0);
269         assert!(update_result < timestamp);
270
271         let timestamp_delta = timestamp - update_result;
272         println!("timestamp delta: {}", timestamp_delta);
273         assert!(timestamp_delta < config::snapshot_generation_interval());
274
275         let readonly_graph = client_graph_arc.read_only();
276         let channels = readonly_graph.channels();
277         let client_channel_count = channels.len();
278         assert_eq!(client_channel_count, 1);
279
280         let first_channel = channels.get(&short_channel_id).unwrap();
281         assert!(&first_channel.announcement_message.is_none());
282         assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.base_msat, 5);
283         assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.base_msat, 10);
284         let last_update_seen_a = first_channel.one_to_two.as_ref().unwrap().last_update;
285         let last_update_seen_b = first_channel.two_to_one.as_ref().unwrap().last_update;
286         println!("last update a: {}", last_update_seen_a);
287         println!("last update b: {}", last_update_seen_b);
288         assert_eq!(last_update_seen_a, update_result - CLIENT_BACKDATE_INTERVAL);
289         assert_eq!(last_update_seen_b, update_result - CLIENT_BACKDATE_INTERVAL);
290
291         tokio::task::spawn_blocking(move || {
292                 drop(persister);
293         }).await.unwrap();
294 }
295
296 #[tokio::test]
297 async fn test_node_announcement_persistence() {
298         let _sanitizer = SchemaSanitizer::new();
299         let logger = Arc::new(TestLogger::new());
300         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
301         let network_graph_arc = Arc::new(network_graph);
302         let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
303
304         { // seed the db
305                 let announcement = generate_node_announcement();
306                 receiver.send(GossipMessage::NodeAnnouncement(announcement.clone(), None)).await.unwrap();
307                 receiver.send(GossipMessage::NodeAnnouncement(announcement, Some(12345))).await.unwrap();
308                 drop(receiver);
309                 persister.persist_gossip().await;
310
311                 tokio::task::spawn_blocking(move || {
312                         drop(persister);
313                 }).await.unwrap();
314         }
315         clean_test_db().await;
316 }
317
318
319 /// If a channel has only seen updates in one direction, it should not be announced
320 #[tokio::test]
321 async fn test_unidirectional_intermediate_update_consideration() {
322         let _sanitizer = SchemaSanitizer::new();
323
324         let logger = Arc::new(TestLogger::new());
325         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
326         let network_graph_arc = Arc::new(network_graph);
327         let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
328
329         let short_channel_id = 1;
330         let timestamp = current_time() - 10;
331         println!("timestamp: {}", timestamp);
332
333         { // seed the db
334                 let announcement = generate_channel_announcement(short_channel_id);
335                 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 6, 0);
336                 let update_2 = generate_update(short_channel_id, true, timestamp + 1, 0, 0, 0, 3, 0);
337                 let update_3 = generate_update(short_channel_id, true, timestamp + 2, 0, 0, 0, 4, 0);
338
339                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
340                 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
341                 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
342                 network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap();
343
344                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
345                 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
346                 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
347                 receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap();
348                 drop(receiver);
349                 persister.persist_gossip().await;
350         }
351
352         let channel_count = network_graph_arc.read_only().channels().len();
353         assert_eq!(channel_count, 1);
354
355         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
356         let client_graph_arc = Arc::new(client_graph);
357         let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
358
359         let serialization = serialize_delta(network_graph_arc.clone(), timestamp + 1, None, logger.clone()).await;
360
361         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 1 update rows of the first update in a new direction", 1);
362         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 1 reference rows", 1);
363         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
364
365         assert_eq!(serialization.message_count, 3);
366         assert_eq!(serialization.announcement_count, 1);
367         assert_eq!(serialization.update_count, 2);
368         assert_eq!(serialization.update_count_full, 2);
369         assert_eq!(serialization.update_count_incremental, 0);
370
371         let update_result = rgs.update_network_graph(&serialization.data).unwrap();
372         println!("update result: {}", update_result);
373         // the update result must be a multiple of our snapshot granularity
374
375         let readonly_graph = client_graph_arc.read_only();
376         let channels = readonly_graph.channels();
377         let client_channel_count = channels.len();
378         assert_eq!(client_channel_count, 1);
379
380         tokio::task::spawn_blocking(move || {
381                 drop(persister);
382         }).await.unwrap();
383
384         clean_test_db().await;
385 }
386
387 /// If a channel has only seen updates in one direction, it should not be announced
388 #[tokio::test]
389 async fn test_bidirectional_intermediate_update_consideration() {
390         let _sanitizer = SchemaSanitizer::new();
391
392         let logger = Arc::new(TestLogger::new());
393         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
394         let network_graph_arc = Arc::new(network_graph);
395         let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
396
397         let short_channel_id = 1;
398         let timestamp = current_time() - 10;
399         println!("timestamp: {}", timestamp);
400
401         { // seed the db
402                 let announcement = generate_channel_announcement(short_channel_id);
403                 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 5, 0);
404                 let update_2 = generate_update(short_channel_id, false, timestamp + 1, 0, 0, 0, 4, 0);
405                 let update_3 = generate_update(short_channel_id, false, timestamp + 2, 0, 0, 0, 3, 0);
406                 let update_4 = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 3, 0);
407
408                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
409                 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
410                 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
411                 network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap();
412                 network_graph_arc.update_channel_unsigned(&update_4.contents).unwrap();
413
414                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
415                 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
416                 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
417                 receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap();
418                 receiver.send(GossipMessage::ChannelUpdate(update_4, None)).await.unwrap();
419                 drop(receiver);
420                 persister.persist_gossip().await;
421         }
422
423         let channel_count = network_graph_arc.read_only().channels().len();
424         assert_eq!(channel_count, 1);
425
426         let serialization = serialize_delta(network_graph_arc.clone(), timestamp + 1, None, logger.clone()).await;
427
428         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 0 update rows of the first update in a new direction", 1);
429         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 2 reference rows", 1);
430         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
431
432         assert_eq!(serialization.message_count, 1);
433         assert_eq!(serialization.announcement_count, 0);
434         assert_eq!(serialization.update_count, 1);
435         assert_eq!(serialization.update_count_full, 0);
436         assert_eq!(serialization.update_count_incremental, 1);
437
438         tokio::task::spawn_blocking(move || {
439                 drop(persister);
440         }).await.unwrap();
441
442         clean_test_db().await;
443 }
444
445 #[tokio::test]
446 async fn test_channel_reminders() {
447         let _sanitizer = SchemaSanitizer::new();
448
449         let logger = Arc::new(TestLogger::new());
450         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
451         let network_graph_arc = Arc::new(network_graph);
452         let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
453
454         let timestamp = current_time();
455         println!("timestamp: {}", timestamp);
456         let channel_reminder_delta = config::CHANNEL_REMINDER_AGE.as_secs() as u32;
457
458         { // seed the db
459                 { // unupdated channel
460                         let short_channel_id = 1;
461                         let announcement = generate_channel_announcement(short_channel_id);
462                         let update_1 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta - 1, 0, 0, 0, 5, 0);
463                         let update_2 = generate_update(short_channel_id, true, timestamp - channel_reminder_delta - 1, 0, 0, 0, 3, 0);
464
465                         network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
466                         network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
467                         network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
468
469                         receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
470                         receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
471                         receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
472                 }
473                 { // unmodified but updated channel
474                         let short_channel_id = 2;
475                         let announcement = generate_channel_announcement(short_channel_id);
476                         let update_1 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta - 10, 0, 0, 0, 5, 0);
477                         // in the false direction, we have one update that's different prior
478                         let update_2 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta - 5, 0, 1, 0, 5, 0);
479                         let update_3 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta - 1, 0, 0, 0, 5, 0);
480                         let update_4 = generate_update(short_channel_id, true, timestamp - channel_reminder_delta - 1, 0, 0, 0, 3, 0);
481                         let update_5 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta + 10, 0, 0, 0, 5, 0);
482                         let update_6 = generate_update(short_channel_id, true, timestamp - channel_reminder_delta + 10, 0, 0, 0, 3, 0);
483                         let update_7 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta + 20, 0, 0, 0, 5, 0);
484                         let update_8 = generate_update(short_channel_id, true, timestamp - channel_reminder_delta + 20, 0, 0, 0, 3, 0);
485
486                         network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
487                         network_graph_arc.update_channel_unsigned(&update_7.contents).unwrap();
488                         network_graph_arc.update_channel_unsigned(&update_8.contents).unwrap();
489
490                         receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
491                         receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp - channel_reminder_delta - 10))).await.unwrap();
492                         receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - channel_reminder_delta - 5))).await.unwrap();
493                         receiver.send(GossipMessage::ChannelUpdate(update_3, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
494                         receiver.send(GossipMessage::ChannelUpdate(update_4, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
495
496                         receiver.send(GossipMessage::ChannelUpdate(update_5, Some(timestamp - channel_reminder_delta + 10))).await.unwrap();
497                         receiver.send(GossipMessage::ChannelUpdate(update_6, Some(timestamp - channel_reminder_delta + 10))).await.unwrap();
498
499                         receiver.send(GossipMessage::ChannelUpdate(update_7, Some(timestamp - channel_reminder_delta + 20))).await.unwrap();
500                         receiver.send(GossipMessage::ChannelUpdate(update_8, Some(timestamp - channel_reminder_delta + 20))).await.unwrap();
501                 }
502                 drop(receiver);
503                 persister.persist_gossip().await;
504         }
505
506         let channel_count = network_graph_arc.read_only().channels().len();
507         assert_eq!(channel_count, 2);
508
509         let serialization = serialize_delta(network_graph_arc.clone(), timestamp - channel_reminder_delta + 15, None, logger.clone()).await;
510
511         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 0 update rows of the first update in a new direction", 1);
512         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 4 update rows of the latest update in the less recently updated direction", 1);
513         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 2 reference rows", 1);
514         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
515
516         assert_eq!(serialization.message_count, 4);
517         assert_eq!(serialization.announcement_count, 0);
518         assert_eq!(serialization.update_count, 4);
519         assert_eq!(serialization.update_count_full, 0);
520         assert_eq!(serialization.update_count_incremental, 4);
521
522         tokio::task::spawn_blocking(move || {
523                 drop(persister);
524         }).await.unwrap();
525
526         clean_test_db().await;
527 }
528
529 #[tokio::test]
530 async fn test_full_snapshot_recency() {
531         let _sanitizer = SchemaSanitizer::new();
532         let logger = Arc::new(TestLogger::new());
533         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
534         let network_graph_arc = Arc::new(network_graph);
535
536         let short_channel_id = 1;
537         let timestamp = current_time();
538         println!("timestamp: {}", timestamp);
539
540         { // seed the db
541                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
542                 let announcement = generate_channel_announcement(short_channel_id);
543                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
544                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
545
546                 { // direction false
547                         { // first update
548                                 let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
549                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
550                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
551                         }
552                         { // second update
553                                 let update = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
554                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
555                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
556                         }
557                 }
558                 { // direction true
559                         { // first and only update
560                                 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
561                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
562                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
563                         }
564                 }
565
566                 drop(receiver);
567                 persister.persist_gossip().await;
568
569                 tokio::task::spawn_blocking(move || {
570                         drop(persister);
571                 }).await.unwrap();
572         }
573
574         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
575         let client_graph_arc = Arc::new(client_graph);
576
577         { // sync after initial seed
578                 let serialization = serialize_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
579                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
580
581                 let channel_count = network_graph_arc.read_only().channels().len();
582
583                 assert_eq!(channel_count, 1);
584                 assert_eq!(serialization.message_count, 3);
585                 assert_eq!(serialization.announcement_count, 1);
586                 assert_eq!(serialization.update_count, 2);
587
588                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
589                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
590                 // the update result must be a multiple of our snapshot granularity
591                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
592                 assert!(update_result < timestamp);
593
594                 let readonly_graph = client_graph_arc.read_only();
595                 let channels = readonly_graph.channels();
596                 let client_channel_count = channels.len();
597                 assert_eq!(client_channel_count, 1);
598
599                 let first_channel = channels.get(&short_channel_id).unwrap();
600                 assert!(&first_channel.announcement_message.is_none());
601                 // ensure the update in one direction shows the latest fee
602                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
603                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
604         }
605
606         clean_test_db().await;
607 }
608
609 #[tokio::test]
610 async fn test_full_snapshot_recency_with_wrong_seen_order() {
611         let _sanitizer = SchemaSanitizer::new();
612         let logger = Arc::new(TestLogger::new());
613         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
614         let network_graph_arc = Arc::new(network_graph);
615
616         let short_channel_id = 1;
617         let timestamp = current_time();
618         println!("timestamp: {}", timestamp);
619
620         { // seed the db
621                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
622                 let announcement = generate_channel_announcement(short_channel_id);
623                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
624                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
625
626                 { // direction false
627                         { // first update, seen latest
628                                 let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
629                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
630                                 receiver.send(GossipMessage::ChannelUpdate(update, Some(timestamp))).await.unwrap();
631                         }
632                         { // second update, seen first
633                                 let update = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
634                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
635                                 receiver.send(GossipMessage::ChannelUpdate(update, Some(timestamp - 1))).await.unwrap();
636                         }
637                 }
638                 { // direction true
639                         { // first and only update
640                                 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
641                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
642                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
643                         }
644                 }
645
646                 drop(receiver);
647                 persister.persist_gossip().await;
648
649                 tokio::task::spawn_blocking(move || {
650                         drop(persister);
651                 }).await.unwrap();
652         }
653
654         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
655         let client_graph_arc = Arc::new(client_graph);
656
657         { // sync after initial seed
658                 let serialization = serialize_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
659                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
660
661                 let channel_count = network_graph_arc.read_only().channels().len();
662
663                 assert_eq!(channel_count, 1);
664                 assert_eq!(serialization.message_count, 3);
665                 assert_eq!(serialization.announcement_count, 1);
666                 assert_eq!(serialization.update_count, 2);
667
668                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
669                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
670                 // the update result must be a multiple of our snapshot granularity
671                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
672                 assert!(update_result < timestamp);
673
674                 let readonly_graph = client_graph_arc.read_only();
675                 let channels = readonly_graph.channels();
676                 let client_channel_count = channels.len();
677                 assert_eq!(client_channel_count, 1);
678
679                 let first_channel = channels.get(&short_channel_id).unwrap();
680                 assert!(&first_channel.announcement_message.is_none());
681                 // ensure the update in one direction shows the latest fee
682                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
683                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
684         }
685
686         clean_test_db().await;
687 }
688
689 #[tokio::test]
690 async fn test_full_snapshot_recency_with_wrong_propagation_order() {
691         let _sanitizer = SchemaSanitizer::new();
692         let logger = Arc::new(TestLogger::new());
693         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
694         let network_graph_arc = Arc::new(network_graph);
695
696         let short_channel_id = 1;
697         let timestamp = current_time();
698         println!("timestamp: {}", timestamp);
699
700         { // seed the db
701                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
702                 let announcement = generate_channel_announcement(short_channel_id);
703                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
704                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
705
706                 { // direction false
707                         // apply updates in their timestamp order
708                         let update_1 = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
709                         let update_2 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
710                         network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
711                         network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
712
713                         // propagate updates in their seen order
714                         receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - 1))).await.unwrap();
715                         receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp))).await.unwrap();
716                 }
717                 { // direction true
718                         { // first and only update
719                                 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
720                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
721                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
722                         }
723                 }
724
725                 drop(receiver);
726                 persister.persist_gossip().await;
727
728                 tokio::task::spawn_blocking(move || {
729                         drop(persister);
730                 }).await.unwrap();
731         }
732
733         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
734         let client_graph_arc = Arc::new(client_graph);
735
736         { // sync after initial seed
737                 let serialization = serialize_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
738                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
739
740                 let channel_count = network_graph_arc.read_only().channels().len();
741
742                 assert_eq!(channel_count, 1);
743                 assert_eq!(serialization.message_count, 3);
744                 assert_eq!(serialization.announcement_count, 1);
745                 assert_eq!(serialization.update_count, 2);
746
747                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
748                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
749                 // the update result must be a multiple of our snapshot granularity
750                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
751                 assert!(update_result < timestamp);
752
753                 let readonly_graph = client_graph_arc.read_only();
754                 let channels = readonly_graph.channels();
755                 let client_channel_count = channels.len();
756                 assert_eq!(client_channel_count, 1);
757
758                 let first_channel = channels.get(&short_channel_id).unwrap();
759                 assert!(&first_channel.announcement_message.is_none());
760                 // ensure the update in one direction shows the latest fee
761                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
762                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
763         }
764
765         clean_test_db().await;
766 }
767
768 #[tokio::test]
769 async fn test_full_snapshot_mutiny_scenario() {
770         let _sanitizer = SchemaSanitizer::new();
771         let logger = Arc::new(TestLogger::new());
772         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
773         let network_graph_arc = Arc::new(network_graph);
774
775         let short_channel_id = 873706024403271681;
776         let timestamp = current_time();
777         // let oldest_simulation_timestamp = 1693300588;
778         let latest_simulation_timestamp = 1695909301;
779         let timestamp_offset = timestamp - latest_simulation_timestamp;
780         println!("timestamp: {}", timestamp);
781
782         { // seed the db
783                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
784                 let announcement = generate_channel_announcement(short_channel_id);
785                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
786                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
787
788                 { // direction false
789                         {
790                                 let update = generate_update(short_channel_id, false, 1693507369 + timestamp_offset, 0, 0, 0, 0, 38);
791                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
792                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
793                         }
794                         {
795                                 let update = generate_update(short_channel_id, false, 1693680390 + timestamp_offset, 0, 0, 0, 0, 38);
796                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
797                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
798                         }
799                         {
800                                 let update = generate_update(short_channel_id, false, 1693749109 + timestamp_offset, 0, 0, 0, 0, 200);
801                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
802                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
803                         }
804                         {
805                                 let update = generate_update(short_channel_id, false, 1693925190 + timestamp_offset, 0, 0, 0, 0, 200);
806                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
807                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
808                         }
809                         {
810                                 let update = generate_update(short_channel_id, false, 1694008323 + timestamp_offset, 0, 0, 0, 0, 209);
811                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
812                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
813                         }
814                         {
815                                 let update = generate_update(short_channel_id, false, 1694219924 + timestamp_offset, 0, 0, 0, 0, 209);
816                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
817                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
818                         }
819                         {
820                                 let update = generate_update(short_channel_id, false, 1694267536 + timestamp_offset, 0, 0, 0, 0, 210);
821                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
822                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
823                         }
824                         {
825                                 let update = generate_update(short_channel_id, false, 1694458808 + timestamp_offset, 0, 0, 0, 0, 210);
826                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
827                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
828                         }
829                         {
830                                 let update = generate_update(short_channel_id, false, 1694526734 + timestamp_offset, 0, 0, 0, 0, 200);
831                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
832                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
833                         }
834                         {
835                                 let update = generate_update(short_channel_id, false, 1694794765 + timestamp_offset, 0, 0, 0, 0, 200);
836                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
837                                 receiver.send(GossipMessage::ChannelUpdate(update, Some(1695909301 + 2 * config::SYMLINK_GRANULARITY_INTERVAL + timestamp_offset))).await.unwrap();
838                         }
839                         {
840                                 let update = generate_update(short_channel_id, false, 1695909301 + timestamp_offset, 0, 0, 0, 0, 130);
841                                 // network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
842                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
843                         }
844                 }
845                 { // direction true
846                         {
847                                 let update = generate_update(short_channel_id, true, 1693300588 + timestamp_offset, 0, 0, 0, 0, 10);
848                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
849                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
850                         }
851                         {
852                                 let update = generate_update(short_channel_id, true, 1695003621 + timestamp_offset, 0, 0, 0, 0, 10);
853                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
854                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
855                         }
856                 }
857
858                 drop(receiver);
859                 persister.persist_gossip().await;
860
861                 tokio::task::spawn_blocking(move || {
862                         drop(persister);
863                 }).await.unwrap();
864         }
865
866         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
867         let client_graph_arc = Arc::new(client_graph);
868
869         { // sync after initial seed
870                 let serialization = serialize_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
871                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
872
873                 let channel_count = network_graph_arc.read_only().channels().len();
874
875                 assert_eq!(channel_count, 1);
876                 assert_eq!(serialization.message_count, 3);
877                 assert_eq!(serialization.announcement_count, 1);
878                 assert_eq!(serialization.update_count, 2);
879
880                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
881                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
882                 println!("update result: {}", update_result);
883                 // the update result must be a multiple of our snapshot granularity
884                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
885                 assert!(update_result < timestamp);
886
887                 let timestamp_delta = timestamp - update_result;
888                 println!("timestamp delta: {}", timestamp_delta);
889                 assert!(timestamp_delta < config::snapshot_generation_interval());
890
891                 let readonly_graph = client_graph_arc.read_only();
892                 let channels = readonly_graph.channels();
893                 let client_channel_count = channels.len();
894                 assert_eq!(client_channel_count, 1);
895
896                 let first_channel = channels.get(&short_channel_id).unwrap();
897                 assert!(&first_channel.announcement_message.is_none());
898                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 130);
899                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
900         }
901
902         clean_test_db().await;
903 }
904
905 #[tokio::test]
906 async fn test_full_snapshot_interlaced_channel_timestamps() {
907         let _sanitizer = SchemaSanitizer::new();
908         let logger = Arc::new(TestLogger::new());
909         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
910         let network_graph_arc = Arc::new(network_graph);
911
912         let main_channel_id = 1;
913         let timestamp = current_time();
914         println!("timestamp: {}", timestamp);
915
916         { // seed the db
917                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
918                 let secondary_channel_id = main_channel_id + 1;
919
920                 { // main channel
921                         let announcement = generate_channel_announcement(main_channel_id);
922                         network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
923                         receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
924                 }
925
926                 { // secondary channel
927                         let announcement = generate_channel_announcement(secondary_channel_id);
928                         network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
929                         receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
930                 }
931
932                 { // main channel
933                         { // direction false
934                                 let update = generate_update(main_channel_id, false, timestamp - 2, 0, 0, 0, 0, 10);
935                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
936                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
937                         }
938                         { // direction true
939                                 let update = generate_update(main_channel_id, true, timestamp - 2, 0, 0, 0, 0, 5);
940                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
941                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
942                         }
943                 }
944
945                 { // in-between channel
946                         { // direction false
947                                 let update = generate_update(secondary_channel_id, false, timestamp - 1, 0, 0, 0, 0, 42);
948                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
949                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
950                         }
951                         { // direction true
952                                 let update = generate_update(secondary_channel_id, true, timestamp - 1, 0, 0, 0, 0, 42);
953                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
954                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
955                         }
956                 }
957
958                 { // main channel
959                         { // direction false
960                                 let update = generate_update(main_channel_id, false, timestamp, 0, 0, 0, 0, 11);
961                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
962                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
963                         }
964                         { // direction true
965                                 let update = generate_update(main_channel_id, true, timestamp, 0, 0, 0, 0, 6);
966                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
967                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
968                         }
969                 }
970
971                 drop(receiver);
972                 persister.persist_gossip().await;
973
974                 tokio::task::spawn_blocking(move || {
975                         drop(persister);
976                 }).await.unwrap();
977         }
978
979         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
980         let client_graph_arc = Arc::new(client_graph);
981
982         { // sync after initial seed
983                 let serialization = serialize_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
984                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 2", 1);
985
986                 let channel_count = network_graph_arc.read_only().channels().len();
987
988                 assert_eq!(channel_count, 2);
989                 assert_eq!(serialization.message_count, 6);
990                 assert_eq!(serialization.announcement_count, 2);
991                 assert_eq!(serialization.update_count, 4);
992
993                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
994                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
995                 // the update result must be a multiple of our snapshot granularity
996                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
997                 assert!(update_result < timestamp);
998
999                 let readonly_graph = client_graph_arc.read_only();
1000                 let channels = readonly_graph.channels();
1001                 let client_channel_count = channels.len();
1002                 assert_eq!(client_channel_count, 2);
1003
1004                 let first_channel = channels.get(&main_channel_id).unwrap();
1005                 assert!(&first_channel.announcement_message.is_none());
1006                 // ensure the update in one direction shows the latest fee
1007                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 11);
1008                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 6);
1009         }
1010
1011         clean_test_db().await;
1012 }
1013
1014 #[tokio::test]
1015 async fn test_full_snapshot_persistence() {
1016         let schema_sanitizer = SchemaSanitizer::new();
1017         let logger = Arc::new(TestLogger::new());
1018         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
1019         let network_graph_arc = Arc::new(network_graph);
1020         let snapshotter = Snapshotter::new(network_graph_arc.clone(), logger.clone());
1021         let cache_sanitizer = CacheSanitizer::new(&schema_sanitizer);
1022
1023         let short_channel_id = 1;
1024         let timestamp = current_time();
1025         println!("timestamp: {}", timestamp);
1026
1027         { // seed the db
1028                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
1029                 let announcement = generate_channel_announcement(short_channel_id);
1030                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
1031                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
1032
1033                 { // direction true
1034                         let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
1035                         network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
1036                         receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
1037                 }
1038
1039                 { // direction false
1040                         let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
1041                         network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
1042                         receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
1043                 }
1044
1045
1046                 drop(receiver);
1047                 persister.persist_gossip().await;
1048
1049                 tokio::task::spawn_blocking(move || {
1050                         drop(persister);
1051                 }).await.unwrap();
1052         }
1053
1054         let cache_path = cache_sanitizer.cache_path();
1055         let symlink_path = format!("{}/symlinks/0.bin", cache_path);
1056
1057         // generate snapshots
1058         {
1059                 snapshotter.generate_snapshots(20, 5, &[5, u64::MAX], &cache_path, Some(10)).await;
1060
1061                 let symlinked_data = fs::read(&symlink_path).unwrap();
1062                 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
1063                 let client_graph_arc = Arc::new(client_graph);
1064
1065                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
1066                 let update_result = rgs.update_network_graph(&symlinked_data).unwrap();
1067                 // the update result must be a multiple of our snapshot granularity
1068                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
1069
1070                 let readonly_graph = client_graph_arc.read_only();
1071                 let channels = readonly_graph.channels();
1072                 let client_channel_count = channels.len();
1073                 assert_eq!(client_channel_count, 1);
1074
1075                 let first_channel = channels.get(&short_channel_id).unwrap();
1076                 assert!(&first_channel.announcement_message.is_none());
1077                 // ensure the update in one direction shows the latest fee
1078                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 38);
1079                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
1080         }
1081
1082         { // update the db
1083                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
1084
1085                 { // second update
1086                         let update = generate_update(short_channel_id, false, timestamp + 30, 0, 0, 0, 0, 39);
1087                         network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
1088                         receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
1089                 }
1090
1091                 drop(receiver);
1092                 persister.persist_gossip().await;
1093
1094                 tokio::task::spawn_blocking(move || {
1095                         drop(persister);
1096                 }).await.unwrap();
1097         }
1098
1099         // regenerate snapshots
1100         {
1101                 snapshotter.generate_snapshots(20, 5, &[5, u64::MAX], &cache_path, Some(10)).await;
1102
1103                 let symlinked_data = fs::read(&symlink_path).unwrap();
1104                 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
1105                 let client_graph_arc = Arc::new(client_graph);
1106
1107                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
1108                 let update_result = rgs.update_network_graph(&symlinked_data).unwrap();
1109                 // the update result must be a multiple of our snapshot granularity
1110                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
1111
1112                 let readonly_graph = client_graph_arc.read_only();
1113                 let channels = readonly_graph.channels();
1114                 let client_channel_count = channels.len();
1115                 assert_eq!(client_channel_count, 1);
1116
1117                 let first_channel = channels.get(&short_channel_id).unwrap();
1118                 assert!(&first_channel.announcement_message.is_none());
1119                 // ensure the update in one direction shows the latest fee
1120                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
1121                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
1122         }
1123
1124         // clean up afterwards
1125         clean_test_db().await;
1126 }