Introduce graph sync crate for fast-forwarding through gossip data downloaded from...
authorArik Sosman <git@arik.io>
Wed, 3 Nov 2021 17:50:08 +0000 (10:50 -0700)
committerArik Sosman <git@arik.io>
Wed, 25 May 2022 08:21:33 +0000 (01:21 -0700)
19 files changed:
.editorconfig
.github/workflows/build.yml
Cargo.toml
fuzz/Cargo.toml
fuzz/src/bin/gen_target.sh
fuzz/src/bin/process_network_graph_target.rs [new file with mode: 0644]
fuzz/src/lib.rs
fuzz/src/process_network_graph.rs [new file with mode: 0644]
fuzz/targets.h
lightning-rapid-gossip-sync/Cargo.toml [new file with mode: 0644]
lightning-rapid-gossip-sync/README.md [new file with mode: 0644]
lightning-rapid-gossip-sync/res/.gitignore [new file with mode: 0644]
lightning-rapid-gossip-sync/src/error.rs [new file with mode: 0644]
lightning-rapid-gossip-sync/src/lib.rs [new file with mode: 0644]
lightning-rapid-gossip-sync/src/processing.rs [new file with mode: 0644]
lightning/src/lib.rs
lightning/src/ln/msgs.rs
lightning/src/routing/network_graph.rs
lightning/src/util/ser.rs

index e5657670c10706dfbc274144d4e9289ba265ef62..dab24fe3dd37f2aedbbe45c70f826eecda517bcb 100644 (file)
@@ -3,3 +3,4 @@
 [*]
 indent_style = tab
 insert_final_newline = true
+trim_trailing_whitespace = true
index cff1713440035a0b76d89a9e6c88904c97c0b87e..59a11b4e4a11efc7788b51fe04ad664ca15da7f9 100644 (file)
@@ -141,6 +141,7 @@ jobs:
         run: |
           cargo test --verbose --color always  -p lightning
           cargo test --verbose --color always  -p lightning-invoice
+          cargo test --verbose --color always  -p lightning-rapid-gossip-sync
           cargo build --verbose  --color always -p lightning-persister
           cargo build --verbose  --color always -p lightning-background-processor
       - name: Test C Bindings Modifications on Rust ${{ matrix.toolchain }}
@@ -221,11 +222,24 @@ jobs:
       - name: Fetch routing graph snapshot
         if: steps.cache-graph.outputs.cache-hit != 'true'
         run: |
-          wget -O lightning/net_graph-2021-05-31.bin https://bitcoin.ninja/ldk-net_graph-v0.0.15-2021-05-31.bin
-          if [ "$(sha256sum lightning/net_graph-2021-05-31.bin | awk '{ print $1 }')" != "05a5361278f68ee2afd086cc04a1f927a63924be451f3221d380533acfacc303" ]; then
+          curl --verbose -L -o lightning/net_graph-2021-05-31.bin https://bitcoin.ninja/ldk-net_graph-v0.0.15-2021-05-31.bin
+          echo "Sha sum: $(sha256sum lightning/net_graph-2021-05-31.bin | awk '{ print $1 }')"
+          if [ "$(sha256sum lightning/net_graph-2021-05-31.bin | awk '{ print $1 }')" != "${EXPECTED_ROUTING_GRAPH_SNAPSHOT_SHASUM}" ]; then
             echo "Bad hash"
             exit 1
           fi
+        env:
+          EXPECTED_ROUTING_GRAPH_SNAPSHOT_SHASUM: 05a5361278f68ee2afd086cc04a1f927a63924be451f3221d380533acfacc303
+      - name: Fetch rapid graph sync reference input
+        run: |
+          curl --verbose -L -o lightning-rapid-gossip-sync/res/full_graph.lngossip https://bitcoin.ninja/ldk-compressed_graph-bc08df7542-2022-05-05.bin
+          echo "Sha sum: $(sha256sum lightning-rapid-gossip-sync/res/full_graph.lngossip | awk '{ print $1 }')"
+          if [ "$(sha256sum lightning-rapid-gossip-sync/res/full_graph.lngossip | awk '{ print $1 }')" != "${EXPECTED_RAPID_GOSSIP_SHASUM}" ]; then
+            echo "Bad hash"
+            exit 1
+          fi
+        env:
+          EXPECTED_RAPID_GOSSIP_SHASUM: 9637b91cea9d64320cf48fc0787c70fe69fc062f90d3512e207044110cadfd7b
       - name: Test with Network Graph on Rust ${{ matrix.toolchain }}
         run: |
           cd lightning
