1 //! Multi-module tests that use database fixtures
3 use std::cell::RefCell;
6 use std::time::{SystemTime, UNIX_EPOCH};
7 use bitcoin::blockdata::constants::ChainHash;
9 use bitcoin::secp256k1::ecdsa::Signature;
10 use bitcoin::secp256k1::{Secp256k1, SecretKey};
11 use bitcoin::hashes::Hash;
12 use bitcoin::hashes::sha256d::Hash as Sha256dHash;
13 use hex_conservative::DisplayHex;
14 use lightning::ln::features::ChannelFeatures;
15 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate};
16 use lightning::routing::gossip::{NetworkGraph, NodeId};
17 use lightning::util::ser::Writeable;
18 use lightning_rapid_gossip_sync::RapidGossipSync;
19 use crate::{config, serialize_delta};
20 use crate::persistence::GossipPersister;
21 use crate::snapshot::Snapshotter;
22 use crate::types::{GossipMessage, tests::TestLogger};
24 const CLIENT_BACKDATE_INTERVAL: u32 = 3600 * 24 * 7; // client backdates RGS by a week
27 static DB_TEST_SCHEMA: RefCell<Option<String>> = RefCell::new(None);
28 static IS_TEST_SCHEMA_CLEAN: RefCell<Option<bool>> = RefCell::new(None);
31 fn blank_signature() -> Signature {
32 Signature::from_compact(&[0u8; 64]).unwrap()
35 fn genesis_hash() -> ChainHash {
36 ChainHash::using_genesis_block(Network::Bitcoin)
39 fn current_time() -> u32 {
40 SystemTime::now().duration_since(UNIX_EPOCH).expect("Time must be > 1970").as_secs() as u32
43 pub(crate) fn db_test_schema() -> String {
44 DB_TEST_SCHEMA.with(|suffix_reference| {
45 let suffix_option = suffix_reference.borrow();
46 suffix_option.as_ref().unwrap().clone()
50 fn generate_announcement(short_channel_id: u64) -> ChannelAnnouncement {
51 let secp_context = Secp256k1::new();
53 let random_private_key_1 = SecretKey::from_slice(&[1; 32]).unwrap();
54 let random_public_key_1 = random_private_key_1.public_key(&secp_context);
55 let node_id_1 = NodeId::from_pubkey(&random_public_key_1);
57 let random_private_key_2 = SecretKey::from_slice(&[2; 32]).unwrap();
58 let random_public_key_2 = random_private_key_2.public_key(&secp_context);
59 let node_id_2 = NodeId::from_pubkey(&random_public_key_2);
61 let announcement = UnsignedChannelAnnouncement {
62 features: ChannelFeatures::empty(),
63 chain_hash: genesis_hash(),
67 bitcoin_key_1: node_id_1,
68 bitcoin_key_2: node_id_2,
72 let msg_hash = bitcoin::secp256k1::Message::from_slice(&Sha256dHash::hash(&announcement.encode()[..])[..]).unwrap();
73 let node_signature_1 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_1);
74 let node_signature_2 = secp_context.sign_ecdsa(&msg_hash, &random_private_key_2);
79 bitcoin_signature_1: node_signature_1,
80 bitcoin_signature_2: node_signature_2,
81 contents: announcement,
85 fn generate_update(scid: u64, direction: bool, timestamp: u32, expiry_delta: u16, min_msat: u64, max_msat: u64, base_msat: u32, fee_rate: u32) -> ChannelUpdate {
86 let flag_mask = if direction { 1 } else { 0 };
88 signature: blank_signature(),
89 contents: UnsignedChannelUpdate {
90 chain_hash: genesis_hash(),
91 short_channel_id: scid,
94 cltv_expiry_delta: expiry_delta,
95 htlc_minimum_msat: min_msat,
96 htlc_maximum_msat: max_msat,
97 fee_base_msat: base_msat,
98 fee_proportional_millionths: fee_rate,
104 struct SchemaSanitizer {}
106 impl SchemaSanitizer {
108 IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
109 let mut is_clean_option = cleanliness_reference.borrow_mut();
110 assert!(is_clean_option.is_none());
111 *is_clean_option = Some(false);
114 DB_TEST_SCHEMA.with(|suffix_reference| {
115 let mut suffix_option = suffix_reference.borrow_mut();
116 let current_time = SystemTime::now();
117 let unix_time = current_time.duration_since(UNIX_EPOCH).expect("Time went backwards");
118 let timestamp_seconds = unix_time.as_secs();
119 let timestamp_nanos = unix_time.as_nanos();
120 // sometimes Rust thinks two tests start at the same nanosecond, causing a schema conflict
121 let thread_id = thread::current().id();
122 let preimage = format!("{:?}-{}", thread_id, timestamp_nanos);
123 println!("test schema preimage: {}", preimage);
124 let suffix = Sha256dHash::hash(preimage.as_bytes()).encode();
125 // the schema must start with a letter
126 let schema = format!("test_{}_{}", timestamp_seconds, suffix.as_hex());
127 *suffix_option = Some(schema);
134 impl Drop for SchemaSanitizer {
136 IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
137 let is_clean_option = cleanliness_reference.borrow();
138 if let Some(is_clean) = *is_clean_option {
139 if std::thread::panicking() {
142 assert_eq!(is_clean, true);
148 struct CacheSanitizer {}
150 impl CacheSanitizer {
151 /// The CacheSanitizer instantiation requires that there be a schema sanitizer
152 fn new(_: &SchemaSanitizer) -> Self {
156 fn cache_path(&self) -> String {
157 format!("./res/{}/", db_test_schema())
161 impl Drop for CacheSanitizer {
163 let cache_path = self.cache_path();
164 fs::remove_dir_all(cache_path).unwrap();
169 async fn clean_test_db() {
170 let client = crate::connect_to_db().await;
171 let schema = db_test_schema();
172 client.execute(&format!("DROP SCHEMA IF EXISTS {} CASCADE", schema), &[]).await.unwrap();
173 IS_TEST_SCHEMA_CLEAN.with(|cleanliness_reference| {
174 let mut is_clean_option = cleanliness_reference.borrow_mut();
175 *is_clean_option = Some(true);
180 async fn test_persistence_runtime() {
181 let _sanitizer = SchemaSanitizer::new();
182 let logger = Arc::new(TestLogger::new());
183 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
184 let network_graph_arc = Arc::new(network_graph);
185 let (_persister, _receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
187 tokio::task::spawn_blocking(move || {
191 clean_test_db().await;
196 async fn test_trivial_setup() {
197 let _sanitizer = SchemaSanitizer::new();
198 let logger = Arc::new(TestLogger::new());
199 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
200 let network_graph_arc = Arc::new(network_graph);
201 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
203 let short_channel_id = 1;
204 let timestamp = current_time() - 10;
205 println!("timestamp: {}", timestamp);
208 let announcement = generate_announcement(short_channel_id);
209 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 5, 0);
210 let update_2 = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 10, 0);
212 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
213 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
214 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
216 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
217 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
218 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
220 persister.persist_gossip().await;
223 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
224 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
225 clean_test_db().await;
227 let channel_count = network_graph_arc.read_only().channels().len();
229 assert_eq!(channel_count, 1);
230 assert_eq!(serialization.message_count, 3);
231 assert_eq!(serialization.announcement_count, 1);
232 assert_eq!(serialization.update_count, 2);
234 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
235 let client_graph_arc = Arc::new(client_graph);
236 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
237 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
238 println!("update result: {}", update_result);
239 // the update result must be a multiple of our snapshot granularity
240 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
241 assert!(update_result < timestamp);
243 let timestamp_delta = timestamp - update_result;
244 println!("timestamp delta: {}", timestamp_delta);
245 assert!(timestamp_delta < config::snapshot_generation_interval());
247 let readonly_graph = client_graph_arc.read_only();
248 let channels = readonly_graph.channels();
249 let client_channel_count = channels.len();
250 assert_eq!(client_channel_count, 1);
252 let first_channel = channels.get(&short_channel_id).unwrap();
253 assert!(&first_channel.announcement_message.is_none());
254 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.base_msat, 5);
255 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.base_msat, 10);
256 let last_update_seen_a = first_channel.one_to_two.as_ref().unwrap().last_update;
257 let last_update_seen_b = first_channel.two_to_one.as_ref().unwrap().last_update;
258 println!("last update a: {}", last_update_seen_a);
259 println!("last update b: {}", last_update_seen_b);
260 assert_eq!(last_update_seen_a, update_result - CLIENT_BACKDATE_INTERVAL);
261 assert_eq!(last_update_seen_b, update_result - CLIENT_BACKDATE_INTERVAL);
263 tokio::task::spawn_blocking(move || {
268 /// If a channel has only seen updates in one direction, it should not be announced
270 async fn test_unidirectional_intermediate_update_consideration() {
271 let _sanitizer = SchemaSanitizer::new();
273 let logger = Arc::new(TestLogger::new());
274 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
275 let network_graph_arc = Arc::new(network_graph);
276 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
278 let short_channel_id = 1;
279 let timestamp = current_time() - 10;
280 println!("timestamp: {}", timestamp);
283 let announcement = generate_announcement(short_channel_id);
284 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 6, 0);
285 let update_2 = generate_update(short_channel_id, true, timestamp + 1, 0, 0, 0, 3, 0);
286 let update_3 = generate_update(short_channel_id, true, timestamp + 2, 0, 0, 0, 4, 0);
288 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
289 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
290 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
291 network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap();
293 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
294 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
295 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
296 receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap();
298 persister.persist_gossip().await;
301 let channel_count = network_graph_arc.read_only().channels().len();
302 assert_eq!(channel_count, 1);
304 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
305 let client_graph_arc = Arc::new(client_graph);
306 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
308 let serialization = serialize_delta(network_graph_arc.clone(), timestamp + 1, logger.clone()).await;
310 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 1 update rows of the first update in a new direction", 1);
311 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 1 reference rows", 1);
312 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
314 assert_eq!(serialization.message_count, 3);
315 assert_eq!(serialization.announcement_count, 1);
316 assert_eq!(serialization.update_count, 2);
317 assert_eq!(serialization.update_count_full, 2);
318 assert_eq!(serialization.update_count_incremental, 0);
320 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
321 println!("update result: {}", update_result);
322 // the update result must be a multiple of our snapshot granularity
324 let readonly_graph = client_graph_arc.read_only();
325 let channels = readonly_graph.channels();
326 let client_channel_count = channels.len();
327 assert_eq!(client_channel_count, 1);
329 tokio::task::spawn_blocking(move || {
333 clean_test_db().await;
336 /// If a channel has only seen updates in one direction, it should not be announced
338 async fn test_bidirectional_intermediate_update_consideration() {
339 let _sanitizer = SchemaSanitizer::new();
341 let logger = Arc::new(TestLogger::new());
342 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
343 let network_graph_arc = Arc::new(network_graph);
344 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
346 let short_channel_id = 1;
347 let timestamp = current_time() - 10;
348 println!("timestamp: {}", timestamp);
351 let announcement = generate_announcement(short_channel_id);
352 let update_1 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 5, 0);
353 let update_2 = generate_update(short_channel_id, false, timestamp + 1, 0, 0, 0, 4, 0);
354 let update_3 = generate_update(short_channel_id, false, timestamp + 2, 0, 0, 0, 3, 0);
355 let update_4 = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 3, 0);
357 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
358 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
359 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
360 network_graph_arc.update_channel_unsigned(&update_3.contents).unwrap();
361 network_graph_arc.update_channel_unsigned(&update_4.contents).unwrap();
363 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp))).await.unwrap();
364 receiver.send(GossipMessage::ChannelUpdate(update_1, None)).await.unwrap();
365 receiver.send(GossipMessage::ChannelUpdate(update_2, None)).await.unwrap();
366 receiver.send(GossipMessage::ChannelUpdate(update_3, None)).await.unwrap();
367 receiver.send(GossipMessage::ChannelUpdate(update_4, None)).await.unwrap();
369 persister.persist_gossip().await;
372 let channel_count = network_graph_arc.read_only().channels().len();
373 assert_eq!(channel_count, 1);
375 let serialization = serialize_delta(network_graph_arc.clone(), timestamp + 1, logger.clone()).await;
377 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 0 update rows of the first update in a new direction", 1);
378 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 2 reference rows", 1);
379 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed intermediate rows (2)", 1);
381 assert_eq!(serialization.message_count, 1);
382 assert_eq!(serialization.announcement_count, 0);
383 assert_eq!(serialization.update_count, 1);
384 assert_eq!(serialization.update_count_full, 0);
385 assert_eq!(serialization.update_count_incremental, 1);
387 tokio::task::spawn_blocking(move || {
391 clean_test_db().await;
395 async fn test_channel_reminders() {
396 let _sanitizer = SchemaSanitizer::new();
398 let logger = Arc::new(TestLogger::new());
399 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
400 let network_graph_arc = Arc::new(network_graph);
401 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
403 let timestamp = current_time();
404 println!("timestamp: {}", timestamp);
405 let channel_reminder_delta = config::CHANNEL_REMINDER_AGE.as_secs() as u32;
408 { // unupdated channel
409 let short_channel_id = 1;
410 let announcement = generate_announcement(short_channel_id);
411 let update_1 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta - 1, 0, 0, 0, 5, 0);
412 let update_2 = generate_update(short_channel_id, true, timestamp - channel_reminder_delta - 1, 0, 0, 0, 3, 0);
414 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
415 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
416 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
418 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
419 receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
420 receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
422 { // unmodified but updated channel
423 let short_channel_id = 2;
424 let announcement = generate_announcement(short_channel_id);
425 let update_1 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta - 1, 0, 0, 0, 5, 0);
426 let update_2 = generate_update(short_channel_id, true, timestamp - channel_reminder_delta - 1, 0, 0, 0, 3, 0);
427 let update_3 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta + 10, 0, 0, 0, 5, 0);
428 let update_4 = generate_update(short_channel_id, true, timestamp - channel_reminder_delta + 10, 0, 0, 0, 3, 0);
429 let update_5 = generate_update(short_channel_id, false, timestamp - channel_reminder_delta + 20, 0, 0, 0, 5, 0);
430 let update_6 = generate_update(short_channel_id, true, timestamp - channel_reminder_delta + 20, 0, 0, 0, 3, 0);
432 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
433 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
434 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
436 receiver.send(GossipMessage::ChannelAnnouncement(announcement, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
437 receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
438 receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - channel_reminder_delta - 1))).await.unwrap();
440 receiver.send(GossipMessage::ChannelUpdate(update_3, Some(timestamp - channel_reminder_delta + 10))).await.unwrap();
441 receiver.send(GossipMessage::ChannelUpdate(update_4, Some(timestamp - channel_reminder_delta + 10))).await.unwrap();
443 receiver.send(GossipMessage::ChannelUpdate(update_5, Some(timestamp - channel_reminder_delta + 20))).await.unwrap();
444 receiver.send(GossipMessage::ChannelUpdate(update_6, Some(timestamp - channel_reminder_delta + 20))).await.unwrap();
447 persister.persist_gossip().await;
450 let channel_count = network_graph_arc.read_only().channels().len();
451 assert_eq!(channel_count, 2);
453 let serialization = serialize_delta(network_graph_arc.clone(), timestamp - channel_reminder_delta + 15, logger.clone()).await;
455 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 0 update rows of the first update in a new direction", 1);
456 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Fetched 1 update rows of the latest update in the less recently updated direction", 1);
457 logger.assert_log_contains("rapid_gossip_sync_server::lookup", "Processed 2 reference rows", 1);
459 assert_eq!(serialization.message_count, 2);
460 assert_eq!(serialization.announcement_count, 0);
461 assert_eq!(serialization.update_count, 2);
462 assert_eq!(serialization.update_count_full, 0);
463 assert_eq!(serialization.update_count_incremental, 2);
465 tokio::task::spawn_blocking(move || {
469 clean_test_db().await;
473 async fn test_full_snapshot_recency() {
474 let _sanitizer = SchemaSanitizer::new();
475 let logger = Arc::new(TestLogger::new());
476 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
477 let network_graph_arc = Arc::new(network_graph);
479 let short_channel_id = 1;
480 let timestamp = current_time();
481 println!("timestamp: {}", timestamp);
484 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
485 let announcement = generate_announcement(short_channel_id);
486 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
487 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
491 let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
492 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
493 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
496 let update = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
497 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
498 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
502 { // first and only update
503 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
504 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
505 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
510 persister.persist_gossip().await;
512 tokio::task::spawn_blocking(move || {
517 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
518 let client_graph_arc = Arc::new(client_graph);
520 { // sync after initial seed
521 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
522 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
524 let channel_count = network_graph_arc.read_only().channels().len();
526 assert_eq!(channel_count, 1);
527 assert_eq!(serialization.message_count, 3);
528 assert_eq!(serialization.announcement_count, 1);
529 assert_eq!(serialization.update_count, 2);
531 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
532 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
533 // the update result must be a multiple of our snapshot granularity
534 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
535 assert!(update_result < timestamp);
537 let readonly_graph = client_graph_arc.read_only();
538 let channels = readonly_graph.channels();
539 let client_channel_count = channels.len();
540 assert_eq!(client_channel_count, 1);
542 let first_channel = channels.get(&short_channel_id).unwrap();
543 assert!(&first_channel.announcement_message.is_none());
544 // ensure the update in one direction shows the latest fee
545 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
546 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
549 clean_test_db().await;
553 async fn test_full_snapshot_recency_with_wrong_seen_order() {
554 let _sanitizer = SchemaSanitizer::new();
555 let logger = Arc::new(TestLogger::new());
556 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
557 let network_graph_arc = Arc::new(network_graph);
559 let short_channel_id = 1;
560 let timestamp = current_time();
561 println!("timestamp: {}", timestamp);
564 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
565 let announcement = generate_announcement(short_channel_id);
566 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
567 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
570 { // first update, seen latest
571 let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
572 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
573 receiver.send(GossipMessage::ChannelUpdate(update, Some(timestamp))).await.unwrap();
575 { // second update, seen first
576 let update = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
577 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
578 receiver.send(GossipMessage::ChannelUpdate(update, Some(timestamp - 1))).await.unwrap();
582 { // first and only update
583 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
584 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
585 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
590 persister.persist_gossip().await;
592 tokio::task::spawn_blocking(move || {
597 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
598 let client_graph_arc = Arc::new(client_graph);
600 { // sync after initial seed
601 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
602 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
604 let channel_count = network_graph_arc.read_only().channels().len();
606 assert_eq!(channel_count, 1);
607 assert_eq!(serialization.message_count, 3);
608 assert_eq!(serialization.announcement_count, 1);
609 assert_eq!(serialization.update_count, 2);
611 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
612 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
613 // the update result must be a multiple of our snapshot granularity
614 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
615 assert!(update_result < timestamp);
617 let readonly_graph = client_graph_arc.read_only();
618 let channels = readonly_graph.channels();
619 let client_channel_count = channels.len();
620 assert_eq!(client_channel_count, 1);
622 let first_channel = channels.get(&short_channel_id).unwrap();
623 assert!(&first_channel.announcement_message.is_none());
624 // ensure the update in one direction shows the latest fee
625 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
626 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
629 clean_test_db().await;
633 async fn test_full_snapshot_recency_with_wrong_propagation_order() {
634 let _sanitizer = SchemaSanitizer::new();
635 let logger = Arc::new(TestLogger::new());
636 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
637 let network_graph_arc = Arc::new(network_graph);
639 let short_channel_id = 1;
640 let timestamp = current_time();
641 println!("timestamp: {}", timestamp);
644 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
645 let announcement = generate_announcement(short_channel_id);
646 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
647 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
650 // apply updates in their timestamp order
651 let update_1 = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
652 let update_2 = generate_update(short_channel_id, false, timestamp, 0, 0, 0, 0, 39);
653 network_graph_arc.update_channel_unsigned(&update_1.contents).unwrap();
654 network_graph_arc.update_channel_unsigned(&update_2.contents).unwrap();
656 // propagate updates in their seen order
657 receiver.send(GossipMessage::ChannelUpdate(update_2, Some(timestamp - 1))).await.unwrap();
658 receiver.send(GossipMessage::ChannelUpdate(update_1, Some(timestamp))).await.unwrap();
661 { // first and only update
662 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
663 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
664 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
669 persister.persist_gossip().await;
671 tokio::task::spawn_blocking(move || {
676 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
677 let client_graph_arc = Arc::new(client_graph);
679 { // sync after initial seed
680 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
681 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
683 let channel_count = network_graph_arc.read_only().channels().len();
685 assert_eq!(channel_count, 1);
686 assert_eq!(serialization.message_count, 3);
687 assert_eq!(serialization.announcement_count, 1);
688 assert_eq!(serialization.update_count, 2);
690 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
691 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
692 // the update result must be a multiple of our snapshot granularity
693 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
694 assert!(update_result < timestamp);
696 let readonly_graph = client_graph_arc.read_only();
697 let channels = readonly_graph.channels();
698 let client_channel_count = channels.len();
699 assert_eq!(client_channel_count, 1);
701 let first_channel = channels.get(&short_channel_id).unwrap();
702 assert!(&first_channel.announcement_message.is_none());
703 // ensure the update in one direction shows the latest fee
704 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
705 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
708 clean_test_db().await;
712 async fn test_full_snapshot_mutiny_scenario() {
713 let _sanitizer = SchemaSanitizer::new();
714 let logger = Arc::new(TestLogger::new());
715 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
716 let network_graph_arc = Arc::new(network_graph);
718 let short_channel_id = 873706024403271681;
719 let timestamp = current_time();
720 // let oldest_simulation_timestamp = 1693300588;
721 let latest_simulation_timestamp = 1695909301;
722 let timestamp_offset = timestamp - latest_simulation_timestamp;
723 println!("timestamp: {}", timestamp);
726 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
727 let announcement = generate_announcement(short_channel_id);
728 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
729 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
733 let update = generate_update(short_channel_id, false, 1693507369 + timestamp_offset, 0, 0, 0, 0, 38);
734 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
735 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
738 let update = generate_update(short_channel_id, false, 1693680390 + timestamp_offset, 0, 0, 0, 0, 38);
739 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
740 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
743 let update = generate_update(short_channel_id, false, 1693749109 + timestamp_offset, 0, 0, 0, 0, 200);
744 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
745 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
748 let update = generate_update(short_channel_id, false, 1693925190 + timestamp_offset, 0, 0, 0, 0, 200);
749 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
750 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
753 let update = generate_update(short_channel_id, false, 1694008323 + timestamp_offset, 0, 0, 0, 0, 209);
754 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
755 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
758 let update = generate_update(short_channel_id, false, 1694219924 + timestamp_offset, 0, 0, 0, 0, 209);
759 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
760 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
763 let update = generate_update(short_channel_id, false, 1694267536 + timestamp_offset, 0, 0, 0, 0, 210);
764 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
765 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
768 let update = generate_update(short_channel_id, false, 1694458808 + timestamp_offset, 0, 0, 0, 0, 210);
769 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
770 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
773 let update = generate_update(short_channel_id, false, 1694526734 + timestamp_offset, 0, 0, 0, 0, 200);
774 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
775 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
778 let update = generate_update(short_channel_id, false, 1694794765 + timestamp_offset, 0, 0, 0, 0, 200);
779 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
780 receiver.send(GossipMessage::ChannelUpdate(update, Some(1695909301 + 2 * config::SYMLINK_GRANULARITY_INTERVAL + timestamp_offset))).await.unwrap();
783 let update = generate_update(short_channel_id, false, 1695909301 + timestamp_offset, 0, 0, 0, 0, 130);
784 // network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
785 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
790 let update = generate_update(short_channel_id, true, 1693300588 + timestamp_offset, 0, 0, 0, 0, 10);
791 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
792 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
795 let update = generate_update(short_channel_id, true, 1695003621 + timestamp_offset, 0, 0, 0, 0, 10);
796 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
797 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
802 persister.persist_gossip().await;
804 tokio::task::spawn_blocking(move || {
809 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
810 let client_graph_arc = Arc::new(client_graph);
812 { // sync after initial seed
813 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
814 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 1", 1);
816 let channel_count = network_graph_arc.read_only().channels().len();
818 assert_eq!(channel_count, 1);
819 assert_eq!(serialization.message_count, 3);
820 assert_eq!(serialization.announcement_count, 1);
821 assert_eq!(serialization.update_count, 2);
823 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
824 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
825 println!("update result: {}", update_result);
826 // the update result must be a multiple of our snapshot granularity
827 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
828 assert!(update_result < timestamp);
830 let timestamp_delta = timestamp - update_result;
831 println!("timestamp delta: {}", timestamp_delta);
832 assert!(timestamp_delta < config::snapshot_generation_interval());
834 let readonly_graph = client_graph_arc.read_only();
835 let channels = readonly_graph.channels();
836 let client_channel_count = channels.len();
837 assert_eq!(client_channel_count, 1);
839 let first_channel = channels.get(&short_channel_id).unwrap();
840 assert!(&first_channel.announcement_message.is_none());
841 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 130);
842 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
845 clean_test_db().await;
849 async fn test_full_snapshot_interlaced_channel_timestamps() {
850 let _sanitizer = SchemaSanitizer::new();
851 let logger = Arc::new(TestLogger::new());
852 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
853 let network_graph_arc = Arc::new(network_graph);
855 let main_channel_id = 1;
856 let timestamp = current_time();
857 println!("timestamp: {}", timestamp);
860 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
861 let secondary_channel_id = main_channel_id + 1;
864 let announcement = generate_announcement(main_channel_id);
865 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
866 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
869 { // secondary channel
870 let announcement = generate_announcement(secondary_channel_id);
871 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
872 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
877 let update = generate_update(main_channel_id, false, timestamp - 2, 0, 0, 0, 0, 10);
878 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
879 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
882 let update = generate_update(main_channel_id, true, timestamp - 2, 0, 0, 0, 0, 5);
883 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
884 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
888 { // in-between channel
890 let update = generate_update(secondary_channel_id, false, timestamp - 1, 0, 0, 0, 0, 42);
891 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
892 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
895 let update = generate_update(secondary_channel_id, true, timestamp - 1, 0, 0, 0, 0, 42);
896 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
897 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
903 let update = generate_update(main_channel_id, false, timestamp, 0, 0, 0, 0, 11);
904 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
905 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
908 let update = generate_update(main_channel_id, true, timestamp, 0, 0, 0, 0, 6);
909 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
910 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
915 persister.persist_gossip().await;
917 tokio::task::spawn_blocking(move || {
922 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
923 let client_graph_arc = Arc::new(client_graph);
925 { // sync after initial seed
926 let serialization = serialize_delta(network_graph_arc.clone(), 0, logger.clone()).await;
927 logger.assert_log_contains("rapid_gossip_sync_server", "announcement channel count: 2", 1);
929 let channel_count = network_graph_arc.read_only().channels().len();
931 assert_eq!(channel_count, 2);
932 assert_eq!(serialization.message_count, 6);
933 assert_eq!(serialization.announcement_count, 2);
934 assert_eq!(serialization.update_count, 4);
936 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
937 let update_result = rgs.update_network_graph(&serialization.data).unwrap();
938 // the update result must be a multiple of our snapshot granularity
939 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
940 assert!(update_result < timestamp);
942 let readonly_graph = client_graph_arc.read_only();
943 let channels = readonly_graph.channels();
944 let client_channel_count = channels.len();
945 assert_eq!(client_channel_count, 2);
947 let first_channel = channels.get(&main_channel_id).unwrap();
948 assert!(&first_channel.announcement_message.is_none());
949 // ensure the update in one direction shows the latest fee
950 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 11);
951 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 6);
954 clean_test_db().await;
958 async fn test_full_snapshot_persistence() {
959 let schema_sanitizer = SchemaSanitizer::new();
960 let logger = Arc::new(TestLogger::new());
961 let network_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
962 let network_graph_arc = Arc::new(network_graph);
963 let snapshotter = Snapshotter::new(network_graph_arc.clone(), logger.clone());
964 let cache_sanitizer = CacheSanitizer::new(&schema_sanitizer);
966 let short_channel_id = 1;
967 let timestamp = current_time();
968 println!("timestamp: {}", timestamp);
971 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
972 let announcement = generate_announcement(short_channel_id);
973 network_graph_arc.update_channel_from_announcement_no_lookup(&announcement).unwrap();
974 receiver.send(GossipMessage::ChannelAnnouncement(announcement, None)).await.unwrap();
977 let update = generate_update(short_channel_id, true, timestamp, 0, 0, 0, 0, 10);
978 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
979 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
983 let update = generate_update(short_channel_id, false, timestamp - 1, 0, 0, 0, 0, 38);
984 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
985 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
990 persister.persist_gossip().await;
992 tokio::task::spawn_blocking(move || {
997 let cache_path = cache_sanitizer.cache_path();
998 let symlink_path = format!("{}/symlinks/0.bin", cache_path);
1000 // generate snapshots
1002 snapshotter.generate_snapshots(20, 5, &[5, u64::MAX], &cache_path, Some(10)).await;
1004 let symlinked_data = fs::read(&symlink_path).unwrap();
1005 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
1006 let client_graph_arc = Arc::new(client_graph);
1008 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
1009 let update_result = rgs.update_network_graph(&symlinked_data).unwrap();
1010 // the update result must be a multiple of our snapshot granularity
1011 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
1013 let readonly_graph = client_graph_arc.read_only();
1014 let channels = readonly_graph.channels();
1015 let client_channel_count = channels.len();
1016 assert_eq!(client_channel_count, 1);
1018 let first_channel = channels.get(&short_channel_id).unwrap();
1019 assert!(&first_channel.announcement_message.is_none());
1020 // ensure the update in one direction shows the latest fee
1021 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 38);
1022 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
1026 let (mut persister, receiver) = GossipPersister::new(network_graph_arc.clone(), logger.clone());
1029 let update = generate_update(short_channel_id, false, timestamp + 30, 0, 0, 0, 0, 39);
1030 network_graph_arc.update_channel_unsigned(&update.contents).unwrap();
1031 receiver.send(GossipMessage::ChannelUpdate(update, None)).await.unwrap();
1035 persister.persist_gossip().await;
1037 tokio::task::spawn_blocking(move || {
1042 // regenerate snapshots
1044 snapshotter.generate_snapshots(20, 5, &[5, u64::MAX], &cache_path, Some(10)).await;
1046 let symlinked_data = fs::read(&symlink_path).unwrap();
1047 let client_graph = NetworkGraph::new(Network::Bitcoin, logger.clone());
1048 let client_graph_arc = Arc::new(client_graph);
1050 let rgs = RapidGossipSync::new(client_graph_arc.clone(), logger.clone());
1051 let update_result = rgs.update_network_graph(&symlinked_data).unwrap();
1052 // the update result must be a multiple of our snapshot granularity
1053 assert_eq!(update_result % config::snapshot_generation_interval(), 0);
1055 let readonly_graph = client_graph_arc.read_only();
1056 let channels = readonly_graph.channels();
1057 let client_channel_count = channels.len();
1058 assert_eq!(client_channel_count, 1);
1060 let first_channel = channels.get(&short_channel_id).unwrap();
1061 assert!(&first_channel.announcement_message.is_none());
1062 // ensure the update in one direction shows the latest fee
1063 assert_eq!(first_channel.one_to_two.as_ref().unwrap().fees.proportional_millionths, 39);
1064 assert_eq!(first_channel.two_to_one.as_ref().unwrap().fees.proportional_millionths, 10);
1067 // clean up afterwards
1068 clean_test_db().await;