1 //! Multi-module tests that use database fixtures
3 use std::cell::RefCell;
6 use std::time::{SystemTime, UNIX_EPOCH};
7 use bitcoin::blockdata::constants::ChainHash;
9 use bitcoin::secp256k1::ecdsa::Signature;
10 use bitcoin::secp256k1::{Secp256k1, SecretKey};
11 use bitcoin::hashes::Hash;
12 use bitcoin::hashes::sha256d::Hash as Sha256dHash;
13 use hex_conservative::DisplayHex;
14 use lightning::ln::features::{ChannelFeatures, NodeFeatures};
15 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement, SocketAddress, UnsignedChannelAnnouncement, UnsignedChannelUpdate, UnsignedNodeAnnouncement};
16 use lightning::routing::gossip::{NetworkGraph, NodeAlias, NodeId};
17 use lightning::util::ser::Writeable;
18 use lightning_rapid_gossip_sync::RapidGossipSync;
19 use crate::{calculate_delta, config, serialize_delta};
20 use crate::persistence::GossipPersister;
21 use crate::snapshot::Snapshotter;
22 use crate::types::{GossipMessage, tests::TestLogger};
24 const CLIENT_BACKDATE_INTERVAL: u32 = 3600 * 24 * 7; // client backdates RGS by a week
27 static DB_TEST_SCHEMA: RefCell<Option<String>> = RefCell::new(None);
28 static IS_TEST_SCHEMA_CLEAN: RefCell<Option<bool>> = RefCell::new(None);
31 fn blank_signature() -> Signature {
32 Signature::from_compact(&[0u8; 64]).unwrap()
35 fn genesis_hash() -> ChainHash {
36 ChainHash::using_genesis_block(Network::Bitcoin)
39 fn current_time() -> u32 {
40 SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs() as u32
43 pub(crate) fn db_test_schema() -> String {
44 DB_TEST_SCHEMA.with(|suffix_reference| {
45 let suffix_option = suffix_reference.borrow();
46 suffix_option.as_ref().unwrap().clone()
50 fn generate_node_announcement(private_key: Option<SecretKey>) -> NodeAnnouncement {
51 let secp_context = Secp256k1::new();
53 let random_private_key = private_key.unwrap_or(SecretKey::from_slice(&[1; 32]).unwrap());
54 let random_public_key = random_private_key.public_key(&secp_context);
55 let node_id = NodeId::from_pubkey(&random_public_key);
57 let announcement = UnsignedNodeAnnouncement {
58 features: NodeFeatures::empty(),
62 alias: NodeAlias([0; 32]),
65 excess_address_data: vec![],
68 let msg_hash = bitcoin::secp256k1::Message::from_slice(&Sha256dHash::hash(&announcement.encode()[..])[..]).unwrap();
69 let signature = secp_context.sign_ecdsa(&msg_hash, &random_private_key);
73 contents: announcement,
78 fn generate_channel_announcement(short_channel_id: u64) -> ChannelAnnouncement {
79 let secp_context = Secp256k1::new();
81 let random_private_key_1 = SecretKey::from_slice(&[1; 32]).unwrap();
82 let random_public_key_1 = random_private_key_1.public_key(&secp_context);
83 let node_id_1 = NodeId::from_pubkey(&random_public_key_1);
85 let random_private_key_2 = SecretKey::from_slice(&[2; 32]).unwrap();
86 let random_public_key_2 = random_private_key_2.public_key(&secp_context);
87 let node_id_2 = NodeId::from_pubkey(&random_public_key_2);
89 let announcement = UnsignedChannelAnnouncement {
90 features: ChannelFeatures::empty(),
91 chain_hash: genesis_hash(),
95 bitcoin_key_1: node_id_1,
96 bitcoin_key_2: node_id_2,
100 let msg_hash = bitcoin::secp256k1::Message::from_slice(&Sha256dHash::hash(&announcement.encode()[..])[..]).unwrap();
101 let node_signature_1 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_1);
102 let node_signature_2 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_2);
104 ChannelAnnouncement {
107 bitcoin_signature_1: node_signature_1,
108 bitcoin_signature_2: node_signature_2,
109 contents: announcement,
113 fn generate_update(scid: u64, direction: bool, timestamp: u32, expiry_delta: u16, min_msat: u64, max_msat: u64, base_msat: u32, fee_rate: u32) -> ChannelUpdate {
114 let flag_mask = if direction { 1 } else { 0 };
116 signature: blank_signature(),
117 contents: UnsignedChannelUpdate {
118 chain_hash: genesis_hash(),
119 short_channel_id: scid,
121 flags: 0 | flag_mask,
122 cltv_expiry_delta: expiry_delta,
123 htlc_minimum_msat: min_msat,
124 htlc_maximum_msat: max_msat,
125 fee_base_msat: base_msat,
126 fee_proportional_millionths: fee_rate,
132 struct SchemaSanitizer {}
134 impl SchemaSanitizer {
136 IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
137 let mut is_clean_option = cleanliness_reference.borrow_mut();
138 assert!(is_clean_option.is_none());
139 *is_clean_option = Some(false);
142 DB_TEST_SCHEMA.with(|suffix_reference| {
143 let mut suffix_option = suffix_reference.borrow_mut();
144 let current_time = SystemTime::now();
145 let unix_time = current_time.duration_since(UNIX_EPOCH).expect("Time went backwards");
146 let timestamp_seconds = unix_time.as_secs();
147 let timestamp_nanos = unix_time.as_nanos();
148 // sometimes Rust thinks two tests start at the same nanosecond, causing a schema conflict
149 let thread_id = thread::current().id();
150 let preimage = format!("{:?}-{}", thread_id, timestamp_nanos);
151 println!("test schema preimage: {}", preimage);
152 let suffix = Sha256dHash::hash(preimage.as_bytes()).encode();
153 // the schema must start with a letter
154 let schema = format!("test_{}_{}", timestamp_seconds, suffix.as_hex());
155 *suffix_option = Some(schema);
162 impl Drop for SchemaSanitizer {
164 IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
165 let is_clean_option = cleanliness_reference.borrow();
166 if let Some(is_clean) = *is_clean_option {
167 if std::thread::panicking() {
170 assert_eq!(is_clean, true);
176 struct CacheSanitizer {}
178 impl CacheSanitizer {
179 /// The CacheSanitizer instantiation requires that there be a schema sanitizer
180 fn new(_: &SchemaSanitizer) -> Self {
184 fn cache_path(&self) -> String {
185 format!("./res/{}/", db_test_schema())
189 impl Drop for CacheSanitizer {
191 let cache_path = self.cache_path();
192 fs::remove_dir_all(cache_path).unwrap();
197 async fn clean_test_db() {
198 let client = crate::connect_to_db().await;
199 let schema = db_test_schema();
200 client.execute(&format!("DROP SCHEMA IF EXISTS {} CASCADE", schema), &[]).await.unwrap();
201 IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
202 let mut is_clean_option = cleanliness_reference.borrow_mut();
203 *is_clean_option = Some(true);
208 async fn test_persistence_runtime() {
209 let _sanitizer = SchemaSanitizer::new();
210 let logger = Arc::new(TestLogger::new());
211 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
212 let network_graph_arc = Arc::new(network_graph);
213 let (_persister, _receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
215 tokio::task::spawn_blocking(move || {
219 clean_test_db().await;
224 async fn test_trivial_setup() {
225 let _sanitizer = SchemaSanitizer::new();
226 let logger = Arc::new(TestLogger::new());
227 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
228 let network_graph_arc = Arc::new(network_graph);
229 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
231 let short_channel_id = 1;
232 let timestamp = current_time() - 10;
233 println!("timestamp: {}", timestamp);
236 let announcement = generate_channel_announcement(short_channel_id);
237 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 5, 0);
238 let update_2 = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 10, 0);
240 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
241 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
242 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
244 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
245 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
246 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
248 persister.persist_gossip().await;
251 let delta = calculate_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
252 let serialization = serialize_delta(&delta, 1, logger.clone());
253 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
254 clean_test_db().await;
256 let channel_count = network_graph_arc.read_only().channels().len();
258 assert_eq!(channel_count, 1);
259 assert_eq!(serialization.message_count, 3);
260 assert_eq!(serialization.channel_announcement_count, 1);
261 assert_eq!(serialization.update_count, 2);
263 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
264 let client_graph_arc = Arc::new(client_graph);
265 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
266 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
267 println!("update result: {}", update_result);
268 // the update result must be a multiple of our snapshot granularity
269 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
270 assert!(update_result < timestamp);
272 let timestamp_delta = timestamp - update_result;
273 println!("timestamp delta: {}", timestamp_delta);
274 assert!(timestamp_delta < config::snapshot_generation_interval());
276 let readonly_graph = client_graph_arc.read_only();
277 let channels = readonly_graph.channels();
278 let client_channel_count = channels.len();
279 assert_eq!(client_channel_count, 1);
281 let first_channel = channels.get(&short_channel_id).unwrap();
282 assert!(&first_channel.announcement_message.is_none());
283 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.base_msat, 5);
284 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.base_msat, 10);
285 let last_update_seen_a = first_channel.one_to_two.as_ref().unwrap().last_update;
286 let last_update_seen_b = first_channel.two_to_one.as_ref().unwrap().last_update;
287 println!("last update a: {}", last_update_seen_a);
288 println!("last update b: {}", last_update_seen_b);
289 assert_eq!(last_update_seen_a, update_result - CLIENT_BACKDATE_INTERVAL);
290 assert_eq!(last_update_seen_b, update_result - CLIENT_BACKDATE_INTERVAL);
292 tokio::task::spawn_blocking(move || {
298 async fn test_node_announcement_persistence() {
299 let _sanitizer = SchemaSanitizer::new();
300 let logger = Arc::new(TestLogger::new());
301 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
302 let network_graph_arc = Arc::new(network_graph);
303 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
306 let mut announcement = generate_node_announcement(None);
307 receiver.send(GossipMessage::NodeAnnouncement(announcement.clone(), None)).await.unwrap();
308 receiver.send(GossipMessage::NodeAnnouncement(announcement.clone(), Some(12345))).await.unwrap();
311 // modify announcement to contain a bunch of addresses
312 announcement.contents.addresses.push(SocketAddress::Hostname {
313 hostname: "google.com".to_string().try_into().unwrap(),
316 announcement.contents.addresses.push(SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 9635 });
317 announcement.contents.addresses.push(SocketAddress::TcpIpV6 { addr: [1; 16], port: 1337 });
318 announcement.contents.addresses.push(SocketAddress::OnionV2([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]));
319 announcement.contents.addresses.push(SocketAddress::OnionV3 {
320 ed25519_pubkey: [1; 32],
326 receiver.send(GossipMessage::NodeAnnouncement(announcement, Some(12345))).await.unwrap();
329 persister.persist_gossip().await;
331 tokio::task::spawn_blocking(move || {
335 clean_test_db().await;
339 async fn test_node_announcement_delta_detection() {
340 let _sanitizer = SchemaSanitizer::new();
341 let logger = Arc::new(TestLogger::new());
342 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
343 let network_graph_arc = Arc::new(network_graph);
344 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
346 let timestamp = current_time() - 10;
349 let mut announcement = generate_node_announcement(None);
350 receiver.send(GossipMessage::NodeAnnouncement(announcement.clone(), Some(timestamp - 10))).await.unwrap();
351 receiver.send(GossipMessage::NodeAnnouncement(announcement.clone(), Some(timestamp - 8))).await.unwrap();
354 let mut current_announcement = generate_node_announcement(Some(SecretKey::from_slice(&[2; 32]).unwrap()));
355 current_announcement.contents.features = NodeFeatures::from_be_bytes(vec![23, 48]);
356 receiver.send(GossipMessage::NodeAnnouncement(current_announcement, Some(timestamp))).await.unwrap();
360 let mut current_announcement = generate_node_announcement(Some(SecretKey::from_slice(&[3; 32]).unwrap()));
361 current_announcement.contents.features = NodeFeatures::from_be_bytes(vec![22, 49]);
362 receiver.send(GossipMessage::NodeAnnouncement(current_announcement, Some(timestamp))).await.unwrap();
366 // modify announcement to contain a bunch of addresses
367 announcement.contents.addresses.push(SocketAddress::Hostname {
368 hostname: "google.com".to_string().try_into().unwrap(),
371 announcement.contents.features = NodeFeatures::from_be_bytes(vec![23, 48]);
372 announcement.contents.addresses.push(SocketAddress::TcpIpV4 { addr: [127, 0, 0, 1], port: 9635 });
373 announcement.contents.addresses.push(SocketAddress::TcpIpV6 { addr: [1; 16], port: 1337 });
374 announcement.contents.addresses.push(SocketAddress::OnionV2([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]));
375 announcement.contents.addresses.push(SocketAddress::OnionV3 {
376 ed25519_pubkey: [1; 32],
382 receiver.send(GossipMessage::NodeAnnouncement(announcement, Some(timestamp))).await.unwrap();
384 { // necessary for the node announcements to be considered relevant
385 let announcement = generate_channel_announcement(1);
386 let update_1 = generate_update(1, false, timestamp, 0, 0, 0, 6, 0);
387 let update_2 = generate_update(1, true, timestamp, 0, 0, 0, 6, 0);
389 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
390 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
391 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
393 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
394 receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp))).await.unwrap();
395 receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp))).await.unwrap();
399 persister.persist_gossip().await;
401 tokio::task::spawn_blocking(move || {
406 let delta = calculate_delta(network_graph_arc.clone(), timestamp - 5, None, logger.clone()).await;
407 let serialization = serialize_delta(&delta, 2, logger.clone());
408 clean_test_db().await;
410 assert_eq!(serialization.message_count, 3);
411 assert_eq!(serialization.node_announcement_count, 3);
412 assert_eq!(serialization.node_update_count, 1);
413 assert_eq!(serialization.node_feature_update_count, 1);
414 assert_eq!(serialization.node_address_update_count, 1);
417 /// If a channel has only seen updates in one direction, it should not be announced
419 async fn test_unidirectional_intermediate_update_consideration() {
420 let _sanitizer = SchemaSanitizer::new();
422 let logger = Arc::new(TestLogger::new());
423 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
424 let network_graph_arc = Arc::new(network_graph);
425 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
427 let short_channel_id = 1;
428 let timestamp = current_time() - 10;
429 println!("timestamp: {}", timestamp);
432 let announcement = generate_channel_announcement(short_channel_id);
433 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 6, 0);
434 let update_2 = generate_update(short_channel_id, true, timestamp + 1, 0, 0, 0, 3, 0);
435 let update_3 = generate_update(short_channel_id, true, timestamp + 2, 0, 0, 0, 4, 0);
437 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
438 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
439 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
440 network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap();
442 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
443 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
444 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
445 receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap();
447 persister.persist_gossip().await;
450 let channel_count = network_graph_arc.read_only().channels().len();
451 assert_eq!(channel_count, 1);
453 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
454 let client_graph_arc = Arc::new(client_graph);
455 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
457 let delta = calculate_delta(network_graph_arc.clone(), timestamp + 1, None, logger.clone()).await;
458 let serialization = serialize_delta(&delta, 1, logger.clone());
460 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 1 update rows of the first update in a new direction", 1);
461 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 1 reference rows", 1);
462 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
464 assert_eq!(serialization.message_count, 3);
465 assert_eq!(serialization.channel_announcement_count, 1);
466 assert_eq!(serialization.update_count, 2);
467 assert_eq!(serialization.update_count_full, 2);
468 assert_eq!(serialization.update_count_incremental, 0);
470 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
471 println!("update result: {}", update_result);
472 // the update result must be a multiple of our snapshot granularity
474 let readonly_graph = client_graph_arc.read_only();
475 let channels = readonly_graph.channels();
476 let client_channel_count = channels.len();
477 assert_eq!(client_channel_count, 1);
479 tokio::task::spawn_blocking(move || {
483 clean_test_db().await;
486 /// If a channel has only seen updates in one direction, it should not be announced
488 async fn test_bidirectional_intermediate_update_consideration() {
489 let _sanitizer = SchemaSanitizer::new();
491 let logger = Arc::new(TestLogger::new());
492 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
493 let network_graph_arc = Arc::new(network_graph);
494 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
496 let short_channel_id = 1;
497 let timestamp = current_time() - 10;
498 println!("timestamp: {}", timestamp);
501 let announcement = generate_channel_announcement(short_channel_id);
502 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 5, 0);
503 let update_2 = generate_update(short_channel_id, false, timestamp + 1, 0, 0, 0, 4, 0);
504 let update_3 = generate_update(short_channel_id, false, timestamp + 2, 0, 0, 0, 3, 0);
505 let update_4 = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 3, 0);
507 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
508 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
509 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
510 network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap();
511 network_graph_arc.update_channel_unsigned(&update_4.contents).unwrap();
513 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
514 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
515 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
516 receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap();
517 receiver.send(GossipMessage::ChannelUpdate(update_4, None)).await.unwrap();
519 persister.persist_gossip().await;
522 let channel_count = network_graph_arc.read_only().channels().len();
523 assert_eq!(channel_count, 1);
525 let delta = calculate_delta(network_graph_arc.clone(), timestamp + 1, None, logger.clone()).await;
526 let serialization = serialize_delta(&delta, 1, logger.clone());
528 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 0 update rows of the first update in a new direction", 1);
529 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 2 reference rows", 1);
530 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
532 assert_eq!(serialization.message_count, 1);
533 assert_eq!(serialization.channel_announcement_count, 0);
534 assert_eq!(serialization.update_count, 1);
535 assert_eq!(serialization.update_count_full, 0);
536 assert_eq!(serialization.update_count_incremental, 1);
538 tokio::task::spawn_blocking(move || {
542 clean_test_db().await;
546 async fn test_channel_reminders() {
547 let _sanitizer = SchemaSanitizer::new();
549 let logger = Arc::new(TestLogger::new());
550 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
551 let network_graph_arc = Arc::new(network_graph);
552 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
554 let timestamp = current_time();
555 println!("timestamp: {}", timestamp);
556 let channel_reminder_delta = config::CHANNEL_REMINDER_AGE.as_secs() as u32;
559 { // unupdated channel
560 let short_channel_id = 1;
561 let announcement = generate_channel_announcement(short_channel_id);
562 let update_1 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta - 1, 0, 0, 0, 5, 0);
563 let update_2 = generate_update(short_channel_id, true, timestamp - channel_reminder_delta - 1, 0, 0, 0, 3, 0);
565 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
566 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
567 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
569 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
570 receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
571 receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
573 { // unmodified but updated channel
574 let short_channel_id = 2;
575 let announcement = generate_channel_announcement(short_channel_id);
576 let update_1 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta - 10, 0, 0, 0, 5, 0);
577 // in the false direction, we have one update that's different prior
578 let update_2 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta - 5, 0, 1, 0, 5, 0);
579 let update_3 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta - 1, 0, 0, 0, 5, 0);
580 let update_4 = generate_update(short_channel_id, true, timestamp - channel_reminder_delta - 1, 0, 0, 0, 3, 0);
581 let update_5 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta + 10, 0, 0, 0, 5, 0);
582 let update_6 = generate_update(short_channel_id, true, timestamp - channel_reminder_delta + 10, 0, 0, 0, 3, 0);
583 let update_7 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta + 20, 0, 0, 0, 5, 0);
584 let update_8 = generate_update(short_channel_id, true, timestamp - channel_reminder_delta + 20, 0, 0, 0, 3, 0);
586 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
587 network_graph_arc.update_channel_unsigned(&update_7.contents).unwrap();
588 network_graph_arc.update_channel_unsigned(&update_8.contents).unwrap();
590 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
591 receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp - channel_reminder_delta - 10))).await.unwrap();
592 receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - channel_reminder_delta - 5))).await.unwrap();
593 receiver.send(GossipMessage::ChannelUpdate(update_3, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
594 receiver.send(GossipMessage::ChannelUpdate(update_4, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
596 receiver.send(GossipMessage::ChannelUpdate(update_5, Some(timestamp - channel_reminder_delta + 10))).await.unwrap();
597 receiver.send(GossipMessage::ChannelUpdate(update_6, Some(timestamp - channel_reminder_delta + 10))).await.unwrap();
599 receiver.send(GossipMessage::ChannelUpdate(update_7, Some(timestamp - channel_reminder_delta + 20))).await.unwrap();
600 receiver.send(GossipMessage::ChannelUpdate(update_8, Some(timestamp - channel_reminder_delta + 20))).await.unwrap();
603 persister.persist_gossip().await;
606 let channel_count = network_graph_arc.read_only().channels().len();
607 assert_eq!(channel_count, 2);
609 let delta = calculate_delta(network_graph_arc.clone(), timestamp - channel_reminder_delta + 15, None, logger.clone()).await;
610 let serialization = serialize_delta(&delta, 1, logger.clone());
612 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 0 update rows of the first update in a new direction", 1);
613 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 4 update rows of the latest update in the less recently updated direction", 1);
614 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 2 reference rows", 1);
615 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
617 assert_eq!(serialization.message_count, 4);
618 assert_eq!(serialization.channel_announcement_count, 0);
619 assert_eq!(serialization.update_count, 4);
620 assert_eq!(serialization.update_count_full, 0);
621 assert_eq!(serialization.update_count_incremental, 4);
623 tokio::task::spawn_blocking(move || {
627 clean_test_db().await;
631 async fn test_full_snapshot_recency() {
632 let _sanitizer = SchemaSanitizer::new();
633 let logger = Arc::new(TestLogger::new());
634 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
635 let network_graph_arc = Arc::new(network_graph);
637 let short_channel_id = 1;
638 let timestamp = current_time();
639 println!("timestamp: {}", timestamp);
642 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
643 let announcement = generate_channel_announcement(short_channel_id);
644 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
645 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
649 let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
650 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
651 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
654 let update = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
655 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
656 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
660 { // first and only update
661 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
662 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
663 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
668 persister.persist_gossip().await;
670 tokio::task::spawn_blocking(move || {
675 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
676 let client_graph_arc = Arc::new(client_graph);
678 { // sync after initial seed
679 let delta = calculate_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
680 let serialization = serialize_delta(&delta, 1, logger.clone());
681 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
683 let channel_count = network_graph_arc.read_only().channels().len();
685 assert_eq!(channel_count, 1);
686 assert_eq!(serialization.message_count, 3);
687 assert_eq!(serialization.channel_announcement_count, 1);
688 assert_eq!(serialization.update_count, 2);
690 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
691 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
692 // the update result must be a multiple of our snapshot granularity
693 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
694 assert!(update_result < timestamp);
696 let readonly_graph = client_graph_arc.read_only();
697 let channels = readonly_graph.channels();
698 let client_channel_count = channels.len();
699 assert_eq!(client_channel_count, 1);
701 let first_channel = channels.get(&short_channel_id).unwrap();
702 assert!(&first_channel.announcement_message.is_none());
703 // ensure the update in one direction shows the latest fee
704 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
705 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
708 clean_test_db().await;
712 async fn test_full_snapshot_recency_with_wrong_seen_order() {
713 let _sanitizer = SchemaSanitizer::new();
714 let logger = Arc::new(TestLogger::new());
715 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
716 let network_graph_arc = Arc::new(network_graph);
718 let short_channel_id = 1;
719 let timestamp = current_time();
720 println!("timestamp: {}", timestamp);
723 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
724 let announcement = generate_channel_announcement(short_channel_id);
725 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
726 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
729 { // first update, seen latest
730 let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
731 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
732 receiver.send(GossipMessage::ChannelUpdate(update, Some(timestamp))).await.unwrap();
734 { // second update, seen first
735 let update = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
736 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
737 receiver.send(GossipMessage::ChannelUpdate(update, Some(timestamp - 1))).await.unwrap();
741 { // first and only update
742 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
743 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
744 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
749 persister.persist_gossip().await;
751 tokio::task::spawn_blocking(move || {
756 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
757 let client_graph_arc = Arc::new(client_graph);
759 { // sync after initial seed
760 let delta = calculate_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
761 let serialization = serialize_delta(&delta, 1, logger.clone());
762 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
764 let channel_count = network_graph_arc.read_only().channels().len();
766 assert_eq!(channel_count, 1);
767 assert_eq!(serialization.message_count, 3);
768 assert_eq!(serialization.channel_announcement_count, 1);
769 assert_eq!(serialization.update_count, 2);
771 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
772 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
773 // the update result must be a multiple of our snapshot granularity
774 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
775 assert!(update_result < timestamp);
777 let readonly_graph = client_graph_arc.read_only();
778 let channels = readonly_graph.channels();
779 let client_channel_count = channels.len();
780 assert_eq!(client_channel_count, 1);
782 let first_channel = channels.get(&short_channel_id).unwrap();
783 assert!(&first_channel.announcement_message.is_none());
784 // ensure the update in one direction shows the latest fee
785 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
786 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
789 clean_test_db().await;
793 async fn test_full_snapshot_recency_with_wrong_propagation_order() {
794 let _sanitizer = SchemaSanitizer::new();
795 let logger = Arc::new(TestLogger::new());
796 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
797 let network_graph_arc = Arc::new(network_graph);
799 let short_channel_id = 1;
800 let timestamp = current_time();
801 println!("timestamp: {}", timestamp);
804 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
805 let announcement = generate_channel_announcement(short_channel_id);
806 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
807 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
810 // apply updates in their timestamp order
811 let update_1 = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
812 let update_2 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
813 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
814 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
816 // propagate updates in their seen order
817 receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - 1))).await.unwrap();
818 receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp))).await.unwrap();
821 { // first and only update
822 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
823 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
824 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
829 persister.persist_gossip().await;
831 tokio::task::spawn_blocking(move || {
836 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
837 let client_graph_arc = Arc::new(client_graph);
839 { // sync after initial seed
840 let delta = calculate_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
841 let serialization = serialize_delta(&delta, 1, logger.clone());
842 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
844 let channel_count = network_graph_arc.read_only().channels().len();
846 assert_eq!(channel_count, 1);
847 assert_eq!(serialization.message_count, 3);
848 assert_eq!(serialization.channel_announcement_count, 1);
849 assert_eq!(serialization.update_count, 2);
851 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
852 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
853 // the update result must be a multiple of our snapshot granularity
854 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
855 assert!(update_result < timestamp);
857 let readonly_graph = client_graph_arc.read_only();
858 let channels = readonly_graph.channels();
859 let client_channel_count = channels.len();
860 assert_eq!(client_channel_count, 1);
862 let first_channel = channels.get(&short_channel_id).unwrap();
863 assert!(&first_channel.announcement_message.is_none());
864 // ensure the update in one direction shows the latest fee
865 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
866 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
869 clean_test_db().await;
873 async fn test_full_snapshot_mutiny_scenario() {
874 let _sanitizer = SchemaSanitizer::new();
875 let logger = Arc::new(TestLogger::new());
876 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
877 let network_graph_arc = Arc::new(network_graph);
879 let short_channel_id = 873706024403271681;
880 let timestamp = current_time();
881 // let oldest_simulation_timestamp = 1693300588;
882 let latest_simulation_timestamp = 1695909301;
883 let timestamp_offset = timestamp - latest_simulation_timestamp;
884 println!("timestamp: {}", timestamp);
887 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
888 let announcement = generate_channel_announcement(short_channel_id);
889 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
890 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
894 let update = generate_update(short_channel_id, false, 1693507369 + timestamp_offset, 0, 0, 0, 0, 38);
895 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
896 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
899 let update = generate_update(short_channel_id, false, 1693680390 + timestamp_offset, 0, 0, 0, 0, 38);
900 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
901 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
904 let update = generate_update(short_channel_id, false, 1693749109 + timestamp_offset, 0, 0, 0, 0, 200);
905 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
906 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
909 let update = generate_update(short_channel_id, false, 1693925190 + timestamp_offset, 0, 0, 0, 0, 200);
910 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
911 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
914 let update = generate_update(short_channel_id, false, 1694008323 + timestamp_offset, 0, 0, 0, 0, 209);
915 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
916 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
919 let update = generate_update(short_channel_id, false, 1694219924 + timestamp_offset, 0, 0, 0, 0, 209);
920 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
921 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
924 let update = generate_update(short_channel_id, false, 1694267536 + timestamp_offset, 0, 0, 0, 0, 210);
925 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
926 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
929 let update = generate_update(short_channel_id, false, 1694458808 + timestamp_offset, 0, 0, 0, 0, 210);
930 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
931 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
934 let update = generate_update(short_channel_id, false, 1694526734 + timestamp_offset, 0, 0, 0, 0, 200);
935 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
936 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
939 let update = generate_update(short_channel_id, false, 1694794765 + timestamp_offset, 0, 0, 0, 0, 200);
940 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
941 receiver.send(GossipMessage::ChannelUpdate(update, Some(1695909301 + 2 * config::SYMLINK_GRANULARITY_INTERVAL + timestamp_offset))).await.unwrap();
944 let update = generate_update(short_channel_id, false, 1695909301 + timestamp_offset, 0, 0, 0, 0, 130);
945 // network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
946 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
951 let update = generate_update(short_channel_id, true, 1693300588 + timestamp_offset, 0, 0, 0, 0, 10);
952 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
953 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
956 let update = generate_update(short_channel_id, true, 1695003621 + timestamp_offset, 0, 0, 0, 0, 10);
957 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
958 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
963 persister.persist_gossip().await;
965 tokio::task::spawn_blocking(move || {
970 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
971 let client_graph_arc = Arc::new(client_graph);
973 { // sync after initial seed
974 let delta = calculate_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
975 let serialization = serialize_delta(&delta, 1, logger.clone());
976 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
978 let channel_count = network_graph_arc.read_only().channels().len();
980 assert_eq!(channel_count, 1);
981 assert_eq!(serialization.message_count, 3);
982 assert_eq!(serialization.channel_announcement_count, 1);
983 assert_eq!(serialization.update_count, 2);
985 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
986 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
987 println!("update result: {}", update_result);
988 // the update result must be a multiple of our snapshot granularity
989 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
990 assert!(update_result < timestamp);
992 let timestamp_delta = timestamp - update_result;
993 println!("timestamp delta: {}", timestamp_delta);
994 assert!(timestamp_delta < config::snapshot_generation_interval());
996 let readonly_graph = client_graph_arc.read_only();
997 let channels = readonly_graph.channels();
998 let client_channel_count = channels.len();
999 assert_eq!(client_channel_count, 1);
1001 let first_channel = channels.get(&short_channel_id).unwrap();
1002 assert!(&first_channel.announcement_message.is_none());
1003 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 130);
1004 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
1007 clean_test_db().await;
1011 async fn test_full_snapshot_interlaced_channel_timestamps() {
1012 let _sanitizer = SchemaSanitizer::new();
1013 let logger = Arc::new(TestLogger::new());
1014 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
1015 let network_graph_arc = Arc::new(network_graph);
1017 let main_channel_id = 1;
1018 let timestamp = current_time();
1019 println!("timestamp: {}", timestamp);
1022 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
1023 let secondary_channel_id = main_channel_id + 1;
1026 let announcement = generate_channel_announcement(main_channel_id);
1027 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
1028 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
1031 { // secondary channel
1032 let announcement = generate_channel_announcement(secondary_channel_id);
1033 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
1034 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
1038 { // direction false
1039 let update = generate_update(main_channel_id, false, timestamp - 2, 0, 0, 0, 0, 10);
1040 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
1041 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
1044 let update = generate_update(main_channel_id, true, timestamp - 2, 0, 0, 0, 0, 5);
1045 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
1046 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
1050 { // in-between channel
1051 { // direction false
1052 let update = generate_update(secondary_channel_id, false, timestamp - 1, 0, 0, 0, 0, 42);
1053 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
1054 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
1057 let update = generate_update(secondary_channel_id, true, timestamp - 1, 0, 0, 0, 0, 42);
1058 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
1059 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
1064 { // direction false
1065 let update = generate_update(main_channel_id, false, timestamp, 0, 0, 0, 0, 11);
1066 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
1067 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
1070 let update = generate_update(main_channel_id, true, timestamp, 0, 0, 0, 0, 6);
1071 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
1072 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
1077 persister.persist_gossip().await;
1079 tokio::task::spawn_blocking(move || {
1084 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
1085 let client_graph_arc = Arc::new(client_graph);
1087 { // sync after initial seed
1088 let delta = calculate_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
1089 let serialization = serialize_delta(&delta, 1, logger.clone());
1090 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 2", 1);
1092 let channel_count = network_graph_arc.read_only().channels().len();
1094 assert_eq!(channel_count, 2);
1095 assert_eq!(serialization.message_count, 6);
1096 assert_eq!(serialization.channel_announcement_count, 2);
1097 assert_eq!(serialization.update_count, 4);
1099 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
1100 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
1101 // the update result must be a multiple of our snapshot granularity
1102 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
1103 assert!(update_result < timestamp);
1105 let readonly_graph = client_graph_arc.read_only();
1106 let channels = readonly_graph.channels();
1107 let client_channel_count = channels.len();
1108 assert_eq!(client_channel_count, 2);
1110 let first_channel = channels.get(&main_channel_id).unwrap();
1111 assert!(&first_channel.announcement_message.is_none());
1112 // ensure the update in one direction shows the latest fee
1113 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 11);
1114 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 6);
1117 clean_test_db().await;
1121 async fn test_full_snapshot_persistence() {
1122 let schema_sanitizer = SchemaSanitizer::new();
1123 let logger = Arc::new(TestLogger::new());
1124 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
1125 let network_graph_arc = Arc::new(network_graph);
1126 let snapshotter = Snapshotter::new(network_graph_arc.clone(), logger.clone());
1127 let cache_sanitizer = CacheSanitizer::new(&schema_sanitizer);
1129 let short_channel_id = 1;
1130 let timestamp = current_time();
1131 println!("timestamp: {}", timestamp);
1134 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
1135 let announcement = generate_channel_announcement(short_channel_id);
1136 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
1137 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
1140 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
1141 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
1142 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
1145 { // direction false
1146 let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
1147 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
1148 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
1153 persister.persist_gossip().await;
1155 tokio::task::spawn_blocking(move || {
1160 let cache_path = cache_sanitizer.cache_path();
1161 let symlink_path = format!("{}/symlinks/0.bin", cache_path);
1163 // generate snapshots
1165 snapshotter.generate_snapshots(20, 5, &[5, u64::MAX], &cache_path, Some(10)).await;
1167 let symlinked_data = fs::read(&symlink_path).unwrap();
1168 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
1169 let client_graph_arc = Arc::new(client_graph);
1171 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
1172 let update_result = rgs.update_network_graph(&symlinked_data).unwrap();
1173 // the update result must be a multiple of our snapshot granularity
1174 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
1176 let readonly_graph = client_graph_arc.read_only();
1177 let channels = readonly_graph.channels();
1178 let client_channel_count = channels.len();
1179 assert_eq!(client_channel_count, 1);
1181 let first_channel = channels.get(&short_channel_id).unwrap();
1182 assert!(&first_channel.announcement_message.is_none());
1183 // ensure the update in one direction shows the latest fee
1184 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 38);
1185 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
1189 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
1192 let update = generate_update(short_channel_id, false, timestamp + 30, 0, 0, 0, 0, 39);
1193 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
1194 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
1198 persister.persist_gossip().await;
1200 tokio::task::spawn_blocking(move || {
1205 // regenerate snapshots
1207 snapshotter.generate_snapshots(20, 5, &[5, u64::MAX], &cache_path, Some(10)).await;
1209 let symlinked_data = fs::read(&symlink_path).unwrap();
1210 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
1211 let client_graph_arc = Arc::new(client_graph);
1213 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
1214 let update_result = rgs.update_network_graph(&symlinked_data).unwrap();
1215 // the update result must be a multiple of our snapshot granularity
1216 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
1218 let readonly_graph = client_graph_arc.read_only();
1219 let channels = readonly_graph.channels();
1220 let client_channel_count = channels.len();
1221 assert_eq!(client_channel_count, 1);
1223 let first_channel = channels.get(&short_channel_id).unwrap();
1224 assert!(&first_channel.announcement_message.is_none());
1225 // ensure the update in one direction shows the latest fee
1226 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
1227 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
1230 // clean up afterwards
1231 clean_test_db().await;