1 //! Multi-module tests that use database fixtures
3 use std::cell::RefCell;
6 use std::time::{SystemTime, UNIX_EPOCH};
7 use bitcoin::blockdata::constants::ChainHash;
9 use bitcoin::secp256k1::ecdsa::Signature;
10 use bitcoin::secp256k1::{Secp256k1, SecretKey};
11 use bitcoin::hashes::Hash;
12 use bitcoin::hashes::sha256d::Hash as Sha256dHash;
13 use hex_conservative::DisplayHex;
14 use lightning::ln::features::ChannelFeatures;
15 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate};
16 use lightning::routing::gossip::{NetworkGraph, NodeId};
17 use lightning::util::ser::Writeable;
18 use lightning_rapid_gossip_sync::RapidGossipSync;
19 use crate::{config, serialize_delta};
20 use crate::persistence::GossipPersister;
21 use crate::snapshot::Snapshotter;
22 use crate::types::{GossipMessage, tests::TestLogger};
24 const CLIENT_BACKDATE_INTERVAL: u32 = 3600 * 24 * 7; // client backdates RGS by a week
27 static DB_TEST_SCHEMA: RefCell<Option<String>> = RefCell::new(None);
28 static IS_TEST_SCHEMA_CLEAN: RefCell<Option<bool>> = RefCell::new(None);
31 fn blank_signature() -> Signature {
32 Signature::from_compact(&[0u8; 64]).unwrap()
35 fn genesis_hash() -> ChainHash {
36 ChainHash::using_genesis_block(Network::Bitcoin)
39 fn current_time() -> u32 {
40 SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs() as u32
43 pub(crate) fn db_test_schema() -> String {
44 DB_TEST_SCHEMA.with(|suffix_reference| {
45 let suffix_option = suffix_reference.borrow();
46 suffix_option.as_ref().unwrap().clone()
50 fn generate_announcement(short_channel_id: u64) -> ChannelAnnouncement {
51 let secp_context = Secp256k1::new();
53 let random_private_key_1 = SecretKey::from_slice(&[1; 32]).unwrap();
54 let random_public_key_1 = random_private_key_1.public_key(&secp_context);
55 let node_id_1 = NodeId::from_pubkey(&random_public_key_1);
57 let random_private_key_2 = SecretKey::from_slice(&[2; 32]).unwrap();
58 let random_public_key_2 = random_private_key_2.public_key(&secp_context);
59 let node_id_2 = NodeId::from_pubkey(&random_public_key_2);
61 let announcement = UnsignedChannelAnnouncement {
62 features: ChannelFeatures::empty(),
63 chain_hash: genesis_hash(),
67 bitcoin_key_1: node_id_1,
68 bitcoin_key_2: node_id_2,
72 let msg_hash = bitcoin::secp256k1::Message::from_slice(&Sha256dHash::hash(&announcement.encode()[..])[..]).unwrap();
73 let node_signature_1 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_1);
74 let node_signature_2 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_2);
79 bitcoin_signature_1: node_signature_1,
80 bitcoin_signature_2: node_signature_2,
81 contents: announcement,
85 fn generate_update(scid: u64, direction: bool, timestamp: u32, expiry_delta: u16, min_msat: u64, max_msat: u64, base_msat: u32, fee_rate: u32) -> ChannelUpdate {
86 let flag_mask = if direction { 1 } else { 0 };
88 signature: blank_signature(),
89 contents: UnsignedChannelUpdate {
90 chain_hash: genesis_hash(),
91 short_channel_id: scid,
94 cltv_expiry_delta: expiry_delta,
95 htlc_minimum_msat: min_msat,
96 htlc_maximum_msat: max_msat,
97 fee_base_msat: base_msat,
98 fee_proportional_millionths: fee_rate,
104 struct SchemaSanitizer {}
106 impl SchemaSanitizer {
108 IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
109 let mut is_clean_option = cleanliness_reference.borrow_mut();
110 assert!(is_clean_option.is_none());
111 *is_clean_option = Some(false);
114 DB_TEST_SCHEMA.with(|suffix_reference| {
115 let mut suffix_option = suffix_reference.borrow_mut();
116 let current_time = SystemTime::now();
117 let unix_time = current_time.duration_since(UNIX_EPOCH).expect("Time went backwards");
118 let timestamp_seconds = unix_time.as_secs();
119 let timestamp_nanos = unix_time.as_nanos();
120 // sometimes Rust thinks two tests start at the same nanosecond, causing a schema conflict
121 let thread_id = thread::current().id();
122 let preimage = format!("{:?}-{}", thread_id, timestamp_nanos);
123 println!("test schema preimage: {}", preimage);
124 let suffix = Sha256dHash::hash(preimage.as_bytes()).encode();
125 // the schema must start with a letter
126 let schema = format!("test_{}_{}", timestamp_seconds, suffix.as_hex());
127 *suffix_option = Some(schema);
134 impl Drop for SchemaSanitizer {
136 IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
137 let is_clean_option = cleanliness_reference.borrow();
138 if let Some(is_clean) = *is_clean_option {
139 if std::thread::panicking() {
142 assert_eq!(is_clean, true);
148 struct CacheSanitizer {}
150 impl CacheSanitizer {
151 /// The CacheSanitizer instantiation requires that there be a schema sanitizer
152 fn new(_: &SchemaSanitizer) -> Self {
156 fn cache_path(&self) -> String {
157 format!("./res/{}/", db_test_schema())
161 impl Drop for CacheSanitizer {
163 let cache_path = self.cache_path();
164 fs::remove_dir_all(cache_path).unwrap();
169 async fn clean_test_db() {
170 let client = crate::connect_to_db().await;
171 let schema = db_test_schema();
172 client.execute(&format!("DROP SCHEMA IF EXISTS {} CASCADE", schema), &[]).await.unwrap();
173 IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
174 let mut is_clean_option = cleanliness_reference.borrow_mut();
175 *is_clean_option = Some(true);
180 async fn test_persistence_runtime() {
181 let _sanitizer = SchemaSanitizer::new();
182 let logger = Arc::new(TestLogger::new());
183 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
184 let network_graph_arc = Arc::new(network_graph);
185 let (_persister, _receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
187 tokio::task::spawn_blocking(move || {
191 clean_test_db().await;
196 async fn test_trivial_setup() {
197 let _sanitizer = SchemaSanitizer::new();
198 let logger = Arc::new(TestLogger::new());
199 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
200 let network_graph_arc = Arc::new(network_graph);
201 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
203 let short_channel_id = 1;
204 let timestamp = current_time() - 10;
205 println!("timestamp: {}", timestamp);
208 let announcement = generate_announcement(short_channel_id);
209 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 5, 0);
210 let update_2 = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 10, 0);
212 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
213 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
214 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
216 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
217 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
218 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
220 persister.persist_gossip().await;
223 let serialization = serialize_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
224 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
225 clean_test_db().await;
227 let channel_count = network_graph_arc.read_only().channels().len();
229 assert_eq!(channel_count, 1);
230 assert_eq!(serialization.message_count, 3);
231 assert_eq!(serialization.announcement_count, 1);
232 assert_eq!(serialization.update_count, 2);
234 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
235 let client_graph_arc = Arc::new(client_graph);
236 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
237 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
238 println!("update result: {}", update_result);
239 // the update result must be a multiple of our snapshot granularity
240 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
241 assert!(update_result < timestamp);
243 let timestamp_delta = timestamp - update_result;
244 println!("timestamp delta: {}", timestamp_delta);
245 assert!(timestamp_delta < config::snapshot_generation_interval());
247 let readonly_graph = client_graph_arc.read_only();
248 let channels = readonly_graph.channels();
249 let client_channel_count = channels.len();
250 assert_eq!(client_channel_count, 1);
252 let first_channel = channels.get(&short_channel_id).unwrap();
253 assert!(&first_channel.announcement_message.is_none());
254 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.base_msat, 5);
255 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.base_msat, 10);
256 let last_update_seen_a = first_channel.one_to_two.as_ref().unwrap().last_update;
257 let last_update_seen_b = first_channel.two_to_one.as_ref().unwrap().last_update;
258 println!("last update a: {}", last_update_seen_a);
259 println!("last update b: {}", last_update_seen_b);
260 assert_eq!(last_update_seen_a, update_result - CLIENT_BACKDATE_INTERVAL);
261 assert_eq!(last_update_seen_b, update_result - CLIENT_BACKDATE_INTERVAL);
263 tokio::task::spawn_blocking(move || {
268 /// If a channel has only seen updates in one direction, it should not be announced
270 async fn test_unidirectional_intermediate_update_consideration() {
271 let _sanitizer = SchemaSanitizer::new();
273 let logger = Arc::new(TestLogger::new());
274 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
275 let network_graph_arc = Arc::new(network_graph);
276 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
278 let short_channel_id = 1;
279 let timestamp = current_time() - 10;
280 println!("timestamp: {}", timestamp);
283 let announcement = generate_announcement(short_channel_id);
284 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 6, 0);
285 let update_2 = generate_update(short_channel_id, true, timestamp + 1, 0, 0, 0, 3, 0);
286 let update_3 = generate_update(short_channel_id, true, timestamp + 2, 0, 0, 0, 4, 0);
288 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
289 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
290 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
291 network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap();
293 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
294 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
295 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
296 receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap();
298 persister.persist_gossip().await;
301 let channel_count = network_graph_arc.read_only().channels().len();
302 assert_eq!(channel_count, 1);
304 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
305 let client_graph_arc = Arc::new(client_graph);
306 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
308 let serialization = serialize_delta(network_graph_arc.clone(), timestamp + 1, None, logger.clone()).await;
310 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 1 update rows of the first update in a new direction", 1);
311 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 1 reference rows", 1);
312 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
314 assert_eq!(serialization.message_count, 3);
315 assert_eq!(serialization.announcement_count, 1);
316 assert_eq!(serialization.update_count, 2);
317 assert_eq!(serialization.update_count_full, 2);
318 assert_eq!(serialization.update_count_incremental, 0);
320 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
321 println!("update result: {}", update_result);
322 // the update result must be a multiple of our snapshot granularity
324 let readonly_graph = client_graph_arc.read_only();
325 let channels = readonly_graph.channels();
326 let client_channel_count = channels.len();
327 assert_eq!(client_channel_count, 1);
329 tokio::task::spawn_blocking(move || {
333 clean_test_db().await;
336 /// If a channel has only seen updates in one direction, it should not be announced
338 async fn test_bidirectional_intermediate_update_consideration() {
339 let _sanitizer = SchemaSanitizer::new();
341 let logger = Arc::new(TestLogger::new());
342 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
343 let network_graph_arc = Arc::new(network_graph);
344 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
346 let short_channel_id = 1;
347 let timestamp = current_time() - 10;
348 println!("timestamp: {}", timestamp);
351 let announcement = generate_announcement(short_channel_id);
352 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 5, 0);
353 let update_2 = generate_update(short_channel_id, false, timestamp + 1, 0, 0, 0, 4, 0);
354 let update_3 = generate_update(short_channel_id, false, timestamp + 2, 0, 0, 0, 3, 0);
355 let update_4 = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 3, 0);
357 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
358 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
359 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
360 network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap();
361 network_graph_arc.update_channel_unsigned(&update_4.contents).unwrap();
363 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
364 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
365 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
366 receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap();
367 receiver.send(GossipMessage::ChannelUpdate(update_4, None)).await.unwrap();
369 persister.persist_gossip().await;
372 let channel_count = network_graph_arc.read_only().channels().len();
373 assert_eq!(channel_count, 1);
375 let serialization = serialize_delta(network_graph_arc.clone(), timestamp + 1, None, logger.clone()).await;
377 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 0 update rows of the first update in a new direction", 1);
378 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 2 reference rows", 1);
379 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
381 assert_eq!(serialization.message_count, 1);
382 assert_eq!(serialization.announcement_count, 0);
383 assert_eq!(serialization.update_count, 1);
384 assert_eq!(serialization.update_count_full, 0);
385 assert_eq!(serialization.update_count_incremental, 1);
387 tokio::task::spawn_blocking(move || {
391 clean_test_db().await;
395 async fn test_channel_reminders() {
396 let _sanitizer = SchemaSanitizer::new();
398 let logger = Arc::new(TestLogger::new());
399 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
400 let network_graph_arc = Arc::new(network_graph);
401 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
403 let timestamp = current_time();
404 println!("timestamp: {}", timestamp);
405 let channel_reminder_delta = config::CHANNEL_REMINDER_AGE.as_secs() as u32;
408 { // unupdated channel
409 let short_channel_id = 1;
410 let announcement = generate_announcement(short_channel_id);
411 let update_1 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta - 1, 0, 0, 0, 5, 0);
412 let update_2 = generate_update(short_channel_id, true, timestamp - channel_reminder_delta - 1, 0, 0, 0, 3, 0);
414 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
415 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
416 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
418 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
419 receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
420 receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
422 { // unmodified but updated channel
423 let short_channel_id = 2;
424 let announcement = generate_announcement(short_channel_id);
425 let update_1 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta - 10, 0, 0, 0, 5, 0);
426 // in the false direction, we have one update that's different prior
427 let update_2 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta - 5, 0, 1, 0, 5, 0);
428 let update_3 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta - 1, 0, 0, 0, 5, 0);
429 let update_4 = generate_update(short_channel_id, true, timestamp - channel_reminder_delta - 1, 0, 0, 0, 3, 0);
430 let update_5 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta + 10, 0, 0, 0, 5, 0);
431 let update_6 = generate_update(short_channel_id, true, timestamp - channel_reminder_delta + 10, 0, 0, 0, 3, 0);
432 let update_7 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta + 20, 0, 0, 0, 5, 0);
433 let update_8 = generate_update(short_channel_id, true, timestamp - channel_reminder_delta + 20, 0, 0, 0, 3, 0);
435 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
436 network_graph_arc.update_channel_unsigned(&update_7.contents).unwrap();
437 network_graph_arc.update_channel_unsigned(&update_8.contents).unwrap();
439 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
440 receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp - channel_reminder_delta - 10))).await.unwrap();
441 receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - channel_reminder_delta - 5))).await.unwrap();
442 receiver.send(GossipMessage::ChannelUpdate(update_3, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
443 receiver.send(GossipMessage::ChannelUpdate(update_4, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
445 receiver.send(GossipMessage::ChannelUpdate(update_5, Some(timestamp - channel_reminder_delta + 10))).await.unwrap();
446 receiver.send(GossipMessage::ChannelUpdate(update_6, Some(timestamp - channel_reminder_delta + 10))).await.unwrap();
448 receiver.send(GossipMessage::ChannelUpdate(update_7, Some(timestamp - channel_reminder_delta + 20))).await.unwrap();
449 receiver.send(GossipMessage::ChannelUpdate(update_8, Some(timestamp - channel_reminder_delta + 20))).await.unwrap();
452 persister.persist_gossip().await;
455 let channel_count = network_graph_arc.read_only().channels().len();
456 assert_eq!(channel_count, 2);
458 let serialization = serialize_delta(network_graph_arc.clone(), timestamp - channel_reminder_delta + 15, None, logger.clone()).await;
460 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 0 update rows of the first update in a new direction", 1);
461 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 4 update rows of the latest update in the less recently updated direction", 1);
462 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 2 reference rows", 1);
463 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
465 assert_eq!(serialization.message_count, 4);
466 assert_eq!(serialization.announcement_count, 0);
467 assert_eq!(serialization.update_count, 4);
468 assert_eq!(serialization.update_count_full, 0);
469 assert_eq!(serialization.update_count_incremental, 4);
471 tokio::task::spawn_blocking(move || {
475 clean_test_db().await;
479 async fn test_full_snapshot_recency() {
480 let _sanitizer = SchemaSanitizer::new();
481 let logger = Arc::new(TestLogger::new());
482 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
483 let network_graph_arc = Arc::new(network_graph);
485 let short_channel_id = 1;
486 let timestamp = current_time();
487 println!("timestamp: {}", timestamp);
490 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
491 let announcement = generate_announcement(short_channel_id);
492 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
493 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
497 let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
498 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
499 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
502 let update = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
503 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
504 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
508 { // first and only update
509 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
510 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
511 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
516 persister.persist_gossip().await;
518 tokio::task::spawn_blocking(move || {
523 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
524 let client_graph_arc = Arc::new(client_graph);
526 { // sync after initial seed
527 let serialization = serialize_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
528 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
530 let channel_count = network_graph_arc.read_only().channels().len();
532 assert_eq!(channel_count, 1);
533 assert_eq!(serialization.message_count, 3);
534 assert_eq!(serialization.announcement_count, 1);
535 assert_eq!(serialization.update_count, 2);
537 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
538 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
539 // the update result must be a multiple of our snapshot granularity
540 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
541 assert!(update_result < timestamp);
543 let readonly_graph = client_graph_arc.read_only();
544 let channels = readonly_graph.channels();
545 let client_channel_count = channels.len();
546 assert_eq!(client_channel_count, 1);
548 let first_channel = channels.get(&short_channel_id).unwrap();
549 assert!(&first_channel.announcement_message.is_none());
550 // ensure the update in one direction shows the latest fee
551 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
552 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
555 clean_test_db().await;
559 async fn test_full_snapshot_recency_with_wrong_seen_order() {
560 let _sanitizer = SchemaSanitizer::new();
561 let logger = Arc::new(TestLogger::new());
562 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
563 let network_graph_arc = Arc::new(network_graph);
565 let short_channel_id = 1;
566 let timestamp = current_time();
567 println!("timestamp: {}", timestamp);
570 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
571 let announcement = generate_announcement(short_channel_id);
572 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
573 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
576 { // first update, seen latest
577 let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
578 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
579 receiver.send(GossipMessage::ChannelUpdate(update, Some(timestamp))).await.unwrap();
581 { // second update, seen first
582 let update = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
583 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
584 receiver.send(GossipMessage::ChannelUpdate(update, Some(timestamp - 1))).await.unwrap();
588 { // first and only update
589 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
590 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
591 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
596 persister.persist_gossip().await;
598 tokio::task::spawn_blocking(move || {
603 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
604 let client_graph_arc = Arc::new(client_graph);
606 { // sync after initial seed
607 let serialization = serialize_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
608 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
610 let channel_count = network_graph_arc.read_only().channels().len();
612 assert_eq!(channel_count, 1);
613 assert_eq!(serialization.message_count, 3);
614 assert_eq!(serialization.announcement_count, 1);
615 assert_eq!(serialization.update_count, 2);
617 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
618 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
619 // the update result must be a multiple of our snapshot granularity
620 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
621 assert!(update_result < timestamp);
623 let readonly_graph = client_graph_arc.read_only();
624 let channels = readonly_graph.channels();
625 let client_channel_count = channels.len();
626 assert_eq!(client_channel_count, 1);
628 let first_channel = channels.get(&short_channel_id).unwrap();
629 assert!(&first_channel.announcement_message.is_none());
630 // ensure the update in one direction shows the latest fee
631 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
632 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
635 clean_test_db().await;
639 async fn test_full_snapshot_recency_with_wrong_propagation_order() {
640 let _sanitizer = SchemaSanitizer::new();
641 let logger = Arc::new(TestLogger::new());
642 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
643 let network_graph_arc = Arc::new(network_graph);
645 let short_channel_id = 1;
646 let timestamp = current_time();
647 println!("timestamp: {}", timestamp);
650 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
651 let announcement = generate_announcement(short_channel_id);
652 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
653 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
656 // apply updates in their timestamp order
657 let update_1 = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
658 let update_2 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
659 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
660 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
662 // propagate updates in their seen order
663 receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - 1))).await.unwrap();
664 receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp))).await.unwrap();
667 { // first and only update
668 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
669 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
670 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
675 persister.persist_gossip().await;
677 tokio::task::spawn_blocking(move || {
682 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
683 let client_graph_arc = Arc::new(client_graph);
685 { // sync after initial seed
686 let serialization = serialize_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
687 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
689 let channel_count = network_graph_arc.read_only().channels().len();
691 assert_eq!(channel_count, 1);
692 assert_eq!(serialization.message_count, 3);
693 assert_eq!(serialization.announcement_count, 1);
694 assert_eq!(serialization.update_count, 2);
696 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
697 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
698 // the update result must be a multiple of our snapshot granularity
699 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
700 assert!(update_result < timestamp);
702 let readonly_graph = client_graph_arc.read_only();
703 let channels = readonly_graph.channels();
704 let client_channel_count = channels.len();
705 assert_eq!(client_channel_count, 1);
707 let first_channel = channels.get(&short_channel_id).unwrap();
708 assert!(&first_channel.announcement_message.is_none());
709 // ensure the update in one direction shows the latest fee
710 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
711 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
714 clean_test_db().await;
718 async fn test_full_snapshot_mutiny_scenario() {
719 let _sanitizer = SchemaSanitizer::new();
720 let logger = Arc::new(TestLogger::new());
721 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
722 let network_graph_arc = Arc::new(network_graph);
724 let short_channel_id = 873706024403271681;
725 let timestamp = current_time();
726 // let oldest_simulation_timestamp = 1693300588;
727 let latest_simulation_timestamp = 1695909301;
728 let timestamp_offset = timestamp - latest_simulation_timestamp;
729 println!("timestamp: {}", timestamp);
732 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
733 let announcement = generate_announcement(short_channel_id);
734 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
735 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
739 let update = generate_update(short_channel_id, false, 1693507369 + timestamp_offset, 0, 0, 0, 0, 38);
740 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
741 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
744 let update = generate_update(short_channel_id, false, 1693680390 + timestamp_offset, 0, 0, 0, 0, 38);
745 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
746 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
749 let update = generate_update(short_channel_id, false, 1693749109 + timestamp_offset, 0, 0, 0, 0, 200);
750 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
751 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
754 let update = generate_update(short_channel_id, false, 1693925190 + timestamp_offset, 0, 0, 0, 0, 200);
755 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
756 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
759 let update = generate_update(short_channel_id, false, 1694008323 + timestamp_offset, 0, 0, 0, 0, 209);
760 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
761 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
764 let update = generate_update(short_channel_id, false, 1694219924 + timestamp_offset, 0, 0, 0, 0, 209);
765 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
766 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
769 let update = generate_update(short_channel_id, false, 1694267536 + timestamp_offset, 0, 0, 0, 0, 210);
770 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
771 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
774 let update = generate_update(short_channel_id, false, 1694458808 + timestamp_offset, 0, 0, 0, 0, 210);
775 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
776 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
779 let update = generate_update(short_channel_id, false, 1694526734 + timestamp_offset, 0, 0, 0, 0, 200);
780 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
781 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
784 let update = generate_update(short_channel_id, false, 1694794765 + timestamp_offset, 0, 0, 0, 0, 200);
785 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
786 receiver.send(GossipMessage::ChannelUpdate(update, Some(1695909301 + 2 * config::SYMLINK_GRANULARITY_INTERVAL + timestamp_offset))).await.unwrap();
789 let update = generate_update(short_channel_id, false, 1695909301 + timestamp_offset, 0, 0, 0, 0, 130);
790 // network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
791 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
796 let update = generate_update(short_channel_id, true, 1693300588 + timestamp_offset, 0, 0, 0, 0, 10);
797 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
798 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
801 let update = generate_update(short_channel_id, true, 1695003621 + timestamp_offset, 0, 0, 0, 0, 10);
802 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
803 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
808 persister.persist_gossip().await;
810 tokio::task::spawn_blocking(move || {
815 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
816 let client_graph_arc = Arc::new(client_graph);
818 { // sync after initial seed
819 let serialization = serialize_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
820 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
822 let channel_count = network_graph_arc.read_only().channels().len();
824 assert_eq!(channel_count, 1);
825 assert_eq!(serialization.message_count, 3);
826 assert_eq!(serialization.announcement_count, 1);
827 assert_eq!(serialization.update_count, 2);
829 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
830 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
831 println!("update result: {}", update_result);
832 // the update result must be a multiple of our snapshot granularity
833 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
834 assert!(update_result < timestamp);
836 let timestamp_delta = timestamp - update_result;
837 println!("timestamp delta: {}", timestamp_delta);
838 assert!(timestamp_delta < config::snapshot_generation_interval());
840 let readonly_graph = client_graph_arc.read_only();
841 let channels = readonly_graph.channels();
842 let client_channel_count = channels.len();
843 assert_eq!(client_channel_count, 1);
845 let first_channel = channels.get(&short_channel_id).unwrap();
846 assert!(&first_channel.announcement_message.is_none());
847 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 130);
848 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
851 clean_test_db().await;
855 async fn test_full_snapshot_interlaced_channel_timestamps() {
856 let _sanitizer = SchemaSanitizer::new();
857 let logger = Arc::new(TestLogger::new());
858 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
859 let network_graph_arc = Arc::new(network_graph);
861 let main_channel_id = 1;
862 let timestamp = current_time();
863 println!("timestamp: {}", timestamp);
866 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
867 let secondary_channel_id = main_channel_id + 1;
870 let announcement = generate_announcement(main_channel_id);
871 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
872 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
875 { // secondary channel
876 let announcement = generate_announcement(secondary_channel_id);
877 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
878 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
883 let update = generate_update(main_channel_id, false, timestamp - 2, 0, 0, 0, 0, 10);
884 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
885 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
888 let update = generate_update(main_channel_id, true, timestamp - 2, 0, 0, 0, 0, 5);
889 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
890 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
894 { // in-between channel
896 let update = generate_update(secondary_channel_id, false, timestamp - 1, 0, 0, 0, 0, 42);
897 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
898 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
901 let update = generate_update(secondary_channel_id, true, timestamp - 1, 0, 0, 0, 0, 42);
902 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
903 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
909 let update = generate_update(main_channel_id, false, timestamp, 0, 0, 0, 0, 11);
910 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
911 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
914 let update = generate_update(main_channel_id, true, timestamp, 0, 0, 0, 0, 6);
915 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
916 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
921 persister.persist_gossip().await;
923 tokio::task::spawn_blocking(move || {
928 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
929 let client_graph_arc = Arc::new(client_graph);
931 { // sync after initial seed
932 let serialization = serialize_delta(network_graph_arc.clone(), 0, None, logger.clone()).await;
933 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 2", 1);
935 let channel_count = network_graph_arc.read_only().channels().len();
937 assert_eq!(channel_count, 2);
938 assert_eq!(serialization.message_count, 6);
939 assert_eq!(serialization.announcement_count, 2);
940 assert_eq!(serialization.update_count, 4);
942 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
943 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
944 // the update result must be a multiple of our snapshot granularity
945 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
946 assert!(update_result < timestamp);
948 let readonly_graph = client_graph_arc.read_only();
949 let channels = readonly_graph.channels();
950 let client_channel_count = channels.len();
951 assert_eq!(client_channel_count, 2);
953 let first_channel = channels.get(&main_channel_id).unwrap();
954 assert!(&first_channel.announcement_message.is_none());
955 // ensure the update in one direction shows the latest fee
956 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 11);
957 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 6);
960 clean_test_db().await;
964 async fn test_full_snapshot_persistence() {
965 let schema_sanitizer = SchemaSanitizer::new();
966 let logger = Arc::new(TestLogger::new());
967 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
968 let network_graph_arc = Arc::new(network_graph);
969 let snapshotter = Snapshotter::new(network_graph_arc.clone(), logger.clone());
970 let cache_sanitizer = CacheSanitizer::new(&schema_sanitizer);
972 let short_channel_id = 1;
973 let timestamp = current_time();
974 println!("timestamp: {}", timestamp);
977 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
978 let announcement = generate_announcement(short_channel_id);
979 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
980 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
983 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
984 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
985 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
989 let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
990 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
991 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
996 persister.persist_gossip().await;
998 tokio::task::spawn_blocking(move || {
1003 let cache_path = cache_sanitizer.cache_path();
1004 let symlink_path = format!("{}/symlinks/0.bin", cache_path);
1006 // generate snapshots
1008 snapshotter.generate_snapshots(20, 5, &[5, u64::MAX], &cache_path, Some(10)).await;
1010 let symlinked_data = fs::read(&symlink_path).unwrap();
1011 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
1012 let client_graph_arc = Arc::new(client_graph);
1014 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
1015 let update_result = rgs.update_network_graph(&symlinked_data).unwrap();
1016 // the update result must be a multiple of our snapshot granularity
1017 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
1019 let readonly_graph = client_graph_arc.read_only();
1020 let channels = readonly_graph.channels();
1021 let client_channel_count = channels.len();
1022 assert_eq!(client_channel_count, 1);
1024 let first_channel = channels.get(&short_channel_id).unwrap();
1025 assert!(&first_channel.announcement_message.is_none());
1026 // ensure the update in one direction shows the latest fee
1027 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 38);
1028 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
1032 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
1035 let update = generate_update(short_channel_id, false, timestamp + 30, 0, 0, 0, 0, 39);
1036 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
1037 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
1041 persister.persist_gossip().await;
1043 tokio::task::spawn_blocking(move || {
1048 // regenerate snapshots
1050 snapshotter.generate_snapshots(20, 5, &[5, u64::MAX], &cache_path, Some(10)).await;
1052 let symlinked_data = fs::read(&symlink_path).unwrap();
1053 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
1054 let client_graph_arc = Arc::new(client_graph);
1056 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
1057 let update_result = rgs.update_network_graph(&symlinked_data).unwrap();
1058 // the update result must be a multiple of our snapshot granularity
1059 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
1061 let readonly_graph = client_graph_arc.read_only();
1062 let channels = readonly_graph.channels();
1063 let client_channel_count = channels.len();
1064 assert_eq!(client_channel_count, 1);
1066 let first_channel = channels.get(&short_channel_id).unwrap();
1067 assert!(&first_channel.announcement_message.is_none());
1068 // ensure the update in one direction shows the latest fee
1069 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
1070 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
1073 // clean up afterwards
1074 clean_test_db().await;