From 6f44a3ae6ebcd5856560ebcc3bae5122b46c0426 Mon Sep 17 00:00:00 2001 From: Arik Sosman Date: Sun, 21 Aug 2022 18:18:35 -0700 Subject: [PATCH] Initial commit. --- .editorconfig | 13 ++ .github/workflows/build.yml | 39 +++++ Cargo.toml | 24 +++ LICENSE | 21 --- LICENSE-APACHE.md | 201 ++++++++++++++++++++++++ LICENSE-MIT.md | 16 ++ LICENSE.md | 14 ++ README.md | 88 +++++++++++ src/config.rs | 102 ++++++++++++ src/downloader.rs | 122 +++++++++++++++ src/hex_utils.rs | 42 +++++ src/lib.rs | 254 ++++++++++++++++++++++++++++++ src/lookup.rs | 283 +++++++++++++++++++++++++++++++++ src/main.rs | 7 + src/persistence.rs | 253 ++++++++++++++++++++++++++++++ src/serialization.rs | 305 ++++++++++++++++++++++++++++++++++++ src/snapshot.rs | 135 ++++++++++++++++ src/tracking.rs | 183 ++++++++++++++++++++++ src/types.rs | 37 +++++ src/verifier.rs | 75 +++++++++ 20 files changed, 2193 insertions(+), 21 deletions(-) create mode 100644 .editorconfig create mode 100644 .github/workflows/build.yml create mode 100644 Cargo.toml delete mode 100644 LICENSE create mode 100644 LICENSE-APACHE.md create mode 100644 LICENSE-MIT.md create mode 100644 LICENSE.md create mode 100644 README.md create mode 100644 src/config.rs create mode 100644 src/downloader.rs create mode 100644 src/hex_utils.rs create mode 100644 src/lib.rs create mode 100644 src/lookup.rs create mode 100644 src/main.rs create mode 100644 src/persistence.rs create mode 100644 src/serialization.rs create mode 100644 src/snapshot.rs create mode 100644 src/tracking.rs create mode 100644 src/types.rs create mode 100644 src/verifier.rs diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..33d2d0c --- /dev/null +++ b/.editorconfig @@ -0,0 +1,13 @@ +root = true + +[*] +insert_final_newline = true +trim_trailing_whitespace = true + +[*.rs] +indent_style = tab +indent_size = 4 + +[*.yml] +indent_style = space +indent_size = 2 diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml new file mode 100644 index 0000000..9ac4c21 --- /dev/null +++ b/.github/workflows/build.yml @@ -0,0 +1,39 @@ +name: Cross-platform build verification +on: + push: + branches: + - main + pull_request: + branches: + - "*" + +jobs: + build: + strategy: + fail-fast: false + matrix: + toolchain: + - stable + - 1.48.0 + - beta + pinning: [true, false] + runs-on: ubuntu-latest + steps: + - name: Checkout source code + uses: actions/checkout@v3 + - name: Install Rust ${{ matrix.toolchain }} toolchain + uses: actions-rs/toolchain@v1 + with: + toolchain: ${{ matrix.toolchain }} + override: true + profile: minimal + - name: Pin dependencies + if: ${{ matrix.pinning }} + run: | + cargo update -p tokio-postgres --precise "0.7.5" --verbose + cargo update -p postgres-types --precise "0.2.3" --verbose + cargo update -p tokio --precise "1.14.1" --verbose + cargo update -p cpufeatures --precise "0.2.2" --verbose # https://github.com/RustCrypto/utils/issues/795 + - name: Build on Rust ${{ matrix.toolchain }} + run: | + cargo build --verbose --color always diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..003f373 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "rapid-gossip-sync-server" +version = "0.1.0" +edition = "2018" + +[dependencies] +base64 = "0.13.0" +bech32 = "0.8" +bitcoin = "0.28.1" +bitcoin-bech32 = "0.12" +lightning = { version = "0.0.110" } +lightning-block-sync = { version = "0.0.110", features=["rest-client"] } +lightning-net-tokio = { version = "0.0.110" } +chrono = "0.4" +futures = "0.3" +hex = "0.3" +rand = "0.4" +tokio = { version = "1.14.1", features = ["full"] } +tokio-postgres = { version="0.7.5" } + +[profile.release] +opt-level = 3 +lto = true +panic = "abort" diff --git a/LICENSE b/LICENSE deleted file mode 100644 index 265ffe7..0000000 --- a/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -MIT License - -Copyright (c) 2022 Lightning Dev Kit - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. diff --git a/LICENSE-APACHE.md b/LICENSE-APACHE.md new file mode 100644 index 0000000..261eeb9 --- /dev/null +++ b/LICENSE-APACHE.md @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/LICENSE-MIT.md b/LICENSE-MIT.md new file mode 100644 index 0000000..9d982a4 --- /dev/null +++ b/LICENSE-MIT.md @@ -0,0 +1,16 @@ +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/LICENSE.md b/LICENSE.md new file mode 100644 index 0000000..c3f44ca --- /dev/null +++ b/LICENSE.md @@ -0,0 +1,14 @@ +This software is licensed under [Apache 2.0](LICENSE-APACHE) or +[MIT](LICENSE-MIT), at your option. + +Some files retain their own copyright notice, however, for full authorship +information, see version control history. + +Except as otherwise noted in individual files, all files in this repository are +licensed under the Apache License, Version 2.0 or the MIT license , at your option. + +You may not use, copy, modify, merge, publish, distribute, sublicense, and/or +sell copies of this software or any files in this repository except in +accordance with one or both of these licenses. diff --git a/README.md b/README.md new file mode 100644 index 0000000..1df2902 --- /dev/null +++ b/README.md @@ -0,0 +1,88 @@ +# rust-ln-sync + +This is a server that connects to peers on the Lightning network and calculates compact rapid sync +gossip data. + +These are the components it's comprised of. + +## config + +A config file where the Postgres credentials and Lightning peers can be adjusted. Most adjustments +can be made by setting environment variables, whose usage is as follows: + +| Name | Default | Description | +|:-------------------------|:--------------|:-----------------------------------------------------------------------------------------------------------| +| RUST_LN_SYNC_DB_HOST | localhost | Domain of the Postgres database | +| RUST_LN_SYNC_DB_USER | alice | Username to access Postgres | +| RUST_LN_SYNC_DB_PASSWORD | _None_ | Password to access Postgres | +| RUST_LN_SYNC_DB_NAME | ln_graph_sync | Name of the database to be used for gossip storage | +| BITCOIN_REST_DOMAIN | 127.0.0.1 | Domain of the [bitcoind REST server](https://github.com/bitcoin/bitcoin/blob/master/doc/REST-interface.md) | +| BITCOIN_REST_PORT | 80 | HTTP port of the bitcoind REST server | +| BITCOIN_REST_PATH | /rest/ | Path infix to access the bitcoind REST endpoints | + +Notably, one property needs to be modified in code, namely the `ln_peers()` method. It specifies how +many and which peers to use for retrieving gossip. + +## download + +The module responsible for initiating the scraping of the network graph from its peers. + +## persistence + +The module responsible for persisting all the downloaded graph data to Postgres. + +## server + +The server is responsible for returning dynamic and snapshotted rapid sync data. + +Dynamic sync data is fed a timestamp of the last sync, and it dynamically calculates a delta +such that a minimal change set is returned based on changes which are assumed to have been seen +by the client (ignoring any intermediate changes). Dynamic sync is only available after the first +full graph sync completes on startup. + +Snapshot sync data is also based on a timestamp, but unlike dynamic sync, its responses are +precalculated, which is done in a way that considers the possibility that the client may have +intermittently seen later updates. + +### snapshot + +The snapshotting module is responsible for calculating and storing snapshots. It's started up +as soon as the first full graph sync completes, and then keeps updating the snapshots at a +24-hour-interval. + +### lookup + +The lookup module is responsible for fetching the latest data from the network graph and Postgres, +and reconciling it into an actionable delta set that the server can return in a serialized format. + +It works by collecting all the channels that are currently in the network graph, and gathering +announcements as well as updates for each of them. For the updates specifically, the last update +seen prior to the given timestamp, the latest known updates, and, if necessary, all intermediate +updates are collected. + +Then, any channel that has only had an announcement but never an update is dropped. Additionally, +every channel whose first update was seen after the given timestamp is collected alongside its +announcement. + +Finally, all channel update transitions are evaluated and collected into either a full or an +incremental update. + +## Making a call + +### Dynamic + +Make a call to + +`http://localhost:3030/dynamic/1652644698` + +Where `1652644698` is the last sync timestamp. + +### Snapshotted + +Same as above, but sub `dynamic` for `snapshot`: + +`http://localhost:3030/snapshot/1652644698` + +## License + +MIT diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..4d44efe --- /dev/null +++ b/src/config.rs @@ -0,0 +1,102 @@ +use std::env; +use std::net::SocketAddr; +use bitcoin::secp256k1::PublicKey; +use lightning_block_sync::http::HttpEndpoint; +use tokio_postgres::Config; +use crate::hex_utils; + +pub(crate) const SCHEMA_VERSION: i32 = 1; +pub(crate) const SNAPSHOT_CALCULATION_INTERVAL: u32 = 3600 * 24; // every 24 hours, in seconds +pub(crate) const DOWNLOAD_NEW_GOSSIP: bool = true; + +pub(crate) fn network_graph_cache_path() -> &'static str { + "./res/network_graph.bin" +} + +pub(crate) fn db_connection_config() -> Config { + let mut config = Config::new(); + let host = env::var("RUST_LN_SYNC_DB_HOST").unwrap_or("localhost".to_string()); + let user = env::var("RUST_LN_SYNC_DB_USER").unwrap_or("alice".to_string()); + let db = env::var("RUST_LN_SYNC_DB_NAME").unwrap_or("ln_graph_sync".to_string()); + config.host(&host); + config.user(&user); + config.dbname(&db); + if let Ok(password) = env::var("RUST_LN_SYNC_DB_PASSWORD") { + config.password(&password); + } + config +} + +pub(crate) fn bitcoin_rest_endpoint() -> HttpEndpoint { + let host = env::var("BITCOIN_REST_DOMAIN").unwrap_or("127.0.0.1".to_string()); + let port = env::var("BITCOIN_REST_PORT") + .unwrap_or("80".to_string()) + .parse::() + .expect("BITCOIN_REST_PORT env variable must be a u16."); + let path = env::var("BITCOIN_REST_PATH").unwrap_or("/rest/".to_string()); + HttpEndpoint::for_host(host).with_port(port).with_path(path) +} + +pub(crate) fn db_config_table_creation_query() -> &'static str { + "CREATE TABLE IF NOT EXISTS config ( + id SERIAL PRIMARY KEY, + db_schema integer + )" +} + +pub(crate) fn db_announcement_table_creation_query() -> &'static str { + "CREATE TABLE IF NOT EXISTS channel_announcements ( + id SERIAL PRIMARY KEY, + short_channel_id character varying(255) NOT NULL UNIQUE, + block_height integer, + chain_hash character varying(255), + announcement_signed BYTEA, + seen timestamp NOT NULL DEFAULT NOW() + )" +} + +pub(crate) fn db_channel_update_table_creation_query() -> &'static str { + "CREATE TABLE IF NOT EXISTS channel_updates ( + id SERIAL PRIMARY KEY, + composite_index character varying(255) UNIQUE, + chain_hash character varying(255), + short_channel_id character varying(255), + timestamp bigint, + channel_flags integer, + direction integer, + disable boolean, + cltv_expiry_delta integer, + htlc_minimum_msat bigint, + fee_base_msat integer, + fee_proportional_millionths integer, + htlc_maximum_msat bigint, + blob_signed BYTEA, + seen timestamp NOT NULL DEFAULT NOW() + )" +} + +pub(crate) fn db_index_creation_query() -> &'static str { + " + CREATE INDEX IF NOT EXISTS channels_seen ON channel_announcements(seen); + CREATE INDEX IF NOT EXISTS channel_updates_scid ON channel_updates(short_channel_id); + CREATE INDEX IF NOT EXISTS channel_updates_direction ON channel_updates(direction); + CREATE INDEX IF NOT EXISTS channel_updates_seen ON channel_updates(seen); + " +} + +/// EDIT ME +pub(crate) fn ln_peers() -> Vec<(PublicKey, SocketAddr)> { + vec![ + // Bitfinex + // (hex_utils::to_compressed_pubkey("033d8656219478701227199cbd6f670335c8d408a92ae88b962c49d4dc0e83e025").unwrap(), "34.65.85.39:9735".parse().unwrap()), + + // Matt Corallo + // (hex_utils::to_compressed_pubkey("03db10aa09ff04d3568b0621750794063df401e6853c79a21a83e1a3f3b5bfb0c8").unwrap(), "69.59.18.80:9735".parse().unwrap()) + + // River Financial + // (hex_utils::to_compressed_pubkey("03037dc08e9ac63b82581f79b662a4d0ceca8a8ca162b1af3551595b8f2d97b70a").unwrap(), "104.196.249.140:9735".parse().unwrap()) + + // Wallet of Satoshi | 035e4ff418fc8b5554c5d9eea66396c227bd429a3251c8cbc711002ba215bfc226@170.75.163.209:9735 + (hex_utils::to_compressed_pubkey("035e4ff418fc8b5554c5d9eea66396c227bd429a3251c8cbc711002ba215bfc226").unwrap(), "170.75.163.209:9735".parse().unwrap()) + ] +} diff --git a/src/downloader.rs b/src/downloader.rs new file mode 100644 index 0000000..ccb75c8 --- /dev/null +++ b/src/downloader.rs @@ -0,0 +1,122 @@ +use std::sync::{Arc, RwLock}; +use std::time::{SystemTime, UNIX_EPOCH}; + +use bitcoin::secp256k1::PublicKey; +use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler}; +use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; +use lightning::util::events::{MessageSendEvent, MessageSendEventsProvider}; +use tokio::sync::mpsc; + +use crate::{GossipChainAccess, TestLogger}; +use crate::types::{DetectedGossipMessage, GossipMessage}; + +pub(crate) struct GossipCounter { + pub(crate) channel_announcements: u64, + pub(crate) channel_updates: u64, + pub(crate) channel_updates_without_htlc_max_msats: u64, + pub(crate) channel_announcements_with_mismatched_scripts: u64 +} + +impl GossipCounter { + pub(crate) fn new() -> Self { + Self { + channel_announcements: 0, + channel_updates: 0, + channel_updates_without_htlc_max_msats: 0, + channel_announcements_with_mismatched_scripts: 0, + } + } +} + +pub(crate) struct GossipRouter { + pub(crate) native_router: Arc>>, GossipChainAccess, Arc>>, + pub(crate) counter: RwLock, + pub(crate) sender: mpsc::Sender, +} + +impl MessageSendEventsProvider for GossipRouter { + fn get_and_clear_pending_msg_events(&self) -> Vec { + self.native_router.get_and_clear_pending_msg_events() + } +} + +impl RoutingMessageHandler for GossipRouter { + fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result { + self.native_router.handle_node_announcement(msg) + } + + fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result { + let timestamp_seen = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); + + let mut counter = self.counter.write().unwrap(); + + let output_value = self.native_router.handle_channel_announcement(msg).map_err(|error| { + let error_string = format!("{:?}", error); + if error_string.contains("announced on an unknown chain"){ + return error; + } + counter.channel_announcements_with_mismatched_scripts += 1; + error + })?; + + counter.channel_announcements += 1; + let gossip_message = GossipMessage::ChannelAnnouncement(msg.clone()); + let detected_gossip_message = DetectedGossipMessage { + message: gossip_message, + timestamp_seen: timestamp_seen as u32, + }; + let sender = self.sender.clone(); + tokio::spawn(async move { + let _ = sender.send(detected_gossip_message).await; + }); + + Ok(output_value) + } + + fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result { + let timestamp_seen = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); + let output_value = self.native_router.handle_channel_update(msg)?; + + let mut counter = self.counter.write().unwrap(); + counter.channel_updates += 1; + let gossip_message = GossipMessage::ChannelUpdate(msg.clone()); + let detected_gossip_message = DetectedGossipMessage { + message: gossip_message, + timestamp_seen: timestamp_seen as u32, + }; + let sender = self.sender.clone(); + tokio::spawn(async move { + let _ = sender.send(detected_gossip_message).await; + }); + + Ok(output_value) + } + + fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(ChannelAnnouncement, Option, Option)> { + self.native_router.get_next_channel_announcements(starting_point, batch_amount) + } + + fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec { + self.native_router.get_next_node_announcements(starting_point, batch_amount) + } + + fn peer_connected(&self, their_node_id: &PublicKey, init: &Init) { + self.native_router.peer_connected(their_node_id, init) + } + + fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError> { + self.native_router.handle_reply_channel_range(their_node_id, msg) + } + + fn handle_reply_short_channel_ids_end(&self, their_node_id: &PublicKey, msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> { + self.native_router.handle_reply_short_channel_ids_end(their_node_id, msg) + } + + fn handle_query_channel_range(&self, their_node_id: &PublicKey, msg: QueryChannelRange) -> Result<(), LightningError> { + self.native_router.handle_query_channel_range(their_node_id, msg) + } + + fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError> { + self.native_router.handle_query_short_channel_ids(their_node_id, msg) + } +} diff --git a/src/hex_utils.rs b/src/hex_utils.rs new file mode 100644 index 0000000..cab5a15 --- /dev/null +++ b/src/hex_utils.rs @@ -0,0 +1,42 @@ +use bitcoin::secp256k1::PublicKey; + +pub fn to_vec(hex: &str) -> Option> { + let mut out = Vec::with_capacity(hex.len() / 2); + + let mut b = 0; + for (idx, c) in hex.as_bytes().iter().enumerate() { + b <<= 4; + match *c { + b'A'..=b'F' => b |= c - b'A' + 10, + b'a'..=b'f' => b |= c - b'a' + 10, + b'0'..=b'9' => b |= c - b'0', + _ => return None, + } + if (idx & 1) == 1 { + out.push(b); + b = 0; + } + } + + Some(out) +} + +#[inline] +pub fn hex_str(value: &[u8]) -> String { + let mut res = String::with_capacity(64); + for v in value { + res += &format!("{:02x}", v); + } + res +} + +pub fn to_compressed_pubkey(hex: &str) -> Option { + let data = match to_vec(&hex[0..33 * 2]) { + Some(bytes) => bytes, + None => return None, + }; + match PublicKey::from_slice(&data) { + Ok(pk) => Some(pk), + Err(_) => None, + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..fe82489 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,254 @@ +#![deny(unsafe_code)] +#![deny(broken_intra_doc_links)] +#![deny(private_intra_doc_links)] +#![deny(non_upper_case_globals)] +#![deny(non_camel_case_types)] +#![deny(non_snake_case)] +#![deny(unused_mut)] +#![deny(unused_variables)] +#![deny(unused_imports)] + +extern crate core; + +use std::collections::{HashMap, HashSet}; +use std::fs::File; +use std::io::BufReader; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; + +use bitcoin::blockdata::constants::genesis_block; +use bitcoin::Network; +use bitcoin::secp256k1::PublicKey; +use lightning::routing::gossip::NetworkGraph; +use lightning::util::ser::{ReadableArgs, Writeable}; +use tokio::sync::mpsc; +use crate::lookup::DeltaSet; + +use crate::persistence::GossipPersister; +use crate::serialization::UpdateSerializationMechanism; +use crate::snapshot::Snapshotter; +use crate::types::{GossipChainAccess, TestLogger}; + +mod downloader; +mod types; +mod tracking; +mod lookup; +mod persistence; +mod serialization; +mod snapshot; +mod config; +mod hex_utils; +mod verifier; + +pub struct RapidSyncProcessor { + network_graph: Arc>>, + pub initial_sync_complete: Arc, +} + +pub struct SerializedResponse { + pub data: Vec, + pub message_count: u32, + pub announcement_count: u32, + pub update_count: u32, + pub update_count_full: u32, + pub update_count_incremental: u32, +} + +impl RapidSyncProcessor { + pub fn new() -> Self { + let logger = TestLogger::new(); + let mut initial_sync_complete = false; + let arc_logger = Arc::new(logger); + let network_graph = if let Ok(file) = File::open(&config::network_graph_cache_path()) { + println!("Initializing from cached network graph…"); + let mut buffered_reader = BufReader::new(file); + let network_graph_result = NetworkGraph::read(&mut buffered_reader, Arc::clone(&arc_logger)); + if let Ok(network_graph) = network_graph_result { + initial_sync_complete = true; + network_graph.remove_stale_channels(); + println!("Initialized from cached network graph!"); + network_graph + } else { + println!("Initialization from cached network graph failed: {}", network_graph_result.err().unwrap()); + NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash(), arc_logger) + } + } else { + NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash(), arc_logger) + }; + let arc_network_graph = Arc::new(network_graph); + let (_sync_termination_sender, _sync_termination_receiver) = mpsc::channel::<()>(1); + Self { + network_graph: arc_network_graph, + initial_sync_complete: Arc::new(AtomicBool::new(initial_sync_complete)), + } + } + + pub async fn start_sync(&self) { + // means to indicate sync completion status within this module + let (sync_completion_sender, mut sync_completion_receiver) = mpsc::channel::<()>(1); + let initial_sync_complete = self.initial_sync_complete.clone(); + + let network_graph = self.network_graph.clone(); + let snapshotter = Snapshotter::new(network_graph.clone()); + + if config::DOWNLOAD_NEW_GOSSIP { + + let mut persister = GossipPersister::new(sync_completion_sender, self.network_graph.clone()); + + let persistence_sender = persister.gossip_persistence_sender.clone(); + println!("Starting gossip download"); + let download_future = tracking::download_gossip(persistence_sender, network_graph.clone()); + tokio::spawn(async move { + // initiate the whole download stuff in the background + download_future.await; + }); + println!("Starting gossip db persistence listener"); + tokio::spawn(async move { + // initiate persistence of the gossip data + let persistence_future = persister.persist_gossip(); + persistence_future.await; + }); + + } else { + sync_completion_sender.send(()).await.unwrap(); + } + + { + let sync_completion = sync_completion_receiver.recv().await; + if sync_completion.is_none() { + panic!("Sync failed!"); + } + initial_sync_complete.store(true, Ordering::Release); + println!("Initial sync complete!"); + + // start the gossip snapshotting service + snapshotter.snapshot_gossip().await; + } + } + + pub async fn serialize_delta(&self, last_sync_timestamp: u32, consider_intermediate_updates: bool) -> SerializedResponse { + crate::serialize_delta(self.network_graph.clone(), last_sync_timestamp, consider_intermediate_updates).await + } +} + +async fn serialize_delta(network_graph: Arc>>, last_sync_timestamp: u32, consider_intermediate_updates: bool) -> SerializedResponse { + let (client, connection) = lookup::connect_to_db().await; + + tokio::spawn(async move { + if let Err(e) = connection.await { + panic!("connection error: {}", e); + } + }); + + let mut output: Vec = vec![]; + + // set a flag if the chain hash is prepended + // chain hash only necessary if either channel announcements or non-incremental updates are present + // for announcement-free incremental-only updates, chain hash can be skipped + + let mut node_id_set: HashSet<[u8; 33]> = HashSet::new(); + let mut node_id_indices: HashMap<[u8; 33], usize> = HashMap::new(); + let mut node_ids: Vec = Vec::new(); + let mut duplicate_node_ids: i32 = 0; + + let mut get_node_id_index = |node_id: PublicKey| { + let serialized_node_id = node_id.serialize(); + if node_id_set.insert(serialized_node_id) { + node_ids.push(node_id); + let index = node_ids.len() - 1; + node_id_indices.insert(serialized_node_id, index); + return index; + } + duplicate_node_ids += 1; + node_id_indices[&serialized_node_id] + }; + + let mut delta_set = DeltaSet::new(); + lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp).await; + println!("announcement channel count: {}", delta_set.len()); + lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp, consider_intermediate_updates).await; + println!("update-fetched channel count: {}", delta_set.len()); + lookup::filter_delta_set(&mut delta_set); + println!("update-filtered channel count: {}", delta_set.len()); + let serialization_details = serialization::serialize_delta_set(delta_set, last_sync_timestamp); + + // process announcements + // write the number of channel announcements to the output + let announcement_count = serialization_details.announcements.len() as u32; + announcement_count.write(&mut output).unwrap(); + let mut previous_announcement_scid = 0; + for current_announcement in serialization_details.announcements { + let id_index_1 = get_node_id_index(current_announcement.node_id_1); + let id_index_2 = get_node_id_index(current_announcement.node_id_2); + let mut stripped_announcement = serialization::serialize_stripped_channel_announcement(¤t_announcement, id_index_1, id_index_2, previous_announcement_scid); + output.append(&mut stripped_announcement); + + previous_announcement_scid = current_announcement.short_channel_id; + } + + // process updates + let mut previous_update_scid = 0; + let update_count = serialization_details.updates.len() as u32; + update_count.write(&mut output).unwrap(); + + let default_update_values = serialization_details.full_update_defaults; + if update_count > 0 { + default_update_values.cltv_expiry_delta.write(&mut output).unwrap(); + default_update_values.htlc_minimum_msat.write(&mut output).unwrap(); + default_update_values.fee_base_msat.write(&mut output).unwrap(); + default_update_values.fee_proportional_millionths.write(&mut output).unwrap(); + default_update_values.htlc_maximum_msat.write(&mut output).unwrap(); + } + + let mut update_count_full = 0; + let mut update_count_incremental = 0; + for current_update in serialization_details.updates { + match ¤t_update.mechanism { + UpdateSerializationMechanism::Full => { + update_count_full += 1; + } + UpdateSerializationMechanism::Incremental(_) => { + update_count_incremental += 1; + } + }; + + let mut stripped_update = serialization::serialize_stripped_channel_update(¤t_update, &default_update_values, previous_update_scid); + output.append(&mut stripped_update); + + previous_update_scid = current_update.update.short_channel_id; + } + + // some stats + let message_count = announcement_count + update_count; + + let mut prefixed_output = vec![76, 68, 75, 1]; + + // always write the chain hash + serialization_details.chain_hash.write(&mut prefixed_output).unwrap(); + // always write the latest seen timestamp + let latest_seen_timestamp = serialization_details.latest_seen; + let overflow_seconds = latest_seen_timestamp % config::SNAPSHOT_CALCULATION_INTERVAL; + let serialized_seen_timestamp = latest_seen_timestamp.saturating_sub(overflow_seconds); + serialized_seen_timestamp.write(&mut prefixed_output).unwrap(); + + let node_id_count = node_ids.len() as u32; + node_id_count.write(&mut prefixed_output).unwrap(); + + for current_node_id in node_ids { + current_node_id.write(&mut prefixed_output).unwrap(); + } + + prefixed_output.append(&mut output); + + println!("duplicated node ids: {}", duplicate_node_ids); + println!("latest seen timestamp: {:?}", serialization_details.latest_seen); + + SerializedResponse { + data: prefixed_output, + message_count, + announcement_count, + update_count, + update_count_full, + update_count_incremental, + } +} diff --git a/src/lookup.rs b/src/lookup.rs new file mode 100644 index 0000000..b6e46a3 --- /dev/null +++ b/src/lookup.rs @@ -0,0 +1,283 @@ +use std::collections::{BTreeMap, HashSet}; +use std::io::Cursor; +use std::ops::Add; +use std::sync::Arc; +use std::time::{Duration, Instant, SystemTime}; + +use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate}; +use lightning::routing::gossip::NetworkGraph; +use lightning::util::ser::Readable; +use tokio_postgres::{Client, Connection, NoTls, Socket}; +use tokio_postgres::tls::NoTlsStream; + +use crate::{config, hex_utils, TestLogger}; +use crate::serialization::MutatedProperties; + +/// The delta set needs to be a BTreeMap so the keys are sorted. +/// That way, the scids in the response automatically grow monotonically +pub(super) type DeltaSet = BTreeMap; + +pub(super) struct AnnouncementDelta { + pub(super) seen: u32, + pub(super) announcement: UnsignedChannelAnnouncement, +} + +pub(super) struct UpdateDelta { + pub(super) seen: u32, + pub(super) update: UnsignedChannelUpdate, +} + +pub(super) struct DirectedUpdateDelta { + pub(super) last_update_before_seen: Option, + pub(super) mutated_properties: MutatedProperties, + pub(super) latest_update_after_seen: Option, +} + +pub(super) struct ChannelDelta { + pub(super) announcement: Option, + pub(super) updates: (Option, Option), + pub(super) first_update_seen: Option, +} + +impl Default for ChannelDelta { + fn default() -> Self { + Self { announcement: None, updates: (None, None), first_update_seen: None } + } +} + +impl Default for DirectedUpdateDelta { + fn default() -> Self { + Self { + last_update_before_seen: None, + mutated_properties: MutatedProperties::default(), + latest_update_after_seen: None, + } + } +} + +pub(super) async fn connect_to_db() -> (Client, Connection) { + let connection_config = config::db_connection_config(); + connection_config.connect(NoTls).await.unwrap() +} + +/// Fetch all the channel announcements that are presently in the network graph, regardless of +/// whether they had been seen before. +/// Also include all announcements for which the first update was announced +/// after `last_syc_timestamp` +pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc>>, client: &Client, last_sync_timestamp: u32) { + let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64)); + println!("Obtaining channel ids from network graph"); + let channel_ids = { + let read_only_graph = network_graph.read_only(); + println!("Retrieved read-only network graph copy"); + let channel_iterator = read_only_graph.channels().into_iter(); + channel_iterator + .filter(|c| c.1.announcement_message.is_some()) + .map(|c| hex_utils::hex_str(&c.1.announcement_message.clone().unwrap().contents.short_channel_id.to_be_bytes())) + .collect::>() + }; + + println!("Obtaining corresponding database entries"); + // get all the channel announcements that are currently in the network graph + let announcement_rows = client.query("SELECT short_channel_id, announcement_signed, seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", &[&channel_ids]).await.unwrap(); + + for current_announcement_row in announcement_rows { + let blob: Vec = current_announcement_row.get("announcement_signed"); + let mut readable = Cursor::new(blob); + let unsigned_announcement = ChannelAnnouncement::read(&mut readable).unwrap().contents; + + let scid = unsigned_announcement.short_channel_id; + let current_seen_timestamp_object: SystemTime = current_announcement_row.get("seen"); + let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32; + + let mut current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default()); + (*current_channel_delta).announcement = Some(AnnouncementDelta { + announcement: unsigned_announcement, + seen: current_seen_timestamp, + }); + } + + println!("Obtaining channel announcements whose first channel updates had not been seen yet"); + + // here is where the channels whose first update in either direction occurred after + // `last_seen_timestamp` are added to the selection + let unannounced_rows = client.query("SELECT short_channel_id, blob_signed, seen FROM (SELECT DISTINCT ON (short_channel_id) short_channel_id, blob_signed, seen FROM channel_updates ORDER BY short_channel_id ASC, seen ASC) AS first_seens WHERE first_seens.seen >= $1", &[&last_sync_timestamp_object]).await.unwrap(); + for current_row in unannounced_rows { + + let blob: Vec = current_row.get("blob_signed"); + let mut readable = Cursor::new(blob); + let unsigned_update = ChannelUpdate::read(&mut readable).unwrap().contents; + let scid = unsigned_update.short_channel_id; + let current_seen_timestamp_object: SystemTime = current_row.get("seen"); + let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32; + + let mut current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default()); + (*current_channel_delta).first_update_seen = Some(current_seen_timestamp); + } +} + +pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32, consider_intermediate_updates: bool) { + let start = Instant::now(); + let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64)); + + // get the latest channel update in each direction prior to last_sync_timestamp, provided + // there was an update in either direction that happened after the last sync (to avoid + // collecting too many reference updates) + let reference_rows = client.query("SELECT DISTINCT ON (short_channel_id, direction) id, short_channel_id, direction, blob_signed FROM channel_updates WHERE seen < $1 AND short_channel_id IN (SELECT short_channel_id FROM channel_updates WHERE seen >= $1 GROUP BY short_channel_id) ORDER BY short_channel_id ASC, direction ASC, seen DESC", &[&last_sync_timestamp_object]).await.unwrap(); + + println!("Fetched reference rows ({}): {:?}", reference_rows.len(), start.elapsed()); + + let mut last_seen_update_ids: Vec = Vec::with_capacity(reference_rows.len()); + let mut non_intermediate_ids: HashSet = HashSet::new(); + + for current_reference in reference_rows { + let update_id: i32 = current_reference.get("id"); + last_seen_update_ids.push(update_id); + non_intermediate_ids.insert(update_id); + + let direction: i32 = current_reference.get("direction"); + let blob: Vec = current_reference.get("blob_signed"); + let mut readable = Cursor::new(blob); + let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents; + let scid = unsigned_channel_update.short_channel_id; + + let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default()); + let mut update_delta = if direction == 0 { + (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default()) + } else if direction == 1 { + (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default()) + } else { + panic!("Channel direction must be binary!") + }; + update_delta.last_update_before_seen = Some(unsigned_channel_update); + + + } + + println!("Processed reference rows (delta size: {}): {:?}", delta_set.len(), start.elapsed()); + + // get all the intermediate channel updates + // (to calculate the set of mutated fields for snapshotting, where intermediate updates may + // have been omitted) + + let mut intermediate_update_prefix = ""; + if !consider_intermediate_updates { + intermediate_update_prefix = "DISTINCT ON (short_channel_id, direction)"; + } + + let query_string = format!("SELECT {} id, short_channel_id, direction, blob_signed, seen FROM channel_updates WHERE seen >= $1 ORDER BY short_channel_id ASC, direction ASC, seen DESC", intermediate_update_prefix); + let intermediate_updates = client.query(&query_string, &[&last_sync_timestamp_object]).await.unwrap(); + println!("Fetched intermediate rows ({}): {:?}", intermediate_updates.len(), start.elapsed()); + + let mut previous_scid = u64::MAX; + let mut previously_seen_directions = (false, false); + + // let mut previously_seen_directions = (false, false); + let mut intermediate_update_count = 0; + for intermediate_update in intermediate_updates { + let update_id: i32 = intermediate_update.get("id"); + if non_intermediate_ids.contains(&update_id) { + continue; + } + intermediate_update_count += 1; + + let direction: i32 = intermediate_update.get("direction"); + let current_seen_timestamp_object: SystemTime = intermediate_update.get("seen"); + let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32; + let blob: Vec = intermediate_update.get("blob_signed"); + let mut readable = Cursor::new(blob); + let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents; + + let scid = unsigned_channel_update.short_channel_id; + if scid != previous_scid { + previous_scid = scid.clone(); + previously_seen_directions = (false, false); + } + + // get the write configuration for this particular channel's directional details + let current_channel_delta = delta_set.entry(scid.clone()).or_insert(ChannelDelta::default()); + let update_delta = if direction == 0 { + (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default()) + } else if direction == 1 { + (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default()) + } else { + panic!("Channel direction must be binary!") + }; + + { + // handle the latest deltas + if direction == 0 && !previously_seen_directions.0 { + previously_seen_directions.0 = true; + update_delta.latest_update_after_seen = Some(UpdateDelta { + seen: current_seen_timestamp, + update: unsigned_channel_update.clone(), + }); + } else if direction == 1 && !previously_seen_directions.1 { + previously_seen_directions.1 = true; + update_delta.latest_update_after_seen = Some(UpdateDelta { + seen: current_seen_timestamp, + update: unsigned_channel_update.clone(), + }); + } + } + + // determine mutations + if let Some(last_seen_update) = update_delta.last_update_before_seen.as_ref(){ + if unsigned_channel_update.flags != last_seen_update.flags { + update_delta.mutated_properties.flags = true; + } + if unsigned_channel_update.cltv_expiry_delta != last_seen_update.cltv_expiry_delta { + update_delta.mutated_properties.cltv_expiry_delta = true; + } + if unsigned_channel_update.htlc_minimum_msat != last_seen_update.htlc_minimum_msat { + update_delta.mutated_properties.htlc_minimum_msat = true; + } + if unsigned_channel_update.fee_base_msat != last_seen_update.fee_base_msat { + update_delta.mutated_properties.fee_base_msat = true; + } + if unsigned_channel_update.fee_proportional_millionths != last_seen_update.fee_proportional_millionths { + update_delta.mutated_properties.fee_proportional_millionths = true; + } + if unsigned_channel_update.htlc_maximum_msat != last_seen_update.htlc_maximum_msat { + update_delta.mutated_properties.htlc_maximum_msat = true; + } + } + + } + println!("Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed()); +} + +pub(super) fn filter_delta_set(delta_set: &mut DeltaSet) { + let original_length = delta_set.len(); + let keys: Vec = delta_set.keys().cloned().collect(); + for k in keys { + let v = delta_set.get(&k).unwrap(); + if v.announcement.is_none() { + // this channel is not currently in the network graph + delta_set.remove(&k); + continue; + } + + let update_meets_criteria = |update: &Option| { + if update.is_none() { + return false; + }; + let update_reference = update.as_ref().unwrap(); + // update_reference.latest_update_after_seen.is_some() && !update_reference.intermediate_updates.is_empty() + // if there has been an update after the channel was first seen + update_reference.latest_update_after_seen.is_some() + }; + + let direction_a_meets_criteria = update_meets_criteria(&v.updates.0); + let direction_b_meets_criteria = update_meets_criteria(&v.updates.1); + + if !direction_a_meets_criteria && !direction_b_meets_criteria { + delta_set.remove(&k); + } + } + + let new_length = delta_set.len(); + if original_length != new_length { + println!("length modified!"); + } +} diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..3e36edf --- /dev/null +++ b/src/main.rs @@ -0,0 +1,7 @@ +use rapid_gossip_sync_server::RapidSyncProcessor; + +#[tokio::main] +async fn main() { + let processor = RapidSyncProcessor::new(); + processor.start_sync().await; +} diff --git a/src/persistence.rs b/src/persistence.rs new file mode 100644 index 0000000..ded79b5 --- /dev/null +++ b/src/persistence.rs @@ -0,0 +1,253 @@ +use std::fs::OpenOptions; +use std::io::BufWriter; +use std::sync::Arc; +use std::time::Instant; +use lightning::routing::gossip::NetworkGraph; +use lightning::util::ser::Writeable; +use tokio::sync::mpsc; +use tokio_postgres::NoTls; + +use crate::{config, hex_utils, TestLogger}; +use crate::types::{DetectedGossipMessage, GossipMessage}; + +pub(crate) struct GossipPersister { + pub(crate) gossip_persistence_sender: mpsc::Sender, + gossip_persistence_receiver: mpsc::Receiver, + server_sync_completion_sender: mpsc::Sender<()>, + network_graph: Arc>>, +} + +impl GossipPersister { + pub fn new(server_sync_completion_sender: mpsc::Sender<()>, network_graph: Arc>>) -> Self { + let (gossip_persistence_sender, gossip_persistence_receiver) = + mpsc::channel::(10000); + GossipPersister { + gossip_persistence_sender, + gossip_persistence_receiver, + server_sync_completion_sender, + network_graph + } + } + + pub(crate) async fn persist_gossip(&mut self) { + let connection_config = config::db_connection_config(); + let (client, connection) = + connection_config.connect(NoTls).await.unwrap(); + + tokio::spawn(async move { + if let Err(e) = connection.await { + panic!("connection error: {}", e); + } + }); + + { + // initialize the database + let initialization = client + .execute(config::db_config_table_creation_query(), &[]) + .await; + if let Err(initialization_error) = initialization { + panic!("db init error: {}", initialization_error); + } + + let initialization = client + .execute( + // TODO: figure out a way to fix the id value without Postgres complaining about + // its value not being default + "INSERT INTO config (id, db_schema) VALUES ($1, $2) ON CONFLICT (id) DO NOTHING", + &[&1, &config::SCHEMA_VERSION] + ).await; + if let Err(initialization_error) = initialization { + panic!("db init error: {}", initialization_error); + } + + let initialization = client + .execute(config::db_announcement_table_creation_query(), &[]) + .await; + if let Err(initialization_error) = initialization { + panic!("db init error: {}", initialization_error); + } + + let initialization = client + .execute( + config::db_channel_update_table_creation_query(), + &[], + ) + .await; + if let Err(initialization_error) = initialization { + panic!("db init error: {}", initialization_error); + } + + let initialization = client + .batch_execute(config::db_index_creation_query()) + .await; + if let Err(initialization_error) = initialization { + panic!("db init error: {}", initialization_error); + } + } + + // print log statement every 10,000 messages + let mut persistence_log_threshold = 10000; + let mut i = 0u32; + let mut server_sync_completion_sent = false; + let mut latest_graph_cache_time: Option = None; + // TODO: it would be nice to have some sort of timeout here so after 10 seconds of + // inactivity, some sort of message could be broadcast signaling the activation of request + // processing + while let Some(detected_gossip_message) = &self.gossip_persistence_receiver.recv().await { + i += 1; // count the persisted gossip messages + + if i == 1 || i % persistence_log_threshold == 0 { + println!("Persisting gossip message #{}", i); + } + + if let Some(last_cache_time) = latest_graph_cache_time { + // has it been ten minutes? Just cache it + if last_cache_time.elapsed().as_secs() >= 600 { + self.persist_network_graph(); + latest_graph_cache_time = Some(Instant::now()); + } + } else { + // initialize graph cache timer + latest_graph_cache_time = Some(Instant::now()); + } + + match &detected_gossip_message.message { + GossipMessage::InitialSyncComplete => { + // signal to the server that it may now serve dynamic responses and calculate + // snapshots + // we take this detour through the persister to ensure that all previous + // messages have already been persisted to the database + println!("Persister caught up with gossip!"); + i -= 1; // this wasn't an actual gossip message that needed persisting + persistence_log_threshold = 50; + if !server_sync_completion_sent { + server_sync_completion_sent = true; + self.server_sync_completion_sender.send(()).await.unwrap(); + println!("Server has been notified of persistence completion."); + } + + // now, cache the persisted network graph + // also persist the network graph here + let mut too_soon = false; + if let Some(latest_graph_cache_time) = latest_graph_cache_time { + let time_since_last_cached = latest_graph_cache_time.elapsed().as_secs(); + // don't cache more frequently than every 2 minutes + too_soon = time_since_last_cached < 120; + } + if too_soon { + println!("Network graph has been cached too recently."); + }else { + latest_graph_cache_time = Some(Instant::now()); + self.persist_network_graph(); + } + } + GossipMessage::ChannelAnnouncement(announcement) => { + + let scid = announcement.contents.short_channel_id; + let scid_hex = hex_utils::hex_str(&scid.to_be_bytes()); + // scid is 8 bytes + // block height is the first three bytes + // to obtain block height, shift scid right by 5 bytes (40 bits) + let block_height = (scid >> 5 * 8) as i32; + let chain_hash = announcement.contents.chain_hash.as_ref(); + let chain_hash_hex = hex_utils::hex_str(chain_hash); + + // start with the type prefix, which is already known a priori + let mut announcement_signed = Vec::new(); // vec![1, 0]; + announcement.write(&mut announcement_signed).unwrap(); + + let result = client + .execute("INSERT INTO channel_announcements (\ + short_channel_id, \ + block_height, \ + chain_hash, \ + announcement_signed \ + ) VALUES ($1, $2, $3, $4) ON CONFLICT (short_channel_id) DO NOTHING", &[ + &scid_hex, + &block_height, + &chain_hash_hex, + &announcement_signed + ]).await; + if result.is_err() { + panic!("error: {}", result.err().unwrap()); + } + } + GossipMessage::ChannelUpdate(update) => { + let scid = update.contents.short_channel_id; + let scid_hex = hex_utils::hex_str(&scid.to_be_bytes()); + + let chain_hash = update.contents.chain_hash.as_ref(); + let chain_hash_hex = hex_utils::hex_str(chain_hash); + + let timestamp = update.contents.timestamp as i64; + + let channel_flags = update.contents.flags as i32; + let direction = channel_flags & 1; + let disable = (channel_flags & 2) > 0; + + let composite_index = format!("{}:{}:{}", scid_hex, timestamp, direction); + + let cltv_expiry_delta = update.contents.cltv_expiry_delta as i32; + let htlc_minimum_msat = update.contents.htlc_minimum_msat as i64; + let fee_base_msat = update.contents.fee_base_msat as i32; + let fee_proportional_millionths = + update.contents.fee_proportional_millionths as i32; + let htlc_maximum_msat = update.contents.htlc_maximum_msat as i64; + + // start with the type prefix, which is already known a priori + let mut update_signed = Vec::new(); // vec![1, 2]; + update.write(&mut update_signed).unwrap(); + + let result = client + .execute("INSERT INTO channel_updates (\ + composite_index, \ + chain_hash, \ + short_channel_id, \ + timestamp, \ + channel_flags, \ + direction, \ + disable, \ + cltv_expiry_delta, \ + htlc_minimum_msat, \ + fee_base_msat, \ + fee_proportional_millionths, \ + htlc_maximum_msat, \ + blob_signed \ + ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13) ON CONFLICT (composite_index) DO NOTHING", &[ + &composite_index, + &chain_hash_hex, + &scid_hex, + ×tamp, + &channel_flags, + &direction, + &disable, + &cltv_expiry_delta, + &htlc_minimum_msat, + &fee_base_msat, + &fee_proportional_millionths, + &htlc_maximum_msat, + &update_signed + ]).await; + if result.is_err() { + panic!("error: {}", result.err().unwrap()); + } + } + } + } + } + + fn persist_network_graph(&self) { + println!("Caching network graph…"); + let cache_path = config::network_graph_cache_path(); + let file = OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(&cache_path) + .unwrap(); + self.network_graph.remove_stale_channels(); + let mut writer = BufWriter::new(file); + self.network_graph.write(&mut writer).unwrap(); + println!("Cached network graph!"); + } +} diff --git a/src/serialization.rs b/src/serialization.rs new file mode 100644 index 0000000..847c754 --- /dev/null +++ b/src/serialization.rs @@ -0,0 +1,305 @@ +use std::cmp::max; +use std::collections::HashMap; + +use bitcoin::BlockHash; +use lightning::ln::msgs::{UnsignedChannelAnnouncement, UnsignedChannelUpdate}; +use lightning::util::ser::{BigSize, Writeable}; + +use crate::lookup::{DeltaSet, DirectedUpdateDelta}; + +pub(super) struct SerializationSet { + pub(super) announcements: Vec, + pub(super) updates: Vec, + pub(super) full_update_defaults: DefaultUpdateValues, + pub(super) latest_seen: u32, + pub(super) chain_hash: BlockHash, +} + +pub(super) struct DefaultUpdateValues { + pub(super) cltv_expiry_delta: u16, + pub(super) htlc_minimum_msat: u64, + pub(super) fee_base_msat: u32, + pub(super) fee_proportional_millionths: u32, + pub(super) htlc_maximum_msat: u64, +} + +impl Default for DefaultUpdateValues { + fn default() -> Self { + Self { + cltv_expiry_delta: 0, + htlc_minimum_msat: 0, + fee_base_msat: 0, + fee_proportional_millionths: 0, + htlc_maximum_msat: 0, + } + } +} + +pub(super) struct MutatedProperties { + pub(super) flags: bool, + pub(super) cltv_expiry_delta: bool, + pub(super) htlc_minimum_msat: bool, + pub(super) fee_base_msat: bool, + pub(super) fee_proportional_millionths: bool, + pub(super) htlc_maximum_msat: bool, +} + +impl Default for MutatedProperties { + fn default() -> Self { + Self { + flags: false, + cltv_expiry_delta: false, + htlc_minimum_msat: false, + fee_base_msat: false, + fee_proportional_millionths: false, + htlc_maximum_msat: false, + } + } +} + +pub(super) struct UpdateSerialization { + pub(super) update: UnsignedChannelUpdate, + pub(super) mechanism: UpdateSerializationMechanism, +} + +impl MutatedProperties { + /// Does not include flags because the flag byte is always sent in full + fn len(&self) -> u8 { + let mut mutations = 0; + if self.cltv_expiry_delta { mutations += 1; }; + if self.htlc_minimum_msat { mutations += 1; }; + if self.fee_base_msat { mutations += 1; }; + if self.fee_proportional_millionths { mutations += 1; }; + if self.htlc_maximum_msat { mutations += 1; }; + mutations + } +} + +pub(super) enum UpdateSerializationMechanism { + Full, + Incremental(MutatedProperties), +} + +struct FullUpdateValueHistograms { + cltv_expiry_delta: HashMap, + htlc_minimum_msat: HashMap, + fee_base_msat: HashMap, + fee_proportional_millionths: HashMap, + htlc_maximum_msat: HashMap, +} + +pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_timestamp: u32) -> SerializationSet { + let mut serialization_set = SerializationSet { + announcements: vec![], + updates: vec![], + full_update_defaults: Default::default(), + chain_hash: Default::default(), + latest_seen: 0, + }; + + let mut chain_hash_set = false; + + let mut full_update_histograms = FullUpdateValueHistograms { + cltv_expiry_delta: Default::default(), + htlc_minimum_msat: Default::default(), + fee_base_msat: Default::default(), + fee_proportional_millionths: Default::default(), + htlc_maximum_msat: Default::default(), + }; + + let mut record_full_update_in_histograms = |full_update: &UnsignedChannelUpdate| { + *full_update_histograms.cltv_expiry_delta.entry(full_update.cltv_expiry_delta).or_insert(0) += 1; + *full_update_histograms.htlc_minimum_msat.entry(full_update.htlc_minimum_msat).or_insert(0) += 1; + *full_update_histograms.fee_base_msat.entry(full_update.fee_base_msat).or_insert(0) += 1; + *full_update_histograms.fee_proportional_millionths.entry(full_update.fee_proportional_millionths).or_insert(0) += 1; + *full_update_histograms.htlc_maximum_msat.entry(full_update.htlc_maximum_msat).or_insert(0) += 1; + }; + + for (_scid, channel_delta) in delta_set.into_iter() { + + // any announcement chain hash is gonna be the same value. Just set it from the first one. + let channel_announcement_delta = channel_delta.announcement.as_ref().unwrap(); + if !chain_hash_set { + chain_hash_set = true; + serialization_set.chain_hash = channel_announcement_delta.announcement.chain_hash.clone(); + } + + let current_announcement_seen = channel_announcement_delta.seen; + let is_new_announcement = current_announcement_seen >= last_sync_timestamp; + let is_newly_updated_announcement = if let Some(first_update_seen) = channel_delta.first_update_seen { + first_update_seen >= last_sync_timestamp + } else { + false + }; + let send_announcement = is_new_announcement || is_newly_updated_announcement; + if send_announcement { + serialization_set.latest_seen = max(serialization_set.latest_seen, current_announcement_seen); + serialization_set.announcements.push(channel_delta.announcement.unwrap().announcement); + } + + let direction_a_updates = channel_delta.updates.0; + let direction_b_updates = channel_delta.updates.1; + + let mut categorize_directed_update_serialization = |directed_updates: Option| { + if let Some(updates) = directed_updates { + if let Some(latest_update_delta) = updates.latest_update_after_seen { + let latest_update = latest_update_delta.update; + + // the returned seen timestamp should be the latest of all the returned + // announcements and latest updates + serialization_set.latest_seen = max(serialization_set.latest_seen, latest_update_delta.seen); + + if updates.last_update_before_seen.is_some() { + let mutated_properties = updates.mutated_properties; + if mutated_properties.len() == 5 { + // all five values have changed, it makes more sense to just + // serialize the update as a full update instead of as a change + // this way, the default values can be computed more efficiently + record_full_update_in_histograms(&latest_update); + serialization_set.updates.push(UpdateSerialization { + update: latest_update, + mechanism: UpdateSerializationMechanism::Full, + }); + } else if mutated_properties.len() > 0 || mutated_properties.flags { + // we don't count flags as mutated properties + serialization_set.updates.push(UpdateSerialization { + update: latest_update, + mechanism: UpdateSerializationMechanism::Incremental(mutated_properties), + }); + } + } else { + // serialize the full update + record_full_update_in_histograms(&latest_update); + serialization_set.updates.push(UpdateSerialization { + update: latest_update, + mechanism: UpdateSerializationMechanism::Full, + }); + } + } + }; + }; + + categorize_directed_update_serialization(direction_a_updates); + categorize_directed_update_serialization(direction_b_updates); + } + + let default_update_values = DefaultUpdateValues { + cltv_expiry_delta: find_most_common_histogram_entry_with_default(full_update_histograms.cltv_expiry_delta, 0), + htlc_minimum_msat: find_most_common_histogram_entry_with_default(full_update_histograms.htlc_minimum_msat, 0), + fee_base_msat: find_most_common_histogram_entry_with_default(full_update_histograms.fee_base_msat, 0), + fee_proportional_millionths: find_most_common_histogram_entry_with_default(full_update_histograms.fee_proportional_millionths, 0), + htlc_maximum_msat: find_most_common_histogram_entry_with_default(full_update_histograms.htlc_maximum_msat, 0), + }; + + serialization_set.full_update_defaults = default_update_values; + serialization_set +} + +pub fn serialize_stripped_channel_announcement(announcement: &UnsignedChannelAnnouncement, node_id_a_index: usize, node_id_b_index: usize, previous_scid: u64) -> Vec { + let mut stripped_announcement = vec![]; + + announcement.features.write(&mut stripped_announcement).unwrap(); + + if previous_scid > announcement.short_channel_id { + panic!("unsorted scids!"); + } + let scid_delta = BigSize(announcement.short_channel_id - previous_scid); + scid_delta.write(&mut stripped_announcement).unwrap(); + + // write indices of node ids rather than the node IDs themselves + BigSize(node_id_a_index as u64).write(&mut stripped_announcement).unwrap(); + BigSize(node_id_b_index as u64).write(&mut stripped_announcement).unwrap(); + + // println!("serialized CA: {}, \n{:?}\n{:?}\n", announcement.short_channel_id, announcement.node_id_1, announcement.node_id_2); + stripped_announcement +} + +pub(super) fn serialize_stripped_channel_update(update: &UpdateSerialization, default_values: &DefaultUpdateValues, previous_scid: u64) -> Vec { + let latest_update = &update.update; + let mut serialized_flags = latest_update.flags; + + if previous_scid > latest_update.short_channel_id { + panic!("unsorted scids!"); + } + + let mut delta_serialization = Vec::new(); + let mut prefixed_serialization = Vec::new(); + + match &update.mechanism { + UpdateSerializationMechanism::Full => { + if latest_update.cltv_expiry_delta != default_values.cltv_expiry_delta { + serialized_flags |= 0b_0100_0000; + latest_update.cltv_expiry_delta.write(&mut delta_serialization).unwrap(); + } + + if latest_update.htlc_minimum_msat != default_values.htlc_minimum_msat { + serialized_flags |= 0b_0010_0000; + latest_update.htlc_minimum_msat.write(&mut delta_serialization).unwrap(); + } + + if latest_update.fee_base_msat != default_values.fee_base_msat { + serialized_flags |= 0b_0001_0000; + latest_update.fee_base_msat.write(&mut delta_serialization).unwrap(); + } + + if latest_update.fee_proportional_millionths != default_values.fee_proportional_millionths { + serialized_flags |= 0b_0000_1000; + latest_update.fee_proportional_millionths.write(&mut delta_serialization).unwrap(); + } + + if latest_update.htlc_maximum_msat != default_values.htlc_maximum_msat { + serialized_flags |= 0b_0000_0100; + latest_update.htlc_maximum_msat.write(&mut delta_serialization).unwrap(); + } + } + + UpdateSerializationMechanism::Incremental(mutated_properties) => { + // indicate that this update is incremental + serialized_flags |= 0b_1000_0000; + + if mutated_properties.cltv_expiry_delta { + serialized_flags |= 0b_0100_0000; + latest_update.cltv_expiry_delta.write(&mut delta_serialization).unwrap(); + } + + if mutated_properties.htlc_minimum_msat { + serialized_flags |= 0b_0010_0000; + latest_update.htlc_minimum_msat.write(&mut delta_serialization).unwrap(); + } + + if mutated_properties.fee_base_msat { + serialized_flags |= 0b_0001_0000; + latest_update.fee_base_msat.write(&mut delta_serialization).unwrap(); + } + + if mutated_properties.fee_proportional_millionths { + serialized_flags |= 0b_0000_1000; + latest_update.fee_proportional_millionths.write(&mut delta_serialization).unwrap(); + } + + if mutated_properties.htlc_maximum_msat { + serialized_flags |= 0b_0000_0100; + latest_update.htlc_maximum_msat.write(&mut delta_serialization).unwrap(); + } + } + } + let scid_delta = BigSize(latest_update.short_channel_id - previous_scid); + scid_delta.write(&mut prefixed_serialization).unwrap(); + + serialized_flags.write(&mut prefixed_serialization).unwrap(); + prefixed_serialization.append(&mut delta_serialization); + + prefixed_serialization +} + +pub(super) fn find_most_common_histogram_entry_with_default(histogram: HashMap, default: T) -> T { + let most_frequent_entry = histogram.iter().max_by(|a, b| a.1.cmp(&b.1)); + if let Some(entry_details) = most_frequent_entry { + // .0 is the value + // .1 is the frequency + return entry_details.0.to_owned(); + } + // the default should pretty much always be a 0 as T + // though for htlc maximum msat it could be a u64::max + default +} diff --git a/src/snapshot.rs b/src/snapshot.rs new file mode 100644 index 0000000..93be840 --- /dev/null +++ b/src/snapshot.rs @@ -0,0 +1,135 @@ +use std::collections::HashMap; +use std::fs; +use std::os::unix::fs::symlink; +use std::sync::Arc; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use lightning::routing::gossip::NetworkGraph; + +use crate::{config, TestLogger}; + +pub(crate) struct Snapshotter { + network_graph: Arc>>, +} + +impl Snapshotter { + pub fn new(network_graph: Arc>>) -> Self { + Self { network_graph } + } + + pub(crate) async fn snapshot_gossip(&self) { + println!("Initiating snapshotting service"); + + let snapshot_sync_day_factors = [1, 2, 3, 4, 5, 6, 7, 14, 21, u64::MAX]; + let round_day_seconds = config::SNAPSHOT_CALCULATION_INTERVAL as u64; + + let pending_snapshot_directory = "./res/snapshots_pending"; + let pending_symlink_directory = "./res/symlinks_pending"; + let finalized_snapshot_directory = "./res/snapshots"; + let finalized_symlink_directory = "./res/symlinks"; + let relative_symlink_to_snapshot_path = "../snapshots"; + + // this is gonna be a never-ending background job + loop { + // 1. get the current timestamp + let timestamp_seen = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); + let filename_timestamp = Self::round_down_to_nearest_multiple(timestamp_seen, round_day_seconds); + println!("Capturing snapshots at {} for: {}", timestamp_seen, filename_timestamp); + + // 2. sleep until the next round 24 hours + // 3. refresh all snapshots + + // the stored snapshots should adhere to the following format + // from one day ago + // from two days ago + // … + // from a week ago + // from two weeks ago + // from three weeks ago + // full + // That means that at any given moment, there should only ever be + // 6 (daily) + 3 (weekly) + 1 (total) = 10 cached snapshots + // The snapshots, unlike dynamic updates, should account for all intermediate + // channel updates + // + + + // purge and recreate the pending directories + if fs::metadata(&pending_snapshot_directory).is_ok(){ + fs::remove_dir_all(&pending_snapshot_directory).expect("Failed to remove pending snapshot directory."); + } + if fs::metadata(&pending_symlink_directory).is_ok(){ + fs::remove_dir_all(&pending_symlink_directory).expect("Failed to remove pending symlink directory."); + } + fs::create_dir_all(&pending_snapshot_directory).expect("Failed to create pending snapshot directory"); + fs::create_dir_all(&pending_symlink_directory).expect("Failed to create pending symlink directory"); + + let mut snapshot_sync_timestamps: Vec<(u64, u64)> = Vec::new(); + for factor in &snapshot_sync_day_factors { + // basically timestamp - day_seconds * factor + let timestamp = timestamp_seen.saturating_sub(round_day_seconds.saturating_mul(factor.clone())); + snapshot_sync_timestamps.push((factor.clone(), timestamp)); + }; + + let mut snapshot_filenames_by_day_range: HashMap = HashMap::with_capacity(10); + + for (day_range, current_last_sync_timestamp) in &snapshot_sync_timestamps { + let network_graph_clone = self.network_graph.clone(); + { + println!("Calculating {}-day snapshot", day_range); + // calculate the snapshot + let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32, true).await; + + // persist the snapshot and update the symlink + let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-days__previous-sync:{}.lngossip", filename_timestamp, day_range, current_last_sync_timestamp); + let snapshot_path = format!("{}/{}", pending_snapshot_directory, snapshot_filename); + println!("Persisting {}-day snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", day_range, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental); + fs::write(&snapshot_path, snapshot.data).unwrap(); + snapshot_filenames_by_day_range.insert(day_range.clone(), snapshot_filename); + } + } + + for i in 1..10_001u64 { + // let's create symlinks + + // first, determine which snapshot range should be referenced + // find min(x) in snapshot_sync_day_factors where x >= i + let referenced_day_range = snapshot_sync_day_factors.iter().find(|x| { + x >= &&i + }).unwrap().clone(); + + let snapshot_filename = snapshot_filenames_by_day_range.get(&referenced_day_range).unwrap(); + let simulated_last_sync_timestamp = timestamp_seen.saturating_sub(round_day_seconds.saturating_mul(i)); + let relative_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, snapshot_filename); + let canonical_last_sync_timestamp = Self::round_down_to_nearest_multiple(simulated_last_sync_timestamp, round_day_seconds); + let symlink_path = format!("{}/{}.bin", pending_symlink_directory, canonical_last_sync_timestamp); + + println!("Symlinking: {} -> {} ({} -> {}", i, referenced_day_range, symlink_path, relative_snapshot_path); + symlink(&relative_snapshot_path, &symlink_path).unwrap(); + } + + if fs::metadata(&finalized_snapshot_directory).is_ok(){ + fs::remove_dir_all(&finalized_snapshot_directory).expect("Failed to remove finalized snapshot directory."); + } + if fs::metadata(&finalized_symlink_directory).is_ok(){ + fs::remove_dir_all(&finalized_symlink_directory).expect("Failed to remove pending symlink directory."); + } + fs::rename(&pending_snapshot_directory, &finalized_snapshot_directory).expect("Failed to finalize snapshot directory."); + fs::rename(&pending_symlink_directory, &finalized_symlink_directory).expect("Failed to finalize symlink directory."); + + + let remainder = timestamp_seen % round_day_seconds; + let time_until_next_day = round_day_seconds - remainder; + + println!("Sleeping until next snapshot capture: {}s", time_until_next_day); + // add in an extra five seconds to assure the rounding down works correctly + let sleep = tokio::time::sleep(Duration::from_secs(time_until_next_day + 5)); + sleep.await; + } + } + + fn round_down_to_nearest_multiple(number: u64, multiple: u64) -> u64 { + let round_multiple_delta = number % multiple; + number - round_multiple_delta + } +} diff --git a/src/tracking.rs b/src/tracking.rs new file mode 100644 index 0000000..9c24923 --- /dev/null +++ b/src/tracking.rs @@ -0,0 +1,183 @@ +use std::net::SocketAddr; +use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; + +use bitcoin::hashes::hex::ToHex; +use bitcoin::secp256k1::{PublicKey, SecretKey}; +use futures::executor; +use lightning; +use lightning::ln::peer_handler::{ + ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager, +}; +use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; +use rand::{Rng, thread_rng}; +use tokio::sync::mpsc; + +use crate::{config, TestLogger}; +use crate::downloader::{GossipCounter, GossipRouter}; +use crate::types::{DetectedGossipMessage, GossipChainAccess, GossipMessage, GossipPeerManager}; +use crate::verifier::ChainVerifier; + +pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender, network_graph: Arc>>) { + let mut key = [0; 32]; + let mut random_data = [0; 32]; + thread_rng().fill_bytes(&mut key); + thread_rng().fill_bytes(&mut random_data); + let our_node_secret = SecretKey::from_slice(&key).unwrap(); + + let _arc_chain_access = None::; + let arc_chain_access = Some(Arc::new(ChainVerifier::new())); + let ignorer = IgnoringMessageHandler {}; + let arc_ignorer = Arc::new(ignorer); + + let errorer = ErroringMessageHandler::new(); + let arc_errorer = Arc::new(errorer); + + let logger = TestLogger::new(); + let arc_logger = Arc::new(logger); + + let router = P2PGossipSync::new( + network_graph.clone(), + arc_chain_access, + Arc::clone(&arc_logger), + ); + let arc_router = Arc::new(router); + let wrapped_router = GossipRouter { + native_router: arc_router, + counter: RwLock::new(GossipCounter::new()), + sender: persistence_sender.clone(), + }; + let arc_wrapped_router = Arc::new(wrapped_router); + + let message_handler = MessageHandler { + chan_handler: arc_errorer, + route_handler: arc_wrapped_router.clone(), + }; + let peer_handler = PeerManager::new( + message_handler, + our_node_secret, + &random_data, + Arc::clone(&arc_logger), + arc_ignorer, + ); + let arc_peer_handler = Arc::new(peer_handler); + + println!("Connecting to Lightning peers…"); + let peers = config::ln_peers(); + let mut connected_peer_count = 0; + + for current_peer in peers { + let initial_connection_succeeded = monitor_peer_connection(current_peer, Arc::clone(&arc_peer_handler)); + if initial_connection_succeeded { + connected_peer_count += 1; + } + } + + if connected_peer_count < 1 { + panic!("Failed to connect to any peer."); + } + + println!("Connected to {} Lightning peers!", connected_peer_count); + + let local_router = arc_wrapped_router.clone(); + let local_persistence_sender = persistence_sender.clone(); + tokio::spawn(async move { + let mut previous_announcement_count = 0u64; + let mut previous_update_count = 0u64; + let mut is_caught_up_with_gossip = false; + + let mut i = 0u32; + let mut latest_new_gossip_time = Instant::now(); + let mut needs_to_notify_persister = false; + + loop { + i += 1; // count the background activity + let sleep = tokio::time::sleep(Duration::from_secs(5)); + sleep.await; + + let current_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs(); + let router_clone = Arc::clone(&local_router); + + { + let counter = router_clone.counter.read().unwrap(); + let total_message_count = counter.channel_announcements + counter.channel_updates; + let new_message_count = total_message_count - previous_announcement_count - previous_update_count; + + let was_previously_caught_up_with_gossip = is_caught_up_with_gossip; + // TODO: make new message threshold (20) adjust based on connected peer count + is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0; + if new_message_count > 0 { + latest_new_gossip_time = Instant::now(); + } + + // if we either aren't caught up, or just stopped/started being caught up + if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) { + println!( + "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n", + i, + total_message_count, + new_message_count, + counter.channel_announcements, + counter.channel_announcements_with_mismatched_scripts, + counter.channel_updates, + counter.channel_updates_without_htlc_max_msats + ); + } else { + println!("Monitoring for gossip…") + } + + if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip { + println!("caught up with gossip!"); + needs_to_notify_persister = true; + } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip { + println!("Received new messages since catching up with gossip!"); + } + + let continuous_caught_up_duration = latest_new_gossip_time.elapsed(); + if continuous_caught_up_duration.as_secs() > 600 { + eprintln!("No new gossip messages in 10 minutes! Something's amiss!"); + } + + previous_announcement_count = counter.channel_announcements; + previous_update_count = counter.channel_updates; + } + + if needs_to_notify_persister { + needs_to_notify_persister = false; + let sender = local_persistence_sender.clone(); + tokio::spawn(async move { + let _ = sender.send(DetectedGossipMessage { + timestamp_seen: current_timestamp as u32, + message: GossipMessage::InitialSyncComplete, + }).await; + }); + } + } + }); +} + +fn monitor_peer_connection(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool { + let peer_manager_clone = Arc::clone(&peer_manager); + eprintln!("Connecting to peer {}@{}…", current_peer.0.to_hex(), current_peer.1.to_string()); + let connection = executor::block_on(async move { + lightning_net_tokio::connect_outbound( + peer_manager_clone, + current_peer.0, + current_peer.1, + ).await + }); + let mut initial_connection_succeeded = false; + if let Some(disconnection_future) = connection { + eprintln!("Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string()); + initial_connection_succeeded = true; + let peer_manager_clone = Arc::clone(&peer_manager); + tokio::spawn(async move { + disconnection_future.await; + eprintln!("Disconnected from peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string()); + monitor_peer_connection(current_peer.clone(), peer_manager_clone); + }); + } else { + eprintln!("Failed to connect to peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string()) + }; + initial_connection_succeeded +} diff --git a/src/types.rs b/src/types.rs new file mode 100644 index 0000000..63a2adb --- /dev/null +++ b/src/types.rs @@ -0,0 +1,37 @@ +use std::sync::Arc; + +use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate}; +use lightning::ln::peer_handler::{ErroringMessageHandler, IgnoringMessageHandler, PeerManager}; +use lightning::util::logger::{Logger, Record}; + +use crate::downloader::GossipRouter; +use crate::verifier::ChainVerifier; + +pub(crate) type GossipChainAccess = Arc; +pub(crate) type GossipPeerManager = Arc, Arc, Arc, Arc>>; + +pub(crate) enum GossipMessage { + ChannelAnnouncement(ChannelAnnouncement), + ChannelUpdate(ChannelUpdate), + InitialSyncComplete, +} + +pub(crate) struct DetectedGossipMessage { + pub(crate) timestamp_seen: u32, + pub(crate) message: GossipMessage, +} + +pub(crate) struct TestLogger {} + +impl TestLogger { + pub(crate) fn new() -> TestLogger { + Self {} + } +} + +impl Logger for TestLogger { + fn log(&self, record: &Record) { + // TODO: allow log level threshold to be set + println!("{:<5} [{} : {}, {}] {}", record.level.to_string(), record.module_path, record.file, record.line, record.args); + } +} diff --git a/src/verifier.rs b/src/verifier.rs new file mode 100644 index 0000000..2446e1a --- /dev/null +++ b/src/verifier.rs @@ -0,0 +1,75 @@ +use std::convert::TryInto; +use std::sync::Arc; + +use bitcoin::{BlockHash, TxOut}; +use bitcoin::blockdata::block::Block; +use bitcoin::hashes::Hash; +use futures::executor; +use lightning::chain; +use lightning::chain::AccessError; +use lightning_block_sync::BlockSource; +use lightning_block_sync::http::BinaryResponse; +use lightning_block_sync::rest::RestClient; + +use crate::config; + +pub(crate) struct ChainVerifier { + rest_client: Arc, +} + +struct RestBinaryResponse(Vec); + +impl ChainVerifier { + pub(crate) fn new() -> Self { + let rest_client = RestClient::new(config::bitcoin_rest_endpoint()).unwrap(); + ChainVerifier { + rest_client: Arc::new(rest_client), + } + } + + fn retrieve_block(&self, block_height: u32) -> Result { + let rest_client = self.rest_client.clone(); + executor::block_on(async move { + let block_hash_result = rest_client.request_resource::(&format!("blockhashbyheight/{}.bin", block_height)).await; + let block_hash: Vec = block_hash_result.map_err(|error| { + eprintln!("Could't find block hash at height {}: {}", block_height, error.to_string()); + AccessError::UnknownChain + })?.0; + let block_hash = BlockHash::from_slice(&block_hash).unwrap(); + + let block_result = rest_client.get_block(&block_hash).await; + let block = block_result.map_err(|error| { + eprintln!("Couldn't retrieve block {}: {:?} ({})", block_height, error, block_hash); + AccessError::UnknownChain + })?; + Ok(block) + }) + } +} + +impl chain::Access for ChainVerifier { + fn get_utxo(&self, _genesis_hash: &BlockHash, short_channel_id: u64) -> Result { + let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes + let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32; + let output_index = (short_channel_id & 0xffff) as u16; + + let block = self.retrieve_block(block_height)?; + let transaction = block.txdata.get(transaction_index as usize).ok_or_else(|| { + eprintln!("Transaction index {} out of bounds in block {} ({})", transaction_index, block_height, block.block_hash().to_string()); + AccessError::UnknownTx + })?; + let output = transaction.output.get(output_index as usize).ok_or_else(|| { + eprintln!("Output index {} out of bounds in transaction {}", output_index, transaction.txid().to_string()); + AccessError::UnknownTx + })?; + Ok(output.clone()) + } +} + +impl TryInto for BinaryResponse { + type Error = std::io::Error; + + fn try_into(self) -> Result { + Ok(RestBinaryResponse(self.0)) + } +} -- 2.39.5