--- /dev/null
+root = true
+
+[*]
+insert_final_newline = true
+trim_trailing_whitespace = true
+
+[*.rs]
+indent_style = tab
+indent_size = 4
+
+[*.yml]
+indent_style = space
+indent_size = 2
--- /dev/null
+name: Cross-platform build verification
+on:
+ push:
+ branches:
+ - main
+ pull_request:
+ branches:
+ - "*"
+
+jobs:
+ build:
+ strategy:
+ fail-fast: false
+ matrix:
+ toolchain:
+ - stable
+ - 1.48.0
+ - beta
+ pinning: [true, false]
+ runs-on: ubuntu-latest
+ steps:
+ - name: Checkout source code
+ uses: actions/checkout@v3
+ - name: Install Rust ${{ matrix.toolchain }} toolchain
+ uses: actions-rs/toolchain@v1
+ with:
+ toolchain: ${{ matrix.toolchain }}
+ override: true
+ profile: minimal
+ - name: Pin dependencies
+ if: ${{ matrix.pinning }}
+ run: |
+ cargo update -p tokio-postgres --precise "0.7.5" --verbose
+ cargo update -p postgres-types --precise "0.2.3" --verbose
+ cargo update -p tokio --precise "1.14.1" --verbose
+ cargo update -p cpufeatures --precise "0.2.2" --verbose # https://github.com/RustCrypto/utils/issues/795
+ - name: Build on Rust ${{ matrix.toolchain }}
+ run: |
+ cargo build --verbose --color always
--- /dev/null
+[package]
+name = "rapid-gossip-sync-server"
+version = "0.1.0"
+edition = "2018"
+
+[dependencies]
+base64 = "0.13.0"
+bech32 = "0.8"
+bitcoin = "0.28.1"
+bitcoin-bech32 = "0.12"
+lightning = { version = "0.0.110" }
+lightning-block-sync = { version = "0.0.110", features=["rest-client"] }
+lightning-net-tokio = { version = "0.0.110" }
+chrono = "0.4"
+futures = "0.3"
+hex = "0.3"
+rand = "0.4"
+tokio = { version = "1.14.1", features = ["full"] }
+tokio-postgres = { version="0.7.5" }
+
+[profile.release]
+opt-level = 3
+lto = true
+panic = "abort"
+++ /dev/null
-MIT License
-
-Copyright (c) 2022 Lightning Dev Kit
-
-Permission is hereby granted, free of charge, to any person obtaining a copy
-of this software and associated documentation files (the "Software"), to deal
-in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-copies of the Software, and to permit persons to whom the Software is
-furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in all
-copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-SOFTWARE.
--- /dev/null
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "[]"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright [yyyy] [name of copyright owner]
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
--- /dev/null
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
--- /dev/null
+This software is licensed under [Apache 2.0](LICENSE-APACHE) or
+[MIT](LICENSE-MIT), at your option.
+
+Some files retain their own copyright notice, however, for full authorship
+information, see version control history.
+
+Except as otherwise noted in individual files, all files in this repository are
+licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+http://www.apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
+http://opensource.org/licenses/MIT>, at your option.
+
+You may not use, copy, modify, merge, publish, distribute, sublicense, and/or
+sell copies of this software or any files in this repository except in
+accordance with one or both of these licenses.
--- /dev/null
+# rust-ln-sync
+
+This is a server that connects to peers on the Lightning network and calculates compact rapid sync
+gossip data.
+
+These are the components it's comprised of.
+
+## config
+
+A config file where the Postgres credentials and Lightning peers can be adjusted. Most adjustments
+can be made by setting environment variables, whose usage is as follows:
+
+| Name | Default | Description |
+|:-------------------------|:--------------|:-----------------------------------------------------------------------------------------------------------|
+| RUST_LN_SYNC_DB_HOST | localhost | Domain of the Postgres database |
+| RUST_LN_SYNC_DB_USER | alice | Username to access Postgres |
+| RUST_LN_SYNC_DB_PASSWORD | _None_ | Password to access Postgres |
+| RUST_LN_SYNC_DB_NAME | ln_graph_sync | Name of the database to be used for gossip storage |
+| BITCOIN_REST_DOMAIN | 127.0.0.1 | Domain of the [bitcoind REST server](https://github.com/bitcoin/bitcoin/blob/master/doc/REST-interface.md) |
+| BITCOIN_REST_PORT | 80 | HTTP port of the bitcoind REST server |
+| BITCOIN_REST_PATH | /rest/ | Path infix to access the bitcoind REST endpoints |
+
+Notably, one property needs to be modified in code, namely the `ln_peers()` method. It specifies how
+many and which peers to use for retrieving gossip.
+
+## download
+
+The module responsible for initiating the scraping of the network graph from its peers.
+
+## persistence
+
+The module responsible for persisting all the downloaded graph data to Postgres.
+
+## server
+
+The server is responsible for returning dynamic and snapshotted rapid sync data.
+
+Dynamic sync data is fed a timestamp of the last sync, and it dynamically calculates a delta
+such that a minimal change set is returned based on changes which are assumed to have been seen
+by the client (ignoring any intermediate changes). Dynamic sync is only available after the first
+full graph sync completes on startup.
+
+Snapshot sync data is also based on a timestamp, but unlike dynamic sync, its responses are
+precalculated, which is done in a way that considers the possibility that the client may have
+intermittently seen later updates.
+
+### snapshot
+
+The snapshotting module is responsible for calculating and storing snapshots. It's started up
+as soon as the first full graph sync completes, and then keeps updating the snapshots at a
+24-hour-interval.
+
+### lookup
+
+The lookup module is responsible for fetching the latest data from the network graph and Postgres,
+and reconciling it into an actionable delta set that the server can return in a serialized format.
+
+It works by collecting all the channels that are currently in the network graph, and gathering
+announcements as well as updates for each of them. For the updates specifically, the last update
+seen prior to the given timestamp, the latest known updates, and, if necessary, all intermediate
+updates are collected.
+
+Then, any channel that has only had an announcement but never an update is dropped. Additionally,
+every channel whose first update was seen after the given timestamp is collected alongside its
+announcement.
+
+Finally, all channel update transitions are evaluated and collected into either a full or an
+incremental update.
+
+## Making a call
+
+### Dynamic
+
+Make a call to
+
+`http://localhost:3030/dynamic/1652644698`
+
+Where `1652644698` is the last sync timestamp.
+
+### Snapshotted
+
+Same as above, but sub `dynamic` for `snapshot`:
+
+`http://localhost:3030/snapshot/1652644698`
+
+## License
+
+MIT
--- /dev/null
+use std::env;
+use std::net::SocketAddr;
+use bitcoin::secp256k1::PublicKey;
+use lightning_block_sync::http::HttpEndpoint;
+use tokio_postgres::Config;
+use crate::hex_utils;
+
+pub(crate) const SCHEMA_VERSION: i32 = 1;
+pub(crate) const SNAPSHOT_CALCULATION_INTERVAL: u32 = 3600 * 24; // every 24 hours, in seconds
+pub(crate) const DOWNLOAD_NEW_GOSSIP: bool = true;
+
+pub(crate) fn network_graph_cache_path() -> &'static str {
+ "./res/network_graph.bin"
+}
+
+pub(crate) fn db_connection_config() -> Config {
+ let mut config = Config::new();
+ let host = env::var("RUST_LN_SYNC_DB_HOST").unwrap_or("localhost".to_string());
+ let user = env::var("RUST_LN_SYNC_DB_USER").unwrap_or("alice".to_string());
+ let db = env::var("RUST_LN_SYNC_DB_NAME").unwrap_or("ln_graph_sync".to_string());
+ config.host(&host);
+ config.user(&user);
+ config.dbname(&db);
+ if let Ok(password) = env::var("RUST_LN_SYNC_DB_PASSWORD") {
+ config.password(&password);
+ }
+ config
+}
+
+pub(crate) fn bitcoin_rest_endpoint() -> HttpEndpoint {
+ let host = env::var("BITCOIN_REST_DOMAIN").unwrap_or("127.0.0.1".to_string());
+ let port = env::var("BITCOIN_REST_PORT")
+ .unwrap_or("80".to_string())
+ .parse::<u16>()
+ .expect("BITCOIN_REST_PORT env variable must be a u16.");
+ let path = env::var("BITCOIN_REST_PATH").unwrap_or("/rest/".to_string());
+ HttpEndpoint::for_host(host).with_port(port).with_path(path)
+}
+
+pub(crate) fn db_config_table_creation_query() -> &'static str {
+ "CREATE TABLE IF NOT EXISTS config (
+ id SERIAL PRIMARY KEY,
+ db_schema integer
+ )"
+}
+
+pub(crate) fn db_announcement_table_creation_query() -> &'static str {
+ "CREATE TABLE IF NOT EXISTS channel_announcements (
+ id SERIAL PRIMARY KEY,
+ short_channel_id character varying(255) NOT NULL UNIQUE,
+ block_height integer,
+ chain_hash character varying(255),
+ announcement_signed BYTEA,
+ seen timestamp NOT NULL DEFAULT NOW()
+ )"
+}
+
+pub(crate) fn db_channel_update_table_creation_query() -> &'static str {
+ "CREATE TABLE IF NOT EXISTS channel_updates (
+ id SERIAL PRIMARY KEY,
+ composite_index character varying(255) UNIQUE,
+ chain_hash character varying(255),
+ short_channel_id character varying(255),
+ timestamp bigint,
+ channel_flags integer,
+ direction integer,
+ disable boolean,
+ cltv_expiry_delta integer,
+ htlc_minimum_msat bigint,
+ fee_base_msat integer,
+ fee_proportional_millionths integer,
+ htlc_maximum_msat bigint,
+ blob_signed BYTEA,
+ seen timestamp NOT NULL DEFAULT NOW()
+ )"
+}
+
+pub(crate) fn db_index_creation_query() -> &'static str {
+ "
+ CREATE INDEX IF NOT EXISTS channels_seen ON channel_announcements(seen);
+ CREATE INDEX IF NOT EXISTS channel_updates_scid ON channel_updates(short_channel_id);
+ CREATE INDEX IF NOT EXISTS channel_updates_direction ON channel_updates(direction);
+ CREATE INDEX IF NOT EXISTS channel_updates_seen ON channel_updates(seen);
+ "
+}
+
+/// EDIT ME
+pub(crate) fn ln_peers() -> Vec<(PublicKey, SocketAddr)> {
+ vec![
+ // Bitfinex
+ // (hex_utils::to_compressed_pubkey("033d8656219478701227199cbd6f670335c8d408a92ae88b962c49d4dc0e83e025").unwrap(), "34.65.85.39:9735".parse().unwrap()),
+
+ // Matt Corallo
+ // (hex_utils::to_compressed_pubkey("03db10aa09ff04d3568b0621750794063df401e6853c79a21a83e1a3f3b5bfb0c8").unwrap(), "69.59.18.80:9735".parse().unwrap())
+
+ // River Financial
+ // (hex_utils::to_compressed_pubkey("03037dc08e9ac63b82581f79b662a4d0ceca8a8ca162b1af3551595b8f2d97b70a").unwrap(), "104.196.249.140:9735".parse().unwrap())
+
+ // Wallet of Satoshi | 035e4ff418fc8b5554c5d9eea66396c227bd429a3251c8cbc711002ba215bfc226@170.75.163.209:9735
+ (hex_utils::to_compressed_pubkey("035e4ff418fc8b5554c5d9eea66396c227bd429a3251c8cbc711002ba215bfc226").unwrap(), "170.75.163.209:9735".parse().unwrap())
+ ]
+}
--- /dev/null
+use std::sync::{Arc, RwLock};
+use std::time::{SystemTime, UNIX_EPOCH};
+
+use bitcoin::secp256k1::PublicKey;
+use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler};
+use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
+use lightning::util::events::{MessageSendEvent, MessageSendEventsProvider};
+use tokio::sync::mpsc;
+
+use crate::{GossipChainAccess, TestLogger};
+use crate::types::{DetectedGossipMessage, GossipMessage};
+
+pub(crate) struct GossipCounter {
+ pub(crate) channel_announcements: u64,
+ pub(crate) channel_updates: u64,
+ pub(crate) channel_updates_without_htlc_max_msats: u64,
+ pub(crate) channel_announcements_with_mismatched_scripts: u64
+}
+
+impl GossipCounter {
+ pub(crate) fn new() -> Self {
+ Self {
+ channel_announcements: 0,
+ channel_updates: 0,
+ channel_updates_without_htlc_max_msats: 0,
+ channel_announcements_with_mismatched_scripts: 0,
+ }
+ }
+}
+
+pub(crate) struct GossipRouter {
+ pub(crate) native_router: Arc<P2PGossipSync<Arc<NetworkGraph<Arc<TestLogger>>>, GossipChainAccess, Arc<TestLogger>>>,
+ pub(crate) counter: RwLock<GossipCounter>,
+ pub(crate) sender: mpsc::Sender<DetectedGossipMessage>,
+}
+
+impl MessageSendEventsProvider for GossipRouter {
+ fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
+ self.native_router.get_and_clear_pending_msg_events()
+ }
+}
+
+impl RoutingMessageHandler for GossipRouter {
+ fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
+ self.native_router.handle_node_announcement(msg)
+ }
+
+ fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result<bool, LightningError> {
+ let timestamp_seen = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
+
+ let mut counter = self.counter.write().unwrap();
+
+ let output_value = self.native_router.handle_channel_announcement(msg).map_err(|error| {
+ let error_string = format!("{:?}", error);
+ if error_string.contains("announced on an unknown chain"){
+ return error;
+ }
+ counter.channel_announcements_with_mismatched_scripts += 1;
+ error
+ })?;
+
+ counter.channel_announcements += 1;
+ let gossip_message = GossipMessage::ChannelAnnouncement(msg.clone());
+ let detected_gossip_message = DetectedGossipMessage {
+ message: gossip_message,
+ timestamp_seen: timestamp_seen as u32,
+ };
+ let sender = self.sender.clone();
+ tokio::spawn(async move {
+ let _ = sender.send(detected_gossip_message).await;
+ });
+
+ Ok(output_value)
+ }
+
+ fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError> {
+ let timestamp_seen = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
+ let output_value = self.native_router.handle_channel_update(msg)?;
+
+ let mut counter = self.counter.write().unwrap();
+ counter.channel_updates += 1;
+ let gossip_message = GossipMessage::ChannelUpdate(msg.clone());
+ let detected_gossip_message = DetectedGossipMessage {
+ message: gossip_message,
+ timestamp_seen: timestamp_seen as u32,
+ };
+ let sender = self.sender.clone();
+ tokio::spawn(async move {
+ let _ = sender.send(detected_gossip_message).await;
+ });
+
+ Ok(output_value)
+ }
+
+ fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
+ self.native_router.get_next_channel_announcements(starting_point, batch_amount)
+ }
+
+ fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<NodeAnnouncement> {
+ self.native_router.get_next_node_announcements(starting_point, batch_amount)
+ }
+
+ fn peer_connected(&self, their_node_id: &PublicKey, init: &Init) {
+ self.native_router.peer_connected(their_node_id, init)
+ }
+
+ fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError> {
+ self.native_router.handle_reply_channel_range(their_node_id, msg)
+ }
+
+ fn handle_reply_short_channel_ids_end(&self, their_node_id: &PublicKey, msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> {
+ self.native_router.handle_reply_short_channel_ids_end(their_node_id, msg)
+ }
+
+ fn handle_query_channel_range(&self, their_node_id: &PublicKey, msg: QueryChannelRange) -> Result<(), LightningError> {
+ self.native_router.handle_query_channel_range(their_node_id, msg)
+ }
+
+ fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError> {
+ self.native_router.handle_query_short_channel_ids(their_node_id, msg)
+ }
+}
--- /dev/null
+use bitcoin::secp256k1::PublicKey;
+
+pub fn to_vec(hex: &str) -> Option<Vec<u8>> {
+ let mut out = Vec::with_capacity(hex.len() / 2);
+
+ let mut b = 0;
+ for (idx, c) in hex.as_bytes().iter().enumerate() {
+ b <<= 4;
+ match *c {
+ b'A'..=b'F' => b |= c - b'A' + 10,
+ b'a'..=b'f' => b |= c - b'a' + 10,
+ b'0'..=b'9' => b |= c - b'0',
+ _ => return None,
+ }
+ if (idx & 1) == 1 {
+ out.push(b);
+ b = 0;
+ }
+ }
+
+ Some(out)
+}
+
+#[inline]
+pub fn hex_str(value: &[u8]) -> String {
+ let mut res = String::with_capacity(64);
+ for v in value {
+ res += &format!("{:02x}", v);
+ }
+ res
+}
+
+pub fn to_compressed_pubkey(hex: &str) -> Option<PublicKey> {
+ let data = match to_vec(&hex[0..33 * 2]) {
+ Some(bytes) => bytes,
+ None => return None,
+ };
+ match PublicKey::from_slice(&data) {
+ Ok(pk) => Some(pk),
+ Err(_) => None,
+ }
+}
--- /dev/null
+#![deny(unsafe_code)]
+#![deny(broken_intra_doc_links)]
+#![deny(private_intra_doc_links)]
+#![deny(non_upper_case_globals)]
+#![deny(non_camel_case_types)]
+#![deny(non_snake_case)]
+#![deny(unused_mut)]
+#![deny(unused_variables)]
+#![deny(unused_imports)]
+
+extern crate core;
+
+use std::collections::{HashMap, HashSet};
+use std::fs::File;
+use std::io::BufReader;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+
+use bitcoin::blockdata::constants::genesis_block;
+use bitcoin::Network;
+use bitcoin::secp256k1::PublicKey;
+use lightning::routing::gossip::NetworkGraph;
+use lightning::util::ser::{ReadableArgs, Writeable};
+use tokio::sync::mpsc;
+use crate::lookup::DeltaSet;
+
+use crate::persistence::GossipPersister;
+use crate::serialization::UpdateSerializationMechanism;
+use crate::snapshot::Snapshotter;
+use crate::types::{GossipChainAccess, TestLogger};
+
+mod downloader;
+mod types;
+mod tracking;
+mod lookup;
+mod persistence;
+mod serialization;
+mod snapshot;
+mod config;
+mod hex_utils;
+mod verifier;
+
+pub struct RapidSyncProcessor {
+ network_graph: Arc<NetworkGraph<Arc<TestLogger>>>,
+ pub initial_sync_complete: Arc<AtomicBool>,
+}
+
+pub struct SerializedResponse {
+ pub data: Vec<u8>,
+ pub message_count: u32,
+ pub announcement_count: u32,
+ pub update_count: u32,
+ pub update_count_full: u32,
+ pub update_count_incremental: u32,
+}
+
+impl RapidSyncProcessor {
+ pub fn new() -> Self {
+ let logger = TestLogger::new();
+ let mut initial_sync_complete = false;
+ let arc_logger = Arc::new(logger);
+ let network_graph = if let Ok(file) = File::open(&config::network_graph_cache_path()) {
+ println!("Initializing from cached network graph…");
+ let mut buffered_reader = BufReader::new(file);
+ let network_graph_result = NetworkGraph::read(&mut buffered_reader, Arc::clone(&arc_logger));
+ if let Ok(network_graph) = network_graph_result {
+ initial_sync_complete = true;
+ network_graph.remove_stale_channels();
+ println!("Initialized from cached network graph!");
+ network_graph
+ } else {
+ println!("Initialization from cached network graph failed: {}", network_graph_result.err().unwrap());
+ NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash(), arc_logger)
+ }
+ } else {
+ NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash(), arc_logger)
+ };
+ let arc_network_graph = Arc::new(network_graph);
+ let (_sync_termination_sender, _sync_termination_receiver) = mpsc::channel::<()>(1);
+ Self {
+ network_graph: arc_network_graph,
+ initial_sync_complete: Arc::new(AtomicBool::new(initial_sync_complete)),
+ }
+ }
+
+ pub async fn start_sync(&self) {
+ // means to indicate sync completion status within this module
+ let (sync_completion_sender, mut sync_completion_receiver) = mpsc::channel::<()>(1);
+ let initial_sync_complete = self.initial_sync_complete.clone();
+
+ let network_graph = self.network_graph.clone();
+ let snapshotter = Snapshotter::new(network_graph.clone());
+
+ if config::DOWNLOAD_NEW_GOSSIP {
+
+ let mut persister = GossipPersister::new(sync_completion_sender, self.network_graph.clone());
+
+ let persistence_sender = persister.gossip_persistence_sender.clone();
+ println!("Starting gossip download");
+ let download_future = tracking::download_gossip(persistence_sender, network_graph.clone());
+ tokio::spawn(async move {
+ // initiate the whole download stuff in the background
+ download_future.await;
+ });
+ println!("Starting gossip db persistence listener");
+ tokio::spawn(async move {
+ // initiate persistence of the gossip data
+ let persistence_future = persister.persist_gossip();
+ persistence_future.await;
+ });
+
+ } else {
+ sync_completion_sender.send(()).await.unwrap();
+ }
+
+ {
+ let sync_completion = sync_completion_receiver.recv().await;
+ if sync_completion.is_none() {
+ panic!("Sync failed!");
+ }
+ initial_sync_complete.store(true, Ordering::Release);
+ println!("Initial sync complete!");
+
+ // start the gossip snapshotting service
+ snapshotter.snapshot_gossip().await;
+ }
+ }
+
+ pub async fn serialize_delta(&self, last_sync_timestamp: u32, consider_intermediate_updates: bool) -> SerializedResponse {
+ crate::serialize_delta(self.network_graph.clone(), last_sync_timestamp, consider_intermediate_updates).await
+ }
+}
+
+async fn serialize_delta(network_graph: Arc<NetworkGraph<Arc<TestLogger>>>, last_sync_timestamp: u32, consider_intermediate_updates: bool) -> SerializedResponse {
+ let (client, connection) = lookup::connect_to_db().await;
+
+ tokio::spawn(async move {
+ if let Err(e) = connection.await {
+ panic!("connection error: {}", e);
+ }
+ });
+
+ let mut output: Vec<u8> = vec![];
+
+ // set a flag if the chain hash is prepended
+ // chain hash only necessary if either channel announcements or non-incremental updates are present
+ // for announcement-free incremental-only updates, chain hash can be skipped
+
+ let mut node_id_set: HashSet<[u8; 33]> = HashSet::new();
+ let mut node_id_indices: HashMap<[u8; 33], usize> = HashMap::new();
+ let mut node_ids: Vec<PublicKey> = Vec::new();
+ let mut duplicate_node_ids: i32 = 0;
+
+ let mut get_node_id_index = |node_id: PublicKey| {
+ let serialized_node_id = node_id.serialize();
+ if node_id_set.insert(serialized_node_id) {
+ node_ids.push(node_id);
+ let index = node_ids.len() - 1;
+ node_id_indices.insert(serialized_node_id, index);
+ return index;
+ }
+ duplicate_node_ids += 1;
+ node_id_indices[&serialized_node_id]
+ };
+
+ let mut delta_set = DeltaSet::new();
+ lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp).await;
+ println!("announcement channel count: {}", delta_set.len());
+ lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp, consider_intermediate_updates).await;
+ println!("update-fetched channel count: {}", delta_set.len());
+ lookup::filter_delta_set(&mut delta_set);
+ println!("update-filtered channel count: {}", delta_set.len());
+ let serialization_details = serialization::serialize_delta_set(delta_set, last_sync_timestamp);
+
+ // process announcements
+ // write the number of channel announcements to the output
+ let announcement_count = serialization_details.announcements.len() as u32;
+ announcement_count.write(&mut output).unwrap();
+ let mut previous_announcement_scid = 0;
+ for current_announcement in serialization_details.announcements {
+ let id_index_1 = get_node_id_index(current_announcement.node_id_1);
+ let id_index_2 = get_node_id_index(current_announcement.node_id_2);
+ let mut stripped_announcement = serialization::serialize_stripped_channel_announcement(¤t_announcement, id_index_1, id_index_2, previous_announcement_scid);
+ output.append(&mut stripped_announcement);
+
+ previous_announcement_scid = current_announcement.short_channel_id;
+ }
+
+ // process updates
+ let mut previous_update_scid = 0;
+ let update_count = serialization_details.updates.len() as u32;
+ update_count.write(&mut output).unwrap();
+
+ let default_update_values = serialization_details.full_update_defaults;
+ if update_count > 0 {
+ default_update_values.cltv_expiry_delta.write(&mut output).unwrap();
+ default_update_values.htlc_minimum_msat.write(&mut output).unwrap();
+ default_update_values.fee_base_msat.write(&mut output).unwrap();
+ default_update_values.fee_proportional_millionths.write(&mut output).unwrap();
+ default_update_values.htlc_maximum_msat.write(&mut output).unwrap();
+ }
+
+ let mut update_count_full = 0;
+ let mut update_count_incremental = 0;
+ for current_update in serialization_details.updates {
+ match ¤t_update.mechanism {
+ UpdateSerializationMechanism::Full => {
+ update_count_full += 1;
+ }
+ UpdateSerializationMechanism::Incremental(_) => {
+ update_count_incremental += 1;
+ }
+ };
+
+ let mut stripped_update = serialization::serialize_stripped_channel_update(¤t_update, &default_update_values, previous_update_scid);
+ output.append(&mut stripped_update);
+
+ previous_update_scid = current_update.update.short_channel_id;
+ }
+
+ // some stats
+ let message_count = announcement_count + update_count;
+
+ let mut prefixed_output = vec![76, 68, 75, 1];
+
+ // always write the chain hash
+ serialization_details.chain_hash.write(&mut prefixed_output).unwrap();
+ // always write the latest seen timestamp
+ let latest_seen_timestamp = serialization_details.latest_seen;
+ let overflow_seconds = latest_seen_timestamp % config::SNAPSHOT_CALCULATION_INTERVAL;
+ let serialized_seen_timestamp = latest_seen_timestamp.saturating_sub(overflow_seconds);
+ serialized_seen_timestamp.write(&mut prefixed_output).unwrap();
+
+ let node_id_count = node_ids.len() as u32;
+ node_id_count.write(&mut prefixed_output).unwrap();
+
+ for current_node_id in node_ids {
+ current_node_id.write(&mut prefixed_output).unwrap();
+ }
+
+ prefixed_output.append(&mut output);
+
+ println!("duplicated node ids: {}", duplicate_node_ids);
+ println!("latest seen timestamp: {:?}", serialization_details.latest_seen);
+
+ SerializedResponse {
+ data: prefixed_output,
+ message_count,
+ announcement_count,
+ update_count,
+ update_count_full,
+ update_count_incremental,
+ }
+}
--- /dev/null
+use std::collections::{BTreeMap, HashSet};
+use std::io::Cursor;
+use std::ops::Add;
+use std::sync::Arc;
+use std::time::{Duration, Instant, SystemTime};
+
+use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate};
+use lightning::routing::gossip::NetworkGraph;
+use lightning::util::ser::Readable;
+use tokio_postgres::{Client, Connection, NoTls, Socket};
+use tokio_postgres::tls::NoTlsStream;
+
+use crate::{config, hex_utils, TestLogger};
+use crate::serialization::MutatedProperties;
+
+/// The delta set needs to be a BTreeMap so the keys are sorted.
+/// That way, the scids in the response automatically grow monotonically
+pub(super) type DeltaSet = BTreeMap<u64, ChannelDelta>;
+
+pub(super) struct AnnouncementDelta {
+ pub(super) seen: u32,
+ pub(super) announcement: UnsignedChannelAnnouncement,
+}
+
+pub(super) struct UpdateDelta {
+ pub(super) seen: u32,
+ pub(super) update: UnsignedChannelUpdate,
+}
+
+pub(super) struct DirectedUpdateDelta {
+ pub(super) last_update_before_seen: Option<UnsignedChannelUpdate>,
+ pub(super) mutated_properties: MutatedProperties,
+ pub(super) latest_update_after_seen: Option<UpdateDelta>,
+}
+
+pub(super) struct ChannelDelta {
+ pub(super) announcement: Option<AnnouncementDelta>,
+ pub(super) updates: (Option<DirectedUpdateDelta>, Option<DirectedUpdateDelta>),
+ pub(super) first_update_seen: Option<u32>,
+}
+
+impl Default for ChannelDelta {
+ fn default() -> Self {
+ Self { announcement: None, updates: (None, None), first_update_seen: None }
+ }
+}
+
+impl Default for DirectedUpdateDelta {
+ fn default() -> Self {
+ Self {
+ last_update_before_seen: None,
+ mutated_properties: MutatedProperties::default(),
+ latest_update_after_seen: None,
+ }
+ }
+}
+
+pub(super) async fn connect_to_db() -> (Client, Connection<Socket, NoTlsStream>) {
+ let connection_config = config::db_connection_config();
+ connection_config.connect(NoTls).await.unwrap()
+}
+
+/// Fetch all the channel announcements that are presently in the network graph, regardless of
+/// whether they had been seen before.
+/// Also include all announcements for which the first update was announced
+/// after `last_syc_timestamp`
+pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<Arc<TestLogger>>>, client: &Client, last_sync_timestamp: u32) {
+ let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64));
+ println!("Obtaining channel ids from network graph");
+ let channel_ids = {
+ let read_only_graph = network_graph.read_only();
+ println!("Retrieved read-only network graph copy");
+ let channel_iterator = read_only_graph.channels().into_iter();
+ channel_iterator
+ .filter(|c| c.1.announcement_message.is_some())
+ .map(|c| hex_utils::hex_str(&c.1.announcement_message.clone().unwrap().contents.short_channel_id.to_be_bytes()))
+ .collect::<Vec<String>>()
+ };
+
+ println!("Obtaining corresponding database entries");
+ // get all the channel announcements that are currently in the network graph
+ let announcement_rows = client.query("SELECT short_channel_id, announcement_signed, seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", &[&channel_ids]).await.unwrap();
+
+ for current_announcement_row in announcement_rows {
+ let blob: Vec<u8> = current_announcement_row.get("announcement_signed");
+ let mut readable = Cursor::new(blob);
+ let unsigned_announcement = ChannelAnnouncement::read(&mut readable).unwrap().contents;
+
+ let scid = unsigned_announcement.short_channel_id;
+ let current_seen_timestamp_object: SystemTime = current_announcement_row.get("seen");
+ let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
+
+ let mut current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
+ (*current_channel_delta).announcement = Some(AnnouncementDelta {
+ announcement: unsigned_announcement,
+ seen: current_seen_timestamp,
+ });
+ }
+
+ println!("Obtaining channel announcements whose first channel updates had not been seen yet");
+
+ // here is where the channels whose first update in either direction occurred after
+ // `last_seen_timestamp` are added to the selection
+ let unannounced_rows = client.query("SELECT short_channel_id, blob_signed, seen FROM (SELECT DISTINCT ON (short_channel_id) short_channel_id, blob_signed, seen FROM channel_updates ORDER BY short_channel_id ASC, seen ASC) AS first_seens WHERE first_seens.seen >= $1", &[&last_sync_timestamp_object]).await.unwrap();
+ for current_row in unannounced_rows {
+
+ let blob: Vec<u8> = current_row.get("blob_signed");
+ let mut readable = Cursor::new(blob);
+ let unsigned_update = ChannelUpdate::read(&mut readable).unwrap().contents;
+ let scid = unsigned_update.short_channel_id;
+ let current_seen_timestamp_object: SystemTime = current_row.get("seen");
+ let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
+
+ let mut current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
+ (*current_channel_delta).first_update_seen = Some(current_seen_timestamp);
+ }
+}
+
+pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32, consider_intermediate_updates: bool) {
+ let start = Instant::now();
+ let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64));
+
+ // get the latest channel update in each direction prior to last_sync_timestamp, provided
+ // there was an update in either direction that happened after the last sync (to avoid
+ // collecting too many reference updates)
+ let reference_rows = client.query("SELECT DISTINCT ON (short_channel_id, direction) id, short_channel_id, direction, blob_signed FROM channel_updates WHERE seen < $1 AND short_channel_id IN (SELECT short_channel_id FROM channel_updates WHERE seen >= $1 GROUP BY short_channel_id) ORDER BY short_channel_id ASC, direction ASC, seen DESC", &[&last_sync_timestamp_object]).await.unwrap();
+
+ println!("Fetched reference rows ({}): {:?}", reference_rows.len(), start.elapsed());
+
+ let mut last_seen_update_ids: Vec<i32> = Vec::with_capacity(reference_rows.len());
+ let mut non_intermediate_ids: HashSet<i32> = HashSet::new();
+
+ for current_reference in reference_rows {
+ let update_id: i32 = current_reference.get("id");
+ last_seen_update_ids.push(update_id);
+ non_intermediate_ids.insert(update_id);
+
+ let direction: i32 = current_reference.get("direction");
+ let blob: Vec<u8> = current_reference.get("blob_signed");
+ let mut readable = Cursor::new(blob);
+ let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
+ let scid = unsigned_channel_update.short_channel_id;
+
+ let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
+ let mut update_delta = if direction == 0 {
+ (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
+ } else if direction == 1 {
+ (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
+ } else {
+ panic!("Channel direction must be binary!")
+ };
+ update_delta.last_update_before_seen = Some(unsigned_channel_update);
+
+
+ }
+
+ println!("Processed reference rows (delta size: {}): {:?}", delta_set.len(), start.elapsed());
+
+ // get all the intermediate channel updates
+ // (to calculate the set of mutated fields for snapshotting, where intermediate updates may
+ // have been omitted)
+
+ let mut intermediate_update_prefix = "";
+ if !consider_intermediate_updates {
+ intermediate_update_prefix = "DISTINCT ON (short_channel_id, direction)";
+ }
+
+ let query_string = format!("SELECT {} id, short_channel_id, direction, blob_signed, seen FROM channel_updates WHERE seen >= $1 ORDER BY short_channel_id ASC, direction ASC, seen DESC", intermediate_update_prefix);
+ let intermediate_updates = client.query(&query_string, &[&last_sync_timestamp_object]).await.unwrap();
+ println!("Fetched intermediate rows ({}): {:?}", intermediate_updates.len(), start.elapsed());
+
+ let mut previous_scid = u64::MAX;
+ let mut previously_seen_directions = (false, false);
+
+ // let mut previously_seen_directions = (false, false);
+ let mut intermediate_update_count = 0;
+ for intermediate_update in intermediate_updates {
+ let update_id: i32 = intermediate_update.get("id");
+ if non_intermediate_ids.contains(&update_id) {
+ continue;
+ }
+ intermediate_update_count += 1;
+
+ let direction: i32 = intermediate_update.get("direction");
+ let current_seen_timestamp_object: SystemTime = intermediate_update.get("seen");
+ let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
+ let blob: Vec<u8> = intermediate_update.get("blob_signed");
+ let mut readable = Cursor::new(blob);
+ let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
+
+ let scid = unsigned_channel_update.short_channel_id;
+ if scid != previous_scid {
+ previous_scid = scid.clone();
+ previously_seen_directions = (false, false);
+ }
+
+ // get the write configuration for this particular channel's directional details
+ let current_channel_delta = delta_set.entry(scid.clone()).or_insert(ChannelDelta::default());
+ let update_delta = if direction == 0 {
+ (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
+ } else if direction == 1 {
+ (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
+ } else {
+ panic!("Channel direction must be binary!")
+ };
+
+ {
+ // handle the latest deltas
+ if direction == 0 && !previously_seen_directions.0 {
+ previously_seen_directions.0 = true;
+ update_delta.latest_update_after_seen = Some(UpdateDelta {
+ seen: current_seen_timestamp,
+ update: unsigned_channel_update.clone(),
+ });
+ } else if direction == 1 && !previously_seen_directions.1 {
+ previously_seen_directions.1 = true;
+ update_delta.latest_update_after_seen = Some(UpdateDelta {
+ seen: current_seen_timestamp,
+ update: unsigned_channel_update.clone(),
+ });
+ }
+ }
+
+ // determine mutations
+ if let Some(last_seen_update) = update_delta.last_update_before_seen.as_ref(){
+ if unsigned_channel_update.flags != last_seen_update.flags {
+ update_delta.mutated_properties.flags = true;
+ }
+ if unsigned_channel_update.cltv_expiry_delta != last_seen_update.cltv_expiry_delta {
+ update_delta.mutated_properties.cltv_expiry_delta = true;
+ }
+ if unsigned_channel_update.htlc_minimum_msat != last_seen_update.htlc_minimum_msat {
+ update_delta.mutated_properties.htlc_minimum_msat = true;
+ }
+ if unsigned_channel_update.fee_base_msat != last_seen_update.fee_base_msat {
+ update_delta.mutated_properties.fee_base_msat = true;
+ }
+ if unsigned_channel_update.fee_proportional_millionths != last_seen_update.fee_proportional_millionths {
+ update_delta.mutated_properties.fee_proportional_millionths = true;
+ }
+ if unsigned_channel_update.htlc_maximum_msat != last_seen_update.htlc_maximum_msat {
+ update_delta.mutated_properties.htlc_maximum_msat = true;
+ }
+ }
+
+ }
+ println!("Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
+}
+
+pub(super) fn filter_delta_set(delta_set: &mut DeltaSet) {
+ let original_length = delta_set.len();
+ let keys: Vec<u64> = delta_set.keys().cloned().collect();
+ for k in keys {
+ let v = delta_set.get(&k).unwrap();
+ if v.announcement.is_none() {
+ // this channel is not currently in the network graph
+ delta_set.remove(&k);
+ continue;
+ }
+
+ let update_meets_criteria = |update: &Option<DirectedUpdateDelta>| {
+ if update.is_none() {
+ return false;
+ };
+ let update_reference = update.as_ref().unwrap();
+ // update_reference.latest_update_after_seen.is_some() && !update_reference.intermediate_updates.is_empty()
+ // if there has been an update after the channel was first seen
+ update_reference.latest_update_after_seen.is_some()
+ };
+
+ let direction_a_meets_criteria = update_meets_criteria(&v.updates.0);
+ let direction_b_meets_criteria = update_meets_criteria(&v.updates.1);
+
+ if !direction_a_meets_criteria && !direction_b_meets_criteria {
+ delta_set.remove(&k);
+ }
+ }
+
+ let new_length = delta_set.len();
+ if original_length != new_length {
+ println!("length modified!");
+ }
+}
--- /dev/null
+use rapid_gossip_sync_server::RapidSyncProcessor;
+
+#[tokio::main]
+async fn main() {
+ let processor = RapidSyncProcessor::new();
+ processor.start_sync().await;
+}
--- /dev/null
+use std::fs::OpenOptions;
+use std::io::BufWriter;
+use std::sync::Arc;
+use std::time::Instant;
+use lightning::routing::gossip::NetworkGraph;
+use lightning::util::ser::Writeable;
+use tokio::sync::mpsc;
+use tokio_postgres::NoTls;
+
+use crate::{config, hex_utils, TestLogger};
+use crate::types::{DetectedGossipMessage, GossipMessage};
+
+pub(crate) struct GossipPersister {
+ pub(crate) gossip_persistence_sender: mpsc::Sender<DetectedGossipMessage>,
+ gossip_persistence_receiver: mpsc::Receiver<DetectedGossipMessage>,
+ server_sync_completion_sender: mpsc::Sender<()>,
+ network_graph: Arc<NetworkGraph<Arc<TestLogger>>>,
+}
+
+impl GossipPersister {
+ pub fn new(server_sync_completion_sender: mpsc::Sender<()>, network_graph: Arc<NetworkGraph<Arc<TestLogger>>>) -> Self {
+ let (gossip_persistence_sender, gossip_persistence_receiver) =
+ mpsc::channel::<DetectedGossipMessage>(10000);
+ GossipPersister {
+ gossip_persistence_sender,
+ gossip_persistence_receiver,
+ server_sync_completion_sender,
+ network_graph
+ }
+ }
+
+ pub(crate) async fn persist_gossip(&mut self) {
+ let connection_config = config::db_connection_config();
+ let (client, connection) =
+ connection_config.connect(NoTls).await.unwrap();
+
+ tokio::spawn(async move {
+ if let Err(e) = connection.await {
+ panic!("connection error: {}", e);
+ }
+ });
+
+ {
+ // initialize the database
+ let initialization = client
+ .execute(config::db_config_table_creation_query(), &[])
+ .await;
+ if let Err(initialization_error) = initialization {
+ panic!("db init error: {}", initialization_error);
+ }
+
+ let initialization = client
+ .execute(
+ // TODO: figure out a way to fix the id value without Postgres complaining about
+ // its value not being default
+ "INSERT INTO config (id, db_schema) VALUES ($1, $2) ON CONFLICT (id) DO NOTHING",
+ &[&1, &config::SCHEMA_VERSION]
+ ).await;
+ if let Err(initialization_error) = initialization {
+ panic!("db init error: {}", initialization_error);
+ }
+
+ let initialization = client
+ .execute(config::db_announcement_table_creation_query(), &[])
+ .await;
+ if let Err(initialization_error) = initialization {
+ panic!("db init error: {}", initialization_error);
+ }
+
+ let initialization = client
+ .execute(
+ config::db_channel_update_table_creation_query(),
+ &[],
+ )
+ .await;
+ if let Err(initialization_error) = initialization {
+ panic!("db init error: {}", initialization_error);
+ }
+
+ let initialization = client
+ .batch_execute(config::db_index_creation_query())
+ .await;
+ if let Err(initialization_error) = initialization {
+ panic!("db init error: {}", initialization_error);
+ }
+ }
+
+ // print log statement every 10,000 messages
+ let mut persistence_log_threshold = 10000;
+ let mut i = 0u32;
+ let mut server_sync_completion_sent = false;
+ let mut latest_graph_cache_time: Option<Instant> = None;
+ // TODO: it would be nice to have some sort of timeout here so after 10 seconds of
+ // inactivity, some sort of message could be broadcast signaling the activation of request
+ // processing
+ while let Some(detected_gossip_message) = &self.gossip_persistence_receiver.recv().await {
+ i += 1; // count the persisted gossip messages
+
+ if i == 1 || i % persistence_log_threshold == 0 {
+ println!("Persisting gossip message #{}", i);
+ }
+
+ if let Some(last_cache_time) = latest_graph_cache_time {
+ // has it been ten minutes? Just cache it
+ if last_cache_time.elapsed().as_secs() >= 600 {
+ self.persist_network_graph();
+ latest_graph_cache_time = Some(Instant::now());
+ }
+ } else {
+ // initialize graph cache timer
+ latest_graph_cache_time = Some(Instant::now());
+ }
+
+ match &detected_gossip_message.message {
+ GossipMessage::InitialSyncComplete => {
+ // signal to the server that it may now serve dynamic responses and calculate
+ // snapshots
+ // we take this detour through the persister to ensure that all previous
+ // messages have already been persisted to the database
+ println!("Persister caught up with gossip!");
+ i -= 1; // this wasn't an actual gossip message that needed persisting
+ persistence_log_threshold = 50;
+ if !server_sync_completion_sent {
+ server_sync_completion_sent = true;
+ self.server_sync_completion_sender.send(()).await.unwrap();
+ println!("Server has been notified of persistence completion.");
+ }
+
+ // now, cache the persisted network graph
+ // also persist the network graph here
+ let mut too_soon = false;
+ if let Some(latest_graph_cache_time) = latest_graph_cache_time {
+ let time_since_last_cached = latest_graph_cache_time.elapsed().as_secs();
+ // don't cache more frequently than every 2 minutes
+ too_soon = time_since_last_cached < 120;
+ }
+ if too_soon {
+ println!("Network graph has been cached too recently.");
+ }else {
+ latest_graph_cache_time = Some(Instant::now());
+ self.persist_network_graph();
+ }
+ }
+ GossipMessage::ChannelAnnouncement(announcement) => {
+
+ let scid = announcement.contents.short_channel_id;
+ let scid_hex = hex_utils::hex_str(&scid.to_be_bytes());
+ // scid is 8 bytes
+ // block height is the first three bytes
+ // to obtain block height, shift scid right by 5 bytes (40 bits)
+ let block_height = (scid >> 5 * 8) as i32;
+ let chain_hash = announcement.contents.chain_hash.as_ref();
+ let chain_hash_hex = hex_utils::hex_str(chain_hash);
+
+ // start with the type prefix, which is already known a priori
+ let mut announcement_signed = Vec::new(); // vec![1, 0];
+ announcement.write(&mut announcement_signed).unwrap();
+
+ let result = client
+ .execute("INSERT INTO channel_announcements (\
+ short_channel_id, \
+ block_height, \
+ chain_hash, \
+ announcement_signed \
+ ) VALUES ($1, $2, $3, $4) ON CONFLICT (short_channel_id) DO NOTHING", &[
+ &scid_hex,
+ &block_height,
+ &chain_hash_hex,
+ &announcement_signed
+ ]).await;
+ if result.is_err() {
+ panic!("error: {}", result.err().unwrap());
+ }
+ }
+ GossipMessage::ChannelUpdate(update) => {
+ let scid = update.contents.short_channel_id;
+ let scid_hex = hex_utils::hex_str(&scid.to_be_bytes());
+
+ let chain_hash = update.contents.chain_hash.as_ref();
+ let chain_hash_hex = hex_utils::hex_str(chain_hash);
+
+ let timestamp = update.contents.timestamp as i64;
+
+ let channel_flags = update.contents.flags as i32;
+ let direction = channel_flags & 1;
+ let disable = (channel_flags & 2) > 0;
+
+ let composite_index = format!("{}:{}:{}", scid_hex, timestamp, direction);
+
+ let cltv_expiry_delta = update.contents.cltv_expiry_delta as i32;
+ let htlc_minimum_msat = update.contents.htlc_minimum_msat as i64;
+ let fee_base_msat = update.contents.fee_base_msat as i32;
+ let fee_proportional_millionths =
+ update.contents.fee_proportional_millionths as i32;
+ let htlc_maximum_msat = update.contents.htlc_maximum_msat as i64;
+
+ // start with the type prefix, which is already known a priori
+ let mut update_signed = Vec::new(); // vec![1, 2];
+ update.write(&mut update_signed).unwrap();
+
+ let result = client
+ .execute("INSERT INTO channel_updates (\
+ composite_index, \
+ chain_hash, \
+ short_channel_id, \
+ timestamp, \
+ channel_flags, \
+ direction, \
+ disable, \
+ cltv_expiry_delta, \
+ htlc_minimum_msat, \
+ fee_base_msat, \
+ fee_proportional_millionths, \
+ htlc_maximum_msat, \
+ blob_signed \
+ ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) ON CONFLICT (composite_index) DO NOTHING", &[
+ &composite_index,
+ &chain_hash_hex,
+ &scid_hex,
+ ×tamp,
+ &channel_flags,
+ &direction,
+ &disable,
+ &cltv_expiry_delta,
+ &htlc_minimum_msat,
+ &fee_base_msat,
+ &fee_proportional_millionths,
+ &htlc_maximum_msat,
+ &update_signed
+ ]).await;
+ if result.is_err() {
+ panic!("error: {}", result.err().unwrap());
+ }
+ }
+ }
+ }
+ }
+
+ fn persist_network_graph(&self) {
+ println!("Caching network graph…");
+ let cache_path = config::network_graph_cache_path();
+ let file = OpenOptions::new()
+ .create(true)
+ .write(true)
+ .truncate(true)
+ .open(&cache_path)
+ .unwrap();
+ self.network_graph.remove_stale_channels();
+ let mut writer = BufWriter::new(file);
+ self.network_graph.write(&mut writer).unwrap();
+ println!("Cached network graph!");
+ }
+}
--- /dev/null
+use std::cmp::max;
+use std::collections::HashMap;
+
+use bitcoin::BlockHash;
+use lightning::ln::msgs::{UnsignedChannelAnnouncement, UnsignedChannelUpdate};
+use lightning::util::ser::{BigSize, Writeable};
+
+use crate::lookup::{DeltaSet, DirectedUpdateDelta};
+
+pub(super) struct SerializationSet {
+ pub(super) announcements: Vec<UnsignedChannelAnnouncement>,
+ pub(super) updates: Vec<UpdateSerialization>,
+ pub(super) full_update_defaults: DefaultUpdateValues,
+ pub(super) latest_seen: u32,
+ pub(super) chain_hash: BlockHash,
+}
+
+pub(super) struct DefaultUpdateValues {
+ pub(super) cltv_expiry_delta: u16,
+ pub(super) htlc_minimum_msat: u64,
+ pub(super) fee_base_msat: u32,
+ pub(super) fee_proportional_millionths: u32,
+ pub(super) htlc_maximum_msat: u64,
+}
+
+impl Default for DefaultUpdateValues {
+ fn default() -> Self {
+ Self {
+ cltv_expiry_delta: 0,
+ htlc_minimum_msat: 0,
+ fee_base_msat: 0,
+ fee_proportional_millionths: 0,
+ htlc_maximum_msat: 0,
+ }
+ }
+}
+
+pub(super) struct MutatedProperties {
+ pub(super) flags: bool,
+ pub(super) cltv_expiry_delta: bool,
+ pub(super) htlc_minimum_msat: bool,
+ pub(super) fee_base_msat: bool,
+ pub(super) fee_proportional_millionths: bool,
+ pub(super) htlc_maximum_msat: bool,
+}
+
+impl Default for MutatedProperties {
+ fn default() -> Self {
+ Self {
+ flags: false,
+ cltv_expiry_delta: false,
+ htlc_minimum_msat: false,
+ fee_base_msat: false,
+ fee_proportional_millionths: false,
+ htlc_maximum_msat: false,
+ }
+ }
+}
+
+pub(super) struct UpdateSerialization {
+ pub(super) update: UnsignedChannelUpdate,
+ pub(super) mechanism: UpdateSerializationMechanism,
+}
+
+impl MutatedProperties {
+ /// Does not include flags because the flag byte is always sent in full
+ fn len(&self) -> u8 {
+ let mut mutations = 0;
+ if self.cltv_expiry_delta { mutations += 1; };
+ if self.htlc_minimum_msat { mutations += 1; };
+ if self.fee_base_msat { mutations += 1; };
+ if self.fee_proportional_millionths { mutations += 1; };
+ if self.htlc_maximum_msat { mutations += 1; };
+ mutations
+ }
+}
+
+pub(super) enum UpdateSerializationMechanism {
+ Full,
+ Incremental(MutatedProperties),
+}
+
+struct FullUpdateValueHistograms {
+ cltv_expiry_delta: HashMap<u16, usize>,
+ htlc_minimum_msat: HashMap<u64, usize>,
+ fee_base_msat: HashMap<u32, usize>,
+ fee_proportional_millionths: HashMap<u32, usize>,
+ htlc_maximum_msat: HashMap<u64, usize>,
+}
+
+pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_timestamp: u32) -> SerializationSet {
+ let mut serialization_set = SerializationSet {
+ announcements: vec![],
+ updates: vec![],
+ full_update_defaults: Default::default(),
+ chain_hash: Default::default(),
+ latest_seen: 0,
+ };
+
+ let mut chain_hash_set = false;
+
+ let mut full_update_histograms = FullUpdateValueHistograms {
+ cltv_expiry_delta: Default::default(),
+ htlc_minimum_msat: Default::default(),
+ fee_base_msat: Default::default(),
+ fee_proportional_millionths: Default::default(),
+ htlc_maximum_msat: Default::default(),
+ };
+
+ let mut record_full_update_in_histograms = |full_update: &UnsignedChannelUpdate| {
+ *full_update_histograms.cltv_expiry_delta.entry(full_update.cltv_expiry_delta).or_insert(0) += 1;
+ *full_update_histograms.htlc_minimum_msat.entry(full_update.htlc_minimum_msat).or_insert(0) += 1;
+ *full_update_histograms.fee_base_msat.entry(full_update.fee_base_msat).or_insert(0) += 1;
+ *full_update_histograms.fee_proportional_millionths.entry(full_update.fee_proportional_millionths).or_insert(0) += 1;
+ *full_update_histograms.htlc_maximum_msat.entry(full_update.htlc_maximum_msat).or_insert(0) += 1;
+ };
+
+ for (_scid, channel_delta) in delta_set.into_iter() {
+
+ // any announcement chain hash is gonna be the same value. Just set it from the first one.
+ let channel_announcement_delta = channel_delta.announcement.as_ref().unwrap();
+ if !chain_hash_set {
+ chain_hash_set = true;
+ serialization_set.chain_hash = channel_announcement_delta.announcement.chain_hash.clone();
+ }
+
+ let current_announcement_seen = channel_announcement_delta.seen;
+ let is_new_announcement = current_announcement_seen >= last_sync_timestamp;
+ let is_newly_updated_announcement = if let Some(first_update_seen) = channel_delta.first_update_seen {
+ first_update_seen >= last_sync_timestamp
+ } else {
+ false
+ };
+ let send_announcement = is_new_announcement || is_newly_updated_announcement;
+ if send_announcement {
+ serialization_set.latest_seen = max(serialization_set.latest_seen, current_announcement_seen);
+ serialization_set.announcements.push(channel_delta.announcement.unwrap().announcement);
+ }
+
+ let direction_a_updates = channel_delta.updates.0;
+ let direction_b_updates = channel_delta.updates.1;
+
+ let mut categorize_directed_update_serialization = |directed_updates: Option<DirectedUpdateDelta>| {
+ if let Some(updates) = directed_updates {
+ if let Some(latest_update_delta) = updates.latest_update_after_seen {
+ let latest_update = latest_update_delta.update;
+
+ // the returned seen timestamp should be the latest of all the returned
+ // announcements and latest updates
+ serialization_set.latest_seen = max(serialization_set.latest_seen, latest_update_delta.seen);
+
+ if updates.last_update_before_seen.is_some() {
+ let mutated_properties = updates.mutated_properties;
+ if mutated_properties.len() == 5 {
+ // all five values have changed, it makes more sense to just
+ // serialize the update as a full update instead of as a change
+ // this way, the default values can be computed more efficiently
+ record_full_update_in_histograms(&latest_update);
+ serialization_set.updates.push(UpdateSerialization {
+ update: latest_update,
+ mechanism: UpdateSerializationMechanism::Full,
+ });
+ } else if mutated_properties.len() > 0 || mutated_properties.flags {
+ // we don't count flags as mutated properties
+ serialization_set.updates.push(UpdateSerialization {
+ update: latest_update,
+ mechanism: UpdateSerializationMechanism::Incremental(mutated_properties),
+ });
+ }
+ } else {
+ // serialize the full update
+ record_full_update_in_histograms(&latest_update);
+ serialization_set.updates.push(UpdateSerialization {
+ update: latest_update,
+ mechanism: UpdateSerializationMechanism::Full,
+ });
+ }
+ }
+ };
+ };
+
+ categorize_directed_update_serialization(direction_a_updates);
+ categorize_directed_update_serialization(direction_b_updates);
+ }
+
+ let default_update_values = DefaultUpdateValues {
+ cltv_expiry_delta: find_most_common_histogram_entry_with_default(full_update_histograms.cltv_expiry_delta, 0),
+ htlc_minimum_msat: find_most_common_histogram_entry_with_default(full_update_histograms.htlc_minimum_msat, 0),
+ fee_base_msat: find_most_common_histogram_entry_with_default(full_update_histograms.fee_base_msat, 0),
+ fee_proportional_millionths: find_most_common_histogram_entry_with_default(full_update_histograms.fee_proportional_millionths, 0),
+ htlc_maximum_msat: find_most_common_histogram_entry_with_default(full_update_histograms.htlc_maximum_msat, 0),
+ };
+
+ serialization_set.full_update_defaults = default_update_values;
+ serialization_set
+}
+
+pub fn serialize_stripped_channel_announcement(announcement: &UnsignedChannelAnnouncement, node_id_a_index: usize, node_id_b_index: usize, previous_scid: u64) -> Vec<u8> {
+ let mut stripped_announcement = vec![];
+
+ announcement.features.write(&mut stripped_announcement).unwrap();
+
+ if previous_scid > announcement.short_channel_id {
+ panic!("unsorted scids!");
+ }
+ let scid_delta = BigSize(announcement.short_channel_id - previous_scid);
+ scid_delta.write(&mut stripped_announcement).unwrap();
+
+ // write indices of node ids rather than the node IDs themselves
+ BigSize(node_id_a_index as u64).write(&mut stripped_announcement).unwrap();
+ BigSize(node_id_b_index as u64).write(&mut stripped_announcement).unwrap();
+
+ // println!("serialized CA: {}, \n{:?}\n{:?}\n", announcement.short_channel_id, announcement.node_id_1, announcement.node_id_2);
+ stripped_announcement
+}
+
+pub(super) fn serialize_stripped_channel_update(update: &UpdateSerialization, default_values: &DefaultUpdateValues, previous_scid: u64) -> Vec<u8> {
+ let latest_update = &update.update;
+ let mut serialized_flags = latest_update.flags;
+
+ if previous_scid > latest_update.short_channel_id {
+ panic!("unsorted scids!");
+ }
+
+ let mut delta_serialization = Vec::new();
+ let mut prefixed_serialization = Vec::new();
+
+ match &update.mechanism {
+ UpdateSerializationMechanism::Full => {
+ if latest_update.cltv_expiry_delta != default_values.cltv_expiry_delta {
+ serialized_flags |= 0b_0100_0000;
+ latest_update.cltv_expiry_delta.write(&mut delta_serialization).unwrap();
+ }
+
+ if latest_update.htlc_minimum_msat != default_values.htlc_minimum_msat {
+ serialized_flags |= 0b_0010_0000;
+ latest_update.htlc_minimum_msat.write(&mut delta_serialization).unwrap();
+ }
+
+ if latest_update.fee_base_msat != default_values.fee_base_msat {
+ serialized_flags |= 0b_0001_0000;
+ latest_update.fee_base_msat.write(&mut delta_serialization).unwrap();
+ }
+
+ if latest_update.fee_proportional_millionths != default_values.fee_proportional_millionths {
+ serialized_flags |= 0b_0000_1000;
+ latest_update.fee_proportional_millionths.write(&mut delta_serialization).unwrap();
+ }
+
+ if latest_update.htlc_maximum_msat != default_values.htlc_maximum_msat {
+ serialized_flags |= 0b_0000_0100;
+ latest_update.htlc_maximum_msat.write(&mut delta_serialization).unwrap();
+ }
+ }
+
+ UpdateSerializationMechanism::Incremental(mutated_properties) => {
+ // indicate that this update is incremental
+ serialized_flags |= 0b_1000_0000;
+
+ if mutated_properties.cltv_expiry_delta {
+ serialized_flags |= 0b_0100_0000;
+ latest_update.cltv_expiry_delta.write(&mut delta_serialization).unwrap();
+ }
+
+ if mutated_properties.htlc_minimum_msat {
+ serialized_flags |= 0b_0010_0000;
+ latest_update.htlc_minimum_msat.write(&mut delta_serialization).unwrap();
+ }
+
+ if mutated_properties.fee_base_msat {
+ serialized_flags |= 0b_0001_0000;
+ latest_update.fee_base_msat.write(&mut delta_serialization).unwrap();
+ }
+
+ if mutated_properties.fee_proportional_millionths {
+ serialized_flags |= 0b_0000_1000;
+ latest_update.fee_proportional_millionths.write(&mut delta_serialization).unwrap();
+ }
+
+ if mutated_properties.htlc_maximum_msat {
+ serialized_flags |= 0b_0000_0100;
+ latest_update.htlc_maximum_msat.write(&mut delta_serialization).unwrap();
+ }
+ }
+ }
+ let scid_delta = BigSize(latest_update.short_channel_id - previous_scid);
+ scid_delta.write(&mut prefixed_serialization).unwrap();
+
+ serialized_flags.write(&mut prefixed_serialization).unwrap();
+ prefixed_serialization.append(&mut delta_serialization);
+
+ prefixed_serialization
+}
+
+pub(super) fn find_most_common_histogram_entry_with_default<T: Copy>(histogram: HashMap<T, usize>, default: T) -> T {
+ let most_frequent_entry = histogram.iter().max_by(|a, b| a.1.cmp(&b.1));
+ if let Some(entry_details) = most_frequent_entry {
+ // .0 is the value
+ // .1 is the frequency
+ return entry_details.0.to_owned();
+ }
+ // the default should pretty much always be a 0 as T
+ // though for htlc maximum msat it could be a u64::max
+ default
+}
--- /dev/null
+use std::collections::HashMap;
+use std::fs;
+use std::os::unix::fs::symlink;
+use std::sync::Arc;
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
+
+use lightning::routing::gossip::NetworkGraph;
+
+use crate::{config, TestLogger};
+
+pub(crate) struct Snapshotter {
+ network_graph: Arc<NetworkGraph<Arc<TestLogger>>>,
+}
+
+impl Snapshotter {
+ pub fn new(network_graph: Arc<NetworkGraph<Arc<TestLogger>>>) -> Self {
+ Self { network_graph }
+ }
+
+ pub(crate) async fn snapshot_gossip(&self) {
+ println!("Initiating snapshotting service");
+
+ let snapshot_sync_day_factors = [1, 2, 3, 4, 5, 6, 7, 14, 21, u64::MAX];
+ let round_day_seconds = config::SNAPSHOT_CALCULATION_INTERVAL as u64;
+
+ let pending_snapshot_directory = "./res/snapshots_pending";
+ let pending_symlink_directory = "./res/symlinks_pending";
+ let finalized_snapshot_directory = "./res/snapshots";
+ let finalized_symlink_directory = "./res/symlinks";
+ let relative_symlink_to_snapshot_path = "../snapshots";
+
+ // this is gonna be a never-ending background job
+ loop {
+ // 1. get the current timestamp
+ let timestamp_seen = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
+ let filename_timestamp = Self::round_down_to_nearest_multiple(timestamp_seen, round_day_seconds);
+ println!("Capturing snapshots at {} for: {}", timestamp_seen, filename_timestamp);
+
+ // 2. sleep until the next round 24 hours
+ // 3. refresh all snapshots
+
+ // the stored snapshots should adhere to the following format
+ // from one day ago
+ // from two days ago
+ // …
+ // from a week ago
+ // from two weeks ago
+ // from three weeks ago
+ // full
+ // That means that at any given moment, there should only ever be
+ // 6 (daily) + 3 (weekly) + 1 (total) = 10 cached snapshots
+ // The snapshots, unlike dynamic updates, should account for all intermediate
+ // channel updates
+ //
+
+
+ // purge and recreate the pending directories
+ if fs::metadata(&pending_snapshot_directory).is_ok(){
+ fs::remove_dir_all(&pending_snapshot_directory).expect("Failed to remove pending snapshot directory.");
+ }
+ if fs::metadata(&pending_symlink_directory).is_ok(){
+ fs::remove_dir_all(&pending_symlink_directory).expect("Failed to remove pending symlink directory.");
+ }
+ fs::create_dir_all(&pending_snapshot_directory).expect("Failed to create pending snapshot directory");
+ fs::create_dir_all(&pending_symlink_directory).expect("Failed to create pending symlink directory");
+
+ let mut snapshot_sync_timestamps: Vec<(u64, u64)> = Vec::new();
+ for factor in &snapshot_sync_day_factors {
+ // basically timestamp - day_seconds * factor
+ let timestamp = timestamp_seen.saturating_sub(round_day_seconds.saturating_mul(factor.clone()));
+ snapshot_sync_timestamps.push((factor.clone(), timestamp));
+ };
+
+ let mut snapshot_filenames_by_day_range: HashMap<u64, String> = HashMap::with_capacity(10);
+
+ for (day_range, current_last_sync_timestamp) in &snapshot_sync_timestamps {
+ let network_graph_clone = self.network_graph.clone();
+ {
+ println!("Calculating {}-day snapshot", day_range);
+ // calculate the snapshot
+ let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32, true).await;
+
+ // persist the snapshot and update the symlink
+ let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-days__previous-sync:{}.lngossip", filename_timestamp, day_range, current_last_sync_timestamp);
+ let snapshot_path = format!("{}/{}", pending_snapshot_directory, snapshot_filename);
+ println!("Persisting {}-day snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", day_range, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental);
+ fs::write(&snapshot_path, snapshot.data).unwrap();
+ snapshot_filenames_by_day_range.insert(day_range.clone(), snapshot_filename);
+ }
+ }
+
+ for i in 1..10_001u64 {
+ // let's create symlinks
+
+ // first, determine which snapshot range should be referenced
+ // find min(x) in snapshot_sync_day_factors where x >= i
+ let referenced_day_range = snapshot_sync_day_factors.iter().find(|x| {
+ x >= &&i
+ }).unwrap().clone();
+
+ let snapshot_filename = snapshot_filenames_by_day_range.get(&referenced_day_range).unwrap();
+ let simulated_last_sync_timestamp = timestamp_seen.saturating_sub(round_day_seconds.saturating_mul(i));
+ let relative_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, snapshot_filename);
+ let canonical_last_sync_timestamp = Self::round_down_to_nearest_multiple(simulated_last_sync_timestamp, round_day_seconds);
+ let symlink_path = format!("{}/{}.bin", pending_symlink_directory, canonical_last_sync_timestamp);
+
+ println!("Symlinking: {} -> {} ({} -> {}", i, referenced_day_range, symlink_path, relative_snapshot_path);
+ symlink(&relative_snapshot_path, &symlink_path).unwrap();
+ }
+
+ if fs::metadata(&finalized_snapshot_directory).is_ok(){
+ fs::remove_dir_all(&finalized_snapshot_directory).expect("Failed to remove finalized snapshot directory.");
+ }
+ if fs::metadata(&finalized_symlink_directory).is_ok(){
+ fs::remove_dir_all(&finalized_symlink_directory).expect("Failed to remove pending symlink directory.");
+ }
+ fs::rename(&pending_snapshot_directory, &finalized_snapshot_directory).expect("Failed to finalize snapshot directory.");
+ fs::rename(&pending_symlink_directory, &finalized_symlink_directory).expect("Failed to finalize symlink directory.");
+
+
+ let remainder = timestamp_seen % round_day_seconds;
+ let time_until_next_day = round_day_seconds - remainder;
+
+ println!("Sleeping until next snapshot capture: {}s", time_until_next_day);
+ // add in an extra five seconds to assure the rounding down works correctly
+ let sleep = tokio::time::sleep(Duration::from_secs(time_until_next_day + 5));
+ sleep.await;
+ }
+ }
+
+ fn round_down_to_nearest_multiple(number: u64, multiple: u64) -> u64 {
+ let round_multiple_delta = number % multiple;
+ number - round_multiple_delta
+ }
+}
--- /dev/null
+use std::net::SocketAddr;
+use std::sync::{Arc, RwLock};
+use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
+
+use bitcoin::hashes::hex::ToHex;
+use bitcoin::secp256k1::{PublicKey, SecretKey};
+use futures::executor;
+use lightning;
+use lightning::ln::peer_handler::{
+ ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager,
+};
+use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
+use rand::{Rng, thread_rng};
+use tokio::sync::mpsc;
+
+use crate::{config, TestLogger};
+use crate::downloader::{GossipCounter, GossipRouter};
+use crate::types::{DetectedGossipMessage, GossipChainAccess, GossipMessage, GossipPeerManager};
+use crate::verifier::ChainVerifier;
+
+pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<DetectedGossipMessage>, network_graph: Arc<NetworkGraph<Arc<TestLogger>>>) {
+ let mut key = [0; 32];
+ let mut random_data = [0; 32];
+ thread_rng().fill_bytes(&mut key);
+ thread_rng().fill_bytes(&mut random_data);
+ let our_node_secret = SecretKey::from_slice(&key).unwrap();
+
+ let _arc_chain_access = None::<GossipChainAccess>;
+ let arc_chain_access = Some(Arc::new(ChainVerifier::new()));
+ let ignorer = IgnoringMessageHandler {};
+ let arc_ignorer = Arc::new(ignorer);
+
+ let errorer = ErroringMessageHandler::new();
+ let arc_errorer = Arc::new(errorer);
+
+ let logger = TestLogger::new();
+ let arc_logger = Arc::new(logger);
+
+ let router = P2PGossipSync::new(
+ network_graph.clone(),
+ arc_chain_access,
+ Arc::clone(&arc_logger),
+ );
+ let arc_router = Arc::new(router);
+ let wrapped_router = GossipRouter {
+ native_router: arc_router,
+ counter: RwLock::new(GossipCounter::new()),
+ sender: persistence_sender.clone(),
+ };
+ let arc_wrapped_router = Arc::new(wrapped_router);
+
+ let message_handler = MessageHandler {
+ chan_handler: arc_errorer,
+ route_handler: arc_wrapped_router.clone(),
+ };
+ let peer_handler = PeerManager::new(
+ message_handler,
+ our_node_secret,
+ &random_data,
+ Arc::clone(&arc_logger),
+ arc_ignorer,
+ );
+ let arc_peer_handler = Arc::new(peer_handler);
+
+ println!("Connecting to Lightning peers…");
+ let peers = config::ln_peers();
+ let mut connected_peer_count = 0;
+
+ for current_peer in peers {
+ let initial_connection_succeeded = monitor_peer_connection(current_peer, Arc::clone(&arc_peer_handler));
+ if initial_connection_succeeded {
+ connected_peer_count += 1;
+ }
+ }
+
+ if connected_peer_count < 1 {
+ panic!("Failed to connect to any peer.");
+ }
+
+ println!("Connected to {} Lightning peers!", connected_peer_count);
+
+ let local_router = arc_wrapped_router.clone();
+ let local_persistence_sender = persistence_sender.clone();
+ tokio::spawn(async move {
+ let mut previous_announcement_count = 0u64;
+ let mut previous_update_count = 0u64;
+ let mut is_caught_up_with_gossip = false;
+
+ let mut i = 0u32;
+ let mut latest_new_gossip_time = Instant::now();
+ let mut needs_to_notify_persister = false;
+
+ loop {
+ i += 1; // count the background activity
+ let sleep = tokio::time::sleep(Duration::from_secs(5));
+ sleep.await;
+
+ let current_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
+ let router_clone = Arc::clone(&local_router);
+
+ {
+ let counter = router_clone.counter.read().unwrap();
+ let total_message_count = counter.channel_announcements + counter.channel_updates;
+ let new_message_count = total_message_count - previous_announcement_count - previous_update_count;
+
+ let was_previously_caught_up_with_gossip = is_caught_up_with_gossip;
+ // TODO: make new message threshold (20) adjust based on connected peer count
+ is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0;
+ if new_message_count > 0 {
+ latest_new_gossip_time = Instant::now();
+ }
+
+ // if we either aren't caught up, or just stopped/started being caught up
+ if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) {
+ println!(
+ "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n",
+ i,
+ total_message_count,
+ new_message_count,
+ counter.channel_announcements,
+ counter.channel_announcements_with_mismatched_scripts,
+ counter.channel_updates,
+ counter.channel_updates_without_htlc_max_msats
+ );
+ } else {
+ println!("Monitoring for gossip…")
+ }
+
+ if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip {
+ println!("caught up with gossip!");
+ needs_to_notify_persister = true;
+ } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip {
+ println!("Received new messages since catching up with gossip!");
+ }
+
+ let continuous_caught_up_duration = latest_new_gossip_time.elapsed();
+ if continuous_caught_up_duration.as_secs() > 600 {
+ eprintln!("No new gossip messages in 10 minutes! Something's amiss!");
+ }
+
+ previous_announcement_count = counter.channel_announcements;
+ previous_update_count = counter.channel_updates;
+ }
+
+ if needs_to_notify_persister {
+ needs_to_notify_persister = false;
+ let sender = local_persistence_sender.clone();
+ tokio::spawn(async move {
+ let _ = sender.send(DetectedGossipMessage {
+ timestamp_seen: current_timestamp as u32,
+ message: GossipMessage::InitialSyncComplete,
+ }).await;
+ });
+ }
+ }
+ });
+}
+
+fn monitor_peer_connection(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool {
+ let peer_manager_clone = Arc::clone(&peer_manager);
+ eprintln!("Connecting to peer {}@{}…", current_peer.0.to_hex(), current_peer.1.to_string());
+ let connection = executor::block_on(async move {
+ lightning_net_tokio::connect_outbound(
+ peer_manager_clone,
+ current_peer.0,
+ current_peer.1,
+ ).await
+ });
+ let mut initial_connection_succeeded = false;
+ if let Some(disconnection_future) = connection {
+ eprintln!("Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string());
+ initial_connection_succeeded = true;
+ let peer_manager_clone = Arc::clone(&peer_manager);
+ tokio::spawn(async move {
+ disconnection_future.await;
+ eprintln!("Disconnected from peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string());
+ monitor_peer_connection(current_peer.clone(), peer_manager_clone);
+ });
+ } else {
+ eprintln!("Failed to connect to peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string())
+ };
+ initial_connection_succeeded
+}
--- /dev/null
+use std::sync::Arc;
+
+use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate};
+use lightning::ln::peer_handler::{ErroringMessageHandler, IgnoringMessageHandler, PeerManager};
+use lightning::util::logger::{Logger, Record};
+
+use crate::downloader::GossipRouter;
+use crate::verifier::ChainVerifier;
+
+pub(crate) type GossipChainAccess = Arc<ChainVerifier>;
+pub(crate) type GossipPeerManager = Arc<PeerManager<lightning_net_tokio::SocketDescriptor, Arc<ErroringMessageHandler>, Arc<GossipRouter>, Arc<TestLogger>, Arc<IgnoringMessageHandler>>>;
+
+pub(crate) enum GossipMessage {
+ ChannelAnnouncement(ChannelAnnouncement),
+ ChannelUpdate(ChannelUpdate),
+ InitialSyncComplete,
+}
+
+pub(crate) struct DetectedGossipMessage {
+ pub(crate) timestamp_seen: u32,
+ pub(crate) message: GossipMessage,
+}
+
+pub(crate) struct TestLogger {}
+
+impl TestLogger {
+ pub(crate) fn new() -> TestLogger {
+ Self {}
+ }
+}
+
+impl Logger for TestLogger {
+ fn log(&self, record: &Record) {
+ // TODO: allow log level threshold to be set
+ println!("{:<5} [{} : {}, {}] {}", record.level.to_string(), record.module_path, record.file, record.line, record.args);
+ }
+}
--- /dev/null
+use std::convert::TryInto;
+use std::sync::Arc;
+
+use bitcoin::{BlockHash, TxOut};
+use bitcoin::blockdata::block::Block;
+use bitcoin::hashes::Hash;
+use futures::executor;
+use lightning::chain;
+use lightning::chain::AccessError;
+use lightning_block_sync::BlockSource;
+use lightning_block_sync::http::BinaryResponse;
+use lightning_block_sync::rest::RestClient;
+
+use crate::config;
+
+pub(crate) struct ChainVerifier {
+ rest_client: Arc<RestClient>,
+}
+
+struct RestBinaryResponse(Vec<u8>);
+
+impl ChainVerifier {
+ pub(crate) fn new() -> Self {
+ let rest_client = RestClient::new(config::bitcoin_rest_endpoint()).unwrap();
+ ChainVerifier {
+ rest_client: Arc::new(rest_client),
+ }
+ }
+
+ fn retrieve_block(&self, block_height: u32) -> Result<Block, AccessError> {
+ let rest_client = self.rest_client.clone();
+ executor::block_on(async move {
+ let block_hash_result = rest_client.request_resource::<BinaryResponse, RestBinaryResponse>(&format!("blockhashbyheight/{}.bin", block_height)).await;
+ let block_hash: Vec<u8> = block_hash_result.map_err(|error| {
+ eprintln!("Could't find block hash at height {}: {}", block_height, error.to_string());
+ AccessError::UnknownChain
+ })?.0;
+ let block_hash = BlockHash::from_slice(&block_hash).unwrap();
+
+ let block_result = rest_client.get_block(&block_hash).await;
+ let block = block_result.map_err(|error| {
+ eprintln!("Couldn't retrieve block {}: {:?} ({})", block_height, error, block_hash);
+ AccessError::UnknownChain
+ })?;
+ Ok(block)
+ })
+ }
+}
+
+impl chain::Access for ChainVerifier {
+ fn get_utxo(&self, _genesis_hash: &BlockHash, short_channel_id: u64) -> Result<TxOut, AccessError> {
+ let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes
+ let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32;
+ let output_index = (short_channel_id & 0xffff) as u16;
+
+ let block = self.retrieve_block(block_height)?;
+ let transaction = block.txdata.get(transaction_index as usize).ok_or_else(|| {
+ eprintln!("Transaction index {} out of bounds in block {} ({})", transaction_index, block_height, block.block_hash().to_string());
+ AccessError::UnknownTx
+ })?;
+ let output = transaction.output.get(output_index as usize).ok_or_else(|| {
+ eprintln!("Output index {} out of bounds in transaction {}", output_index, transaction.txid().to_string());
+ AccessError::UnknownTx
+ })?;
+ Ok(output.clone())
+ }
+}
+
+impl TryInto<RestBinaryResponse> for BinaryResponse {
+ type Error = std::io::Error;
+
+ fn try_into(self) -> Result<RestBinaryResponse, Self::Error> {
+ Ok(RestBinaryResponse(self.0))
+ }
+}