From a58ae4c97b3436b743fd61a1fa697471131cf356 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Wed, 3 Nov 2021 10:50:08 -0700 Subject: [PATCH] Introduce graph sync crate for fast-forwarding through gossip data downloaded from a server. --- .editorconfig | 1 + .github/workflows/build.yml | 18 +- Cargo.toml | 1 + fuzz/Cargo.toml | 1 + fuzz/src/bin/gen_target.sh | 1 + fuzz/src/bin/process_network_graph_target.rs | 113 ++++ fuzz/src/lib.rs | 2 + fuzz/src/process_network_graph.rs | 20 + fuzz/targets.h | 1 + lightning-rapid-gossip-sync/Cargo.toml | 20 + lightning-rapid-gossip-sync/README.md | 120 +++++ lightning-rapid-gossip-sync/res/.gitignore | 2 + lightning-rapid-gossip-sync/src/error.rs | 40 ++ lightning-rapid-gossip-sync/src/lib.rs | 243 +++++++++ lightning-rapid-gossip-sync/src/processing.rs | 499 ++++++++++++++++++ lightning/src/lib.rs | 2 +- lightning/src/ln/msgs.rs | 5 +- lightning/src/routing/network_graph.rs | 158 +++--- lightning/src/util/ser.rs | 2 +- 19 files changed, 1185 insertions(+), 64 deletions(-) create mode 100644 fuzz/src/bin/process_network_graph_target.rs create mode 100644 fuzz/src/process_network_graph.rs create mode 100644 lightning-rapid-gossip-sync/Cargo.toml create mode 100644 lightning-rapid-gossip-sync/README.md create mode 100644 lightning-rapid-gossip-sync/res/.gitignore create mode 100644 lightning-rapid-gossip-sync/src/error.rs create mode 100644 lightning-rapid-gossip-sync/src/lib.rs create mode 100644 lightning-rapid-gossip-sync/src/processing.rs diff --git a/.editorconfig b/.editorconfig index e5657670c..dab24fe3d 100644 --- a/.editorconfig +++ b/.editorconfig @@ -3,3 +3,4 @@ [*] indent_style = tab insert_final_newline = true +trim_trailing_whitespace = true diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index cff171344..59a11b4e4 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -141,6 +141,7 @@ jobs: run: | cargo test --verbose --color always -p lightning cargo test --verbose --color always -p lightning-invoice + cargo test --verbose --color always -p lightning-rapid-gossip-sync cargo build --verbose --color always -p lightning-persister cargo build --verbose --color always -p lightning-background-processor - name: Test C Bindings Modifications on Rust ${{ matrix.toolchain }} @@ -221,11 +222,24 @@ jobs: - name: Fetch routing graph snapshot if: steps.cache-graph.outputs.cache-hit != 'true' run: | - wget -O lightning/net_graph-2021-05-31.bin https://bitcoin.ninja/ldk-net_graph-v0.0.15-2021-05-31.bin - if [ "$(sha256sum lightning/net_graph-2021-05-31.bin | awk '{ print $1 }')" != "05a5361278f68ee2afd086cc04a1f927a63924be451f3221d380533acfacc303" ]; then + curl --verbose -L -o lightning/net_graph-2021-05-31.bin https://bitcoin.ninja/ldk-net_graph-v0.0.15-2021-05-31.bin + echo "Sha sum: $(sha256sum lightning/net_graph-2021-05-31.bin | awk '{ print $1 }')" + if [ "$(sha256sum lightning/net_graph-2021-05-31.bin | awk '{ print $1 }')" != "${EXPECTED_ROUTING_GRAPH_SNAPSHOT_SHASUM}" ]; then echo "Bad hash" exit 1 fi + env: + EXPECTED_ROUTING_GRAPH_SNAPSHOT_SHASUM: 05a5361278f68ee2afd086cc04a1f927a63924be451f3221d380533acfacc303 + - name: Fetch rapid graph sync reference input + run: | + curl --verbose -L -o lightning-rapid-gossip-sync/res/full_graph.lngossip https://bitcoin.ninja/ldk-compressed_graph-bc08df7542-2022-05-05.bin + echo "Sha sum: $(sha256sum lightning-rapid-gossip-sync/res/full_graph.lngossip | awk '{ print $1 }')" + if [ "$(sha256sum lightning-rapid-gossip-sync/res/full_graph.lngossip | awk '{ print $1 }')" != "${EXPECTED_RAPID_GOSSIP_SHASUM}" ]; then + echo "Bad hash" + exit 1 + fi + env: + EXPECTED_RAPID_GOSSIP_SHASUM: 9637b91cea9d64320cf48fc0787c70fe69fc062f90d3512e207044110cadfd7b - name: Test with Network Graph on Rust ${{ matrix.toolchain }} run: | cd lightning diff --git a/Cargo.toml b/Cargo.toml index 6e03fc1ac..f263dc8ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,6 +7,7 @@ members = [ "lightning-net-tokio", "lightning-persister", "lightning-background-processor", + "lightning-rapid-gossip-sync" ] exclude = [ diff --git a/fuzz/Cargo.toml b/fuzz/Cargo.toml index 88e577617..66dabcfe4 100644 --- a/fuzz/Cargo.toml +++ b/fuzz/Cargo.toml @@ -19,6 +19,7 @@ stdin_fuzz = [] [dependencies] afl = { version = "0.4", optional = true } lightning = { path = "../lightning", features = ["regex"] } +lightning-rapid-gossip-sync = { path = "../lightning-rapid-gossip-sync" } bitcoin = { version = "0.28.1", features = ["secp-lowmemory"] } hex = "0.3" honggfuzz = { version = "0.5", optional = true } diff --git a/fuzz/src/bin/gen_target.sh b/fuzz/src/bin/gen_target.sh index eb07df634..72fefe516 100755 --- a/fuzz/src/bin/gen_target.sh +++ b/fuzz/src/bin/gen_target.sh @@ -10,6 +10,7 @@ GEN_TEST chanmon_deser GEN_TEST chanmon_consistency GEN_TEST full_stack GEN_TEST peer_crypt +GEN_TEST process_network_graph GEN_TEST router GEN_TEST zbase32 diff --git a/fuzz/src/bin/process_network_graph_target.rs b/fuzz/src/bin/process_network_graph_target.rs new file mode 100644 index 000000000..380efdff4 --- /dev/null +++ b/fuzz/src/bin/process_network_graph_target.rs @@ -0,0 +1,113 @@ +// This file is Copyright its original authors, visible in version control +// history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license +// , at your option. +// You may not use this file except in accordance with one or both of these +// licenses. + +// This file is auto-generated by gen_target.sh based on target_template.txt +// To modify it, modify target_template.txt and run gen_target.sh instead. + +#![cfg_attr(feature = "libfuzzer_fuzz", no_main)] + +#[cfg(not(fuzzing))] +compile_error!("Fuzz targets need cfg=fuzzing"); + +extern crate lightning_fuzz; +use lightning_fuzz::process_network_graph::*; + +#[cfg(feature = "afl")] +#[macro_use] extern crate afl; +#[cfg(feature = "afl")] +fn main() { + fuzz!(|data| { + process_network_graph_run(data.as_ptr(), data.len()); + }); +} + +#[cfg(feature = "honggfuzz")] +#[macro_use] extern crate honggfuzz; +#[cfg(feature = "honggfuzz")] +fn main() { + loop { + fuzz!(|data| { + process_network_graph_run(data.as_ptr(), data.len()); + }); + } +} + +#[cfg(feature = "libfuzzer_fuzz")] +#[macro_use] extern crate libfuzzer_sys; +#[cfg(feature = "libfuzzer_fuzz")] +fuzz_target!(|data: &[u8]| { + process_network_graph_run(data.as_ptr(), data.len()); +}); + +#[cfg(feature = "stdin_fuzz")] +fn main() { + use std::io::Read; + + let mut data = Vec::with_capacity(8192); + std::io::stdin().read_to_end(&mut data).unwrap(); + process_network_graph_run(data.as_ptr(), data.len()); +} + +#[test] +fn run_test_cases() { + use std::fs; + use std::io::Read; + use lightning_fuzz::utils::test_logger::StringBuffer; + + use std::sync::{atomic, Arc}; + { + let data: Vec = vec![0]; + process_network_graph_run(data.as_ptr(), data.len()); + } + let mut threads = Vec::new(); + let threads_running = Arc::new(atomic::AtomicUsize::new(0)); + if let Ok(tests) = fs::read_dir("test_cases/process_network_graph") { + for test in tests { + let mut data: Vec = Vec::new(); + let path = test.unwrap().path(); + fs::File::open(&path).unwrap().read_to_end(&mut data).unwrap(); + threads_running.fetch_add(1, atomic::Ordering::AcqRel); + + let thread_count_ref = Arc::clone(&threads_running); + let main_thread_ref = std::thread::current(); + threads.push((path.file_name().unwrap().to_str().unwrap().to_string(), + std::thread::spawn(move || { + let string_logger = StringBuffer::new(); + + let panic_logger = string_logger.clone(); + let res = if ::std::panic::catch_unwind(move || { + process_network_graph_test(&data, panic_logger); + }).is_err() { + Some(string_logger.into_string()) + } else { None }; + thread_count_ref.fetch_sub(1, atomic::Ordering::AcqRel); + main_thread_ref.unpark(); + res + }) + )); + while threads_running.load(atomic::Ordering::Acquire) > 32 { + std::thread::park(); + } + } + } + let mut failed_outputs = Vec::new(); + for (test, thread) in threads.drain(..) { + if let Some(output) = thread.join().unwrap() { + println!("\nOutput of {}:\n{}\n", test, output); + failed_outputs.push(test); + } + } + if !failed_outputs.is_empty() { + println!("Test cases which failed: "); + for case in failed_outputs { + println!("{}", case); + } + panic!(); + } +} diff --git a/fuzz/src/lib.rs b/fuzz/src/lib.rs index a0cc42b81..5e158aee3 100644 --- a/fuzz/src/lib.rs +++ b/fuzz/src/lib.rs @@ -9,6 +9,7 @@ extern crate bitcoin; extern crate lightning; +extern crate lightning_rapid_gossip_sync; extern crate hex; pub mod utils; @@ -17,6 +18,7 @@ pub mod chanmon_deser; pub mod chanmon_consistency; pub mod full_stack; pub mod peer_crypt; +pub mod process_network_graph; pub mod router; pub mod zbase32; diff --git a/fuzz/src/process_network_graph.rs b/fuzz/src/process_network_graph.rs new file mode 100644 index 000000000..3f3033584 --- /dev/null +++ b/fuzz/src/process_network_graph.rs @@ -0,0 +1,20 @@ +// Import that needs to be added manually +use utils::test_logger; + +/// Actual fuzz test, method signature and name are fixed +fn do_test(data: &[u8]) { + let block_hash = bitcoin::BlockHash::default(); + let network_graph = lightning::routing::network_graph::NetworkGraph::new(block_hash); + lightning_rapid_gossip_sync::processing::update_network_graph(&network_graph, data); +} + +/// Method that needs to be added manually, {name}_test +pub fn process_network_graph_test(data: &[u8], _out: Out) { + do_test(data); +} + +/// Method that needs to be added manually, {name}_run +#[no_mangle] +pub extern "C" fn process_network_graph_run(data: *const u8, datalen: usize) { + do_test(unsafe { std::slice::from_raw_parts(data, datalen) }); +} diff --git a/fuzz/targets.h b/fuzz/targets.h index 5d45e3d02..798fb6647 100644 --- a/fuzz/targets.h +++ b/fuzz/targets.h @@ -3,6 +3,7 @@ void chanmon_deser_run(const unsigned char* data, size_t data_len); void chanmon_consistency_run(const unsigned char* data, size_t data_len); void full_stack_run(const unsigned char* data, size_t data_len); void peer_crypt_run(const unsigned char* data, size_t data_len); +void process_network_graph_run(const unsigned char* data, size_t data_len); void router_run(const unsigned char* data, size_t data_len); void zbase32_run(const unsigned char* data, size_t data_len); void msg_accept_channel_run(const unsigned char* data, size_t data_len); diff --git a/lightning-rapid-gossip-sync/Cargo.toml b/lightning-rapid-gossip-sync/Cargo.toml new file mode 100644 index 000000000..a4bc04fed --- /dev/null +++ b/lightning-rapid-gossip-sync/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "lightning-rapid-gossip-sync" +version = "0.0.104" +authors = ["Arik Sosman "] +license = "MIT OR Apache-2.0" +repository = "https://github.com/lightningdevkit/rust-lightning" +edition = "2018" +description = """ +Utility to process gossip routing data from Rapid Gossip Sync Server. +""" + +[features] +_bench_unstable = [] + +[dependencies] +lightning = { version = "0.0.106", path = "../lightning" } +bitcoin = { version = "0.28.1", default-features = false } + +[dev-dependencies] +lightning = { version = "0.0.106", path = "../lightning", features = ["_test_utils"] } diff --git a/lightning-rapid-gossip-sync/README.md b/lightning-rapid-gossip-sync/README.md new file mode 100644 index 000000000..86d4981bf --- /dev/null +++ b/lightning-rapid-gossip-sync/README.md @@ -0,0 +1,120 @@ +# lightning-rapid-gossip-sync + +This crate exposes functionality for rapid gossip graph syncing, aimed primarily at mobile clients. +Its server counterpart is the +[rapid-gossip-sync-server](https://github.com/lightningdevkit/rapid-gossip-sync-server) repository. + +## Mechanism + +The (presumed) server sends a compressed gossip response containing gossip data. The gossip data is +formatted compactly, omitting signatures and opportunistically incremental where previous channel +updates are known. + +Essentially, the serialization structure is as follows: + +1. Fixed prefix bytes `76, 68, 75, 1` (the first three bytes are ASCII for `LDK`) + - The purpose of this prefix is to identify the serialization format, should other rapid gossip + sync formats arise in the future + - The fourth byte is the protocol version in case our format gets updated +2. Chain hash (32 bytes) +3. Latest seen timestamp (`u32`) +4. An unsigned int indicating the number of node IDs to follow +5. An array of compressed node ID pubkeys (all pubkeys are presumed to be standard + compressed 33-byte-serializations) +6. An unsigned int indicating the number of channel announcement messages to follow +7. An array of significantly stripped down customized channel announcements +8. An unsigned int indicating the number of channel update messages to follow +9. A series of default values used for non-incremental channel updates + - The values are defined as follows: + 1. `default_cltv_expiry_delta` + 2. `default_htlc_minimum_msat` + 3. `default_fee_base_msat` + 4. `default_fee_proportional_millionths` + 5. `default_htlc_maximum_msat` (`u64`, and if the default is no maximum, `u64::MAX`) + - The defaults are calculated by the server based on the frequency among non-incremental + updates within a given delta set +10. An array of customized channel updates + +You will also notice that `NodeAnnouncement` messages are omitted altogether as the node IDs are +implicitly extracted from the channel announcements and updates. + +The data is then applied to the current network graph, artificially dated to the timestamp of the +latest seen message less one week, be it an announcement or an update, from the server's +perspective. The network graph should not be pruned until the graph sync completes. + +### Custom Channel Announcement + +To achieve compactness and avoid data repetition, we're sending a significantly stripped down +version of the channel announcement message, which contains only the following data: + +1. `channel_features`: `u16` + `n`, where `n` is the number of bytes indicated by the first `u16` +2. `short_channel_id`: `CompactSize` (incremental `CompactSize` deltas starting from 0) +3. `node_id_1_index`: `CompactSize` (index of node id within the previously sent sequence) +4. `node_id_2_index`: `CompactSize` (index of node id within the previously sent sequence) + +### Custom Channel Update + +For the purpose of rapid syncing, we have deviated from the channel update format specified in +BOLT 7 significantly. Our custom channel updates are structured as follows: + +1. `short_channel_id`: `CompactSize` (incremental `CompactSize` deltas starting at 0) +2. `custom_channel_flags`: `u8` +3. `update_data` + +Specifically, our custom channel flags break down like this: + +| 128 | 64 | 32 | 16 | 8 | 4 | 2 | 1 | +|---------------------|----|----|----|---|---|------------------|-----------| +| Incremental update? | | | | | | Disable channel? | Direction | + +If the most significant bit is set to `1`, indicating an incremental update, the intermediate bit +flags assume the following meaning: + +| 64 | 32 | 16 | 8 | 4 | +|---------------------------------|---------------------------------|-----------------------------|-------------------------------------------|---------------------------------| +| `cltv_expiry_delta` has changed | `htlc_minimum_msat` has changed | `fee_base_msat` has changed | `fee_proportional_millionths` has changed | `htlc_maximum_msat` has changed | + +If the most significant bit is set to `0`, the meaning is almost identical, except instead of a +change, the flags now represent a deviation from the defaults sent at the beginning of the update +sequence. + +In both cases, `update_data` only contains the fields that are indicated by the channel flags to be +non-default or to have mutated. + +## Delta Calculation + +The way a server is meant to calculate this rapid gossip sync data is by taking the latest time +any change, be it either an announcement or an update, was seen. That timestamp is included in each +rapid sync message, so all the client needs to do is cache one variable. + +If a particular channel update had never occurred before, the full update is sent. If a channel has +had updates prior to the provided timestamp, the latest update prior to the timestamp is taken as a +reference, and the delta is calculated against it. + +Depending on whether the rapid sync message is calculated on the fly or a snapshotted version is +returned, intermediate changes between the latest update seen by the client and the latest update +broadcast on the network may be taken into account when calculating the delta. + +## Performance + +Given the primary purpose of this utility is a faster graph sync, we thought it might be helpful to +provide some examples of various delta sets. These examples were calculated as of May 19th 2022 +with a network graph comprised of 80,000 channel announcements and 160,000 directed channel updates. + +| Full sync | | +|-----------------------------|--------| +| Message Length | 4.7 MB | +| Gzipped Message Length | 2.0 MB | +| Client-side Processing Time | 1.4 s | + +| Week-old sync | | +|-----------------------------|--------| +| Message Length | 2.7 MB | +| Gzipped Message Length | 862 kB | +| Client-side Processing Time | 907 ms | + +| Day-old sync | | +|-----------------------------|---------| +| Message Length | 191 kB | +| Gzipped Message Length | 92.8 kB | +| Client-side Processing Time | 196 ms | diff --git a/lightning-rapid-gossip-sync/res/.gitignore b/lightning-rapid-gossip-sync/res/.gitignore new file mode 100644 index 000000000..d6b7ef32c --- /dev/null +++ b/lightning-rapid-gossip-sync/res/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore diff --git a/lightning-rapid-gossip-sync/src/error.rs b/lightning-rapid-gossip-sync/src/error.rs new file mode 100644 index 000000000..fee8feafc --- /dev/null +++ b/lightning-rapid-gossip-sync/src/error.rs @@ -0,0 +1,40 @@ +use core::fmt::Debug; +use std::fmt::Formatter; +use lightning::ln::msgs::{DecodeError, LightningError}; + +/// All-encompassing standard error type that processing can return +pub enum GraphSyncError { + /// Error trying to read the update data, typically due to an erroneous data length indication + /// that is greater than the actual amount of data provided + DecodeError(DecodeError), + /// Error applying the patch to the network graph, usually the result of updates that are too + /// old or missing prerequisite data to the application of updates out of order + LightningError(LightningError), +} + +impl From for GraphSyncError { + fn from(error: std::io::Error) -> Self { + Self::DecodeError(DecodeError::Io(error.kind())) + } +} + +impl From for GraphSyncError { + fn from(error: DecodeError) -> Self { + Self::DecodeError(error) + } +} + +impl From for GraphSyncError { + fn from(error: LightningError) -> Self { + Self::LightningError(error) + } +} + +impl Debug for GraphSyncError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + match self { + GraphSyncError::DecodeError(e) => f.write_fmt(format_args!("DecodeError: {:?}", e)), + GraphSyncError::LightningError(e) => f.write_fmt(format_args!("LightningError: {:?}", e)) + } + } +} diff --git a/lightning-rapid-gossip-sync/src/lib.rs b/lightning-rapid-gossip-sync/src/lib.rs new file mode 100644 index 000000000..4ab1028c7 --- /dev/null +++ b/lightning-rapid-gossip-sync/src/lib.rs @@ -0,0 +1,243 @@ +#![deny(missing_docs)] +#![deny(unsafe_code)] +#![deny(broken_intra_doc_links)] +#![deny(non_upper_case_globals)] +#![deny(non_camel_case_types)] +#![deny(non_snake_case)] +#![deny(unused_mut)] +#![deny(unused_variables)] +#![deny(unused_imports)] +//! This crate exposes functionality to rapidly sync gossip data, aimed primarily at mobile +//! devices. +//! +//! The server sends a compressed response containing differential gossip data. The gossip data is +//! formatted compactly, omitting signatures and opportunistically incremental where previous +//! channel updates are known (a mechanism that is enabled when the timestamp of the last known +//! channel update is communicated). A reference server implementation can be found +//! [here](https://github.com/lightningdevkit/rapid-gossip-sync-server). +//! +//! An example server request could look as simple as the following. Note that the first ever rapid +//! sync should use `0` for `last_sync_timestamp`: +//! +//! ```shell +//! curl -o rapid_sync.lngossip https://rapidsync.lightningdevkit.org/snapshot/ +//! ``` +//! +//! Then, call the network processing function. In this example, we process the update by reading +//! its contents from disk, which we do by calling the `sync_network_graph_with_file_path` method: +//! +//! ``` +//! use bitcoin::blockdata::constants::genesis_block; +//! use bitcoin::Network; +//! use lightning::routing::network_graph::NetworkGraph; +//! +//! let block_hash = genesis_block(Network::Bitcoin).header.block_hash(); +//! let network_graph = NetworkGraph::new(block_hash); +//! let new_last_sync_timestamp_result = lightning_rapid_gossip_sync::sync_network_graph_with_file_path(&network_graph, "./rapid_sync.lngossip"); +//! ``` +//! +//! The primary benefit this syncing mechanism provides is that given a trusted server, a +//! low-powered client can offload the validation of gossip signatures. This enables a client to +//! privately calculate routes for payments, and do so much faster and earlier than requiring a full +//! peer-to-peer gossip sync to complete. +//! +//! The reason the rapid sync server requires trust is that it could provide bogus data, though at +//! worst, all that would result in is a fake network topology, which wouldn't enable the server to +//! steal or siphon off funds. It could, however, reduce the client's privacy by forcing all +//! payments to be routed via channels the server controls. +//! +//! The way a server is meant to calculate this rapid gossip sync data is by using a `latest_seen` +//! timestamp provided by the client. It's not included in either channel announcement or update, +//! (not least due to announcements not including any timestamps at all, but only a block height) +//! but rather, it's a timestamp of when the server saw a particular message. + +// Allow and import test features for benching +#![cfg_attr(all(test, feature = "_bench_unstable"), feature(test))] +#[cfg(all(test, feature = "_bench_unstable"))] +extern crate test; + +use std::fs::File; + +use lightning::routing::network_graph; + +use crate::error::GraphSyncError; + +/// Error types that these functions can return +pub mod error; + +/// Core functionality of this crate +pub mod processing; + +/// Sync gossip data from a file +/// Returns the last sync timestamp to be used the next time rapid sync data is queried. +/// +/// `network_graph`: The network graph to apply the updates to +/// +/// `sync_path`: Path to the file where the gossip update data is located +/// +pub fn sync_network_graph_with_file_path( + network_graph: &network_graph::NetworkGraph, + sync_path: &str, +) -> Result { + let mut file = File::open(sync_path)?; + processing::update_network_graph_from_byte_stream(&network_graph, &mut file) +} + +#[cfg(test)] +mod tests { + use std::fs; + + use bitcoin::blockdata::constants::genesis_block; + use bitcoin::Network; + + use lightning::ln::msgs::DecodeError; + use lightning::routing::network_graph::NetworkGraph; + + use crate::sync_network_graph_with_file_path; + + #[test] + fn test_sync_from_file() { + struct FileSyncTest { + directory: String, + } + + impl FileSyncTest { + fn new(tmp_directory: &str, valid_response: &[u8]) -> FileSyncTest { + let test = FileSyncTest { directory: tmp_directory.to_owned() }; + + let graph_sync_test_directory = test.get_test_directory(); + fs::create_dir_all(graph_sync_test_directory).unwrap(); + + let graph_sync_test_file = test.get_test_file_path(); + fs::write(&graph_sync_test_file, valid_response).unwrap(); + + test + } + fn get_test_directory(&self) -> String { + let graph_sync_test_directory = self.directory.clone() + "/graph-sync-tests"; + graph_sync_test_directory + } + fn get_test_file_path(&self) -> String { + let graph_sync_test_directory = self.get_test_directory(); + let graph_sync_test_file = graph_sync_test_directory.to_owned() + "/test_data.lngossip"; + graph_sync_test_file + } + } + + impl Drop for FileSyncTest { + fn drop(&mut self) { + fs::remove_dir_all(self.directory.clone()).unwrap(); + } + } + + // same as incremental_only_update_fails_without_prior_same_direction_updates + let valid_response = vec![ + 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247, + 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218, + 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251, + 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125, + 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136, + 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106, + 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138, + 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175, + 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128, + 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68, + 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232, + 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0, + 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192, + ]; + + let tmp_directory = "./rapid-gossip-sync-tests-tmp"; + let sync_test = FileSyncTest::new(tmp_directory, &valid_response); + let graph_sync_test_file = sync_test.get_test_file_path(); + + let block_hash = genesis_block(Network::Bitcoin).block_hash(); + let network_graph = NetworkGraph::new(block_hash); + + assert_eq!(network_graph.read_only().channels().len(), 0); + + let sync_result = sync_network_graph_with_file_path(&network_graph, &graph_sync_test_file); + + if sync_result.is_err() { + panic!("Unexpected sync result: {:?}", sync_result) + } + + assert_eq!(network_graph.read_only().channels().len(), 2); + let after = network_graph.to_string(); + assert!( + after.contains("021607cfce19a4c5e7e6e738663dfafbbbac262e4ff76c2c9b30dbeefc35c00643") + ); + assert!( + after.contains("02247d9db0dfafea745ef8c9e161eb322f73ac3f8858d8730b6fd97254747ce76b") + ); + assert!( + after.contains("029e01f279986acc83ba235d46d80aede0b7595f410353b93a8ab540bb677f4432") + ); + assert!( + after.contains("02c913118a8895b9e29c89af6e20ed00d95a1f64e4952edbafa84d048f26804c61") + ); + assert!(after.contains("619737530008010752")); + assert!(after.contains("783241506229452801")); + } + + #[test] + fn measure_native_read_from_file() { + let block_hash = genesis_block(Network::Bitcoin).block_hash(); + let network_graph = NetworkGraph::new(block_hash); + + assert_eq!(network_graph.read_only().channels().len(), 0); + + let start = std::time::Instant::now(); + let sync_result = + sync_network_graph_with_file_path(&network_graph, "./res/full_graph.lngossip"); + if let Err(crate::error::GraphSyncError::DecodeError(DecodeError::Io(io_error))) = &sync_result { + let error_string = format!("Input file lightning-graph-sync/res/full_graph.lngossip is missing! Download it from https://bitcoin.ninja/ldk-compressed_graph-bc08df7542-2022-05-05.bin\n\n{:?}", io_error); + #[cfg(not(require_route_graph_test))] + { + println!("{}", error_string); + return; + } + panic!("{}", error_string); + } + let elapsed = start.elapsed(); + println!("initialization duration: {:?}", elapsed); + if sync_result.is_err() { + panic!("Unexpected sync result: {:?}", sync_result) + } + } +} + +#[cfg(all(test, feature = "_bench_unstable"))] +pub mod bench { + use test::Bencher; + + use bitcoin::blockdata::constants::genesis_block; + use bitcoin::Network; + + use lightning::ln::msgs::DecodeError; + use lightning::routing::network_graph::NetworkGraph; + + use crate::sync_network_graph_with_file_path; + + #[bench] + fn bench_reading_full_graph_from_file(b: &mut Bencher) { + let block_hash = genesis_block(Network::Bitcoin).block_hash(); + b.iter(|| { + let network_graph = NetworkGraph::new(block_hash); + let sync_result = sync_network_graph_with_file_path( + &network_graph, + "./res/full_graph.lngossip", + ); + if let Err(crate::error::GraphSyncError::DecodeError(DecodeError::Io(io_error))) = &sync_result { + let error_string = format!("Input file lightning-graph-sync/res/full_graph.lngossip is missing! Download it from https://bitcoin.ninja/ldk-compressed_graph-bc08df7542-2022-05-05.bin\n\n{:?}", io_error); + #[cfg(not(require_route_graph_test))] + { + println!("{}", error_string); + return; + } + panic!("{}", error_string); + } + assert!(sync_result.is_ok()) + }); + } +} diff --git a/lightning-rapid-gossip-sync/src/processing.rs b/lightning-rapid-gossip-sync/src/processing.rs new file mode 100644 index 000000000..ceb8b8229 --- /dev/null +++ b/lightning-rapid-gossip-sync/src/processing.rs @@ -0,0 +1,499 @@ +use std::cmp::max; +use std::io; +use std::io::Read; + +use bitcoin::BlockHash; +use bitcoin::secp256k1::PublicKey; + +use lightning::ln::msgs::{ + DecodeError, ErrorAction, LightningError, OptionalField, UnsignedChannelUpdate, +}; +use lightning::routing::network_graph; +use lightning::util::ser::{BigSize, Readable}; + +use crate::error::GraphSyncError; + +/// The purpose of this prefix is to identify the serialization format, should other rapid gossip +/// sync formats arise in the future. +/// +/// The fourth byte is the protocol version in case our format gets updated. +const GOSSIP_PREFIX: [u8; 4] = [76, 68, 75, 1]; + +/// Maximum vector allocation capacity for distinct node IDs. This constraint is necessary to +/// avoid malicious updates being able to trigger excessive memory allocation. +const MAX_INITIAL_NODE_ID_VECTOR_CAPACITY: u32 = 50_000; + +/// Update network graph from binary data. +/// Returns the last sync timestamp to be used the next time rapid sync data is queried. +/// +/// `network_graph`: network graph to be updated +/// +/// `update_data`: `&[u8]` binary stream that comprises the update data +pub fn update_network_graph( + network_graph: &network_graph::NetworkGraph, + update_data: &[u8], +) -> Result { + let mut read_cursor = io::Cursor::new(update_data); + update_network_graph_from_byte_stream(&network_graph, &mut read_cursor) +} + +pub(crate) fn update_network_graph_from_byte_stream( + network_graph: &network_graph::NetworkGraph, + mut read_cursor: &mut R, +) -> Result { + let mut prefix = [0u8; 4]; + read_cursor.read_exact(&mut prefix)?; + + match prefix { + GOSSIP_PREFIX => {}, + _ => { + return Err(DecodeError::UnknownVersion.into()); + } + }; + + let chain_hash: BlockHash = Readable::read(read_cursor)?; + let latest_seen_timestamp: u32 = Readable::read(read_cursor)?; + // backdate the applied timestamp by a week + let backdated_timestamp = latest_seen_timestamp.saturating_sub(24 * 3600 * 7); + + let node_id_count: u32 = Readable::read(read_cursor)?; + let mut node_ids: Vec = Vec::with_capacity(std::cmp::min( + node_id_count, + MAX_INITIAL_NODE_ID_VECTOR_CAPACITY, + ) as usize); + for _ in 0..node_id_count { + let current_node_id = Readable::read(read_cursor)?; + node_ids.push(current_node_id); + } + + let mut previous_scid: u64 = 0; + let announcement_count: u32 = Readable::read(read_cursor)?; + for _ in 0..announcement_count { + let features = Readable::read(read_cursor)?; + + // handle SCID + let scid_delta: BigSize = Readable::read(read_cursor)?; + let short_channel_id = previous_scid + .checked_add(scid_delta.0) + .ok_or(DecodeError::InvalidValue)?; + previous_scid = short_channel_id; + + let node_id_1_index: BigSize = Readable::read(read_cursor)?; + let node_id_2_index: BigSize = Readable::read(read_cursor)?; + if max(node_id_1_index.0, node_id_2_index.0) >= node_id_count as u64 { + return Err(DecodeError::InvalidValue.into()); + }; + let node_id_1 = node_ids[node_id_1_index.0 as usize]; + let node_id_2 = node_ids[node_id_2_index.0 as usize]; + + let announcement_result = network_graph.add_channel_from_partial_announcement( + short_channel_id, + backdated_timestamp as u64, + features, + node_id_1, + node_id_2, + ); + if let Err(lightning_error) = announcement_result { + if let ErrorAction::IgnoreDuplicateGossip = lightning_error.action { + // everything is fine, just a duplicate channel announcement + } else { + return Err(lightning_error.into()); + } + } + } + + previous_scid = 0; // updates start at a new scid + + let update_count: u32 = Readable::read(read_cursor)?; + if update_count == 0 { + return Ok(latest_seen_timestamp); + } + + // obtain default values for non-incremental updates + let default_cltv_expiry_delta: u16 = Readable::read(&mut read_cursor)?; + let default_htlc_minimum_msat: u64 = Readable::read(&mut read_cursor)?; + let default_fee_base_msat: u32 = Readable::read(&mut read_cursor)?; + let default_fee_proportional_millionths: u32 = Readable::read(&mut read_cursor)?; + let tentative_default_htlc_maximum_msat: u64 = Readable::read(&mut read_cursor)?; + let default_htlc_maximum_msat = if tentative_default_htlc_maximum_msat == u64::max_value() { + OptionalField::Absent + } else { + OptionalField::Present(tentative_default_htlc_maximum_msat) + }; + + for _ in 0..update_count { + let scid_delta: BigSize = Readable::read(read_cursor)?; + let short_channel_id = previous_scid + .checked_add(scid_delta.0) + .ok_or(DecodeError::InvalidValue)?; + previous_scid = short_channel_id; + + let channel_flags: u8 = Readable::read(read_cursor)?; + + // flags are always sent in full, and hence always need updating + let standard_channel_flags = channel_flags & 0b_0000_0011; + + let mut synthetic_update = if channel_flags & 0b_1000_0000 == 0 { + // full update, field flags will indicate deviations from the default + UnsignedChannelUpdate { + chain_hash, + short_channel_id, + timestamp: backdated_timestamp, + flags: standard_channel_flags, + cltv_expiry_delta: default_cltv_expiry_delta, + htlc_minimum_msat: default_htlc_minimum_msat, + htlc_maximum_msat: default_htlc_maximum_msat.clone(), + fee_base_msat: default_fee_base_msat, + fee_proportional_millionths: default_fee_proportional_millionths, + excess_data: vec![], + } + } else { + // incremental update, field flags will indicate mutated values + let read_only_network_graph = network_graph.read_only(); + let channel = read_only_network_graph + .channels() + .get(&short_channel_id) + .ok_or(LightningError { + err: "Couldn't find channel for update".to_owned(), + action: ErrorAction::IgnoreError, + })?; + + let directional_info = channel + .get_directional_info(channel_flags) + .ok_or(LightningError { + err: "Couldn't find previous directional data for update".to_owned(), + action: ErrorAction::IgnoreError, + })?; + + let htlc_maximum_msat = + if let Some(htlc_maximum_msat) = directional_info.htlc_maximum_msat { + OptionalField::Present(htlc_maximum_msat) + } else { + OptionalField::Absent + }; + + UnsignedChannelUpdate { + chain_hash, + short_channel_id, + timestamp: backdated_timestamp, + flags: standard_channel_flags, + cltv_expiry_delta: directional_info.cltv_expiry_delta, + htlc_minimum_msat: directional_info.htlc_minimum_msat, + htlc_maximum_msat, + fee_base_msat: directional_info.fees.base_msat, + fee_proportional_millionths: directional_info.fees.proportional_millionths, + excess_data: vec![], + } + }; + + if channel_flags & 0b_0100_0000 > 0 { + let cltv_expiry_delta: u16 = Readable::read(read_cursor)?; + synthetic_update.cltv_expiry_delta = cltv_expiry_delta; + } + + if channel_flags & 0b_0010_0000 > 0 { + let htlc_minimum_msat: u64 = Readable::read(read_cursor)?; + synthetic_update.htlc_minimum_msat = htlc_minimum_msat; + } + + if channel_flags & 0b_0001_0000 > 0 { + let fee_base_msat: u32 = Readable::read(read_cursor)?; + synthetic_update.fee_base_msat = fee_base_msat; + } + + if channel_flags & 0b_0000_1000 > 0 { + let fee_proportional_millionths: u32 = Readable::read(read_cursor)?; + synthetic_update.fee_proportional_millionths = fee_proportional_millionths; + } + + if channel_flags & 0b_0000_0100 > 0 { + let tentative_htlc_maximum_msat: u64 = Readable::read(read_cursor)?; + synthetic_update.htlc_maximum_msat = if tentative_htlc_maximum_msat == u64::max_value() + { + OptionalField::Absent + } else { + OptionalField::Present(tentative_htlc_maximum_msat) + }; + } + + network_graph.update_channel_unsigned(&synthetic_update)?; + } + + Ok(latest_seen_timestamp) +} + +#[cfg(test)] +mod tests { + use bitcoin::blockdata::constants::genesis_block; + use bitcoin::Network; + + use lightning::ln::msgs::DecodeError; + use lightning::routing::network_graph::NetworkGraph; + + use crate::error::GraphSyncError; + use crate::processing::update_network_graph; + + #[test] + fn network_graph_fails_to_update_from_clipped_input() { + let block_hash = genesis_block(Network::Bitcoin).block_hash(); + let network_graph = NetworkGraph::new(block_hash); + + let example_input = vec![ + 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247, + 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218, + 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251, + 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125, + 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136, + 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106, + 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138, + 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175, + 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128, + 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68, + 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 0, 100, + 0, 0, 2, 224, 0, 0, 0, 0, 29, 129, 25, 192, 255, 8, 153, 192, 0, 2, 27, 0, 0, 36, 0, 0, + 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 2, 68, 226, 0, 6, 11, 0, 1, 24, 0, + 0, 3, 232, 0, 0, 0, + ]; + let update_result = update_network_graph(&network_graph, &example_input[..]); + assert!(update_result.is_err()); + if let Err(GraphSyncError::DecodeError(DecodeError::ShortRead)) = update_result { + // this is the expected error type + } else { + panic!("Unexpected update result: {:?}", update_result) + } + } + + #[test] + fn incremental_only_update_fails_without_prior_announcements() { + let incremental_update_input = vec![ + 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247, + 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 229, 183, 167, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 136, 0, 0, 0, 221, 255, 2, + 68, 226, 0, 6, 11, 0, 1, 128, + ]; + + let block_hash = genesis_block(Network::Bitcoin).block_hash(); + let network_graph = NetworkGraph::new(block_hash); + + assert_eq!(network_graph.read_only().channels().len(), 0); + + let update_result = update_network_graph(&network_graph, &incremental_update_input[..]); + assert!(update_result.is_err()); + if let Err(GraphSyncError::LightningError(lightning_error)) = update_result { + assert_eq!(lightning_error.err, "Couldn't find channel for update"); + } else { + panic!("Unexpected update result: {:?}", update_result) + } + } + + #[test] + fn incremental_only_update_fails_without_prior_updates() { + let announced_update_input = vec![ + 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247, + 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 229, 183, 167, + 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251, + 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125, + 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136, + 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106, + 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138, + 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175, + 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128, + 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68, + 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 136, 0, 0, 0, 221, 255, + 2, 68, 226, 0, 6, 11, 0, 1, 128, + ]; + + let block_hash = genesis_block(Network::Bitcoin).block_hash(); + let network_graph = NetworkGraph::new(block_hash); + + assert_eq!(network_graph.read_only().channels().len(), 0); + + let update_result = update_network_graph(&network_graph, &announced_update_input[..]); + assert!(update_result.is_err()); + if let Err(GraphSyncError::LightningError(lightning_error)) = update_result { + assert_eq!( + lightning_error.err, + "Couldn't find previous directional data for update" + ); + } else { + panic!("Unexpected update result: {:?}", update_result) + } + } + + #[test] + fn incremental_only_update_fails_without_prior_same_direction_updates() { + let initialization_input = vec![ + 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247, + 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218, + 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251, + 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125, + 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136, + 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106, + 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138, + 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175, + 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128, + 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68, + 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232, + 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0, + 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192, + ]; + + let block_hash = genesis_block(Network::Bitcoin).block_hash(); + let network_graph = NetworkGraph::new(block_hash); + + assert_eq!(network_graph.read_only().channels().len(), 0); + + let initialization_result = update_network_graph(&network_graph, &initialization_input[..]); + if initialization_result.is_err() { + panic!( + "Unexpected initialization result: {:?}", + initialization_result + ) + } + + assert_eq!(network_graph.read_only().channels().len(), 2); + let initialized = network_graph.to_string(); + assert!(initialized + .contains("021607cfce19a4c5e7e6e738663dfafbbbac262e4ff76c2c9b30dbeefc35c00643")); + assert!(initialized + .contains("02247d9db0dfafea745ef8c9e161eb322f73ac3f8858d8730b6fd97254747ce76b")); + assert!(initialized + .contains("029e01f279986acc83ba235d46d80aede0b7595f410353b93a8ab540bb677f4432")); + assert!(initialized + .contains("02c913118a8895b9e29c89af6e20ed00d95a1f64e4952edbafa84d048f26804c61")); + assert!(initialized.contains("619737530008010752")); + assert!(initialized.contains("783241506229452801")); + + let opposite_direction_incremental_update_input = vec![ + 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247, + 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 229, 183, 167, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 136, 0, 0, 0, 221, 255, 2, + 68, 226, 0, 6, 11, 0, 1, 128, + ]; + let update_result = update_network_graph( + &network_graph, + &opposite_direction_incremental_update_input[..], + ); + assert!(update_result.is_err()); + if let Err(GraphSyncError::LightningError(lightning_error)) = update_result { + assert_eq!( + lightning_error.err, + "Couldn't find previous directional data for update" + ); + } else { + panic!("Unexpected update result: {:?}", update_result) + } + } + + #[test] + fn incremental_update_succeeds_with_prior_announcements_and_full_updates() { + let initialization_input = vec![ + 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247, + 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218, + 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251, + 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125, + 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136, + 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106, + 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138, + 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175, + 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128, + 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68, + 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 4, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232, + 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 56, 0, 0, + 0, 0, 0, 0, 0, 1, 0, 0, 0, 100, 0, 0, 2, 224, 0, 25, 0, 0, 0, 1, 0, 0, 0, 125, 255, 2, + 68, 226, 0, 6, 11, 0, 1, 4, 0, 0, 0, 0, 29, 129, 25, 192, 0, 5, 0, 0, 0, 0, 29, 129, + 25, 192, + ]; + + let block_hash = genesis_block(Network::Bitcoin).block_hash(); + let network_graph = NetworkGraph::new(block_hash); + + assert_eq!(network_graph.read_only().channels().len(), 0); + + let initialization_result = update_network_graph(&network_graph, &initialization_input[..]); + assert!(initialization_result.is_ok()); + + let single_direction_incremental_update_input = vec![ + 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247, + 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 229, 183, 167, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 136, 0, 0, 0, 221, 255, 2, + 68, 226, 0, 6, 11, 0, 1, 128, + ]; + let update_result = update_network_graph( + &network_graph, + &single_direction_incremental_update_input[..], + ); + if update_result.is_err() { + panic!("Unexpected update result: {:?}", update_result) + } + + assert_eq!(network_graph.read_only().channels().len(), 2); + let after = network_graph.to_string(); + assert!( + after.contains("021607cfce19a4c5e7e6e738663dfafbbbac262e4ff76c2c9b30dbeefc35c00643") + ); + assert!( + after.contains("02247d9db0dfafea745ef8c9e161eb322f73ac3f8858d8730b6fd97254747ce76b") + ); + assert!( + after.contains("029e01f279986acc83ba235d46d80aede0b7595f410353b93a8ab540bb677f4432") + ); + assert!( + after.contains("02c913118a8895b9e29c89af6e20ed00d95a1f64e4952edbafa84d048f26804c61") + ); + assert!(after.contains("619737530008010752")); + assert!(after.contains("783241506229452801")); + } + + #[test] + fn full_update_succeeds() { + let valid_input = vec![ + 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247, + 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218, + 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251, + 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125, + 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136, + 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106, + 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138, + 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175, + 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128, + 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68, + 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 4, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232, + 0, 0, 0, 1, 0, 0, 0, 0, 29, 129, 25, 192, 255, 8, 153, 192, 0, 2, 27, 0, 0, 60, 0, 0, + 0, 0, 0, 0, 0, 1, 0, 0, 0, 100, 0, 0, 2, 224, 0, 0, 0, 0, 58, 85, 116, 216, 0, 29, 0, + 0, 0, 1, 0, 0, 0, 125, 0, 0, 0, 0, 58, 85, 116, 216, 255, 2, 68, 226, 0, 6, 11, 0, 1, + 0, 0, 1, + ]; + + let block_hash = genesis_block(Network::Bitcoin).block_hash(); + let network_graph = NetworkGraph::new(block_hash); + + assert_eq!(network_graph.read_only().channels().len(), 0); + + let update_result = update_network_graph(&network_graph, &valid_input[..]); + if update_result.is_err() { + panic!("Unexpected update result: {:?}", update_result) + } + + assert_eq!(network_graph.read_only().channels().len(), 2); + let after = network_graph.to_string(); + assert!( + after.contains("021607cfce19a4c5e7e6e738663dfafbbbac262e4ff76c2c9b30dbeefc35c00643") + ); + assert!( + after.contains("02247d9db0dfafea745ef8c9e161eb322f73ac3f8858d8730b6fd97254747ce76b") + ); + assert!( + after.contains("029e01f279986acc83ba235d46d80aede0b7595f410353b93a8ab540bb677f4432") + ); + assert!( + after.contains("02c913118a8895b9e29c89af6e20ed00d95a1f64e4952edbafa84d048f26804c61") + ); + assert!(after.contains("619737530008010752")); + assert!(after.contains("783241506229452801")); + } +} diff --git a/lightning/src/lib.rs b/lightning/src/lib.rs index abdc10c57..63f4fcdc8 100644 --- a/lightning/src/lib.rs +++ b/lightning/src/lib.rs @@ -158,7 +158,7 @@ mod sync { #[cfg(test)] pub use debug_sync::*; #[cfg(not(test))] - pub use ::std::sync::{Arc, Mutex, Condvar, MutexGuard, RwLock, RwLockReadGuard}; + pub use ::std::sync::{Arc, Mutex, Condvar, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; #[cfg(not(test))] pub use crate::util::fairrwlock::FairRwLock; } diff --git a/lightning/src/ln/msgs.rs b/lightning/src/ln/msgs.rs index 281a2a8e9..5aa48c32b 100644 --- a/lightning/src/ln/msgs.rs +++ b/lightning/src/ln/msgs.rs @@ -630,7 +630,10 @@ pub struct UnsignedChannelUpdate { pub fee_base_msat: u32, /// The amount to fee multiplier, in micro-satoshi pub fee_proportional_millionths: u32, - pub(crate) excess_data: Vec, + /// Excess data which was signed as a part of the message which we do not (yet) understand how + /// to decode. This is stored to ensure forward-compatibility as new fields are added to the + /// lightning gossip + pub excess_data: Vec, } /// A channel_update message to be sent or received from a peer #[derive(Clone, Debug, PartialEq)] diff --git a/lightning/src/routing/network_graph.rs b/lightning/src/routing/network_graph.rs index c65cec2a8..26719cc27 100644 --- a/lightning/src/routing/network_graph.rs +++ b/lightning/src/routing/network_graph.rs @@ -67,7 +67,7 @@ impl NodeId { pub fn from_pubkey(pubkey: &PublicKey) -> Self { NodeId(pubkey.serialize()) } - + /// Get the public key slice from this NodeId pub fn as_slice(&self) -> &[u8] { &self.0 @@ -712,6 +712,16 @@ impl ChannelInfo { }; Some((DirectedChannelInfo::new(self, direction), target)) } + + /// Returns a [`ChannelUpdateInfo`] based on the direction implied by the channel_flag. + pub fn get_directional_info(&self, channel_flags: u8) -> Option<&ChannelUpdateInfo> { + let direction = channel_flags & 1u8; + if direction == 0 { + self.one_to_two.as_ref() + } else { + self.two_to_one.as_ref() + } + } } impl fmt::Display for ChannelInfo { @@ -1155,6 +1165,83 @@ impl NetworkGraph { self.update_channel_from_unsigned_announcement_intern(msg, None, chain_access) } + /// Update channel from partial announcement data received via rapid gossip sync + /// + /// `timestamp: u64`: Timestamp emulating the backdated original announcement receipt (by the + /// rapid gossip sync server) + /// + /// All other parameters as used in [`msgs::UnsignedChannelAnnouncement`] fields. + pub fn add_channel_from_partial_announcement(&self, short_channel_id: u64, timestamp: u64, features: ChannelFeatures, node_id_1: PublicKey, node_id_2: PublicKey) -> Result<(), LightningError> { + if node_id_1 == node_id_2 { + return Err(LightningError{err: "Channel announcement node had a channel with itself".to_owned(), action: ErrorAction::IgnoreError}); + }; + + let node_1 = NodeId::from_pubkey(&node_id_1); + let node_2 = NodeId::from_pubkey(&node_id_2); + let channel_info = ChannelInfo { + features, + node_one: node_1.clone(), + one_to_two: None, + node_two: node_2.clone(), + two_to_one: None, + capacity_sats: None, + announcement_message: None, + announcement_received_time: timestamp, + }; + + self.add_channel_between_nodes(short_channel_id, channel_info, None) + } + + fn add_channel_between_nodes(&self, short_channel_id: u64, channel_info: ChannelInfo, utxo_value: Option) -> Result<(), LightningError> { + let mut channels = self.channels.write().unwrap(); + let mut nodes = self.nodes.write().unwrap(); + + let node_id_a = channel_info.node_one.clone(); + let node_id_b = channel_info.node_two.clone(); + + match channels.entry(short_channel_id) { + BtreeEntry::Occupied(mut entry) => { + //TODO: because asking the blockchain if short_channel_id is valid is only optional + //in the blockchain API, we need to handle it smartly here, though it's unclear + //exactly how... + if utxo_value.is_some() { + // Either our UTXO provider is busted, there was a reorg, or the UTXO provider + // only sometimes returns results. In any case remove the previous entry. Note + // that the spec expects us to "blacklist" the node_ids involved, but we can't + // do that because + // a) we don't *require* a UTXO provider that always returns results. + // b) we don't track UTXOs of channels we know about and remove them if they + // get reorg'd out. + // c) it's unclear how to do so without exposing ourselves to massive DoS risk. + Self::remove_channel_in_nodes(&mut nodes, &entry.get(), short_channel_id); + *entry.get_mut() = channel_info; + } else { + return Err(LightningError{err: "Already have knowledge of channel".to_owned(), action: ErrorAction::IgnoreDuplicateGossip}); + } + }, + BtreeEntry::Vacant(entry) => { + entry.insert(channel_info); + } + }; + + for current_node_id in [node_id_a, node_id_b].iter() { + match nodes.entry(current_node_id.clone()) { + BtreeEntry::Occupied(node_entry) => { + node_entry.into_mut().channels.push(short_channel_id); + }, + BtreeEntry::Vacant(node_entry) => { + node_entry.insert(NodeInfo { + channels: vec!(short_channel_id), + lowest_inbound_channel_fees: None, + announcement_info: None, + }); + } + }; + }; + + Ok(()) + } + fn update_channel_from_unsigned_announcement_intern( &self, msg: &msgs::UnsignedChannelAnnouncement, full_msg: Option<&msgs::ChannelAnnouncement>, chain_access: &Option ) -> Result<(), LightningError> @@ -1203,65 +1290,18 @@ impl NetworkGraph { } let chan_info = ChannelInfo { - features: msg.features.clone(), - node_one: NodeId::from_pubkey(&msg.node_id_1), - one_to_two: None, - node_two: NodeId::from_pubkey(&msg.node_id_2), - two_to_one: None, - capacity_sats: utxo_value, - announcement_message: if msg.excess_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY - { full_msg.cloned() } else { None }, - announcement_received_time, - }; - - let mut channels = self.channels.write().unwrap(); - let mut nodes = self.nodes.write().unwrap(); - match channels.entry(msg.short_channel_id) { - BtreeEntry::Occupied(mut entry) => { - //TODO: because asking the blockchain if short_channel_id is valid is only optional - //in the blockchain API, we need to handle it smartly here, though it's unclear - //exactly how... - if utxo_value.is_some() { - // Either our UTXO provider is busted, there was a reorg, or the UTXO provider - // only sometimes returns results. In any case remove the previous entry. Note - // that the spec expects us to "blacklist" the node_ids involved, but we can't - // do that because - // a) we don't *require* a UTXO provider that always returns results. - // b) we don't track UTXOs of channels we know about and remove them if they - // get reorg'd out. - // c) it's unclear how to do so without exposing ourselves to massive DoS risk. - Self::remove_channel_in_nodes(&mut nodes, &entry.get(), msg.short_channel_id); - *entry.get_mut() = chan_info; - } else { - return Err(LightningError{err: "Already have knowledge of channel".to_owned(), action: ErrorAction::IgnoreDuplicateGossip}); - } - }, - BtreeEntry::Vacant(entry) => { - entry.insert(chan_info); - } + features: msg.features.clone(), + node_one: NodeId::from_pubkey(&msg.node_id_1), + one_to_two: None, + node_two: NodeId::from_pubkey(&msg.node_id_2), + two_to_one: None, + capacity_sats: utxo_value, + announcement_message: if msg.excess_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY + { full_msg.cloned() } else { None }, + announcement_received_time, }; - macro_rules! add_channel_to_node { - ( $node_id: expr ) => { - match nodes.entry($node_id) { - BtreeEntry::Occupied(node_entry) => { - node_entry.into_mut().channels.push(msg.short_channel_id); - }, - BtreeEntry::Vacant(node_entry) => { - node_entry.insert(NodeInfo { - channels: vec!(msg.short_channel_id), - lowest_inbound_channel_fees: None, - announcement_info: None, - }); - } - } - }; - } - - add_channel_to_node!(NodeId::from_pubkey(&msg.node_id_1)); - add_channel_to_node!(NodeId::from_pubkey(&msg.node_id_2)); - - Ok(()) + self.add_channel_between_nodes(msg.short_channel_id, chan_info, utxo_value) } /// Close a channel if a corresponding HTLC fail was sent. @@ -1581,7 +1621,7 @@ mod tests { use ln::features::{ChannelFeatures, InitFeatures, NodeFeatures}; use routing::network_graph::{NetGraphMsgHandler, NetworkGraph, NetworkUpdate, MAX_EXCESS_BYTES_FOR_RELAY}; use ln::msgs::{Init, OptionalField, RoutingMessageHandler, UnsignedNodeAnnouncement, NodeAnnouncement, - UnsignedChannelAnnouncement, ChannelAnnouncement, UnsignedChannelUpdate, ChannelUpdate, + UnsignedChannelAnnouncement, ChannelAnnouncement, UnsignedChannelUpdate, ChannelUpdate, ReplyChannelRange, QueryChannelRange, QueryShortChannelIds, MAX_VALUE_MSAT}; use util::test_utils; use util::logger::Logger; diff --git a/lightning/src/util/ser.rs b/lightning/src/util/ser.rs index 69fd14640..428adbc5e 100644 --- a/lightning/src/util/ser.rs +++ b/lightning/src/util/ser.rs @@ -301,7 +301,7 @@ impl Readable for U48 { /// encoded in several different ways, which we must check for at deserialization-time. Thus, if /// you're looking for an example of a variable-length integer to use for your own project, move /// along, this is a rather poor design. -pub(crate) struct BigSize(pub u64); +pub struct BigSize(pub u64); impl Writeable for BigSize { #[inline] fn write(&self, writer: &mut W) -> Result<(), io::Error> { -- 2.39.5