index 6e03fc1ac4cfc4bab1feed65368651bc5135cb9e..f263dc8eccb16414c5e440b70c02bd6e9e4ab2df 100644 (file)
@@ -7,6 +7,7 @@ members = [
     "lightning-net-tokio",
     "lightning-persister",
     "lightning-background-processor",
+    "lightning-rapid-gossip-sync"
 ]
 
 exclude = [
index 88e577617b54faeab07ba0e61a213d29f726b1ba..66dabcfe4be3242eac2240a287476745c8f03e2a 100644 (file)
@@ -19,6 +19,7 @@ stdin_fuzz = []
 [dependencies]
 afl = { version = "0.4", optional = true }
 lightning = { path = "../lightning", features = ["regex"] }
+lightning-rapid-gossip-sync = { path = "../lightning-rapid-gossip-sync" }
 bitcoin = { version = "0.28.1", features = ["secp-lowmemory"] }
 hex = "0.3"
 honggfuzz = { version = "0.5", optional = true }
index eb07df6342f86dc6d99904953f376f414de5289f..72fefe51609103c7ed7c8cbf74ed2b33f6b9552f 100755 (executable)
@@ -10,6 +10,7 @@ GEN_TEST chanmon_deser
 GEN_TEST chanmon_consistency
 GEN_TEST full_stack
 GEN_TEST peer_crypt
+GEN_TEST process_network_graph
 GEN_TEST router
 GEN_TEST zbase32
 
diff --git a/fuzz/src/bin/process_network_graph_target.rs b/fuzz/src/bin/process_network_graph_target.rs
new file mode 100644 (file)
index 0000000..380efdf
--- /dev/null
@@ -0,0 +1,113 @@
+// This file is Copyright its original authors, visible in version control
+// history.
+//
+// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE
+// or http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
+// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your option.
+// You may not use this file except in accordance with one or both of these
+// licenses.
+
+// This file is auto-generated by gen_target.sh based on target_template.txt
+// To modify it, modify target_template.txt and run gen_target.sh instead.
+
+#![cfg_attr(feature = "libfuzzer_fuzz", no_main)]
+
+#[cfg(not(fuzzing))]
+compile_error!("Fuzz targets need cfg=fuzzing");
+
+extern crate lightning_fuzz;
+use lightning_fuzz::process_network_graph::*;
+
+#[cfg(feature = "afl")]
+#[macro_use] extern crate afl;
+#[cfg(feature = "afl")]
+fn main() {
+       fuzz!(|data| {
+               process_network_graph_run(data.as_ptr(), data.len());
+       });
+}
+
+#[cfg(feature = "honggfuzz")]
+#[macro_use] extern crate honggfuzz;
+#[cfg(feature = "honggfuzz")]
+fn main() {
+       loop {
+               fuzz!(|data| {
+                       process_network_graph_run(data.as_ptr(), data.len());
+               });
+       }
+}
+
+#[cfg(feature = "libfuzzer_fuzz")]
+#[macro_use] extern crate libfuzzer_sys;
+#[cfg(feature = "libfuzzer_fuzz")]
+fuzz_target!(|data: &[u8]| {
+       process_network_graph_run(data.as_ptr(), data.len());
+});
+
+#[cfg(feature = "stdin_fuzz")]
+fn main() {
+       use std::io::Read;
+
+       let mut data = Vec::with_capacity(8192);
+       std::io::stdin().read_to_end(&mut data).unwrap();
+       process_network_graph_run(data.as_ptr(), data.len());
+}
+
+#[test]
+fn run_test_cases() {
+       use std::fs;
+       use std::io::Read;
+       use lightning_fuzz::utils::test_logger::StringBuffer;
+
+       use std::sync::{atomic, Arc};
+       {
+               let data: Vec<u8> = vec![0];
+               process_network_graph_run(data.as_ptr(), data.len());
+       }
+       let mut threads = Vec::new();
+       let threads_running = Arc::new(atomic::AtomicUsize::new(0));
+       if let Ok(tests) = fs::read_dir("test_cases/process_network_graph") {
+               for test in tests {
+                       let mut data: Vec<u8> = Vec::new();
+                       let path = test.unwrap().path();
+                       fs::File::open(&path).unwrap().read_to_end(&mut data).unwrap();
+                       threads_running.fetch_add(1, atomic::Ordering::AcqRel);
+
+                       let thread_count_ref = Arc::clone(&threads_running);
+                       let main_thread_ref = std::thread::current();
+                       threads.push((path.file_name().unwrap().to_str().unwrap().to_string(),
+                               std::thread::spawn(move || {
+                                       let string_logger = StringBuffer::new();
+
+                                       let panic_logger = string_logger.clone();
+                                       let res = if ::std::panic::catch_unwind(move || {
+                                               process_network_graph_test(&data, panic_logger);
+                                       }).is_err() {
+                                               Some(string_logger.into_string())
+                                       } else { None };
+                                       thread_count_ref.fetch_sub(1, atomic::Ordering::AcqRel);
+                                       main_thread_ref.unpark();
+                                       res
+                               })
+                       ));
+                       while threads_running.load(atomic::Ordering::Acquire) > 32 {
+                               std::thread::park();
+                       }
+               }
+       }
+       let mut failed_outputs = Vec::new();
+       for (test, thread) in threads.drain(..) {
+               if let Some(output) = thread.join().unwrap() {
+                       println!("\nOutput of {}:\n{}\n", test, output);
+                       failed_outputs.push(test);
+               }
+       }
+       if !failed_outputs.is_empty() {
+               println!("Test cases which failed: ");
+               for case in failed_outputs {
+                       println!("{}", case);
+               }
+               panic!();
+       }
+}
index a0cc42b8189f9ae8837fb8ea64106c902185781d..5e158aee36ffe050e6f53176ddd1fc7887935871 100644 (file)
@@ -9,6 +9,7 @@
 
 extern crate bitcoin;
 extern crate lightning;
+extern crate lightning_rapid_gossip_sync;
 extern crate hex;
 
 pub mod utils;
@@ -17,6 +18,7 @@ pub mod chanmon_deser;
 pub mod chanmon_consistency;
 pub mod full_stack;
 pub mod peer_crypt;
+pub mod process_network_graph;
 pub mod router;
 pub mod zbase32;
 
diff --git a/fuzz/src/process_network_graph.rs b/fuzz/src/process_network_graph.rs
new file mode 100644 (file)
index 0000000..3f30335
--- /dev/null
@@ -0,0 +1,20 @@
+// Import that needs to be added manually
+use utils::test_logger;
+
+/// Actual fuzz test, method signature and name are fixed
+fn do_test(data: &[u8]) {
+       let block_hash = bitcoin::BlockHash::default();
+       let network_graph = lightning::routing::network_graph::NetworkGraph::new(block_hash);
+       lightning_rapid_gossip_sync::processing::update_network_graph(&network_graph, data);
+}
+
+/// Method that needs to be added manually, {name}_test
+pub fn process_network_graph_test<Out: test_logger::Output>(data: &[u8], _out: Out) {
+       do_test(data);
+}
+
+/// Method that needs to be added manually, {name}_run
+#[no_mangle]
+pub extern "C" fn process_network_graph_run(data: *const u8, datalen: usize) {
+       do_test(unsafe { std::slice::from_raw_parts(data, datalen) });
+}
index 5d45e3d02388ea9a1659bf0cbb6e88e379d2f940..798fb66479519cf9b00a84ce3d5873787c449b58 100644 (file)
@@ -3,6 +3,7 @@ void chanmon_deser_run(const unsigned char* data, size_t data_len);
 void chanmon_consistency_run(const unsigned char* data, size_t data_len);
 void full_stack_run(const unsigned char* data, size_t data_len);
 void peer_crypt_run(const unsigned char* data, size_t data_len);
+void process_network_graph_run(const unsigned char* data, size_t data_len);
 void router_run(const unsigned char* data, size_t data_len);
 void zbase32_run(const unsigned char* data, size_t data_len);
 void msg_accept_channel_run(const unsigned char* data, size_t data_len);
diff --git a/lightning-rapid-gossip-sync/Cargo.toml b/lightning-rapid-gossip-sync/Cargo.toml
new file mode 100644 (file)
index 0000000..a4bc04f
--- /dev/null
@@ -0,0 +1,20 @@
+[package]
+name = "lightning-rapid-gossip-sync"
+version = "0.0.104"
+authors = ["Arik Sosman <git@arik.io>"]
+license = "MIT OR Apache-2.0"
+repository = "https://github.com/lightningdevkit/rust-lightning"
+edition = "2018"
+description = """
+Utility to process gossip routing data from Rapid Gossip Sync Server.
+"""
+
+[features]
+_bench_unstable = []
+
+[dependencies]
+lightning = { version = "0.0.106", path = "../lightning" }
+bitcoin = { version = "0.28.1", default-features = false }
+
+[dev-dependencies]
+lightning = { version = "0.0.106", path = "../lightning", features = ["_test_utils"] }
diff --git a/lightning-rapid-gossip-sync/README.md b/lightning-rapid-gossip-sync/README.md
new file mode 100644 (file)
index 0000000..86d4981
--- /dev/null
@@ -0,0 +1,120 @@
+# lightning-rapid-gossip-sync
+
+This crate exposes functionality for rapid gossip graph syncing, aimed primarily at mobile clients.
+Its server counterpart is the
+[rapid-gossip-sync-server](https://github.com/lightningdevkit/rapid-gossip-sync-server) repository.
+
+## Mechanism
+
+The (presumed) server sends a compressed gossip response containing gossip data. The gossip data is
+formatted compactly, omitting signatures and opportunistically incremental where previous channel
+updates are known.
+
+Essentially, the serialization structure is as follows:
+
+1. Fixed prefix bytes `76, 68, 75, 1` (the first three bytes are ASCII for `LDK`)
+    - The purpose of this prefix is to identify the serialization format, should other rapid gossip
+      sync formats arise in the future
+    - The fourth byte is the protocol version in case our format gets updated
+2. Chain hash (32 bytes)
+3. Latest seen timestamp (`u32`)
+4. An unsigned int indicating the number of node IDs to follow
+5. An array of compressed node ID pubkeys (all pubkeys are presumed to be standard
+   compressed 33-byte-serializations)
+6. An unsigned int indicating the number of channel announcement messages to follow
+7. An array of significantly stripped down customized channel announcements
+8. An unsigned int indicating the number of channel update messages to follow
+9. A series of default values used for non-incremental channel updates
+    - The values are defined as follows:
+        1. `default_cltv_expiry_delta`
+        2. `default_htlc_minimum_msat`
+        3. `default_fee_base_msat`
+        4. `default_fee_proportional_millionths`
+        5. `default_htlc_maximum_msat` (`u64`, and if the default is no maximum, `u64::MAX`)
+    - The defaults are calculated by the server based on the frequency among non-incremental
+      updates within a given delta set
+10. An array of customized channel updates
+
+You will also notice that `NodeAnnouncement` messages are omitted altogether as the node IDs are
+implicitly extracted from the channel announcements and updates.
+
+The data is then applied to the current network graph, artificially dated to the timestamp of the
+latest seen message less one week, be it an announcement or an update, from the server's
+perspective. The network graph should not be pruned until the graph sync completes.
+
+### Custom Channel Announcement
+
+To achieve compactness and avoid data repetition, we're sending a significantly stripped down
+version of the channel announcement message, which contains only the following data:
+
+1. `channel_features`: `u16` + `n`, where `n` is the number of bytes indicated by the first `u16`
+2. `short_channel_id`: `CompactSize` (incremental `CompactSize` deltas starting from 0)
+3. `node_id_1_index`: `CompactSize` (index of node id within the previously sent sequence)
+4. `node_id_2_index`: `CompactSize` (index of node id within the previously sent sequence)
+
+### Custom Channel Update
+
+For the purpose of rapid syncing, we have deviated from the channel update format specified in
+BOLT 7 significantly. Our custom channel updates are structured as follows:
+
+1. `short_channel_id`: `CompactSize` (incremental `CompactSize` deltas starting at 0)
+2. `custom_channel_flags`: `u8`
+3. `update_data`
+
+Specifically, our custom channel flags break down like this:
+
+| 128                 | 64 | 32 | 16 | 8 | 4 | 2                | 1         |
+|---------------------|----|----|----|---|---|------------------|-----------|
+| Incremental update? |    |    |    |   |   | Disable channel? | Direction |
+
+If the most significant bit is set to `1`, indicating an incremental update, the intermediate bit
+flags assume the following meaning:
+
+| 64                              | 32                              | 16                          | 8                                         | 4                               |
+|---------------------------------|---------------------------------|-----------------------------|-------------------------------------------|---------------------------------|
+| `cltv_expiry_delta` has changed | `htlc_minimum_msat` has changed | `fee_base_msat` has changed | `fee_proportional_millionths` has changed | `htlc_maximum_msat` has changed |
+
+If the most significant bit is set to `0`, the meaning is almost identical, except instead of a
+change, the flags now represent a deviation from the defaults sent at the beginning of the update
+sequence.
+
+In both cases, `update_data` only contains the fields that are indicated by the channel flags to be
+non-default or to have mutated.
+
+## Delta Calculation
+
+The way a server is meant to calculate this rapid gossip sync data is by taking the latest time
+any change, be it either an announcement or an update, was seen. That timestamp is included in each
+rapid sync message, so all the client needs to do is cache one variable.
+
+If a particular channel update had never occurred before, the full update is sent. If a channel has
+had updates prior to the provided timestamp, the latest update prior to the timestamp is taken as a
+reference, and the delta is calculated against it.
+
+Depending on whether the rapid sync message is calculated on the fly or a snapshotted version is
+returned, intermediate changes between the latest update seen by the client and the latest update
+broadcast on the network may be taken into account when calculating the delta.
+
+## Performance
+
+Given the primary purpose of this utility is a faster graph sync, we thought it might be helpful to
+provide some examples of various delta sets. These examples were calculated as of May 19th  2022
+with a network graph comprised of 80,000 channel announcements and 160,000 directed channel updates.
+
+| Full sync                   |        |
+|-----------------------------|--------|
+| Message Length              | 4.7 MB |
+| Gzipped Message Length      | 2.0 MB |
+| Client-side Processing Time | 1.4 s  |
+
+| Week-old sync               |        |
+|-----------------------------|--------|
+| Message Length              | 2.7 MB |
+| Gzipped Message Length      | 862 kB |
+| Client-side Processing Time | 907 ms |
+
+| Day-old sync                |         |
+|-----------------------------|---------|
+| Message Length              | 191 kB  |
+| Gzipped Message Length      | 92.8 kB |
+| Client-side Processing Time | 196 ms  |
diff --git a/lightning-rapid-gossip-sync/res/.gitignore b/lightning-rapid-gossip-sync/res/.gitignore
new file mode 100644 (file)
index 0000000..d6b7ef3
--- /dev/null
@@ -0,0 +1,2 @@
+*
+!.gitignore
diff --git a/lightning-rapid-gossip-sync/src/error.rs b/lightning-rapid-gossip-sync/src/error.rs
new file mode 100644 (file)
index 0000000..fee8fea
--- /dev/null
@@ -0,0 +1,40 @@
+use core::fmt::Debug;
+use std::fmt::Formatter;
+use lightning::ln::msgs::{DecodeError, LightningError};
+
+/// All-encompassing standard error type that processing can return
+pub enum GraphSyncError {
+       /// Error trying to read the update data, typically due to an erroneous data length indication
+       /// that is greater than the actual amount of data provided
+       DecodeError(DecodeError),
+       /// Error applying the patch to the network graph, usually the result of updates that are too
+       /// old or missing prerequisite data to the application of updates out of order
+       LightningError(LightningError),
+}
+
+impl From<std::io::Error> for GraphSyncError {
+       fn from(error: std::io::Error) -> Self {
+               Self::DecodeError(DecodeError::Io(error.kind()))
+       }
+}
+
+impl From<DecodeError> for GraphSyncError {
+       fn from(error: DecodeError) -> Self {
+               Self::DecodeError(error)
+       }
+}
+
+impl From<LightningError> for GraphSyncError {
+       fn from(error: LightningError) -> Self {
+               Self::LightningError(error)
+       }
+}
+
+impl Debug for GraphSyncError {
+       fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+               match self {
+                       GraphSyncError::DecodeError(e) => f.write_fmt(format_args!("DecodeError: {:?}", e)),
+                       GraphSyncError::LightningError(e) => f.write_fmt(format_args!("LightningError: {:?}", e))
+               }
+       }
+}
diff --git a/lightning-rapid-gossip-sync/src/lib.rs b/lightning-rapid-gossip-sync/src/lib.rs
new file mode 100644 (file)
index 0000000..4ab1028
--- /dev/null
@@ -0,0 +1,243 @@
+#![deny(missing_docs)]
+#![deny(unsafe_code)]
+#![deny(broken_intra_doc_links)]
+#![deny(non_upper_case_globals)]
+#![deny(non_camel_case_types)]
+#![deny(non_snake_case)]
+#![deny(unused_mut)]
+#![deny(unused_variables)]
+#![deny(unused_imports)]
+//! This crate exposes functionality to rapidly sync gossip data, aimed primarily at mobile
+//! devices.
+//!
+//! The server sends a compressed response containing differential gossip data. The gossip data is
+//! formatted compactly, omitting signatures and opportunistically incremental where previous
+//! channel updates are known (a mechanism that is enabled when the timestamp of the last known
+//! channel update is communicated). A reference server implementation can be found
+//! [here](https://github.com/lightningdevkit/rapid-gossip-sync-server).
+//!
+//! An example server request could look as simple as the following. Note that the first ever rapid
+//! sync should use `0` for `last_sync_timestamp`:
+//!
+//! ```shell
+//! curl -o rapid_sync.lngossip https://rapidsync.lightningdevkit.org/snapshot/<last_sync_timestamp>
+//! ```
+//!
+//! Then, call the network processing function. In this example, we process the update by reading
+//! its contents from disk, which we do by calling the `sync_network_graph_with_file_path` method:
+//!
+//! ```
+//! use bitcoin::blockdata::constants::genesis_block;
+//! use bitcoin::Network;
+//! use lightning::routing::network_graph::NetworkGraph;
+//!
+//! let block_hash = genesis_block(Network::Bitcoin).header.block_hash();
+//! let network_graph = NetworkGraph::new(block_hash);
+//! let new_last_sync_timestamp_result = lightning_rapid_gossip_sync::sync_network_graph_with_file_path(&network_graph, "./rapid_sync.lngossip");
+//! ```
+//!
+//! The primary benefit this syncing mechanism provides is that given a trusted server, a
+//! low-powered client can offload the validation of gossip signatures. This enables a client to
+//! privately calculate routes for payments, and do so much faster and earlier than requiring a full
+//! peer-to-peer gossip sync to complete.
+//!
+//! The reason the rapid sync server requires trust is that it could provide bogus data, though at
+//! worst, all that would result in is a fake network topology, which wouldn't enable the server to
+//! steal or siphon off funds. It could, however, reduce the client's privacy by forcing all
+//! payments to be routed via channels the server controls.
+//!
+//! The way a server is meant to calculate this rapid gossip sync data is by using a `latest_seen`
+//! timestamp provided by the client. It's not included in either channel announcement or update,
+//! (not least due to announcements not including any timestamps at all, but only a block height)
+//! but rather, it's a timestamp of when the server saw a particular message.
+
+// Allow and import test features for benching
+#![cfg_attr(all(test, feature = "_bench_unstable"), feature(test))]
+#[cfg(all(test, feature = "_bench_unstable"))]
+extern crate test;
+
+use std::fs::File;
+
+use lightning::routing::network_graph;
+
+use crate::error::GraphSyncError;
+
+/// Error types that these functions can return
+pub mod error;
+
+/// Core functionality of this crate
+pub mod processing;
+
+/// Sync gossip data from a file
+/// Returns the last sync timestamp to be used the next time rapid sync data is queried.
+///
+/// `network_graph`: The network graph to apply the updates to
+///
+/// `sync_path`: Path to the file where the gossip update data is located
+///
+pub fn sync_network_graph_with_file_path(
+       network_graph: &network_graph::NetworkGraph,
+       sync_path: &str,
+) -> Result<u32, GraphSyncError> {
+       let mut file = File::open(sync_path)?;
+       processing::update_network_graph_from_byte_stream(&network_graph, &mut file)
+}
+
+#[cfg(test)]
+mod tests {
+       use std::fs;
+
+       use bitcoin::blockdata::constants::genesis_block;
+       use bitcoin::Network;
+
+       use lightning::ln::msgs::DecodeError;
+       use lightning::routing::network_graph::NetworkGraph;
+
+       use crate::sync_network_graph_with_file_path;
+
+       #[test]
+       fn test_sync_from_file() {
+               struct FileSyncTest {
+                       directory: String,
+               }
+
+               impl FileSyncTest {
+                       fn new(tmp_directory: &str, valid_response: &[u8]) -> FileSyncTest {
+                               let test = FileSyncTest { directory: tmp_directory.to_owned() };
+
+                               let graph_sync_test_directory = test.get_test_directory();
+                               fs::create_dir_all(graph_sync_test_directory).unwrap();
+
+                               let graph_sync_test_file = test.get_test_file_path();
+                               fs::write(&graph_sync_test_file, valid_response).unwrap();
+
+                               test
+                       }
+                       fn get_test_directory(&self) -> String {
+                               let graph_sync_test_directory = self.directory.clone() + "/graph-sync-tests";
+                               graph_sync_test_directory
+                       }
+                       fn get_test_file_path(&self) -> String {
+                               let graph_sync_test_directory = self.get_test_directory();
+                               let graph_sync_test_file = graph_sync_test_directory.to_owned() + "/test_data.lngossip";
+                               graph_sync_test_file
+                       }
+               }
+
+               impl Drop for FileSyncTest {
+                       fn drop(&mut self) {
+                               fs::remove_dir_all(self.directory.clone()).unwrap();
+                       }
+               }
+
+               // same as incremental_only_update_fails_without_prior_same_direction_updates
+               let valid_response = vec![
+                       76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
+                       79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
+                       0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
+                       187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
+                       157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
+                       88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
+                       204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
+                       181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
+                       110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
+                       76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
+                       226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
+                       0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
+                       0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
+               ];
+
+               let tmp_directory = "./rapid-gossip-sync-tests-tmp";
+               let sync_test = FileSyncTest::new(tmp_directory, &valid_response);
+               let graph_sync_test_file = sync_test.get_test_file_path();
+
+               let block_hash = genesis_block(Network::Bitcoin).block_hash();
+               let network_graph = NetworkGraph::new(block_hash);
+
+               assert_eq!(network_graph.read_only().channels().len(), 0);
+
+               let sync_result = sync_network_graph_with_file_path(&network_graph, &graph_sync_test_file);
+
+               if sync_result.is_err() {
+                       panic!("Unexpected sync result: {:?}", sync_result)
+               }
+
+               assert_eq!(network_graph.read_only().channels().len(), 2);
+               let after = network_graph.to_string();
+               assert!(
+                       after.contains("021607cfce19a4c5e7e6e738663dfafbbbac262e4ff76c2c9b30dbeefc35c00643")
+               );
+               assert!(
+                       after.contains("02247d9db0dfafea745ef8c9e161eb322f73ac3f8858d8730b6fd97254747ce76b")
+               );
+               assert!(
+                       after.contains("029e01f279986acc83ba235d46d80aede0b7595f410353b93a8ab540bb677f4432")
+               );
+               assert!(
+                       after.contains("02c913118a8895b9e29c89af6e20ed00d95a1f64e4952edbafa84d048f26804c61")
+               );
+               assert!(after.contains("619737530008010752"));
+               assert!(after.contains("783241506229452801"));
+       }
+
+       #[test]
+       fn measure_native_read_from_file() {
+               let block_hash = genesis_block(Network::Bitcoin).block_hash();
+               let network_graph = NetworkGraph::new(block_hash);
+
+               assert_eq!(network_graph.read_only().channels().len(), 0);
+
+               let start = std::time::Instant::now();
+               let sync_result =
+                       sync_network_graph_with_file_path(&network_graph, "./res/full_graph.lngossip");
+               if let Err(crate::error::GraphSyncError::DecodeError(DecodeError::Io(io_error))) = &sync_result {
+                       let error_string = format!("Input file lightning-graph-sync/res/full_graph.lngossip is missing! Download it from https://bitcoin.ninja/ldk-compressed_graph-bc08df7542-2022-05-05.bin\n\n{:?}", io_error);
+                       #[cfg(not(require_route_graph_test))]
+                       {
+                               println!("{}", error_string);
+                               return;
+                       }
+                       panic!("{}", error_string);
+               }
+               let elapsed = start.elapsed();
+               println!("initialization duration: {:?}", elapsed);
+               if sync_result.is_err() {
+                       panic!("Unexpected sync result: {:?}", sync_result)
+               }
+       }
+}
+
+#[cfg(all(test, feature = "_bench_unstable"))]
+pub mod bench {
+       use test::Bencher;
+
+       use bitcoin::blockdata::constants::genesis_block;
+       use bitcoin::Network;
+
+       use lightning::ln::msgs::DecodeError;
+       use lightning::routing::network_graph::NetworkGraph;
+
+       use crate::sync_network_graph_with_file_path;
+
+       #[bench]
+       fn bench_reading_full_graph_from_file(b: &mut Bencher) {
+               let block_hash = genesis_block(Network::Bitcoin).block_hash();
+               b.iter(|| {
+                       let network_graph = NetworkGraph::new(block_hash);
+                       let sync_result = sync_network_graph_with_file_path(
+                               &network_graph,
+                               "./res/full_graph.lngossip",
+                       );
+                       if let Err(crate::error::GraphSyncError::DecodeError(DecodeError::Io(io_error))) = &sync_result {
+                               let error_string = format!("Input file lightning-graph-sync/res/full_graph.lngossip is missing! Download it from https://bitcoin.ninja/ldk-compressed_graph-bc08df7542-2022-05-05.bin\n\n{:?}", io_error);
+                               #[cfg(not(require_route_graph_test))]
+                               {
+                                       println!("{}", error_string);
+                                       return;
+                               }
+                               panic!("{}", error_string);
+                       }
+                       assert!(sync_result.is_ok())
+               });
+       }
+}
diff --git a/lightning-rapid-gossip-sync/src/processing.rs b/lightning-rapid-gossip-sync/src/processing.rs
new file mode 100644 (file)
index 0000000..ceb8b82
--- /dev/null
@@ -0,0 +1,499 @@
+use std::cmp::max;
+use std::io;
+use std::io::Read;
+
+use bitcoin::BlockHash;
+use bitcoin::secp256k1::PublicKey;
+
+use lightning::ln::msgs::{
+       DecodeError, ErrorAction, LightningError, OptionalField, UnsignedChannelUpdate,
+};
+use lightning::routing::network_graph;
+use lightning::util::ser::{BigSize, Readable};
+
+use crate::error::GraphSyncError;
+
+/// The purpose of this prefix is to identify the serialization format, should other rapid gossip
+/// sync formats arise in the future.
+///
+/// The fourth byte is the protocol version in case our format gets updated.
+const GOSSIP_PREFIX: [u8; 4] = [76, 68, 75, 1];
+
+/// Maximum vector allocation capacity for distinct node IDs. This constraint is necessary to
+/// avoid malicious updates being able to trigger excessive memory allocation.
+const MAX_INITIAL_NODE_ID_VECTOR_CAPACITY: u32 = 50_000;
+
+/// Update network graph from binary data.
+/// Returns the last sync timestamp to be used the next time rapid sync data is queried.
+///
+/// `network_graph`: network graph to be updated
+///
+/// `update_data`: `&[u8]` binary stream that comprises the update data
+pub fn update_network_graph(
+       network_graph: &network_graph::NetworkGraph,
+       update_data: &[u8],
+) -> Result<u32, GraphSyncError> {
+       let mut read_cursor = io::Cursor::new(update_data);
+       update_network_graph_from_byte_stream(&network_graph, &mut read_cursor)
+}
+
+pub(crate) fn update_network_graph_from_byte_stream<R: Read>(
+       network_graph: &network_graph::NetworkGraph,
+       mut read_cursor: &mut R,
+) -> Result<u32, GraphSyncError> {
+       let mut prefix = [0u8; 4];
+       read_cursor.read_exact(&mut prefix)?;
+
+       match prefix {
+               GOSSIP_PREFIX => {},
+               _ => {
+                       return Err(DecodeError::UnknownVersion.into());
+               }
+       };
+
+       let chain_hash: BlockHash = Readable::read(read_cursor)?;
+       let latest_seen_timestamp: u32 = Readable::read(read_cursor)?;
+       // backdate the applied timestamp by a week
+       let backdated_timestamp = latest_seen_timestamp.saturating_sub(24 * 3600 * 7);
+
+       let node_id_count: u32 = Readable::read(read_cursor)?;
+       let mut node_ids: Vec<PublicKey> = Vec::with_capacity(std::cmp::min(
+               node_id_count,
+               MAX_INITIAL_NODE_ID_VECTOR_CAPACITY,
+       ) as usize);
+       for _ in 0..node_id_count {
+               let current_node_id = Readable::read(read_cursor)?;
+               node_ids.push(current_node_id);
+       }
+
+       let mut previous_scid: u64 = 0;
+       let announcement_count: u32 = Readable::read(read_cursor)?;
+       for _ in 0..announcement_count {
+               let features = Readable::read(read_cursor)?;
+
+               // handle SCID
+               let scid_delta: BigSize = Readable::read(read_cursor)?;
+               let short_channel_id = previous_scid
+                       .checked_add(scid_delta.0)
+                       .ok_or(DecodeError::InvalidValue)?;
+               previous_scid = short_channel_id;
+
+               let node_id_1_index: BigSize = Readable::read(read_cursor)?;
+               let node_id_2_index: BigSize = Readable::read(read_cursor)?;
+               if max(node_id_1_index.0, node_id_2_index.0) >= node_id_count as u64 {
+                       return Err(DecodeError::InvalidValue.into());
+               };
+               let node_id_1 = node_ids[node_id_1_index.0 as usize];
+               let node_id_2 = node_ids[node_id_2_index.0 as usize];
+
+               let announcement_result = network_graph.add_channel_from_partial_announcement(
+                       short_channel_id,
+                       backdated_timestamp as u64,
+                       features,
+                       node_id_1,
+                       node_id_2,
+               );
+               if let Err(lightning_error) = announcement_result {
+                       if let ErrorAction::IgnoreDuplicateGossip = lightning_error.action {
+                               // everything is fine, just a duplicate channel announcement
+                       } else {
+                               return Err(lightning_error.into());
+                       }
+               }
+       }
+
+       previous_scid = 0; // updates start at a new scid
+
+       let update_count: u32 = Readable::read(read_cursor)?;
+       if update_count == 0 {
+               return Ok(latest_seen_timestamp);
+       }
+
+       // obtain default values for non-incremental updates
+       let default_cltv_expiry_delta: u16 = Readable::read(&mut read_cursor)?;
+       let default_htlc_minimum_msat: u64 = Readable::read(&mut read_cursor)?;
+       let default_fee_base_msat: u32 = Readable::read(&mut read_cursor)?;
+       let default_fee_proportional_millionths: u32 = Readable::read(&mut read_cursor)?;
+       let tentative_default_htlc_maximum_msat: u64 = Readable::read(&mut read_cursor)?;
+       let default_htlc_maximum_msat = if tentative_default_htlc_maximum_msat == u64::max_value() {
+               OptionalField::Absent
+       } else {
+               OptionalField::Present(tentative_default_htlc_maximum_msat)
+       };
+
+       for _ in 0..update_count {
+               let scid_delta: BigSize = Readable::read(read_cursor)?;
+               let short_channel_id = previous_scid
+                       .checked_add(scid_delta.0)
+                       .ok_or(DecodeError::InvalidValue)?;
+               previous_scid = short_channel_id;
+
+               let channel_flags: u8 = Readable::read(read_cursor)?;
+
+               // flags are always sent in full, and hence always need updating
+               let standard_channel_flags = channel_flags & 0b_0000_0011;
+
+               let mut synthetic_update = if channel_flags & 0b_1000_0000 == 0 {
+                       // full update, field flags will indicate deviations from the default
+                       UnsignedChannelUpdate {
+                               chain_hash,
+                               short_channel_id,
+                               timestamp: backdated_timestamp,
+                               flags: standard_channel_flags,
+                               cltv_expiry_delta: default_cltv_expiry_delta,
+                               htlc_minimum_msat: default_htlc_minimum_msat,
+                               htlc_maximum_msat: default_htlc_maximum_msat.clone(),
+                               fee_base_msat: default_fee_base_msat,
+                               fee_proportional_millionths: default_fee_proportional_millionths,
+                               excess_data: vec![],
+                       }
+               } else {
+                       // incremental update, field flags will indicate mutated values
+                       let read_only_network_graph = network_graph.read_only();
+                       let channel = read_only_network_graph
+                               .channels()
+                               .get(&short_channel_id)
+                               .ok_or(LightningError {
+                                       err: "Couldn't find channel for update".to_owned(),
+                                       action: ErrorAction::IgnoreError,
+                               })?;
+
+                       let directional_info = channel
+                               .get_directional_info(channel_flags)
+                               .ok_or(LightningError {
+                                       err: "Couldn't find previous directional data for update".to_owned(),
+                                       action: ErrorAction::IgnoreError,
+                               })?;
+
+                       let htlc_maximum_msat =
+                               if let Some(htlc_maximum_msat) = directional_info.htlc_maximum_msat {
+                                       OptionalField::Present(htlc_maximum_msat)
+                               } else {
+                                       OptionalField::Absent
+                               };
+
+                       UnsignedChannelUpdate {
+                               chain_hash,
+                               short_channel_id,
+                               timestamp: backdated_timestamp,
+                               flags: standard_channel_flags,
+                               cltv_expiry_delta: directional_info.cltv_expiry_delta,
+                               htlc_minimum_msat: directional_info.htlc_minimum_msat,
+                               htlc_maximum_msat,
+                               fee_base_msat: directional_info.fees.base_msat,
+                               fee_proportional_millionths: directional_info.fees.proportional_millionths,
+                               excess_data: vec![],
+                       }
+               };
+
+               if channel_flags & 0b_0100_0000 > 0 {
+                       let cltv_expiry_delta: u16 = Readable::read(read_cursor)?;
+                       synthetic_update.cltv_expiry_delta = cltv_expiry_delta;
+               }
+
+               if channel_flags & 0b_0010_0000 > 0 {
+                       let htlc_minimum_msat: u64 = Readable::read(read_cursor)?;
+                       synthetic_update.htlc_minimum_msat = htlc_minimum_msat;
+               }
+
+               if channel_flags & 0b_0001_0000 > 0 {
+                       let fee_base_msat: u32 = Readable::read(read_cursor)?;
+                       synthetic_update.fee_base_msat = fee_base_msat;
+               }
+
+               if channel_flags & 0b_0000_1000 > 0 {
+                       let fee_proportional_millionths: u32 = Readable::read(read_cursor)?;
+                       synthetic_update.fee_proportional_millionths = fee_proportional_millionths;
+               }
+
+               if channel_flags & 0b_0000_0100 > 0 {
+                       let tentative_htlc_maximum_msat: u64 = Readable::read(read_cursor)?;
+                       synthetic_update.htlc_maximum_msat = if tentative_htlc_maximum_msat == u64::max_value()
+                       {
+                               OptionalField::Absent
+                       } else {
+                               OptionalField::Present(tentative_htlc_maximum_msat)
+                       };
+               }
+
+               network_graph.update_channel_unsigned(&synthetic_update)?;
+       }
+
+       Ok(latest_seen_timestamp)
+}
+
+#[cfg(test)]
+mod tests {
+       use bitcoin::blockdata::constants::genesis_block;
+       use bitcoin::Network;
+
+       use lightning::ln::msgs::DecodeError;
+       use lightning::routing::network_graph::NetworkGraph;
+
+       use crate::error::GraphSyncError;
+       use crate::processing::update_network_graph;
+
+       #[test]
+       fn network_graph_fails_to_update_from_clipped_input() {
+               let block_hash = genesis_block(Network::Bitcoin).block_hash();
+               let network_graph = NetworkGraph::new(block_hash);
+
+               let example_input = vec![
+                       76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
+                       79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
+                       0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
+                       187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
+                       157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
+                       88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
+                       204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
+                       181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
+                       110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
+                       76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
+                       226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 0, 100,
+                       0, 0, 2, 224, 0, 0, 0, 0, 29, 129, 25, 192, 255, 8, 153, 192, 0, 2, 27, 0, 0, 36, 0, 0,
+                       0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 2, 68, 226, 0, 6, 11, 0, 1, 24, 0,
+                       0, 3, 232, 0, 0, 0,
+               ];
+               let update_result = update_network_graph(&network_graph, &example_input[..]);
+               assert!(update_result.is_err());
+               if let Err(GraphSyncError::DecodeError(DecodeError::ShortRead)) = update_result {
+                       // this is the expected error type
+               } else {
+                       panic!("Unexpected update result: {:?}", update_result)
+               }
+       }
+
+       #[test]
+       fn incremental_only_update_fails_without_prior_announcements() {
+               let incremental_update_input = vec![
+                       76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
+                       79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 229, 183, 167,
+                       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                       0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 136, 0, 0, 0, 221, 255, 2,
+                       68, 226, 0, 6, 11, 0, 1, 128,
+               ];
+
+               let block_hash = genesis_block(Network::Bitcoin).block_hash();
+               let network_graph = NetworkGraph::new(block_hash);
+
+               assert_eq!(network_graph.read_only().channels().len(), 0);
+
+               let update_result = update_network_graph(&network_graph, &incremental_update_input[..]);
+               assert!(update_result.is_err());
+               if let Err(GraphSyncError::LightningError(lightning_error)) = update_result {
+                       assert_eq!(lightning_error.err, "Couldn't find channel for update");
+               } else {
+                       panic!("Unexpected update result: {:?}", update_result)
+               }
+       }
+
+       #[test]
+       fn incremental_only_update_fails_without_prior_updates() {
+               let announced_update_input = vec![
+                       76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
+                       79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 229, 183, 167,
+                       0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
+                       187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
+                       157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
+                       88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
+                       204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
+                       181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
+                       110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
+                       76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
+                       226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 136, 0, 0, 0, 221, 255,
+                       2, 68, 226, 0, 6, 11, 0, 1, 128,
+               ];
+
+               let block_hash = genesis_block(Network::Bitcoin).block_hash();
+               let network_graph = NetworkGraph::new(block_hash);
+
+               assert_eq!(network_graph.read_only().channels().len(), 0);
+
+               let update_result = update_network_graph(&network_graph, &announced_update_input[..]);
+               assert!(update_result.is_err());
+               if let Err(GraphSyncError::LightningError(lightning_error)) = update_result {
+                       assert_eq!(
+                               lightning_error.err,
+                               "Couldn't find previous directional data for update"
+                       );
+               } else {
+                       panic!("Unexpected update result: {:?}", update_result)
+               }
+       }
+
+       #[test]
+       fn incremental_only_update_fails_without_prior_same_direction_updates() {
+               let initialization_input = vec![
+                       76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
+                       79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
+                       0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
+                       187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
+                       157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
+                       88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
+                       204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
+                       181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
+                       110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
+                       76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
+                       226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
+                       0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
+                       0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
+               ];
+
+               let block_hash = genesis_block(Network::Bitcoin).block_hash();
+               let network_graph = NetworkGraph::new(block_hash);
+
+               assert_eq!(network_graph.read_only().channels().len(), 0);
+
+               let initialization_result = update_network_graph(&network_graph, &initialization_input[..]);
+               if initialization_result.is_err() {
+                       panic!(
+                               "Unexpected initialization result: {:?}",
+                               initialization_result
+                       )
+               }
+
+               assert_eq!(network_graph.read_only().channels().len(), 2);
+               let initialized = network_graph.to_string();
+               assert!(initialized
+                       .contains("021607cfce19a4c5e7e6e738663dfafbbbac262e4ff76c2c9b30dbeefc35c00643"));
+               assert!(initialized
+                       .contains("02247d9db0dfafea745ef8c9e161eb322f73ac3f8858d8730b6fd97254747ce76b"));
+               assert!(initialized
+                       .contains("029e01f279986acc83ba235d46d80aede0b7595f410353b93a8ab540bb677f4432"));
+               assert!(initialized
+                       .contains("02c913118a8895b9e29c89af6e20ed00d95a1f64e4952edbafa84d048f26804c61"));
+               assert!(initialized.contains("619737530008010752"));
+               assert!(initialized.contains("783241506229452801"));
+
+               let opposite_direction_incremental_update_input = vec![
+                       76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
+                       79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 229, 183, 167,
+                       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                       0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 136, 0, 0, 0, 221, 255, 2,
+                       68, 226, 0, 6, 11, 0, 1, 128,
+               ];
+               let update_result = update_network_graph(
+                       &network_graph,
+                       &opposite_direction_incremental_update_input[..],
+               );
+               assert!(update_result.is_err());
+               if let Err(GraphSyncError::LightningError(lightning_error)) = update_result {
+                       assert_eq!(
+                               lightning_error.err,
+                               "Couldn't find previous directional data for update"
+                       );
+               } else {
+                       panic!("Unexpected update result: {:?}", update_result)
+               }
+       }
+
+       #[test]
+       fn incremental_update_succeeds_with_prior_announcements_and_full_updates() {
+               let initialization_input = vec![
+                       76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
+                       79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
+                       0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
+                       187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
+                       157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
+                       88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
+                       204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
+                       181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
+                       110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
+                       76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
+                       226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 4, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
+                       0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 56, 0, 0,
+                       0, 0, 0, 0, 0, 1, 0, 0, 0, 100, 0, 0, 2, 224, 0, 25, 0, 0, 0, 1, 0, 0, 0, 125, 255, 2,
+                       68, 226, 0, 6, 11, 0, 1, 4, 0, 0, 0, 0, 29, 129, 25, 192, 0, 5, 0, 0, 0, 0, 29, 129,
+                       25, 192,
+               ];
+
+               let block_hash = genesis_block(Network::Bitcoin).block_hash();
+               let network_graph = NetworkGraph::new(block_hash);
+
+               assert_eq!(network_graph.read_only().channels().len(), 0);
+
+               let initialization_result = update_network_graph(&network_graph, &initialization_input[..]);
+               assert!(initialization_result.is_ok());
+
+               let single_direction_incremental_update_input = vec![
+                       76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
+                       79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 229, 183, 167,
+                       0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+                       0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 136, 0, 0, 0, 221, 255, 2,
+                       68, 226, 0, 6, 11, 0, 1, 128,
+               ];
+               let update_result = update_network_graph(
+                       &network_graph,
+                       &single_direction_incremental_update_input[..],
+               );
+               if update_result.is_err() {
+                       panic!("Unexpected update result: {:?}", update_result)
+               }
+
+               assert_eq!(network_graph.read_only().channels().len(), 2);
+               let after = network_graph.to_string();
+               assert!(
+                       after.contains("021607cfce19a4c5e7e6e738663dfafbbbac262e4ff76c2c9b30dbeefc35c00643")
+               );
+               assert!(
+                       after.contains("02247d9db0dfafea745ef8c9e161eb322f73ac3f8858d8730b6fd97254747ce76b")
+               );
+               assert!(
+                       after.contains("029e01f279986acc83ba235d46d80aede0b7595f410353b93a8ab540bb677f4432")
+               );
+               assert!(
+                       after.contains("02c913118a8895b9e29c89af6e20ed00d95a1f64e4952edbafa84d048f26804c61")
+               );
+               assert!(after.contains("619737530008010752"));
+               assert!(after.contains("783241506229452801"));
+       }
+
+       #[test]
+       fn full_update_succeeds() {
+               let valid_input = vec![
+                       76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
+                       79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
+                       0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
+                       187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
+                       157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
+                       88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
+                       204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
+                       181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
+                       110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
+                       76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
+                       226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 4, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
+                       0, 0, 0, 1, 0, 0, 0, 0, 29, 129, 25, 192, 255, 8, 153, 192, 0, 2, 27, 0, 0, 60, 0, 0,
+                       0, 0, 0, 0, 0, 1, 0, 0, 0, 100, 0, 0, 2, 224, 0, 0, 0, 0, 58, 85, 116, 216, 0, 29, 0,
+                       0, 0, 1, 0, 0, 0, 125, 0, 0, 0, 0, 58, 85, 116, 216, 255, 2, 68, 226, 0, 6, 11, 0, 1,
+                       0, 0, 1,
+               ];
+
+               let block_hash = genesis_block(Network::Bitcoin).block_hash();
+               let network_graph = NetworkGraph::new(block_hash);
+
+               assert_eq!(network_graph.read_only().channels().len(), 0);
+
+               let update_result = update_network_graph(&network_graph, &valid_input[..]);
+               if update_result.is_err() {
+                       panic!("Unexpected update result: {:?}", update_result)
+               }
+
+               assert_eq!(network_graph.read_only().channels().len(), 2);
+               let after = network_graph.to_string();
+               assert!(
+                       after.contains("021607cfce19a4c5e7e6e738663dfafbbbac262e4ff76c2c9b30dbeefc35c00643")
+               );
+               assert!(
+                       after.contains("02247d9db0dfafea745ef8c9e161eb322f73ac3f8858d8730b6fd97254747ce76b")
+               );
+               assert!(
+                       after.contains("029e01f279986acc83ba235d46d80aede0b7595f410353b93a8ab540bb677f4432")
+               );
+               assert!(
+                       after.contains("02c913118a8895b9e29c89af6e20ed00d95a1f64e4952edbafa84d048f26804c61")
+               );
+               assert!(after.contains("619737530008010752"));
+               assert!(after.contains("783241506229452801"));
+       }
+}
index abdc10c577a4f476b70929dad51499b26f1b0cfe..63f4fcdc8df4d355df1446c996ab115a6d19ff9f 100644 (file)
@@ -158,7 +158,7 @@ mod sync {
        #[cfg(test)]
        pub use debug_sync::*;
        #[cfg(not(test))]
-       pub use ::std::sync::{Arc, Mutex, Condvar, MutexGuard, RwLock, RwLockReadGuard};
+       pub use ::std::sync::{Arc, Mutex, Condvar, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard};
        #[cfg(not(test))]
        pub use crate::util::fairrwlock::FairRwLock;
 }
index 281a2a8e977123158f5daeba45ff8666c96ec403..5aa48c32bf07686aba4adad53b9615a6cdd91bb1 100644 (file)
@@ -630,7 +630,10 @@ pub struct UnsignedChannelUpdate {
        pub fee_base_msat: u32,
        /// The amount to fee multiplier, in micro-satoshi
        pub fee_proportional_millionths: u32,
-       pub(crate) excess_data: Vec<u8>,
+       /// Excess data which was signed as a part of the message which we do not (yet) understand how
+       /// to decode. This is stored to ensure forward-compatibility as new fields are added to the
+       /// lightning gossip
+       pub excess_data: Vec<u8>,
 }
 /// A channel_update message to be sent or received from a peer
 #[derive(Clone, Debug, PartialEq)]
index c65cec2a861dad727750896c8300f99dccdbb1ec..26719cc279061f180f5617f204846922271f3f3b 100644 (file)
@@ -67,7 +67,7 @@ impl NodeId {
        pub fn from_pubkey(pubkey: &PublicKey) -> Self {
                NodeId(pubkey.serialize())
        }
-       
+
        /// Get the public key slice from this NodeId
        pub fn as_slice(&self) -> &[u8] {
                &self.0
@@ -712,6 +712,16 @@ impl ChannelInfo {
                };
                Some((DirectedChannelInfo::new(self, direction), target))
        }
+
+       /// Returns a [`ChannelUpdateInfo`] based on the direction implied by the channel_flag.
+       pub fn get_directional_info(&self, channel_flags: u8) -> Option<&ChannelUpdateInfo> {
+               let direction = channel_flags & 1u8;
+               if direction == 0 {
+                       self.one_to_two.as_ref()
+               } else {
+                       self.two_to_one.as_ref()
+               }
+       }
 }
 
 impl fmt::Display for ChannelInfo {
@@ -1155,6 +1165,83 @@ impl NetworkGraph {
                self.update_channel_from_unsigned_announcement_intern(msg, None, chain_access)
        }
 
+       /// Update channel from partial announcement data received via rapid gossip sync
+       ///
+       /// `timestamp: u64`: Timestamp emulating the backdated original announcement receipt (by the
+       /// rapid gossip sync server)
+       ///
+       /// All other parameters as used in [`msgs::UnsignedChannelAnnouncement`] fields.
+       pub fn add_channel_from_partial_announcement(&self, short_channel_id: u64, timestamp: u64, features: ChannelFeatures, node_id_1: PublicKey, node_id_2: PublicKey) -> Result<(), LightningError> {
+               if node_id_1 == node_id_2 {
+                       return Err(LightningError{err: "Channel announcement node had a channel with itself".to_owned(), action: ErrorAction::IgnoreError});
+               };
+
+               let node_1 = NodeId::from_pubkey(&node_id_1);
+               let node_2 = NodeId::from_pubkey(&node_id_2);
+               let channel_info = ChannelInfo {
+                       features,
+                       node_one: node_1.clone(),
+                       one_to_two: None,
+                       node_two: node_2.clone(),
+                       two_to_one: None,
+                       capacity_sats: None,
+                       announcement_message: None,
+                       announcement_received_time: timestamp,
+               };
+
+               self.add_channel_between_nodes(short_channel_id, channel_info, None)
+       }
+
+       fn add_channel_between_nodes(&self, short_channel_id: u64, channel_info: ChannelInfo, utxo_value: Option<u64>) -> Result<(), LightningError> {
+               let mut channels = self.channels.write().unwrap();
+               let mut nodes = self.nodes.write().unwrap();
+
+               let node_id_a = channel_info.node_one.clone();
+               let node_id_b = channel_info.node_two.clone();
+
+               match channels.entry(short_channel_id) {
+                       BtreeEntry::Occupied(mut entry) => {
+                               //TODO: because asking the blockchain if short_channel_id is valid is only optional
+                               //in the blockchain API, we need to handle it smartly here, though it's unclear
+                               //exactly how...
+                               if utxo_value.is_some() {
+                                       // Either our UTXO provider is busted, there was a reorg, or the UTXO provider
+                                       // only sometimes returns results. In any case remove the previous entry. Note
+                                       // that the spec expects us to "blacklist" the node_ids involved, but we can't
+                                       // do that because
+                                       // a) we don't *require* a UTXO provider that always returns results.
+                                       // b) we don't track UTXOs of channels we know about and remove them if they
+                                       //    get reorg'd out.
+                                       // c) it's unclear how to do so without exposing ourselves to massive DoS risk.
+                                       Self::remove_channel_in_nodes(&mut nodes, &entry.get(), short_channel_id);
+                                       *entry.get_mut() = channel_info;
+                               } else {
+                                       return Err(LightningError{err: "Already have knowledge of channel".to_owned(), action: ErrorAction::IgnoreDuplicateGossip});
+                               }
+                       },
+                       BtreeEntry::Vacant(entry) => {
+                               entry.insert(channel_info);
+                       }
+               };
+
+               for current_node_id in [node_id_a, node_id_b].iter() {
+                       match nodes.entry(current_node_id.clone()) {
+                               BtreeEntry::Occupied(node_entry) => {
+                                       node_entry.into_mut().channels.push(short_channel_id);
+                               },
+                               BtreeEntry::Vacant(node_entry) => {
+                                       node_entry.insert(NodeInfo {
+                                               channels: vec!(short_channel_id),
+                                               lowest_inbound_channel_fees: None,
+                                               announcement_info: None,
+                                       });
+                               }
+                       };
+               };
+
+               Ok(())
+       }
+
        fn update_channel_from_unsigned_announcement_intern<C: Deref>(
                &self, msg: &msgs::UnsignedChannelAnnouncement, full_msg: Option<&msgs::ChannelAnnouncement>, chain_access: &Option<C>
        ) -> Result<(), LightningError>
@@ -1203,65 +1290,18 @@ impl NetworkGraph {
                }
 
                let chan_info = ChannelInfo {
-                               features: msg.features.clone(),
-                               node_one: NodeId::from_pubkey(&msg.node_id_1),
-                               one_to_two: None,
-                               node_two: NodeId::from_pubkey(&msg.node_id_2),
-                               two_to_one: None,
-                               capacity_sats: utxo_value,
-                               announcement_message: if msg.excess_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY
-                                       { full_msg.cloned() } else { None },
-                               announcement_received_time,
-                       };
-
-               let mut channels = self.channels.write().unwrap();
-               let mut nodes = self.nodes.write().unwrap();
-               match channels.entry(msg.short_channel_id) {
-                       BtreeEntry::Occupied(mut entry) => {
-                               //TODO: because asking the blockchain if short_channel_id is valid is only optional
-                               //in the blockchain API, we need to handle it smartly here, though it's unclear
-                               //exactly how...
-                               if utxo_value.is_some() {
-                                       // Either our UTXO provider is busted, there was a reorg, or the UTXO provider
-                                       // only sometimes returns results. In any case remove the previous entry. Note
-                                       // that the spec expects us to "blacklist" the node_ids involved, but we can't
-                                       // do that because
-                                       // a) we don't *require* a UTXO provider that always returns results.
-                                       // b) we don't track UTXOs of channels we know about and remove them if they
-                                       //    get reorg'd out.
-                                       // c) it's unclear how to do so without exposing ourselves to massive DoS risk.
-                                       Self::remove_channel_in_nodes(&mut nodes, &entry.get(), msg.short_channel_id);
-                                       *entry.get_mut() = chan_info;
-                               } else {
-                                       return Err(LightningError{err: "Already have knowledge of channel".to_owned(), action: ErrorAction::IgnoreDuplicateGossip});
-                               }
-                       },
-                       BtreeEntry::Vacant(entry) => {
-                               entry.insert(chan_info);
-                       }
+                       features: msg.features.clone(),
+                       node_one: NodeId::from_pubkey(&msg.node_id_1),
+                       one_to_two: None,
+                       node_two: NodeId::from_pubkey(&msg.node_id_2),
+                       two_to_one: None,
+                       capacity_sats: utxo_value,
+                       announcement_message: if msg.excess_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY
+                               { full_msg.cloned() } else { None },
+                       announcement_received_time,
                };
 
-               macro_rules! add_channel_to_node {
-                       ( $node_id: expr ) => {
-                               match nodes.entry($node_id) {
-                                       BtreeEntry::Occupied(node_entry) => {
-                                               node_entry.into_mut().channels.push(msg.short_channel_id);
-                                       },
-                                       BtreeEntry::Vacant(node_entry) => {
-                                               node_entry.insert(NodeInfo {
-                                                       channels: vec!(msg.short_channel_id),
-                                                       lowest_inbound_channel_fees: None,
-                                                       announcement_info: None,
-                                               });
-                                       }
-                               }
-                       };
-               }
-
-               add_channel_to_node!(NodeId::from_pubkey(&msg.node_id_1));
-               add_channel_to_node!(NodeId::from_pubkey(&msg.node_id_2));
-
-               Ok(())
+               self.add_channel_between_nodes(msg.short_channel_id, chan_info, utxo_value)
        }
 
        /// Close a channel if a corresponding HTLC fail was sent.
@@ -1581,7 +1621,7 @@ mod tests {
        use ln::features::{ChannelFeatures, InitFeatures, NodeFeatures};
        use routing::network_graph::{NetGraphMsgHandler, NetworkGraph, NetworkUpdate, MAX_EXCESS_BYTES_FOR_RELAY};
        use ln::msgs::{Init, OptionalField, RoutingMessageHandler, UnsignedNodeAnnouncement, NodeAnnouncement,
-               UnsignedChannelAnnouncement, ChannelAnnouncement, UnsignedChannelUpdate, ChannelUpdate, 
+               UnsignedChannelAnnouncement, ChannelAnnouncement, UnsignedChannelUpdate, ChannelUpdate,
                ReplyChannelRange, QueryChannelRange, QueryShortChannelIds, MAX_VALUE_MSAT};
        use util::test_utils;
        use util::logger::Logger;
index 69fd14640ae605bc2d481f40760ee48d3a502eb6..428adbc5e6638b5a0489eeb855d80f7c42f5ce86 100644 (file)
@@ -301,7 +301,7 @@ impl Readable for U48 {
 /// encoded in several different ways, which we must check for at deserialization-time. Thus, if
 /// you're looking for an example of a variable-length integer to use for your own project, move
 /// along, this is a rather poor design.
-pub(crate) struct BigSize(pub u64);
+pub struct BigSize(pub u64);
 impl Writeable for BigSize {
        #[inline]
        fn write<W: Writer>(&self, writer: &mut W) -> Result<(), io::Error> {