Test storing node addresses.
[rapid-gossip-sync-server] / src / tests / mod.rs
1 //! Multi-module tests that use database fixtures
2
3 use std::cell::RefCell;
4 use std::sync::Arc;
5 use std::{fs, thread};
6 use std::time::{SystemTime, UNIX_EPOCH};
7 use bitcoin::blockdata::constants::ChainHash;
8 use bitcoin::Network;
9 use bitcoin::secp256k1::ecdsa::Signature;
10 use bitcoin::secp256k1::{Secp256k1, SecretKey};
11 use bitcoin::hashes::Hash;
12 use bitcoin::hashes::sha256d::Hash as Sha256dHash;
13 use hex_conservative::DisplayHex;
14 use lightning::ln::features::{ChannelFeatures, NodeFeatures};
15 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement, SocketAddress, UnsignedChannelAnnouncement, UnsignedChannelUpdate, UnsignedNodeAnnouncement};
16 use lightning::routing::gossip::{NetworkGraph, NodeAlias, NodeId};
17 use lightning::util::ser::Writeable;
18 use lightning_rapid_gossip_sync::RapidGossipSync;
19 use crate::{config, serialize_delta};
20 use crate::persistence::GossipPersister;
21 use crate::snapshot::Snapshotter;
22 use crate::types::{GossipMessage, tests::TestLogger};
23
24 const CLIENT_BACKDATE_INTERVAL: u32 = 3600 * 24 * 7; // client backdates RGS by a week
25
26 thread_local! {
27         static DB_TEST_SCHEMA: RefCell<Option<String>> = RefCell::new(None);
28         static IS_TEST_SCHEMA_CLEAN: RefCell<Option<bool>> = RefCell::new(None);
29 }
30
31 fn blank_signature() -> Signature {
32         Signature::from_compact(&[0u8; 64]).unwrap()
33 }
34
35 fn genesis_hash() -> ChainHash {
36         ChainHash::using_genesis_block(Network::Bitcoin)
37 }
38
39 fn current_time() -> u32 {
40         SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs() as u32
41 }
42
43 pub(crate) fn db_test_schema() -> String {
44         DB_TEST_SCHEMA.with(|suffix_reference| {
45                 let suffix_option = suffix_reference.borrow();
46                 suffix_option.as_ref().unwrap().clone()
47         })
48 }
49
50 fn generate_node_announcement() -> NodeAnnouncement {
51         let secp_context = Secp256k1::new();
52
53         let random_private_key = SecretKey::from_slice(&[1; 32]).unwrap();
54         let random_public_key = random_private_key.public_key(&secp_context);
55         let node_id = NodeId::from_pubkey(&random_public_key);
56
57         let announcement = UnsignedNodeAnnouncement {
58                 features: NodeFeatures::empty(),
59                 timestamp: 0,
60                 node_id,
61                 rgb: [0, 128, 255],
62                 alias: NodeAlias([0; 32]),
63                 addresses: vec![],
64                 excess_data: vec![],
65                 excess_address_data: vec![],
66         };
67
68         let msg_hash = bitcoin::secp256k1::Message::from_slice(&Sha256dHash::hash(&announcement.encode()[..])[..]).unwrap();
69         let signature = secp_context.sign_ecdsa(&msg_hash, &random_private_key);
70
71         NodeAnnouncement {
72                 signature,
73                 contents: announcement,
74         }
75 }
76
77
78 fn generate_channel_announcement(short_channel_id: u64) -> ChannelAnnouncement {
79         let secp_context = Secp256k1::new();
80
81         let random_private_key_1 = SecretKey::from_slice(&[1; 32]).unwrap();
82         let random_public_key_1 = random_private_key_1.public_key(&secp_context);
83         let node_id_1 = NodeId::from_pubkey(&random_public_key_1);
84
85         let random_private_key_2 = SecretKey::from_slice(&[2; 32]).unwrap();
86         let random_public_key_2 = random_private_key_2.public_key(&secp_context);
87         let node_id_2 = NodeId::from_pubkey(&random_public_key_2);
88
89         let announcement = UnsignedChannelAnnouncement {
90                 features: ChannelFeatures::empty(),
91                 chain_hash: genesis_hash(),
92                 short_channel_id,
93                 node_id_1,
94                 node_id_2,
95                 bitcoin_key_1: node_id_1,
96                 bitcoin_key_2: node_id_2,
97                 excess_data: vec![],
98         };
99
100         let msg_hash = bitcoin::secp256k1::Message::from_slice(&Sha256dHash::hash(&announcement.encode()[..])[..]).unwrap();
101         let node_signature_1 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_1);
102         let node_signature_2 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_2);
103
104         ChannelAnnouncement {
105                 node_signature_1,
106                 node_signature_2,
107                 bitcoin_signature_1: node_signature_1,
108                 bitcoin_signature_2: node_signature_2,
109                 contents: announcement,
110         }
111 }
112
113 fn generate_update(scid: u64, direction: bool, timestamp: u32, expiry_delta: u16, min_msat: u64, max_msat: u64, base_msat: u32, fee_rate: u32) -> ChannelUpdate {
114         let flag_mask = if direction { 1 } else { 0 };
115         ChannelUpdate {
116                 signature: blank_signature(),
117                 contents: UnsignedChannelUpdate {
118                         chain_hash: genesis_hash(),
119                         short_channel_id: scid,
120                         timestamp,
121                         flags: 0 | flag_mask,
122                         cltv_expiry_delta: expiry_delta,
123                         htlc_minimum_msat: min_msat,
124                         htlc_maximum_msat: max_msat,
125                         fee_base_msat: base_msat,
126                         fee_proportional_millionths: fee_rate,
127                         excess_data: vec![],
128                 },
129         }
130 }
131
132 struct SchemaSanitizer {}
133
134 impl SchemaSanitizer {
135         fn new() -> Self {
136                 IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
137                         let mut is_clean_option = cleanliness_reference.borrow_mut();
138                         assert!(is_clean_option.is_none());
139                         *is_clean_option = Some(false);
140                 });
141
142                 DB_TEST_SCHEMA.with(|suffix_reference| {
143                         let mut suffix_option = suffix_reference.borrow_mut();
144                         let current_time = SystemTime::now();
145                         let unix_time = current_time.duration_since(UNIX_EPOCH).expect("Time went backwards");
146                         let timestamp_seconds = unix_time.as_secs();
147                         let timestamp_nanos = unix_time.as_nanos();
148                         // sometimes Rust thinks two tests start at the same nanosecond, causing a schema conflict
149                         let thread_id = thread::current().id();
150                         let preimage = format!("{:?}-{}", thread_id, timestamp_nanos);
151                         println!("test schema preimage: {}", preimage);
152                         let suffix = Sha256dHash::hash(preimage.as_bytes()).encode();
153                         // the schema must start with a letter
154                         let schema = format!("test_{}_{}", timestamp_seconds, suffix.as_hex());
155                         *suffix_option = Some(schema);
156                 });
157
158                 return Self {};
159         }
160 }
161
162 impl Drop for SchemaSanitizer {
163         fn drop(&mut self) {
164                 IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
165                         let is_clean_option = cleanliness_reference.borrow();
166                         if let Some(is_clean) = *is_clean_option {
167                                 if std::thread::panicking() {
168                                         return;
169                                 }
170                                 assert_eq!(is_clean, true);
171                         }
172                 });
173         }
174 }
175
176 struct CacheSanitizer {}
177
178 impl CacheSanitizer {
179         /// The CacheSanitizer instantiation requires that there be a schema sanitizer
180         fn new(_: &SchemaSanitizer) -> Self {
181                 Self {}
182         }
183
184         fn cache_path(&self) -> String {
185                 format!("./res/{}/", db_test_schema())
186         }
187 }
188
189 impl Drop for CacheSanitizer {
190         fn drop(&mut self) {
191                 let cache_path = self.cache_path();
192                 fs::remove_dir_all(cache_path).unwrap();
193         }
194 }
195
196
197 async fn clean_test_db() {
198         let client = crate::connect_to_db().await;
199         let schema = db_test_schema();
200         client.execute(&format!("DROP SCHEMA IF EXISTS {} CASCADE", schema), &[]).await.unwrap();
201         IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
202                 let mut is_clean_option = cleanliness_reference.borrow_mut();
203                 *is_clean_option = Some(true);
204         });
205 }
206
207 #[tokio::test]
208 async fn test_persistence_runtime() {
209         let _sanitizer = SchemaSanitizer::new();
210         let logger = Arc::new(TestLogger::new());
211         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
212         let network_graph_arc = Arc::new(network_graph);
213         let (_persister, _receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
214
215         tokio::task::spawn_blocking(move || {
216                 drop(_persister);
217         }).await.unwrap();
218
219         clean_test_db().await;
220 }
221
222
223 #[tokio::test]
224 async fn test_trivial_setup() {
225         let _sanitizer = SchemaSanitizer::new();
226         let logger = Arc::new(TestLogger::new());
227         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
228         let network_graph_arc = Arc::new(network_graph);
229         let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
230
231         let short_channel_id = 1;
232         let timestamp = current_time() - 10;
233         println!("timestamp: {}", timestamp);
234
235         { // seed the db
236                 let announcement = generate_channel_announcement(short_channel_id);
237                 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 5, 0);
238                 let update_2 = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 10, 0);
239
240                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
241                 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
242                 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
243
244                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
245                 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
246                 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
247                 drop(receiver);
248                 persister.persist_gossip().await;
249         }
250
251         let serialization = serialize_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
252         logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
253         clean_test_db().await;
254
255         let channel_count = network_graph_arc.read_only().channels().len();
256
257         assert_eq!(channel_count, 1);
258         assert_eq!(serialization.message_count, 3);
259         assert_eq!(serialization.announcement_count, 1);
260         assert_eq!(serialization.update_count, 2);
261
262         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
263         let client_graph_arc = Arc::new(client_graph);
264         let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
265         let update_result = rgs.update_network_graph(&serialization.data).unwrap();
266         println!("update result: {}", update_result);
267         // the update result must be a multiple of our snapshot granularity
268         assert_eq!(update_result % config::snapshot_generation_interval(), 0);
269         assert!(update_result < timestamp);
270
271         let timestamp_delta = timestamp - update_result;
272         println!("timestamp delta: {}", timestamp_delta);
273         assert!(timestamp_delta < config::snapshot_generation_interval());
274
275         let readonly_graph = client_graph_arc.read_only();
276         let channels = readonly_graph.channels();
277         let client_channel_count = channels.len();
278         assert_eq!(client_channel_count, 1);
279
280         let first_channel = channels.get(&short_channel_id).unwrap();
281         assert!(&first_channel.announcement_message.is_none());
282         assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.base_msat, 5);
283         assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.base_msat, 10);
284         let last_update_seen_a = first_channel.one_to_two.as_ref().unwrap().last_update;
285         let last_update_seen_b = first_channel.two_to_one.as_ref().unwrap().last_update;
286         println!("last update a: {}", last_update_seen_a);
287         println!("last update b: {}", last_update_seen_b);
288         assert_eq!(last_update_seen_a, update_result - CLIENT_BACKDATE_INTERVAL);
289         assert_eq!(last_update_seen_b, update_result - CLIENT_BACKDATE_INTERVAL);
290
291         tokio::task::spawn_blocking(move || {
292                 drop(persister);
293         }).await.unwrap();
294 }
295
296 #[tokio::test]
297 async fn test_node_announcement_persistence() {
298         let _sanitizer = SchemaSanitizer::new();
299         let logger = Arc::new(TestLogger::new());
300         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
301         let network_graph_arc = Arc::new(network_graph);
302         let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
303
304         { // seed the db
305                 let mut announcement = generate_node_announcement();
306                 receiver.send(GossipMessage::NodeAnnouncement(announcement.clone(), None)).await.unwrap();
307                 receiver.send(GossipMessage::NodeAnnouncement(announcement.clone(), Some(12345))).await.unwrap();
308
309                 {
310                         // modify announcement to contain a bunch of addresses
311                         announcement.contents.addresses.push(SocketAddress::Hostname {
312                                 hostname: "google.com".to_string().try_into().unwrap(),
313                                 port: 443,
314                         });
315                         announcement.contents.addresses.push(SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 9635 });
316                         announcement.contents.addresses.push(SocketAddress::TcpIpV6 { addr: [1; 16], port: 1337 });
317                         announcement.contents.addresses.push(SocketAddress::OnionV2([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]));
318                         announcement.contents.addresses.push(SocketAddress::OnionV3 {
319                                 ed25519_pubkey: [1; 32],
320                                 checksum: 2,
321                                 version: 3,
322                                 port: 4,
323                         });
324                 }
325                 receiver.send(GossipMessage::NodeAnnouncement(announcement, Some(12345))).await.unwrap();
326
327                 drop(receiver);
328                 persister.persist_gossip().await;
329
330                 tokio::task::spawn_blocking(move || {
331                         drop(persister);
332                 }).await.unwrap();
333         }
334         clean_test_db().await;
335 }
336
337
338 /// If a channel has only seen updates in one direction, it should not be announced
339 #[tokio::test]
340 async fn test_unidirectional_intermediate_update_consideration() {
341         let _sanitizer = SchemaSanitizer::new();
342
343         let logger = Arc::new(TestLogger::new());
344         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
345         let network_graph_arc = Arc::new(network_graph);
346         let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
347
348         let short_channel_id = 1;
349         let timestamp = current_time() - 10;
350         println!("timestamp: {}", timestamp);
351
352         { // seed the db
353                 let announcement = generate_channel_announcement(short_channel_id);
354                 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 6, 0);
355                 let update_2 = generate_update(short_channel_id, true, timestamp + 1, 0, 0, 0, 3, 0);
356                 let update_3 = generate_update(short_channel_id, true, timestamp + 2, 0, 0, 0, 4, 0);
357
358                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
359                 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
360                 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
361                 network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap();
362
363                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
364                 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
365                 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
366                 receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap();
367                 drop(receiver);
368                 persister.persist_gossip().await;
369         }
370
371         let channel_count = network_graph_arc.read_only().channels().len();
372         assert_eq!(channel_count, 1);
373
374         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
375         let client_graph_arc = Arc::new(client_graph);
376         let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
377
378         let serialization = serialize_delta(network_graph_arc.clone(), timestamp + 1, None, logger.clone()).await;
379
380         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 1 update rows of the first update in a new direction", 1);
381         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 1 reference rows", 1);
382         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
383
384         assert_eq!(serialization.message_count, 3);
385         assert_eq!(serialization.announcement_count, 1);
386         assert_eq!(serialization.update_count, 2);
387         assert_eq!(serialization.update_count_full, 2);
388         assert_eq!(serialization.update_count_incremental, 0);
389
390         let update_result = rgs.update_network_graph(&serialization.data).unwrap();
391         println!("update result: {}", update_result);
392         // the update result must be a multiple of our snapshot granularity
393
394         let readonly_graph = client_graph_arc.read_only();
395         let channels = readonly_graph.channels();
396         let client_channel_count = channels.len();
397         assert_eq!(client_channel_count, 1);
398
399         tokio::task::spawn_blocking(move || {
400                 drop(persister);
401         }).await.unwrap();
402
403         clean_test_db().await;
404 }
405
406 /// If a channel has only seen updates in one direction, it should not be announced
407 #[tokio::test]
408 async fn test_bidirectional_intermediate_update_consideration() {
409         let _sanitizer = SchemaSanitizer::new();
410
411         let logger = Arc::new(TestLogger::new());
412         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
413         let network_graph_arc = Arc::new(network_graph);
414         let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
415
416         let short_channel_id = 1;
417         let timestamp = current_time() - 10;
418         println!("timestamp: {}", timestamp);
419
420         { // seed the db
421                 let announcement = generate_channel_announcement(short_channel_id);
422                 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 5, 0);
423                 let update_2 = generate_update(short_channel_id, false, timestamp + 1, 0, 0, 0, 4, 0);
424                 let update_3 = generate_update(short_channel_id, false, timestamp + 2, 0, 0, 0, 3, 0);
425                 let update_4 = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 3, 0);
426
427                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
428                 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
429                 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
430                 network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap();
431                 network_graph_arc.update_channel_unsigned(&update_4.contents).unwrap();
432
433                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
434                 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
435                 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
436                 receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap();
437                 receiver.send(GossipMessage::ChannelUpdate(update_4, None)).await.unwrap();
438                 drop(receiver);
439                 persister.persist_gossip().await;
440         }
441
442         let channel_count = network_graph_arc.read_only().channels().len();
443         assert_eq!(channel_count, 1);
444
445         let serialization = serialize_delta(network_graph_arc.clone(), timestamp + 1, None, logger.clone()).await;
446
447         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 0 update rows of the first update in a new direction", 1);
448         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 2 reference rows", 1);
449         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
450
451         assert_eq!(serialization.message_count, 1);
452         assert_eq!(serialization.announcement_count, 0);
453         assert_eq!(serialization.update_count, 1);
454         assert_eq!(serialization.update_count_full, 0);
455         assert_eq!(serialization.update_count_incremental, 1);
456
457         tokio::task::spawn_blocking(move || {
458                 drop(persister);
459         }).await.unwrap();
460
461         clean_test_db().await;
462 }
463
464 #[tokio::test]
465 async fn test_channel_reminders() {
466         let _sanitizer = SchemaSanitizer::new();
467
468         let logger = Arc::new(TestLogger::new());
469         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
470         let network_graph_arc = Arc::new(network_graph);
471         let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
472
473         let timestamp = current_time();
474         println!("timestamp: {}", timestamp);
475         let channel_reminder_delta = config::CHANNEL_REMINDER_AGE.as_secs() as u32;
476
477         { // seed the db
478                 { // unupdated channel
479                         let short_channel_id = 1;
480                         let announcement = generate_channel_announcement(short_channel_id);
481                         let update_1 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta - 1, 0, 0, 0, 5, 0);
482                         let update_2 = generate_update(short_channel_id, true, timestamp - channel_reminder_delta - 1, 0, 0, 0, 3, 0);
483
484                         network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
485                         network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
486                         network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
487
488                         receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
489                         receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
490                         receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
491                 }
492                 { // unmodified but updated channel
493                         let short_channel_id = 2;
494                         let announcement = generate_channel_announcement(short_channel_id);
495                         let update_1 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta - 10, 0, 0, 0, 5, 0);
496                         // in the false direction, we have one update that's different prior
497                         let update_2 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta - 5, 0, 1, 0, 5, 0);
498                         let update_3 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta - 1, 0, 0, 0, 5, 0);
499                         let update_4 = generate_update(short_channel_id, true, timestamp - channel_reminder_delta - 1, 0, 0, 0, 3, 0);
500                         let update_5 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta + 10, 0, 0, 0, 5, 0);
501                         let update_6 = generate_update(short_channel_id, true, timestamp - channel_reminder_delta + 10, 0, 0, 0, 3, 0);
502                         let update_7 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta + 20, 0, 0, 0, 5, 0);
503                         let update_8 = generate_update(short_channel_id, true, timestamp - channel_reminder_delta + 20, 0, 0, 0, 3, 0);
504
505                         network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
506                         network_graph_arc.update_channel_unsigned(&update_7.contents).unwrap();
507                         network_graph_arc.update_channel_unsigned(&update_8.contents).unwrap();
508
509                         receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
510                         receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp - channel_reminder_delta - 10))).await.unwrap();
511                         receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - channel_reminder_delta - 5))).await.unwrap();
512                         receiver.send(GossipMessage::ChannelUpdate(update_3, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
513                         receiver.send(GossipMessage::ChannelUpdate(update_4, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
514
515                         receiver.send(GossipMessage::ChannelUpdate(update_5, Some(timestamp - channel_reminder_delta + 10))).await.unwrap();
516                         receiver.send(GossipMessage::ChannelUpdate(update_6, Some(timestamp - channel_reminder_delta + 10))).await.unwrap();
517
518                         receiver.send(GossipMessage::ChannelUpdate(update_7, Some(timestamp - channel_reminder_delta + 20))).await.unwrap();
519                         receiver.send(GossipMessage::ChannelUpdate(update_8, Some(timestamp - channel_reminder_delta + 20))).await.unwrap();
520                 }
521                 drop(receiver);
522                 persister.persist_gossip().await;
523         }
524
525         let channel_count = network_graph_arc.read_only().channels().len();
526         assert_eq!(channel_count, 2);
527
528         let serialization = serialize_delta(network_graph_arc.clone(), timestamp - channel_reminder_delta + 15, None, logger.clone()).await;
529
530         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 0 update rows of the first update in a new direction", 1);
531         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 4 update rows of the latest update in the less recently updated direction", 1);
532         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 2 reference rows", 1);
533         logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
534
535         assert_eq!(serialization.message_count, 4);
536         assert_eq!(serialization.announcement_count, 0);
537         assert_eq!(serialization.update_count, 4);
538         assert_eq!(serialization.update_count_full, 0);
539         assert_eq!(serialization.update_count_incremental, 4);
540
541         tokio::task::spawn_blocking(move || {
542                 drop(persister);
543         }).await.unwrap();
544
545         clean_test_db().await;
546 }
547
548 #[tokio::test]
549 async fn test_full_snapshot_recency() {
550         let _sanitizer = SchemaSanitizer::new();
551         let logger = Arc::new(TestLogger::new());
552         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
553         let network_graph_arc = Arc::new(network_graph);
554
555         let short_channel_id = 1;
556         let timestamp = current_time();
557         println!("timestamp: {}", timestamp);
558
559         { // seed the db
560                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
561                 let announcement = generate_channel_announcement(short_channel_id);
562                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
563                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
564
565                 { // direction false
566                         { // first update
567                                 let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
568                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
569                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
570                         }
571                         { // second update
572                                 let update = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
573                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
574                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
575                         }
576                 }
577                 { // direction true
578                         { // first and only update
579                                 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
580                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
581                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
582                         }
583                 }
584
585                 drop(receiver);
586                 persister.persist_gossip().await;
587
588                 tokio::task::spawn_blocking(move || {
589                         drop(persister);
590                 }).await.unwrap();
591         }
592
593         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
594         let client_graph_arc = Arc::new(client_graph);
595
596         { // sync after initial seed
597                 let serialization = serialize_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
598                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
599
600                 let channel_count = network_graph_arc.read_only().channels().len();
601
602                 assert_eq!(channel_count, 1);
603                 assert_eq!(serialization.message_count, 3);
604                 assert_eq!(serialization.announcement_count, 1);
605                 assert_eq!(serialization.update_count, 2);
606
607                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
608                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
609                 // the update result must be a multiple of our snapshot granularity
610                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
611                 assert!(update_result < timestamp);
612
613                 let readonly_graph = client_graph_arc.read_only();
614                 let channels = readonly_graph.channels();
615                 let client_channel_count = channels.len();
616                 assert_eq!(client_channel_count, 1);
617
618                 let first_channel = channels.get(&short_channel_id).unwrap();
619                 assert!(&first_channel.announcement_message.is_none());
620                 // ensure the update in one direction shows the latest fee
621                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
622                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
623         }
624
625         clean_test_db().await;
626 }
627
628 #[tokio::test]
629 async fn test_full_snapshot_recency_with_wrong_seen_order() {
630         let _sanitizer = SchemaSanitizer::new();
631         let logger = Arc::new(TestLogger::new());
632         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
633         let network_graph_arc = Arc::new(network_graph);
634
635         let short_channel_id = 1;
636         let timestamp = current_time();
637         println!("timestamp: {}", timestamp);
638
639         { // seed the db
640                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
641                 let announcement = generate_channel_announcement(short_channel_id);
642                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
643                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
644
645                 { // direction false
646                         { // first update, seen latest
647                                 let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
648                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
649                                 receiver.send(GossipMessage::ChannelUpdate(update, Some(timestamp))).await.unwrap();
650                         }
651                         { // second update, seen first
652                                 let update = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
653                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
654                                 receiver.send(GossipMessage::ChannelUpdate(update, Some(timestamp - 1))).await.unwrap();
655                         }
656                 }
657                 { // direction true
658                         { // first and only update
659                                 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
660                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
661                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
662                         }
663                 }
664
665                 drop(receiver);
666                 persister.persist_gossip().await;
667
668                 tokio::task::spawn_blocking(move || {
669                         drop(persister);
670                 }).await.unwrap();
671         }
672
673         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
674         let client_graph_arc = Arc::new(client_graph);
675
676         { // sync after initial seed
677                 let serialization = serialize_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
678                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
679
680                 let channel_count = network_graph_arc.read_only().channels().len();
681
682                 assert_eq!(channel_count, 1);
683                 assert_eq!(serialization.message_count, 3);
684                 assert_eq!(serialization.announcement_count, 1);
685                 assert_eq!(serialization.update_count, 2);
686
687                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
688                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
689                 // the update result must be a multiple of our snapshot granularity
690                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
691                 assert!(update_result < timestamp);
692
693                 let readonly_graph = client_graph_arc.read_only();
694                 let channels = readonly_graph.channels();
695                 let client_channel_count = channels.len();
696                 assert_eq!(client_channel_count, 1);
697
698                 let first_channel = channels.get(&short_channel_id).unwrap();
699                 assert!(&first_channel.announcement_message.is_none());
700                 // ensure the update in one direction shows the latest fee
701                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
702                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
703         }
704
705         clean_test_db().await;
706 }
707
708 #[tokio::test]
709 async fn test_full_snapshot_recency_with_wrong_propagation_order() {
710         let _sanitizer = SchemaSanitizer::new();
711         let logger = Arc::new(TestLogger::new());
712         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
713         let network_graph_arc = Arc::new(network_graph);
714
715         let short_channel_id = 1;
716         let timestamp = current_time();
717         println!("timestamp: {}", timestamp);
718
719         { // seed the db
720                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
721                 let announcement = generate_channel_announcement(short_channel_id);
722                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
723                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
724
725                 { // direction false
726                         // apply updates in their timestamp order
727                         let update_1 = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
728                         let update_2 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
729                         network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
730                         network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
731
732                         // propagate updates in their seen order
733                         receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - 1))).await.unwrap();
734                         receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp))).await.unwrap();
735                 }
736                 { // direction true
737                         { // first and only update
738                                 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
739                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
740                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
741                         }
742                 }
743
744                 drop(receiver);
745                 persister.persist_gossip().await;
746
747                 tokio::task::spawn_blocking(move || {
748                         drop(persister);
749                 }).await.unwrap();
750         }
751
752         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
753         let client_graph_arc = Arc::new(client_graph);
754
755         { // sync after initial seed
756                 let serialization = serialize_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
757                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
758
759                 let channel_count = network_graph_arc.read_only().channels().len();
760
761                 assert_eq!(channel_count, 1);
762                 assert_eq!(serialization.message_count, 3);
763                 assert_eq!(serialization.announcement_count, 1);
764                 assert_eq!(serialization.update_count, 2);
765
766                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
767                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
768                 // the update result must be a multiple of our snapshot granularity
769                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
770                 assert!(update_result < timestamp);
771
772                 let readonly_graph = client_graph_arc.read_only();
773                 let channels = readonly_graph.channels();
774                 let client_channel_count = channels.len();
775                 assert_eq!(client_channel_count, 1);
776
777                 let first_channel = channels.get(&short_channel_id).unwrap();
778                 assert!(&first_channel.announcement_message.is_none());
779                 // ensure the update in one direction shows the latest fee
780                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
781                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
782         }
783
784         clean_test_db().await;
785 }
786
787 #[tokio::test]
788 async fn test_full_snapshot_mutiny_scenario() {
789         let _sanitizer = SchemaSanitizer::new();
790         let logger = Arc::new(TestLogger::new());
791         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
792         let network_graph_arc = Arc::new(network_graph);
793
794         let short_channel_id = 873706024403271681;
795         let timestamp = current_time();
796         // let oldest_simulation_timestamp = 1693300588;
797         let latest_simulation_timestamp = 1695909301;
798         let timestamp_offset = timestamp - latest_simulation_timestamp;
799         println!("timestamp: {}", timestamp);
800
801         { // seed the db
802                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
803                 let announcement = generate_channel_announcement(short_channel_id);
804                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
805                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
806
807                 { // direction false
808                         {
809                                 let update = generate_update(short_channel_id, false, 1693507369 + timestamp_offset, 0, 0, 0, 0, 38);
810                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
811                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
812                         }
813                         {
814                                 let update = generate_update(short_channel_id, false, 1693680390 + timestamp_offset, 0, 0, 0, 0, 38);
815                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
816                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
817                         }
818                         {
819                                 let update = generate_update(short_channel_id, false, 1693749109 + timestamp_offset, 0, 0, 0, 0, 200);
820                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
821                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
822                         }
823                         {
824                                 let update = generate_update(short_channel_id, false, 1693925190 + timestamp_offset, 0, 0, 0, 0, 200);
825                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
826                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
827                         }
828                         {
829                                 let update = generate_update(short_channel_id, false, 1694008323 + timestamp_offset, 0, 0, 0, 0, 209);
830                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
831                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
832                         }
833                         {
834                                 let update = generate_update(short_channel_id, false, 1694219924 + timestamp_offset, 0, 0, 0, 0, 209);
835                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
836                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
837                         }
838                         {
839                                 let update = generate_update(short_channel_id, false, 1694267536 + timestamp_offset, 0, 0, 0, 0, 210);
840                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
841                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
842                         }
843                         {
844                                 let update = generate_update(short_channel_id, false, 1694458808 + timestamp_offset, 0, 0, 0, 0, 210);
845                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
846                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
847                         }
848                         {
849                                 let update = generate_update(short_channel_id, false, 1694526734 + timestamp_offset, 0, 0, 0, 0, 200);
850                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
851                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
852                         }
853                         {
854                                 let update = generate_update(short_channel_id, false, 1694794765 + timestamp_offset, 0, 0, 0, 0, 200);
855                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
856                                 receiver.send(GossipMessage::ChannelUpdate(update, Some(1695909301 + 2 * config::SYMLINK_GRANULARITY_INTERVAL + timestamp_offset))).await.unwrap();
857                         }
858                         {
859                                 let update = generate_update(short_channel_id, false, 1695909301 + timestamp_offset, 0, 0, 0, 0, 130);
860                                 // network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
861                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
862                         }
863                 }
864                 { // direction true
865                         {
866                                 let update = generate_update(short_channel_id, true, 1693300588 + timestamp_offset, 0, 0, 0, 0, 10);
867                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
868                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
869                         }
870                         {
871                                 let update = generate_update(short_channel_id, true, 1695003621 + timestamp_offset, 0, 0, 0, 0, 10);
872                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
873                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
874                         }
875                 }
876
877                 drop(receiver);
878                 persister.persist_gossip().await;
879
880                 tokio::task::spawn_blocking(move || {
881                         drop(persister);
882                 }).await.unwrap();
883         }
884
885         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
886         let client_graph_arc = Arc::new(client_graph);
887
888         { // sync after initial seed
889                 let serialization = serialize_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
890                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
891
892                 let channel_count = network_graph_arc.read_only().channels().len();
893
894                 assert_eq!(channel_count, 1);
895                 assert_eq!(serialization.message_count, 3);
896                 assert_eq!(serialization.announcement_count, 1);
897                 assert_eq!(serialization.update_count, 2);
898
899                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
900                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
901                 println!("update result: {}", update_result);
902                 // the update result must be a multiple of our snapshot granularity
903                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
904                 assert!(update_result < timestamp);
905
906                 let timestamp_delta = timestamp - update_result;
907                 println!("timestamp delta: {}", timestamp_delta);
908                 assert!(timestamp_delta < config::snapshot_generation_interval());
909
910                 let readonly_graph = client_graph_arc.read_only();
911                 let channels = readonly_graph.channels();
912                 let client_channel_count = channels.len();
913                 assert_eq!(client_channel_count, 1);
914
915                 let first_channel = channels.get(&short_channel_id).unwrap();
916                 assert!(&first_channel.announcement_message.is_none());
917                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 130);
918                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
919         }
920
921         clean_test_db().await;
922 }
923
924 #[tokio::test]
925 async fn test_full_snapshot_interlaced_channel_timestamps() {
926         let _sanitizer = SchemaSanitizer::new();
927         let logger = Arc::new(TestLogger::new());
928         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
929         let network_graph_arc = Arc::new(network_graph);
930
931         let main_channel_id = 1;
932         let timestamp = current_time();
933         println!("timestamp: {}", timestamp);
934
935         { // seed the db
936                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
937                 let secondary_channel_id = main_channel_id + 1;
938
939                 { // main channel
940                         let announcement = generate_channel_announcement(main_channel_id);
941                         network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
942                         receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
943                 }
944
945                 { // secondary channel
946                         let announcement = generate_channel_announcement(secondary_channel_id);
947                         network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
948                         receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
949                 }
950
951                 { // main channel
952                         { // direction false
953                                 let update = generate_update(main_channel_id, false, timestamp - 2, 0, 0, 0, 0, 10);
954                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
955                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
956                         }
957                         { // direction true
958                                 let update = generate_update(main_channel_id, true, timestamp - 2, 0, 0, 0, 0, 5);
959                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
960                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
961                         }
962                 }
963
964                 { // in-between channel
965                         { // direction false
966                                 let update = generate_update(secondary_channel_id, false, timestamp - 1, 0, 0, 0, 0, 42);
967                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
968                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
969                         }
970                         { // direction true
971                                 let update = generate_update(secondary_channel_id, true, timestamp - 1, 0, 0, 0, 0, 42);
972                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
973                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
974                         }
975                 }
976
977                 { // main channel
978                         { // direction false
979                                 let update = generate_update(main_channel_id, false, timestamp, 0, 0, 0, 0, 11);
980                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
981                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
982                         }
983                         { // direction true
984                                 let update = generate_update(main_channel_id, true, timestamp, 0, 0, 0, 0, 6);
985                                 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
986                                 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
987                         }
988                 }
989
990                 drop(receiver);
991                 persister.persist_gossip().await;
992
993                 tokio::task::spawn_blocking(move || {
994                         drop(persister);
995                 }).await.unwrap();
996         }
997
998         let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
999         let client_graph_arc = Arc::new(client_graph);
1000
1001         { // sync after initial seed
1002                 let serialization = serialize_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
1003                 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 2", 1);
1004
1005                 let channel_count = network_graph_arc.read_only().channels().len();
1006
1007                 assert_eq!(channel_count, 2);
1008                 assert_eq!(serialization.message_count, 6);
1009                 assert_eq!(serialization.announcement_count, 2);
1010                 assert_eq!(serialization.update_count, 4);
1011
1012                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
1013                 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
1014                 // the update result must be a multiple of our snapshot granularity
1015                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
1016                 assert!(update_result < timestamp);
1017
1018                 let readonly_graph = client_graph_arc.read_only();
1019                 let channels = readonly_graph.channels();
1020                 let client_channel_count = channels.len();
1021                 assert_eq!(client_channel_count, 2);
1022
1023                 let first_channel = channels.get(&main_channel_id).unwrap();
1024                 assert!(&first_channel.announcement_message.is_none());
1025                 // ensure the update in one direction shows the latest fee
1026                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 11);
1027                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 6);
1028         }
1029
1030         clean_test_db().await;
1031 }
1032
1033 #[tokio::test]
1034 async fn test_full_snapshot_persistence() {
1035         let schema_sanitizer = SchemaSanitizer::new();
1036         let logger = Arc::new(TestLogger::new());
1037         let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
1038         let network_graph_arc = Arc::new(network_graph);
1039         let snapshotter = Snapshotter::new(network_graph_arc.clone(), logger.clone());
1040         let cache_sanitizer = CacheSanitizer::new(&schema_sanitizer);
1041
1042         let short_channel_id = 1;
1043         let timestamp = current_time();
1044         println!("timestamp: {}", timestamp);
1045
1046         { // seed the db
1047                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
1048                 let announcement = generate_channel_announcement(short_channel_id);
1049                 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
1050                 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
1051
1052                 { // direction true
1053                         let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
1054                         network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
1055                         receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
1056                 }
1057
1058                 { // direction false
1059                         let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
1060                         network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
1061                         receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
1062                 }
1063
1064
1065                 drop(receiver);
1066                 persister.persist_gossip().await;
1067
1068                 tokio::task::spawn_blocking(move || {
1069                         drop(persister);
1070                 }).await.unwrap();
1071         }
1072
1073         let cache_path = cache_sanitizer.cache_path();
1074         let symlink_path = format!("{}/symlinks/0.bin", cache_path);
1075
1076         // generate snapshots
1077         {
1078                 snapshotter.generate_snapshots(20, 5, &[5, u64::MAX], &cache_path, Some(10)).await;
1079
1080                 let symlinked_data = fs::read(&symlink_path).unwrap();
1081                 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
1082                 let client_graph_arc = Arc::new(client_graph);
1083
1084                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
1085                 let update_result = rgs.update_network_graph(&symlinked_data).unwrap();
1086                 // the update result must be a multiple of our snapshot granularity
1087                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
1088
1089                 let readonly_graph = client_graph_arc.read_only();
1090                 let channels = readonly_graph.channels();
1091                 let client_channel_count = channels.len();
1092                 assert_eq!(client_channel_count, 1);
1093
1094                 let first_channel = channels.get(&short_channel_id).unwrap();
1095                 assert!(&first_channel.announcement_message.is_none());
1096                 // ensure the update in one direction shows the latest fee
1097                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 38);
1098                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
1099         }
1100
1101         { // update the db
1102                 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
1103
1104                 { // second update
1105                         let update = generate_update(short_channel_id, false, timestamp + 30, 0, 0, 0, 0, 39);
1106                         network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
1107                         receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
1108                 }
1109
1110                 drop(receiver);
1111                 persister.persist_gossip().await;
1112
1113                 tokio::task::spawn_blocking(move || {
1114                         drop(persister);
1115                 }).await.unwrap();
1116         }
1117
1118         // regenerate snapshots
1119         {
1120                 snapshotter.generate_snapshots(20, 5, &[5, u64::MAX], &cache_path, Some(10)).await;
1121
1122                 let symlinked_data = fs::read(&symlink_path).unwrap();
1123                 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
1124                 let client_graph_arc = Arc::new(client_graph);
1125
1126                 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
1127                 let update_result = rgs.update_network_graph(&symlinked_data).unwrap();
1128                 // the update result must be a multiple of our snapshot granularity
1129                 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
1130
1131                 let readonly_graph = client_graph_arc.read_only();
1132                 let channels = readonly_graph.channels();
1133                 let client_channel_count = channels.len();
1134                 assert_eq!(client_channel_count, 1);
1135
1136                 let first_channel = channels.get(&short_channel_id).unwrap();
1137                 assert!(&first_channel.announcement_message.is_none());
1138                 // ensure the update in one direction shows the latest fee
1139                 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
1140                 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
1141         }
1142
1143         // clean up afterwards
1144         clean_test_db().await;
1145 }