1 //! Multi-module tests that use database fixtures
3 use std::cell::RefCell;
6 use std::time::{SystemTime, UNIX_EPOCH};
7 use bitcoin::blockdata::constants::ChainHash;
9 use bitcoin::secp256k1::ecdsa::Signature;
10 use bitcoin::secp256k1::{Secp256k1, SecretKey};
11 use bitcoin::hashes::Hash;
12 use bitcoin::hashes::sha256d::Hash as Sha256dHash;
13 use hex_conservative::DisplayHex;
14 use lightning::ln::features::ChannelFeatures;
15 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate};
16 use lightning::routing::gossip::{NetworkGraph, NodeId};
17 use lightning::util::ser::Writeable;
18 use lightning_rapid_gossip_sync::RapidGossipSync;
19 use crate::{config, serialize_delta};
20 use crate::persistence::GossipPersister;
21 use crate::snapshot::Snapshotter;
22 use crate::types::{GossipMessage, tests::TestLogger};
24 const CLIENT_BACKDATE_INTERVAL: u32 = 3600 * 24 * 7; // client backdates RGS by a week
27 static DB_TEST_SCHEMA: RefCell<Option<String>> = RefCell::new(None);
28 static IS_TEST_SCHEMA_CLEAN: RefCell<Option<bool>> = RefCell::new(None);
31 fn blank_signature() -> Signature {
32 Signature::from_compact(&[0u8; 64]).unwrap()
35 fn genesis_hash() -> ChainHash {
36 ChainHash::using_genesis_block(Network::Bitcoin)
39 fn current_time() -> u32 {
40 SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs() as u32
43 pub(crate) fn db_test_schema() -> String {
44 DB_TEST_SCHEMA.with(|suffix_reference| {
45 let suffix_option = suffix_reference.borrow();
46 suffix_option.as_ref().unwrap().clone()
50 fn generate_announcement(short_channel_id: u64) -> ChannelAnnouncement {
51 let secp_context = Secp256k1::new();
53 let random_private_key_1 = SecretKey::from_slice(&[1; 32]).unwrap();
54 let random_public_key_1 = random_private_key_1.public_key(&secp_context);
55 let node_id_1 = NodeId::from_pubkey(&random_public_key_1);
57 let random_private_key_2 = SecretKey::from_slice(&[2; 32]).unwrap();
58 let random_public_key_2 = random_private_key_2.public_key(&secp_context);
59 let node_id_2 = NodeId::from_pubkey(&random_public_key_2);
61 let announcement = UnsignedChannelAnnouncement {
62 features: ChannelFeatures::empty(),
63 chain_hash: genesis_hash(),
67 bitcoin_key_1: node_id_1,
68 bitcoin_key_2: node_id_2,
72 let msg_hash = bitcoin::secp256k1::Message::from_slice(&Sha256dHash::hash(&announcement.encode()[..])[..]).unwrap();
73 let node_signature_1 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_1);
74 let node_signature_2 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_2);
79 bitcoin_signature_1: node_signature_1,
80 bitcoin_signature_2: node_signature_2,
81 contents: announcement,
85 fn generate_update(scid: u64, direction: bool, timestamp: u32, expiry_delta: u16, min_msat: u64, max_msat: u64, base_msat: u32, fee_rate: u32) -> ChannelUpdate {
86 let flag_mask = if direction { 1 } else { 0 };
88 signature: blank_signature(),
89 contents: UnsignedChannelUpdate {
90 chain_hash: genesis_hash(),
91 short_channel_id: scid,
94 cltv_expiry_delta: expiry_delta,
95 htlc_minimum_msat: min_msat,
96 htlc_maximum_msat: max_msat,
97 fee_base_msat: base_msat,
98 fee_proportional_millionths: fee_rate,
104 struct SchemaSanitizer {}
106 impl SchemaSanitizer {
108 IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
109 let mut is_clean_option = cleanliness_reference.borrow_mut();
110 assert!(is_clean_option.is_none());
111 *is_clean_option = Some(false);
114 DB_TEST_SCHEMA.with(|suffix_reference| {
115 let mut suffix_option = suffix_reference.borrow_mut();
116 let current_time = SystemTime::now();
117 let unix_time = current_time.duration_since(UNIX_EPOCH).expect("Time went backwards");
118 let timestamp_seconds = unix_time.as_secs();
119 let timestamp_nanos = unix_time.as_nanos();
120 // sometimes Rust thinks two tests start at the same nanosecond, causing a schema conflict
121 let thread_id = thread::current().id();
122 let preimage = format!("{:?}-{}", thread_id, timestamp_nanos);
123 println!("test schema preimage: {}", preimage);
124 let suffix = Sha256dHash::hash(preimage.as_bytes()).encode();
125 // the schema must start with a letter
126 let schema = format!("test_{}_{}", timestamp_seconds, suffix.as_hex());
127 *suffix_option = Some(schema);
134 impl Drop for SchemaSanitizer {
136 IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
137 let is_clean_option = cleanliness_reference.borrow();
138 if let Some(is_clean) = *is_clean_option {
139 if std::thread::panicking() {
142 assert_eq!(is_clean, true);
148 struct CacheSanitizer {}
150 impl CacheSanitizer {
151 /// The CacheSanitizer instantiation requires that there be a schema sanitizer
152 fn new(_: &SchemaSanitizer) -> Self {
156 fn cache_path(&self) -> String {
157 format!("./res/{}/", db_test_schema())
161 impl Drop for CacheSanitizer {
163 let cache_path = self.cache_path();
164 fs::remove_dir_all(cache_path).unwrap();
169 async fn clean_test_db() {
170 let client = crate::connect_to_db().await;
171 let schema = db_test_schema();
172 client.execute(&format!("DROP SCHEMA IF EXISTS {} CASCADE", schema), &[]).await.unwrap();
173 IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
174 let mut is_clean_option = cleanliness_reference.borrow_mut();
175 *is_clean_option = Some(true);
180 async fn test_persistence_runtime() {
181 let _sanitizer = SchemaSanitizer::new();
182 let logger = Arc::new(TestLogger::new());
183 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
184 let network_graph_arc = Arc::new(network_graph);
185 let (_persister, _receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
187 tokio::task::spawn_blocking(move || {
191 clean_test_db().await;
196 async fn test_trivial_setup() {
197 let _sanitizer = SchemaSanitizer::new();
198 let logger = Arc::new(TestLogger::new());
199 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
200 let network_graph_arc = Arc::new(network_graph);
201 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
203 let short_channel_id = 1;
204 let timestamp = current_time() - 10;
205 println!("timestamp: {}", timestamp);
208 let announcement = generate_announcement(short_channel_id);
209 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 5, 0);
210 let update_2 = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 10, 0);
212 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
213 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
214 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
216 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
217 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
218 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
220 persister.persist_gossip().await;
223 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
224 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
225 clean_test_db().await;
227 let channel_count = network_graph_arc.read_only().channels().len();
229 assert_eq!(channel_count, 1);
230 assert_eq!(serialization.message_count, 3);
231 assert_eq!(serialization.announcement_count, 1);
232 assert_eq!(serialization.update_count, 2);
234 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
235 let client_graph_arc = Arc::new(client_graph);
236 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
237 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
238 println!("update result: {}", update_result);
239 // the update result must be a multiple of our snapshot granularity
240 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
241 assert!(update_result < timestamp);
243 let timestamp_delta = timestamp - update_result;
244 println!("timestamp delta: {}", timestamp_delta);
245 assert!(timestamp_delta < config::snapshot_generation_interval());
247 let readonly_graph = client_graph_arc.read_only();
248 let channels = readonly_graph.channels();
249 let client_channel_count = channels.len();
250 assert_eq!(client_channel_count, 1);
252 let first_channel = channels.get(&short_channel_id).unwrap();
253 assert!(&first_channel.announcement_message.is_none());
254 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.base_msat, 5);
255 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.base_msat, 10);
256 let last_update_seen_a = first_channel.one_to_two.as_ref().unwrap().last_update;
257 let last_update_seen_b = first_channel.two_to_one.as_ref().unwrap().last_update;
258 println!("last update a: {}", last_update_seen_a);
259 println!("last update b: {}", last_update_seen_b);
260 assert_eq!(last_update_seen_a, update_result - CLIENT_BACKDATE_INTERVAL);
261 assert_eq!(last_update_seen_b, update_result - CLIENT_BACKDATE_INTERVAL);
263 tokio::task::spawn_blocking(move || {
268 /// If a channel has only seen updates in one direction, it should not be announced
270 async fn test_unidirectional_intermediate_update_consideration() {
271 let _sanitizer = SchemaSanitizer::new();
273 let logger = Arc::new(TestLogger::new());
274 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
275 let network_graph_arc = Arc::new(network_graph);
276 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
278 let short_channel_id = 1;
279 let timestamp = current_time() - 10;
280 println!("timestamp: {}", timestamp);
283 let announcement = generate_announcement(short_channel_id);
284 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 6, 0);
285 let update_2 = generate_update(short_channel_id, true, timestamp + 1, 0, 0, 0, 3, 0);
286 let update_3 = generate_update(short_channel_id, true, timestamp + 2, 0, 0, 0, 4, 0);
288 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
289 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
290 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
291 network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap();
293 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
294 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
295 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
296 receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap();
298 persister.persist_gossip().await;
301 let channel_count = network_graph_arc.read_only().channels().len();
302 assert_eq!(channel_count, 1);
304 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
305 let client_graph_arc = Arc::new(client_graph);
306 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
308 let serialization = serialize_delta(network_graph_arc.clone(), timestamp + 1, logger.clone()).await;
310 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 1 update rows of the first update in a new direction", 1);
311 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 1 reference rows", 1);
312 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
314 assert_eq!(serialization.message_count, 3);
315 assert_eq!(serialization.announcement_count, 1);
316 assert_eq!(serialization.update_count, 2);
317 assert_eq!(serialization.update_count_full, 2);
318 assert_eq!(serialization.update_count_incremental, 0);
320 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
321 println!("update result: {}", update_result);
322 // the update result must be a multiple of our snapshot granularity
324 let readonly_graph = client_graph_arc.read_only();
325 let channels = readonly_graph.channels();
326 let client_channel_count = channels.len();
327 assert_eq!(client_channel_count, 1);
329 tokio::task::spawn_blocking(move || {
333 clean_test_db().await;
336 /// If a channel has only seen updates in one direction, it should not be announced
338 async fn test_bidirectional_intermediate_update_consideration() {
339 let _sanitizer = SchemaSanitizer::new();
341 let logger = Arc::new(TestLogger::new());
342 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
343 let network_graph_arc = Arc::new(network_graph);
344 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
346 let short_channel_id = 1;
347 let timestamp = current_time() - 10;
348 println!("timestamp: {}", timestamp);
351 let announcement = generate_announcement(short_channel_id);
352 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 5, 0);
353 let update_2 = generate_update(short_channel_id, false, timestamp + 1, 0, 0, 0, 4, 0);
354 let update_3 = generate_update(short_channel_id, false, timestamp + 2, 0, 0, 0, 3, 0);
355 let update_4 = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 3, 0);
357 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
358 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
359 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
360 network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap();
361 network_graph_arc.update_channel_unsigned(&update_4.contents).unwrap();
363 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
364 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
365 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
366 receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap();
367 receiver.send(GossipMessage::ChannelUpdate(update_4, None)).await.unwrap();
369 persister.persist_gossip().await;
372 let channel_count = network_graph_arc.read_only().channels().len();
373 assert_eq!(channel_count, 1);
375 let serialization = serialize_delta(network_graph_arc.clone(), timestamp + 1, logger.clone()).await;
377 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 0 update rows of the first update in a new direction", 1);
378 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 2 reference rows", 1);
379 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
381 assert_eq!(serialization.message_count, 1);
382 assert_eq!(serialization.announcement_count, 0);
383 assert_eq!(serialization.update_count, 1);
384 assert_eq!(serialization.update_count_full, 0);
385 assert_eq!(serialization.update_count_incremental, 1);
387 tokio::task::spawn_blocking(move || {
391 clean_test_db().await;
395 async fn test_full_snapshot_recency() {
396 let _sanitizer = SchemaSanitizer::new();
397 let logger = Arc::new(TestLogger::new());
398 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
399 let network_graph_arc = Arc::new(network_graph);
401 let short_channel_id = 1;
402 let timestamp = current_time();
403 println!("timestamp: {}", timestamp);
406 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
407 let announcement = generate_announcement(short_channel_id);
408 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
409 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
413 let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
414 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
415 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
418 let update = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
419 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
420 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
424 { // first and only update
425 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
426 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
427 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
432 persister.persist_gossip().await;
434 tokio::task::spawn_blocking(move || {
439 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
440 let client_graph_arc = Arc::new(client_graph);
442 { // sync after initial seed
443 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
444 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
446 let channel_count = network_graph_arc.read_only().channels().len();
448 assert_eq!(channel_count, 1);
449 assert_eq!(serialization.message_count, 3);
450 assert_eq!(serialization.announcement_count, 1);
451 assert_eq!(serialization.update_count, 2);
453 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
454 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
455 // the update result must be a multiple of our snapshot granularity
456 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
457 assert!(update_result < timestamp);
459 let readonly_graph = client_graph_arc.read_only();
460 let channels = readonly_graph.channels();
461 let client_channel_count = channels.len();
462 assert_eq!(client_channel_count, 1);
464 let first_channel = channels.get(&short_channel_id).unwrap();
465 assert!(&first_channel.announcement_message.is_none());
466 // ensure the update in one direction shows the latest fee
467 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
468 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
471 clean_test_db().await;
475 async fn test_full_snapshot_recency_with_wrong_seen_order() {
476 let _sanitizer = SchemaSanitizer::new();
477 let logger = Arc::new(TestLogger::new());
478 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
479 let network_graph_arc = Arc::new(network_graph);
481 let short_channel_id = 1;
482 let timestamp = current_time();
483 println!("timestamp: {}", timestamp);
486 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
487 let announcement = generate_announcement(short_channel_id);
488 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
489 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
492 { // first update, seen latest
493 let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
494 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
495 receiver.send(GossipMessage::ChannelUpdate(update, Some(timestamp))).await.unwrap();
497 { // second update, seen first
498 let update = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
499 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
500 receiver.send(GossipMessage::ChannelUpdate(update, Some(timestamp - 1))).await.unwrap();
504 { // first and only update
505 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
506 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
507 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
512 persister.persist_gossip().await;
514 tokio::task::spawn_blocking(move || {
519 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
520 let client_graph_arc = Arc::new(client_graph);
522 { // sync after initial seed
523 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
524 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
526 let channel_count = network_graph_arc.read_only().channels().len();
528 assert_eq!(channel_count, 1);
529 assert_eq!(serialization.message_count, 3);
530 assert_eq!(serialization.announcement_count, 1);
531 assert_eq!(serialization.update_count, 2);
533 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
534 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
535 // the update result must be a multiple of our snapshot granularity
536 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
537 assert!(update_result < timestamp);
539 let readonly_graph = client_graph_arc.read_only();
540 let channels = readonly_graph.channels();
541 let client_channel_count = channels.len();
542 assert_eq!(client_channel_count, 1);
544 let first_channel = channels.get(&short_channel_id).unwrap();
545 assert!(&first_channel.announcement_message.is_none());
546 // ensure the update in one direction shows the latest fee
547 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
548 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
551 clean_test_db().await;
555 async fn test_full_snapshot_recency_with_wrong_propagation_order() {
556 let _sanitizer = SchemaSanitizer::new();
557 let logger = Arc::new(TestLogger::new());
558 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
559 let network_graph_arc = Arc::new(network_graph);
561 let short_channel_id = 1;
562 let timestamp = current_time();
563 println!("timestamp: {}", timestamp);
566 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
567 let announcement = generate_announcement(short_channel_id);
568 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
569 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
572 // apply updates in their timestamp order
573 let update_1 = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
574 let update_2 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
575 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
576 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
578 // propagate updates in their seen order
579 receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - 1))).await.unwrap();
580 receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp))).await.unwrap();
583 { // first and only update
584 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
585 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
586 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
591 persister.persist_gossip().await;
593 tokio::task::spawn_blocking(move || {
598 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
599 let client_graph_arc = Arc::new(client_graph);
601 { // sync after initial seed
602 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
603 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
605 let channel_count = network_graph_arc.read_only().channels().len();
607 assert_eq!(channel_count, 1);
608 assert_eq!(serialization.message_count, 3);
609 assert_eq!(serialization.announcement_count, 1);
610 assert_eq!(serialization.update_count, 2);
612 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
613 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
614 // the update result must be a multiple of our snapshot granularity
615 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
616 assert!(update_result < timestamp);
618 let readonly_graph = client_graph_arc.read_only();
619 let channels = readonly_graph.channels();
620 let client_channel_count = channels.len();
621 assert_eq!(client_channel_count, 1);
623 let first_channel = channels.get(&short_channel_id).unwrap();
624 assert!(&first_channel.announcement_message.is_none());
625 // ensure the update in one direction shows the latest fee
626 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
627 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
630 clean_test_db().await;
634 async fn test_full_snapshot_mutiny_scenario() {
635 let _sanitizer = SchemaSanitizer::new();
636 let logger = Arc::new(TestLogger::new());
637 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
638 let network_graph_arc = Arc::new(network_graph);
640 let short_channel_id = 873706024403271681;
641 let timestamp = current_time();
642 // let oldest_simulation_timestamp = 1693300588;
643 let latest_simulation_timestamp = 1695909301;
644 let timestamp_offset = timestamp - latest_simulation_timestamp;
645 println!("timestamp: {}", timestamp);
648 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
649 let announcement = generate_announcement(short_channel_id);
650 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
651 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
655 let update = generate_update(short_channel_id, false, 1693507369 + timestamp_offset, 0, 0, 0, 0, 38);
656 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
657 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
660 let update = generate_update(short_channel_id, false, 1693680390 + timestamp_offset, 0, 0, 0, 0, 38);
661 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
662 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
665 let update = generate_update(short_channel_id, false, 1693749109 + timestamp_offset, 0, 0, 0, 0, 200);
666 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
667 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
670 let update = generate_update(short_channel_id, false, 1693925190 + timestamp_offset, 0, 0, 0, 0, 200);
671 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
672 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
675 let update = generate_update(short_channel_id, false, 1694008323 + timestamp_offset, 0, 0, 0, 0, 209);
676 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
677 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
680 let update = generate_update(short_channel_id, false, 1694219924 + timestamp_offset, 0, 0, 0, 0, 209);
681 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
682 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
685 let update = generate_update(short_channel_id, false, 1694267536 + timestamp_offset, 0, 0, 0, 0, 210);
686 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
687 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
690 let update = generate_update(short_channel_id, false, 1694458808 + timestamp_offset, 0, 0, 0, 0, 210);
691 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
692 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
695 let update = generate_update(short_channel_id, false, 1694526734 + timestamp_offset, 0, 0, 0, 0, 200);
696 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
697 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
700 let update = generate_update(short_channel_id, false, 1694794765 + timestamp_offset, 0, 0, 0, 0, 200);
701 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
702 receiver.send(GossipMessage::ChannelUpdate(update, Some(1695909301 + 2 * config::SYMLINK_GRANULARITY_INTERVAL + timestamp_offset))).await.unwrap();
705 let update = generate_update(short_channel_id, false, 1695909301 + timestamp_offset, 0, 0, 0, 0, 130);
706 // network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
707 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
712 let update = generate_update(short_channel_id, true, 1693300588 + timestamp_offset, 0, 0, 0, 0, 10);
713 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
714 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
717 let update = generate_update(short_channel_id, true, 1695003621 + timestamp_offset, 0, 0, 0, 0, 10);
718 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
719 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
724 persister.persist_gossip().await;
726 tokio::task::spawn_blocking(move || {
731 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
732 let client_graph_arc = Arc::new(client_graph);
734 { // sync after initial seed
735 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
736 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
738 let channel_count = network_graph_arc.read_only().channels().len();
740 assert_eq!(channel_count, 1);
741 assert_eq!(serialization.message_count, 3);
742 assert_eq!(serialization.announcement_count, 1);
743 assert_eq!(serialization.update_count, 2);
745 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
746 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
747 println!("update result: {}", update_result);
748 // the update result must be a multiple of our snapshot granularity
749 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
750 assert!(update_result < timestamp);
752 let timestamp_delta = timestamp - update_result;
753 println!("timestamp delta: {}", timestamp_delta);
754 assert!(timestamp_delta < config::snapshot_generation_interval());
756 let readonly_graph = client_graph_arc.read_only();
757 let channels = readonly_graph.channels();
758 let client_channel_count = channels.len();
759 assert_eq!(client_channel_count, 1);
761 let first_channel = channels.get(&short_channel_id).unwrap();
762 assert!(&first_channel.announcement_message.is_none());
763 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 130);
764 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
767 clean_test_db().await;
771 async fn test_full_snapshot_interlaced_channel_timestamps() {
772 let _sanitizer = SchemaSanitizer::new();
773 let logger = Arc::new(TestLogger::new());
774 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
775 let network_graph_arc = Arc::new(network_graph);
777 let main_channel_id = 1;
778 let timestamp = current_time();
779 println!("timestamp: {}", timestamp);
782 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
783 let secondary_channel_id = main_channel_id + 1;
786 let announcement = generate_announcement(main_channel_id);
787 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
788 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
791 { // secondary channel
792 let announcement = generate_announcement(secondary_channel_id);
793 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
794 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
799 let update = generate_update(main_channel_id, false, timestamp - 2, 0, 0, 0, 0, 10);
800 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
801 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
804 let update = generate_update(main_channel_id, true, timestamp - 2, 0, 0, 0, 0, 5);
805 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
806 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
810 { // in-between channel
812 let update = generate_update(secondary_channel_id, false, timestamp - 1, 0, 0, 0, 0, 42);
813 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
814 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
817 let update = generate_update(secondary_channel_id, true, timestamp - 1, 0, 0, 0, 0, 42);
818 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
819 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
825 let update = generate_update(main_channel_id, false, timestamp, 0, 0, 0, 0, 11);
826 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
827 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
830 let update = generate_update(main_channel_id, true, timestamp, 0, 0, 0, 0, 6);
831 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
832 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
837 persister.persist_gossip().await;
839 tokio::task::spawn_blocking(move || {
844 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
845 let client_graph_arc = Arc::new(client_graph);
847 { // sync after initial seed
848 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
849 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 2", 1);
851 let channel_count = network_graph_arc.read_only().channels().len();
853 assert_eq!(channel_count, 2);
854 assert_eq!(serialization.message_count, 6);
855 assert_eq!(serialization.announcement_count, 2);
856 assert_eq!(serialization.update_count, 4);
858 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
859 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
860 // the update result must be a multiple of our snapshot granularity
861 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
862 assert!(update_result < timestamp);
864 let readonly_graph = client_graph_arc.read_only();
865 let channels = readonly_graph.channels();
866 let client_channel_count = channels.len();
867 assert_eq!(client_channel_count, 2);
869 let first_channel = channels.get(&main_channel_id).unwrap();
870 assert!(&first_channel.announcement_message.is_none());
871 // ensure the update in one direction shows the latest fee
872 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 11);
873 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 6);
876 clean_test_db().await;
880 async fn test_full_snapshot_persistence() {
881 let schema_sanitizer = SchemaSanitizer::new();
882 let logger = Arc::new(TestLogger::new());
883 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
884 let network_graph_arc = Arc::new(network_graph);
885 let snapshotter = Snapshotter::new(network_graph_arc.clone(), logger.clone());
886 let cache_sanitizer = CacheSanitizer::new(&schema_sanitizer);
888 let short_channel_id = 1;
889 let timestamp = current_time();
890 println!("timestamp: {}", timestamp);
893 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
894 let announcement = generate_announcement(short_channel_id);
895 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
896 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
899 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
900 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
901 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
905 let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
906 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
907 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
912 persister.persist_gossip().await;
914 tokio::task::spawn_blocking(move || {
919 let cache_path = cache_sanitizer.cache_path();
920 let symlink_path = format!("{}/symlinks/0.bin", cache_path);
922 // generate snapshots
924 snapshotter.generate_snapshots(20, 5, &[5, u64::MAX], &cache_path, Some(10)).await;
926 let symlinked_data = fs::read(&symlink_path).unwrap();
927 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
928 let client_graph_arc = Arc::new(client_graph);
930 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
931 let update_result = rgs.update_network_graph(&symlinked_data).unwrap();
932 // the update result must be a multiple of our snapshot granularity
933 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
935 let readonly_graph = client_graph_arc.read_only();
936 let channels = readonly_graph.channels();
937 let client_channel_count = channels.len();
938 assert_eq!(client_channel_count, 1);
940 let first_channel = channels.get(&short_channel_id).unwrap();
941 assert!(&first_channel.announcement_message.is_none());
942 // ensure the update in one direction shows the latest fee
943 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 38);
944 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
948 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
951 let update = generate_update(short_channel_id, false, timestamp + 30, 0, 0, 0, 0, 39);
952 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
953 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
957 persister.persist_gossip().await;
959 tokio::task::spawn_blocking(move || {
964 // regenerate snapshots
966 snapshotter.generate_snapshots(20, 5, &[5, u64::MAX], &cache_path, Some(10)).await;
968 let symlinked_data = fs::read(&symlink_path).unwrap();
969 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
970 let client_graph_arc = Arc::new(client_graph);
972 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
973 let update_result = rgs.update_network_graph(&symlinked_data).unwrap();
974 // the update result must be a multiple of our snapshot granularity
975 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
977 let readonly_graph = client_graph_arc.read_only();
978 let channels = readonly_graph.channels();
979 let client_channel_count = channels.len();
980 assert_eq!(client_channel_count, 1);
982 let first_channel = channels.get(&short_channel_id).unwrap();
983 assert!(&first_channel.announcement_message.is_none());
984 // ensure the update in one direction shows the latest fee
985 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
986 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
989 // clean up afterwards
990 clean_test_db().await;