1 //! Multi-module tests that use database fixtures
3 use std::cell::RefCell;
6 use std::time::{SystemTime, UNIX_EPOCH};
7 use bitcoin::blockdata::constants::ChainHash;
9 use bitcoin::secp256k1::ecdsa::Signature;
10 use bitcoin::secp256k1::{Secp256k1, SecretKey};
11 use bitcoin::hashes::Hash;
12 use bitcoin::hashes::sha256d::Hash as Sha256dHash;
13 use hex_conservative::DisplayHex;
14 use lightning::ln::features::ChannelFeatures;
15 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate};
16 use lightning::routing::gossip::{NetworkGraph, NodeId};
17 use lightning::util::ser::Writeable;
18 use lightning_rapid_gossip_sync::RapidGossipSync;
19 use crate::{config, serialize_delta};
20 use crate::persistence::GossipPersister;
21 use crate::snapshot::Snapshotter;
22 use crate::types::{GossipMessage, tests::TestLogger};
24 const CLIENT_BACKDATE_INTERVAL: u32 = 3600 * 24 * 7; // client backdates RGS by a week
27 static DB_TEST_SCHEMA: RefCell<Option<String>> = RefCell::new(None);
28 static IS_TEST_SCHEMA_CLEAN: RefCell<Option<bool>> = RefCell::new(None);
31 fn blank_signature() -> Signature {
32 Signature::from_compact(&[0u8; 64]).unwrap()
35 fn genesis_hash() -> ChainHash {
36 ChainHash::using_genesis_block(Network::Bitcoin)
39 fn current_time() -> u32 {
40 SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs() as u32
43 pub(crate) fn db_test_schema() -> String {
44 DB_TEST_SCHEMA.with(|suffix_reference| {
45 let suffix_option = suffix_reference.borrow();
46 suffix_option.as_ref().unwrap().clone()
50 fn generate_announcement(short_channel_id: u64) -> ChannelAnnouncement {
51 let secp_context = Secp256k1::new();
53 let random_private_key_1 = SecretKey::from_slice(&[1; 32]).unwrap();
54 let random_public_key_1 = random_private_key_1.public_key(&secp_context);
55 let node_id_1 = NodeId::from_pubkey(&random_public_key_1);
57 let random_private_key_2 = SecretKey::from_slice(&[2; 32]).unwrap();
58 let random_public_key_2 = random_private_key_2.public_key(&secp_context);
59 let node_id_2 = NodeId::from_pubkey(&random_public_key_2);
61 let announcement = UnsignedChannelAnnouncement {
62 features: ChannelFeatures::empty(),
63 chain_hash: genesis_hash(),
67 bitcoin_key_1: node_id_1,
68 bitcoin_key_2: node_id_2,
72 let msg_hash = bitcoin::secp256k1::Message::from_slice(&Sha256dHash::hash(&announcement.encode()[..])[..]).unwrap();
73 let node_signature_1 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_1);
74 let node_signature_2 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_2);
79 bitcoin_signature_1: node_signature_1,
80 bitcoin_signature_2: node_signature_2,
81 contents: announcement,
85 fn generate_update(scid: u64, direction: bool, timestamp: u32, expiry_delta: u16, min_msat: u64, max_msat: u64, base_msat: u32, fee_rate: u32) -> ChannelUpdate {
86 let flag_mask = if direction { 1 } else { 0 };
88 signature: blank_signature(),
89 contents: UnsignedChannelUpdate {
90 chain_hash: genesis_hash(),
91 short_channel_id: scid,
94 cltv_expiry_delta: expiry_delta,
95 htlc_minimum_msat: min_msat,
96 htlc_maximum_msat: max_msat,
97 fee_base_msat: base_msat,
98 fee_proportional_millionths: fee_rate,
104 struct SchemaSanitizer {}
106 impl SchemaSanitizer {
108 IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
109 let mut is_clean_option = cleanliness_reference.borrow_mut();
110 assert!(is_clean_option.is_none());
111 *is_clean_option = Some(false);
114 DB_TEST_SCHEMA.with(|suffix_reference| {
115 let mut suffix_option = suffix_reference.borrow_mut();
116 let current_time = SystemTime::now();
117 let unix_time = current_time.duration_since(UNIX_EPOCH).expect("Time went backwards");
118 let timestamp_seconds = unix_time.as_secs();
119 let timestamp_nanos = unix_time.as_nanos();
120 // sometimes Rust thinks two tests start at the same nanosecond, causing a schema conflict
121 let thread_id = thread::current().id();
122 let preimage = format!("{:?}-{}", thread_id, timestamp_nanos);
123 println!("test schema preimage: {}", preimage);
124 let suffix = Sha256dHash::hash(preimage.as_bytes()).encode();
125 // the schema must start with a letter
126 let schema = format!("test_{}_{}", timestamp_seconds, suffix.as_hex());
127 *suffix_option = Some(schema);
134 impl Drop for SchemaSanitizer {
136 IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
137 let is_clean_option = cleanliness_reference.borrow();
138 if let Some(is_clean) = *is_clean_option {
139 assert_eq!(is_clean, true);
145 struct CacheSanitizer {}
147 impl CacheSanitizer {
148 /// The CacheSanitizer instantiation requires that there be a schema sanitizer
149 fn new(_: &SchemaSanitizer) -> Self {
153 fn cache_path(&self) -> String {
154 format!("./res/{}/", db_test_schema())
158 impl Drop for CacheSanitizer {
160 let cache_path = self.cache_path();
161 fs::remove_dir_all(cache_path).unwrap();
166 async fn clean_test_db() {
167 let client = crate::connect_to_db().await;
168 let schema = db_test_schema();
169 client.execute(&format!("DROP SCHEMA IF EXISTS {} CASCADE", schema), &[]).await.unwrap();
170 IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
171 let mut is_clean_option = cleanliness_reference.borrow_mut();
172 *is_clean_option = Some(true);
177 async fn test_persistence_runtime() {
178 let _sanitizer = SchemaSanitizer::new();
179 let logger = Arc::new(TestLogger::new());
180 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
181 let network_graph_arc = Arc::new(network_graph);
182 let (_persister, _receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
184 tokio::task::spawn_blocking(move || {
188 clean_test_db().await;
193 async fn test_trivial_setup() {
194 let _sanitizer = SchemaSanitizer::new();
195 let logger = Arc::new(TestLogger::new());
196 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
197 let network_graph_arc = Arc::new(network_graph);
198 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
200 let short_channel_id = 1;
201 let timestamp = current_time() - 10;
202 println!("timestamp: {}", timestamp);
205 let announcement = generate_announcement(short_channel_id);
206 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 5, 0);
207 let update_2 = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 10, 0);
209 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
210 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
211 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
213 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
214 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
215 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
217 persister.persist_gossip().await;
220 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
221 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
222 clean_test_db().await;
224 let channel_count = network_graph_arc.read_only().channels().len();
226 assert_eq!(channel_count, 1);
227 assert_eq!(serialization.message_count, 3);
228 assert_eq!(serialization.announcement_count, 1);
229 assert_eq!(serialization.update_count, 2);
231 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
232 let client_graph_arc = Arc::new(client_graph);
233 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
234 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
235 println!("update result: {}", update_result);
236 // the update result must be a multiple of our snapshot granularity
237 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
238 assert!(update_result < timestamp);
240 let timestamp_delta = timestamp - update_result;
241 println!("timestamp delta: {}", timestamp_delta);
242 assert!(timestamp_delta < config::snapshot_generation_interval());
244 let readonly_graph = client_graph_arc.read_only();
245 let channels = readonly_graph.channels();
246 let client_channel_count = channels.len();
247 assert_eq!(client_channel_count, 1);
249 let first_channel = channels.get(&short_channel_id).unwrap();
250 assert!(&first_channel.announcement_message.is_none());
251 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.base_msat, 5);
252 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.base_msat, 10);
253 let last_update_seen_a = first_channel.one_to_two.as_ref().unwrap().last_update;
254 let last_update_seen_b = first_channel.two_to_one.as_ref().unwrap().last_update;
255 println!("last update a: {}", last_update_seen_a);
256 println!("last update b: {}", last_update_seen_b);
257 assert_eq!(last_update_seen_a, update_result - CLIENT_BACKDATE_INTERVAL);
258 assert_eq!(last_update_seen_b, update_result - CLIENT_BACKDATE_INTERVAL);
260 tokio::task::spawn_blocking(move || {
265 /// If a channel has only seen updates in one direction, it should not be announced
267 async fn test_unidirectional_intermediate_update_consideration() {
268 let _sanitizer = SchemaSanitizer::new();
270 let logger = Arc::new(TestLogger::new());
271 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
272 let network_graph_arc = Arc::new(network_graph);
273 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
275 let short_channel_id = 1;
276 let timestamp = current_time() - 10;
277 println!("timestamp: {}", timestamp);
280 let announcement = generate_announcement(short_channel_id);
281 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 6, 0);
282 let update_2 = generate_update(short_channel_id, true, timestamp + 1, 0, 0, 0, 3, 0);
283 let update_3 = generate_update(short_channel_id, true, timestamp + 2, 0, 0, 0, 4, 0);
285 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
286 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
287 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
288 network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap();
290 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
291 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
292 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
293 receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap();
295 persister.persist_gossip().await;
298 let channel_count = network_graph_arc.read_only().channels().len();
299 assert_eq!(channel_count, 1);
301 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
302 let client_graph_arc = Arc::new(client_graph);
303 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
305 let serialization = serialize_delta(network_graph_arc.clone(), timestamp + 1, logger.clone()).await;
307 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 1 update rows of the first update in a new direction", 1);
308 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 1 reference rows", 1);
309 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
311 assert_eq!(serialization.message_count, 3);
312 assert_eq!(serialization.announcement_count, 1);
313 assert_eq!(serialization.update_count, 2);
314 assert_eq!(serialization.update_count_full, 2);
315 assert_eq!(serialization.update_count_incremental, 0);
317 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
318 println!("update result: {}", update_result);
319 // the update result must be a multiple of our snapshot granularity
321 let readonly_graph = client_graph_arc.read_only();
322 let channels = readonly_graph.channels();
323 let client_channel_count = channels.len();
324 assert_eq!(client_channel_count, 1);
326 tokio::task::spawn_blocking(move || {
330 clean_test_db().await;
333 /// If a channel has only seen updates in one direction, it should not be announced
335 async fn test_bidirectional_intermediate_update_consideration() {
336 let _sanitizer = SchemaSanitizer::new();
338 let logger = Arc::new(TestLogger::new());
339 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
340 let network_graph_arc = Arc::new(network_graph);
341 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
343 let short_channel_id = 1;
344 let timestamp = current_time() - 10;
345 println!("timestamp: {}", timestamp);
348 let announcement = generate_announcement(short_channel_id);
349 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 5, 0);
350 let update_2 = generate_update(short_channel_id, false, timestamp + 1, 0, 0, 0, 4, 0);
351 let update_3 = generate_update(short_channel_id, false, timestamp + 2, 0, 0, 0, 3, 0);
352 let update_4 = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 3, 0);
354 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
355 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
356 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
357 network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap();
358 network_graph_arc.update_channel_unsigned(&update_4.contents).unwrap();
360 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
361 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
362 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
363 receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap();
364 receiver.send(GossipMessage::ChannelUpdate(update_4, None)).await.unwrap();
366 persister.persist_gossip().await;
369 let channel_count = network_graph_arc.read_only().channels().len();
370 assert_eq!(channel_count, 1);
372 let serialization = serialize_delta(network_graph_arc.clone(), timestamp + 1, logger.clone()).await;
374 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 0 update rows of the first update in a new direction", 1);
375 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 2 reference rows", 1);
376 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
378 assert_eq!(serialization.message_count, 1);
379 assert_eq!(serialization.announcement_count, 0);
380 assert_eq!(serialization.update_count, 1);
381 assert_eq!(serialization.update_count_full, 0);
382 assert_eq!(serialization.update_count_incremental, 1);
384 tokio::task::spawn_blocking(move || {
388 clean_test_db().await;
392 async fn test_full_snapshot_recency() {
393 let _sanitizer = SchemaSanitizer::new();
394 let logger = Arc::new(TestLogger::new());
395 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
396 let network_graph_arc = Arc::new(network_graph);
398 let short_channel_id = 1;
399 let timestamp = current_time();
400 println!("timestamp: {}", timestamp);
403 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
404 let announcement = generate_announcement(short_channel_id);
405 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
406 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
410 let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
411 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
412 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
415 let update = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
416 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
417 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
421 { // first and only update
422 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
423 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
424 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
429 persister.persist_gossip().await;
431 tokio::task::spawn_blocking(move || {
436 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
437 let client_graph_arc = Arc::new(client_graph);
439 { // sync after initial seed
440 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
441 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
443 let channel_count = network_graph_arc.read_only().channels().len();
445 assert_eq!(channel_count, 1);
446 assert_eq!(serialization.message_count, 3);
447 assert_eq!(serialization.announcement_count, 1);
448 assert_eq!(serialization.update_count, 2);
450 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
451 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
452 // the update result must be a multiple of our snapshot granularity
453 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
454 assert!(update_result < timestamp);
456 let readonly_graph = client_graph_arc.read_only();
457 let channels = readonly_graph.channels();
458 let client_channel_count = channels.len();
459 assert_eq!(client_channel_count, 1);
461 let first_channel = channels.get(&short_channel_id).unwrap();
462 assert!(&first_channel.announcement_message.is_none());
463 // ensure the update in one direction shows the latest fee
464 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
465 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
468 clean_test_db().await;
472 async fn test_full_snapshot_recency_with_wrong_seen_order() {
473 let _sanitizer = SchemaSanitizer::new();
474 let logger = Arc::new(TestLogger::new());
475 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
476 let network_graph_arc = Arc::new(network_graph);
478 let short_channel_id = 1;
479 let timestamp = current_time();
480 println!("timestamp: {}", timestamp);
483 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
484 let announcement = generate_announcement(short_channel_id);
485 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
486 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
489 { // first update, seen latest
490 let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
491 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
492 receiver.send(GossipMessage::ChannelUpdate(update, Some(timestamp))).await.unwrap();
494 { // second update, seen first
495 let update = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
496 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
497 receiver.send(GossipMessage::ChannelUpdate(update, Some(timestamp - 1))).await.unwrap();
501 { // first and only update
502 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
503 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
504 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
509 persister.persist_gossip().await;
511 tokio::task::spawn_blocking(move || {
516 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
517 let client_graph_arc = Arc::new(client_graph);
519 { // sync after initial seed
520 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
521 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
523 let channel_count = network_graph_arc.read_only().channels().len();
525 assert_eq!(channel_count, 1);
526 assert_eq!(serialization.message_count, 3);
527 assert_eq!(serialization.announcement_count, 1);
528 assert_eq!(serialization.update_count, 2);
530 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
531 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
532 // the update result must be a multiple of our snapshot granularity
533 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
534 assert!(update_result < timestamp);
536 let readonly_graph = client_graph_arc.read_only();
537 let channels = readonly_graph.channels();
538 let client_channel_count = channels.len();
539 assert_eq!(client_channel_count, 1);
541 let first_channel = channels.get(&short_channel_id).unwrap();
542 assert!(&first_channel.announcement_message.is_none());
543 // ensure the update in one direction shows the latest fee
544 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
545 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
548 clean_test_db().await;
552 async fn test_full_snapshot_recency_with_wrong_propagation_order() {
553 let _sanitizer = SchemaSanitizer::new();
554 let logger = Arc::new(TestLogger::new());
555 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
556 let network_graph_arc = Arc::new(network_graph);
558 let short_channel_id = 1;
559 let timestamp = current_time();
560 println!("timestamp: {}", timestamp);
563 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
564 let announcement = generate_announcement(short_channel_id);
565 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
566 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
569 // apply updates in their timestamp order
570 let update_1 = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
571 let update_2 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
572 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
573 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
575 // propagate updates in their seen order
576 receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - 1))).await.unwrap();
577 receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp))).await.unwrap();
580 { // first and only update
581 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
582 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
583 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
588 persister.persist_gossip().await;
590 tokio::task::spawn_blocking(move || {
595 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
596 let client_graph_arc = Arc::new(client_graph);
598 { // sync after initial seed
599 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
600 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
602 let channel_count = network_graph_arc.read_only().channels().len();
604 assert_eq!(channel_count, 1);
605 assert_eq!(serialization.message_count, 3);
606 assert_eq!(serialization.announcement_count, 1);
607 assert_eq!(serialization.update_count, 2);
609 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
610 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
611 // the update result must be a multiple of our snapshot granularity
612 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
613 assert!(update_result < timestamp);
615 let readonly_graph = client_graph_arc.read_only();
616 let channels = readonly_graph.channels();
617 let client_channel_count = channels.len();
618 assert_eq!(client_channel_count, 1);
620 let first_channel = channels.get(&short_channel_id).unwrap();
621 assert!(&first_channel.announcement_message.is_none());
622 // ensure the update in one direction shows the latest fee
623 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
624 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
627 clean_test_db().await;
631 async fn test_full_snapshot_mutiny_scenario() {
632 let _sanitizer = SchemaSanitizer::new();
633 let logger = Arc::new(TestLogger::new());
634 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
635 let network_graph_arc = Arc::new(network_graph);
637 let short_channel_id = 873706024403271681;
638 let timestamp = current_time();
639 // let oldest_simulation_timestamp = 1693300588;
640 let latest_simulation_timestamp = 1695909301;
641 let timestamp_offset = timestamp - latest_simulation_timestamp;
642 println!("timestamp: {}", timestamp);
645 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
646 let announcement = generate_announcement(short_channel_id);
647 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
648 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
652 let update = generate_update(short_channel_id, false, 1693507369 + timestamp_offset, 0, 0, 0, 0, 38);
653 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
654 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
657 let update = generate_update(short_channel_id, false, 1693680390 + timestamp_offset, 0, 0, 0, 0, 38);
658 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
659 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
662 let update = generate_update(short_channel_id, false, 1693749109 + timestamp_offset, 0, 0, 0, 0, 200);
663 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
664 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
667 let update = generate_update(short_channel_id, false, 1693925190 + timestamp_offset, 0, 0, 0, 0, 200);
668 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
669 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
672 let update = generate_update(short_channel_id, false, 1694008323 + timestamp_offset, 0, 0, 0, 0, 209);
673 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
674 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
677 let update = generate_update(short_channel_id, false, 1694219924 + timestamp_offset, 0, 0, 0, 0, 209);
678 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
679 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
682 let update = generate_update(short_channel_id, false, 1694267536 + timestamp_offset, 0, 0, 0, 0, 210);
683 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
684 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
687 let update = generate_update(short_channel_id, false, 1694458808 + timestamp_offset, 0, 0, 0, 0, 210);
688 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
689 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
692 let update = generate_update(short_channel_id, false, 1694526734 + timestamp_offset, 0, 0, 0, 0, 200);
693 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
694 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
697 let update = generate_update(short_channel_id, false, 1694794765 + timestamp_offset, 0, 0, 0, 0, 200);
698 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
699 receiver.send(GossipMessage::ChannelUpdate(update, Some(1695909301 + 2 * config::SYMLINK_GRANULARITY_INTERVAL + timestamp_offset))).await.unwrap();
702 let update = generate_update(short_channel_id, false, 1695909301 + timestamp_offset, 0, 0, 0, 0, 130);
703 // network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
704 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
709 let update = generate_update(short_channel_id, true, 1693300588 + timestamp_offset, 0, 0, 0, 0, 10);
710 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
711 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
714 let update = generate_update(short_channel_id, true, 1695003621 + timestamp_offset, 0, 0, 0, 0, 10);
715 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
716 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
721 persister.persist_gossip().await;
723 tokio::task::spawn_blocking(move || {
728 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
729 let client_graph_arc = Arc::new(client_graph);
731 { // sync after initial seed
732 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
733 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
735 let channel_count = network_graph_arc.read_only().channels().len();
737 assert_eq!(channel_count, 1);
738 assert_eq!(serialization.message_count, 3);
739 assert_eq!(serialization.announcement_count, 1);
740 assert_eq!(serialization.update_count, 2);
742 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
743 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
744 println!("update result: {}", update_result);
745 // the update result must be a multiple of our snapshot granularity
746 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
747 assert!(update_result < timestamp);
749 let timestamp_delta = timestamp - update_result;
750 println!("timestamp delta: {}", timestamp_delta);
751 assert!(timestamp_delta < config::snapshot_generation_interval());
753 let readonly_graph = client_graph_arc.read_only();
754 let channels = readonly_graph.channels();
755 let client_channel_count = channels.len();
756 assert_eq!(client_channel_count, 1);
758 let first_channel = channels.get(&short_channel_id).unwrap();
759 assert!(&first_channel.announcement_message.is_none());
760 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 130);
761 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
764 clean_test_db().await;
768 async fn test_full_snapshot_interlaced_channel_timestamps() {
769 let _sanitizer = SchemaSanitizer::new();
770 let logger = Arc::new(TestLogger::new());
771 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
772 let network_graph_arc = Arc::new(network_graph);
774 let main_channel_id = 1;
775 let timestamp = current_time();
776 println!("timestamp: {}", timestamp);
779 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
780 let secondary_channel_id = main_channel_id + 1;
783 let announcement = generate_announcement(main_channel_id);
784 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
785 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
788 { // secondary channel
789 let announcement = generate_announcement(secondary_channel_id);
790 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
791 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
796 let update = generate_update(main_channel_id, false, timestamp - 2, 0, 0, 0, 0, 10);
797 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
798 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
801 let update = generate_update(main_channel_id, true, timestamp - 2, 0, 0, 0, 0, 5);
802 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
803 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
807 { // in-between channel
809 let update = generate_update(secondary_channel_id, false, timestamp - 1, 0, 0, 0, 0, 42);
810 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
811 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
814 let update = generate_update(secondary_channel_id, true, timestamp - 1, 0, 0, 0, 0, 42);
815 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
816 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
822 let update = generate_update(main_channel_id, false, timestamp, 0, 0, 0, 0, 11);
823 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
824 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
827 let update = generate_update(main_channel_id, true, timestamp, 0, 0, 0, 0, 6);
828 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
829 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
834 persister.persist_gossip().await;
836 tokio::task::spawn_blocking(move || {
841 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
842 let client_graph_arc = Arc::new(client_graph);
844 { // sync after initial seed
845 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
846 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 2", 1);
848 let channel_count = network_graph_arc.read_only().channels().len();
850 assert_eq!(channel_count, 2);
851 assert_eq!(serialization.message_count, 6);
852 assert_eq!(serialization.announcement_count, 2);
853 assert_eq!(serialization.update_count, 4);
855 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
856 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
857 // the update result must be a multiple of our snapshot granularity
858 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
859 assert!(update_result < timestamp);
861 let readonly_graph = client_graph_arc.read_only();
862 let channels = readonly_graph.channels();
863 let client_channel_count = channels.len();
864 assert_eq!(client_channel_count, 2);
866 let first_channel = channels.get(&main_channel_id).unwrap();
867 assert!(&first_channel.announcement_message.is_none());
868 // ensure the update in one direction shows the latest fee
869 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 11);
870 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 6);
873 clean_test_db().await;
877 async fn test_full_snapshot_persistence() {
878 let schema_sanitizer = SchemaSanitizer::new();
879 let logger = Arc::new(TestLogger::new());
880 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
881 let network_graph_arc = Arc::new(network_graph);
882 let snapshotter = Snapshotter::new(network_graph_arc.clone(), logger.clone());
883 let cache_sanitizer = CacheSanitizer::new(&schema_sanitizer);
885 let short_channel_id = 1;
886 let timestamp = current_time();
887 println!("timestamp: {}", timestamp);
890 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
891 let announcement = generate_announcement(short_channel_id);
892 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
893 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
896 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
897 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
898 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
902 let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
903 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
904 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
909 persister.persist_gossip().await;
911 tokio::task::spawn_blocking(move || {
916 let cache_path = cache_sanitizer.cache_path();
917 let symlink_path = format!("{}/symlinks/0.bin", cache_path);
919 // generate snapshots
921 snapshotter.generate_snapshots(20, 5, &[5, u64::MAX], &cache_path, Some(10)).await;
923 let symlinked_data = fs::read(&symlink_path).unwrap();
924 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
925 let client_graph_arc = Arc::new(client_graph);
927 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
928 let update_result = rgs.update_network_graph(&symlinked_data).unwrap();
929 // the update result must be a multiple of our snapshot granularity
930 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
932 let readonly_graph = client_graph_arc.read_only();
933 let channels = readonly_graph.channels();
934 let client_channel_count = channels.len();
935 assert_eq!(client_channel_count, 1);
937 let first_channel = channels.get(&short_channel_id).unwrap();
938 assert!(&first_channel.announcement_message.is_none());
939 // ensure the update in one direction shows the latest fee
940 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 38);
941 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
945 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
948 let update = generate_update(short_channel_id, false, timestamp + 30, 0, 0, 0, 0, 39);
949 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
950 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
954 persister.persist_gossip().await;
956 tokio::task::spawn_blocking(move || {
961 // regenerate snapshots
963 snapshotter.generate_snapshots(20, 5, &[5, u64::MAX], &cache_path, Some(10)).await;
965 let symlinked_data = fs::read(&symlink_path).unwrap();
966 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
967 let client_graph_arc = Arc::new(client_graph);
969 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
970 let update_result = rgs.update_network_graph(&symlinked_data).unwrap();
971 // the update result must be a multiple of our snapshot granularity
972 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
974 let readonly_graph = client_graph_arc.read_only();
975 let channels = readonly_graph.channels();
976 let client_channel_count = channels.len();
977 assert_eq!(client_channel_count, 1);
979 let first_channel = channels.get(&short_channel_id).unwrap();
980 assert!(&first_channel.announcement_message.is_none());
981 // ensure the update in one direction shows the latest fee
982 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
983 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
986 // clean up afterwards
987 clean_test_db().await;