Initial commit.
authorArik Sosman <git@arik.io>
Mon, 22 Aug 2022 01:18:35 +0000 (18:18 -0700)
committerArik Sosman <git@arik.io>
Mon, 22 Aug 2022 02:04:35 +0000 (19:04 -0700)
20 files changed:
.editorconfig [new file with mode: 0644]
.github/workflows/build.yml [new file with mode: 0644]
Cargo.toml [new file with mode: 0644]
LICENSE [deleted file]
LICENSE-APACHE.md [new file with mode: 0644]
LICENSE-MIT.md [new file with mode: 0644]
LICENSE.md [new file with mode: 0644]
README.md [new file with mode: 0644]
src/config.rs [new file with mode: 0644]
src/downloader.rs [new file with mode: 0644]
src/hex_utils.rs [new file with mode: 0644]
src/lib.rs [new file with mode: 0644]
src/lookup.rs [new file with mode: 0644]
src/main.rs [new file with mode: 0644]
src/persistence.rs [new file with mode: 0644]
src/serialization.rs [new file with mode: 0644]
src/snapshot.rs [new file with mode: 0644]
src/tracking.rs [new file with mode: 0644]
src/types.rs [new file with mode: 0644]
src/verifier.rs [new file with mode: 0644]

diff --git a/.editorconfig b/.editorconfig
new file mode 100644 (file)
index 0000000..33d2d0c
--- /dev/null
@@ -0,0 +1,13 @@
+root = true
+
+[*]
+insert_final_newline = true
+trim_trailing_whitespace = true
+
+[*.rs]
+indent_style = tab
+indent_size = 4
+
+[*.yml]
+indent_style = space
+indent_size = 2
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
new file mode 100644 (file)
index 0000000..9ac4c21
--- /dev/null
@@ -0,0 +1,39 @@
+name: Cross-platform build verification
+on:
+  push:
+    branches:
+      - main
+  pull_request:
+    branches:
+      - "*"
+
+jobs:
+  build:
+    strategy:
+      fail-fast: false
+      matrix:
+        toolchain:
+          - stable
+          - 1.48.0
+          - beta
+        pinning: [true, false]
+    runs-on: ubuntu-latest
+    steps:
+      - name: Checkout source code
+        uses: actions/checkout@v3
+      - name: Install Rust ${{ matrix.toolchain }} toolchain
+        uses: actions-rs/toolchain@v1
+        with:
+          toolchain: ${{ matrix.toolchain }}
+          override: true
+          profile: minimal
+      - name: Pin dependencies
+        if: ${{ matrix.pinning }}
+        run: |
+          cargo update -p tokio-postgres --precise "0.7.5" --verbose
+          cargo update -p postgres-types --precise "0.2.3" --verbose
+          cargo update -p tokio --precise "1.14.1" --verbose
+          cargo update -p cpufeatures --precise "0.2.2" --verbose # https://github.com/RustCrypto/utils/issues/795
+      - name: Build on Rust ${{ matrix.toolchain }}
+        run: |
+          cargo build --verbose --color always
diff --git a/Cargo.toml b/Cargo.toml
new file mode 100644 (file)
index 0000000..003f373
--- /dev/null
@@ -0,0 +1,24 @@
+[package]
+name = "rapid-gossip-sync-server"
+version = "0.1.0"
+edition = "2018"
+
+[dependencies]
+base64 = "0.13.0"
+bech32 = "0.8"
+bitcoin = "0.28.1"
+bitcoin-bech32 = "0.12"
+lightning = { version = "0.0.110" }
+lightning-block-sync = { version = "0.0.110", features=["rest-client"] }
+lightning-net-tokio = { version = "0.0.110" }
+chrono = "0.4"
+futures = "0.3"
+hex = "0.3"
+rand = "0.4"
+tokio = { version = "1.14.1", features = ["full"] }
+tokio-postgres = { version="0.7.5" }
+
+[profile.release]
+opt-level = 3
+lto = true
+panic = "abort"
diff --git a/LICENSE b/LICENSE
deleted file mode 100644 (file)
index 265ffe7..0000000
--- a/LICENSE
+++ /dev/null
@@ -1,21 +0,0 @@
-MIT License
-
-Copyright (c) 2022 Lightning Dev Kit
-
-Permission is hereby granted, free of charge, to any person obtaining a copy
-of this software and associated documentation files (the "Software"), to deal
-in the Software without restriction, including without limitation the rights
-to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-copies of the Software, and to permit persons to whom the Software is
-furnished to do so, subject to the following conditions:
-
-The above copyright notice and this permission notice shall be included in all
-copies or substantial portions of the Software.
-
-THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
-SOFTWARE.
diff --git a/LICENSE-APACHE.md b/LICENSE-APACHE.md
new file mode 100644 (file)
index 0000000..261eeb9
--- /dev/null
@@ -0,0 +1,201 @@
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
diff --git a/LICENSE-MIT.md b/LICENSE-MIT.md
new file mode 100644 (file)
index 0000000..9d982a4
--- /dev/null
@@ -0,0 +1,16 @@
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/LICENSE.md b/LICENSE.md
new file mode 100644 (file)
index 0000000..c3f44ca
--- /dev/null
@@ -0,0 +1,14 @@
+This software is licensed under [Apache 2.0](LICENSE-APACHE) or
+[MIT](LICENSE-MIT), at your option.
+
+Some files retain their own copyright notice, however, for full authorship
+information, see version control history.
+
+Except as otherwise noted in individual files, all files in this repository are
+licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
+http://www.apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
+http://opensource.org/licenses/MIT>, at your option.
+
+You may not use, copy, modify, merge, publish, distribute, sublicense, and/or
+sell copies of this software or any files in this repository except in
+accordance with one or both of these licenses.
diff --git a/README.md b/README.md
new file mode 100644 (file)
index 0000000..1df2902
--- /dev/null
+++ b/README.md
@@ -0,0 +1,88 @@
+# rust-ln-sync
+
+This is a server that connects to peers on the Lightning network and calculates compact rapid sync
+gossip data.
+
+These are the components it's comprised of.
+
+## config
+
+A config file where the Postgres credentials and Lightning peers can be adjusted. Most adjustments
+can be made by setting environment variables, whose usage is as follows:
+
+| Name                     | Default       | Description                                                                                                |
+|:-------------------------|:--------------|:-----------------------------------------------------------------------------------------------------------|
+| RUST_LN_SYNC_DB_HOST     | localhost     | Domain of the Postgres database                                                                            |
+| RUST_LN_SYNC_DB_USER     | alice         | Username to access Postgres                                                                                |
+| RUST_LN_SYNC_DB_PASSWORD | _None_        | Password to access Postgres                                                                                |
+| RUST_LN_SYNC_DB_NAME     | ln_graph_sync | Name of the database to be used for gossip storage                                                         |
+| BITCOIN_REST_DOMAIN      | 127.0.0.1     | Domain of the [bitcoind REST server](https://github.com/bitcoin/bitcoin/blob/master/doc/REST-interface.md) |
+| BITCOIN_REST_PORT        | 80            | HTTP port of the bitcoind REST server                                                                      |
+| BITCOIN_REST_PATH        | /rest/        | Path infix to access the bitcoind REST endpoints                                                           |
+
+Notably, one property needs to be modified in code, namely the `ln_peers()` method. It specifies how
+many and which peers to use for retrieving gossip.
+
+## download
+
+The module responsible for initiating the scraping of the network graph from its peers.
+
+## persistence
+
+The module responsible for persisting all the downloaded graph data to Postgres.
+
+## server
+
+The server is responsible for returning dynamic and snapshotted rapid sync data.
+
+Dynamic sync data is fed a timestamp of the last sync, and it dynamically calculates a delta
+such that a minimal change set is returned based on changes which are assumed to have been seen
+by the client (ignoring any intermediate changes). Dynamic sync is only available after the first
+full graph sync completes on startup.
+
+Snapshot sync data is also based on a timestamp, but unlike dynamic sync, its responses are
+precalculated, which is done in a way that considers the possibility that the client may have
+intermittently seen later updates.
+
+### snapshot
+
+The snapshotting module is responsible for calculating and storing snapshots. It's started up
+as soon as the first full graph sync completes, and then keeps updating the snapshots at a
+24-hour-interval.
+
+### lookup
+
+The lookup module is responsible for fetching the latest data from the network graph and Postgres,
+and reconciling it into an actionable delta set that the server can return in a serialized format.
+
+It works by collecting all the channels that are currently in the network graph, and gathering
+announcements as well as updates for each of them. For the updates specifically, the last update
+seen prior to the given timestamp, the latest known updates, and, if necessary, all intermediate
+updates are collected.
+
+Then, any channel that has only had an announcement but never an update is dropped. Additionally,
+every channel whose first update was seen after the given timestamp is collected alongside its
+announcement.
+
+Finally, all channel update transitions are evaluated and collected into either a full or an
+incremental update.
+
+## Making a call
+
+### Dynamic
+
+Make a call to
+
+`http://localhost:3030/dynamic/1652644698`
+
+Where `1652644698` is the last sync timestamp.
+
+### Snapshotted
+
+Same as above, but sub `dynamic` for `snapshot`:
+
+`http://localhost:3030/snapshot/1652644698`
+
+## License
+
+MIT
diff --git a/src/config.rs b/src/config.rs
new file mode 100644 (file)
index 0000000..4d44efe
--- /dev/null
@@ -0,0 +1,102 @@
+use std::env;
+use std::net::SocketAddr;
+use bitcoin::secp256k1::PublicKey;
+use lightning_block_sync::http::HttpEndpoint;
+use tokio_postgres::Config;
+use crate::hex_utils;
+
+pub(crate) const SCHEMA_VERSION: i32 = 1;
+pub(crate) const SNAPSHOT_CALCULATION_INTERVAL: u32 = 3600 * 24; // every 24 hours, in seconds
+pub(crate) const DOWNLOAD_NEW_GOSSIP: bool = true;
+
+pub(crate) fn network_graph_cache_path() -> &'static str {
+       "./res/network_graph.bin"
+}
+
+pub(crate) fn db_connection_config() -> Config {
+       let mut config = Config::new();
+       let host = env::var("RUST_LN_SYNC_DB_HOST").unwrap_or("localhost".to_string());
+       let user = env::var("RUST_LN_SYNC_DB_USER").unwrap_or("alice".to_string());
+       let db = env::var("RUST_LN_SYNC_DB_NAME").unwrap_or("ln_graph_sync".to_string());
+       config.host(&host);
+       config.user(&user);
+       config.dbname(&db);
+       if let Ok(password) = env::var("RUST_LN_SYNC_DB_PASSWORD") {
+               config.password(&password);
+       }
+       config
+}
+
+pub(crate) fn bitcoin_rest_endpoint() -> HttpEndpoint {
+       let host = env::var("BITCOIN_REST_DOMAIN").unwrap_or("127.0.0.1".to_string());
+       let port = env::var("BITCOIN_REST_PORT")
+               .unwrap_or("80".to_string())
+               .parse::<u16>()
+               .expect("BITCOIN_REST_PORT env variable must be a u16.");
+       let path = env::var("BITCOIN_REST_PATH").unwrap_or("/rest/".to_string());
+       HttpEndpoint::for_host(host).with_port(port).with_path(path)
+}
+
+pub(crate) fn db_config_table_creation_query() -> &'static str {
+       "CREATE TABLE IF NOT EXISTS config (
+               id SERIAL PRIMARY KEY,
+               db_schema integer
+       )"
+}
+
+pub(crate) fn db_announcement_table_creation_query() -> &'static str {
+       "CREATE TABLE IF NOT EXISTS channel_announcements (
+               id SERIAL PRIMARY KEY,
+               short_channel_id character varying(255) NOT NULL UNIQUE,
+               block_height integer,
+               chain_hash character varying(255),
+               announcement_signed BYTEA,
+               seen timestamp NOT NULL DEFAULT NOW()
+       )"
+}
+
+pub(crate) fn db_channel_update_table_creation_query() -> &'static str {
+       "CREATE TABLE IF NOT EXISTS channel_updates (
+               id SERIAL PRIMARY KEY,
+               composite_index character varying(255) UNIQUE,
+               chain_hash character varying(255),
+               short_channel_id character varying(255),
+               timestamp bigint,
+               channel_flags integer,
+               direction integer,
+               disable boolean,
+               cltv_expiry_delta integer,
+               htlc_minimum_msat bigint,
+               fee_base_msat integer,
+               fee_proportional_millionths integer,
+               htlc_maximum_msat bigint,
+               blob_signed BYTEA,
+               seen timestamp NOT NULL DEFAULT NOW()
+       )"
+}
+
+pub(crate) fn db_index_creation_query() -> &'static str {
+       "
+       CREATE INDEX IF NOT EXISTS channels_seen ON channel_announcements(seen);
+       CREATE INDEX IF NOT EXISTS channel_updates_scid ON channel_updates(short_channel_id);
+       CREATE INDEX IF NOT EXISTS channel_updates_direction ON channel_updates(direction);
+       CREATE INDEX IF NOT EXISTS channel_updates_seen ON channel_updates(seen);
+       "
+}
+
+/// EDIT ME
+pub(crate) fn ln_peers() -> Vec<(PublicKey, SocketAddr)> {
+       vec![
+               // Bitfinex
+               // (hex_utils::to_compressed_pubkey("033d8656219478701227199cbd6f670335c8d408a92ae88b962c49d4dc0e83e025").unwrap(), "34.65.85.39:9735".parse().unwrap()),
+
+               // Matt Corallo
+               // (hex_utils::to_compressed_pubkey("03db10aa09ff04d3568b0621750794063df401e6853c79a21a83e1a3f3b5bfb0c8").unwrap(), "69.59.18.80:9735".parse().unwrap())
+
+               // River Financial
+               // (hex_utils::to_compressed_pubkey("03037dc08e9ac63b82581f79b662a4d0ceca8a8ca162b1af3551595b8f2d97b70a").unwrap(), "104.196.249.140:9735".parse().unwrap())
+
+               // Wallet of Satoshi | 035e4ff418fc8b5554c5d9eea66396c227bd429a3251c8cbc711002ba215bfc226@170.75.163.209:9735
+               (hex_utils::to_compressed_pubkey("035e4ff418fc8b5554c5d9eea66396c227bd429a3251c8cbc711002ba215bfc226").unwrap(), "170.75.163.209:9735".parse().unwrap())
+       ]
+}
diff --git a/src/downloader.rs b/src/downloader.rs
new file mode 100644 (file)
index 0000000..ccb75c8
--- /dev/null
@@ -0,0 +1,122 @@
+use std::sync::{Arc, RwLock};
+use std::time::{SystemTime, UNIX_EPOCH};
+
+use bitcoin::secp256k1::PublicKey;
+use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler};
+use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
+use lightning::util::events::{MessageSendEvent, MessageSendEventsProvider};
+use tokio::sync::mpsc;
+
+use crate::{GossipChainAccess, TestLogger};
+use crate::types::{DetectedGossipMessage, GossipMessage};
+
+pub(crate) struct GossipCounter {
+       pub(crate) channel_announcements: u64,
+       pub(crate) channel_updates: u64,
+       pub(crate) channel_updates_without_htlc_max_msats: u64,
+       pub(crate) channel_announcements_with_mismatched_scripts: u64
+}
+
+impl GossipCounter {
+       pub(crate) fn new() -> Self {
+               Self {
+                       channel_announcements: 0,
+                       channel_updates: 0,
+                       channel_updates_without_htlc_max_msats: 0,
+                       channel_announcements_with_mismatched_scripts: 0,
+               }
+       }
+}
+
+pub(crate) struct GossipRouter {
+       pub(crate) native_router: Arc<P2PGossipSync<Arc<NetworkGraph<Arc<TestLogger>>>, GossipChainAccess, Arc<TestLogger>>>,
+       pub(crate) counter: RwLock<GossipCounter>,
+       pub(crate) sender: mpsc::Sender<DetectedGossipMessage>,
+}
+
+impl MessageSendEventsProvider for GossipRouter {
+       fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
+               self.native_router.get_and_clear_pending_msg_events()
+       }
+}
+
+impl RoutingMessageHandler for GossipRouter {
+       fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
+               self.native_router.handle_node_announcement(msg)
+       }
+
+       fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result<bool, LightningError> {
+               let timestamp_seen = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
+
+               let mut counter = self.counter.write().unwrap();
+
+               let output_value = self.native_router.handle_channel_announcement(msg).map_err(|error| {
+                       let error_string = format!("{:?}", error);
+                       if error_string.contains("announced on an unknown chain"){
+                               return error;
+                       }
+                       counter.channel_announcements_with_mismatched_scripts += 1;
+                       error
+               })?;
+
+               counter.channel_announcements += 1;
+               let gossip_message = GossipMessage::ChannelAnnouncement(msg.clone());
+               let detected_gossip_message = DetectedGossipMessage {
+                       message: gossip_message,
+                       timestamp_seen: timestamp_seen as u32,
+               };
+               let sender = self.sender.clone();
+               tokio::spawn(async move {
+                       let _ = sender.send(detected_gossip_message).await;
+               });
+
+               Ok(output_value)
+       }
+
+       fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError> {
+               let timestamp_seen = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
+               let output_value = self.native_router.handle_channel_update(msg)?;
+
+               let mut counter = self.counter.write().unwrap();
+               counter.channel_updates += 1;
+               let gossip_message = GossipMessage::ChannelUpdate(msg.clone());
+               let detected_gossip_message = DetectedGossipMessage {
+                       message: gossip_message,
+                       timestamp_seen: timestamp_seen as u32,
+               };
+               let sender = self.sender.clone();
+               tokio::spawn(async move {
+                       let _ = sender.send(detected_gossip_message).await;
+               });
+
+               Ok(output_value)
+       }
+
+       fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
+               self.native_router.get_next_channel_announcements(starting_point, batch_amount)
+       }
+
+       fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<NodeAnnouncement> {
+               self.native_router.get_next_node_announcements(starting_point, batch_amount)
+       }
+
+       fn peer_connected(&self, their_node_id: &PublicKey, init: &Init) {
+               self.native_router.peer_connected(their_node_id, init)
+       }
+
+       fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError> {
+               self.native_router.handle_reply_channel_range(their_node_id, msg)
+       }
+
+       fn handle_reply_short_channel_ids_end(&self, their_node_id: &PublicKey, msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> {
+               self.native_router.handle_reply_short_channel_ids_end(their_node_id, msg)
+       }
+
+       fn handle_query_channel_range(&self, their_node_id: &PublicKey, msg: QueryChannelRange) -> Result<(), LightningError> {
+               self.native_router.handle_query_channel_range(their_node_id, msg)
+       }
+
+       fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError> {
+               self.native_router.handle_query_short_channel_ids(their_node_id, msg)
+       }
+}
diff --git a/src/hex_utils.rs b/src/hex_utils.rs
new file mode 100644 (file)
index 0000000..cab5a15
--- /dev/null
@@ -0,0 +1,42 @@
+use bitcoin::secp256k1::PublicKey;
+
+pub fn to_vec(hex: &str) -> Option<Vec<u8>> {
+    let mut out = Vec::with_capacity(hex.len() / 2);
+
+    let mut b = 0;
+    for (idx, c) in hex.as_bytes().iter().enumerate() {
+        b <<= 4;
+        match *c {
+            b'A'..=b'F' => b |= c - b'A' + 10,
+            b'a'..=b'f' => b |= c - b'a' + 10,
+            b'0'..=b'9' => b |= c - b'0',
+            _ => return None,
+        }
+        if (idx & 1) == 1 {
+            out.push(b);
+            b = 0;
+        }
+    }
+
+    Some(out)
+}
+
+#[inline]
+pub fn hex_str(value: &[u8]) -> String {
+    let mut res = String::with_capacity(64);
+    for v in value {
+        res += &format!("{:02x}", v);
+    }
+    res
+}
+
+pub fn to_compressed_pubkey(hex: &str) -> Option<PublicKey> {
+    let data = match to_vec(&hex[0..33 * 2]) {
+        Some(bytes) => bytes,
+        None => return None,
+    };
+    match PublicKey::from_slice(&data) {
+        Ok(pk) => Some(pk),
+        Err(_) => None,
+    }
+}
diff --git a/src/lib.rs b/src/lib.rs
new file mode 100644 (file)
index 0000000..fe82489
--- /dev/null
@@ -0,0 +1,254 @@
+#![deny(unsafe_code)]
+#![deny(broken_intra_doc_links)]
+#![deny(private_intra_doc_links)]
+#![deny(non_upper_case_globals)]
+#![deny(non_camel_case_types)]
+#![deny(non_snake_case)]
+#![deny(unused_mut)]
+#![deny(unused_variables)]
+#![deny(unused_imports)]
+
+extern crate core;
+
+use std::collections::{HashMap, HashSet};
+use std::fs::File;
+use std::io::BufReader;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+
+use bitcoin::blockdata::constants::genesis_block;
+use bitcoin::Network;
+use bitcoin::secp256k1::PublicKey;
+use lightning::routing::gossip::NetworkGraph;
+use lightning::util::ser::{ReadableArgs, Writeable};
+use tokio::sync::mpsc;
+use crate::lookup::DeltaSet;
+
+use crate::persistence::GossipPersister;
+use crate::serialization::UpdateSerializationMechanism;
+use crate::snapshot::Snapshotter;
+use crate::types::{GossipChainAccess, TestLogger};
+
+mod downloader;
+mod types;
+mod tracking;
+mod lookup;
+mod persistence;
+mod serialization;
+mod snapshot;
+mod config;
+mod hex_utils;
+mod verifier;
+
+pub struct RapidSyncProcessor {
+       network_graph: Arc<NetworkGraph<Arc<TestLogger>>>,
+       pub initial_sync_complete: Arc<AtomicBool>,
+}
+
+pub struct SerializedResponse {
+       pub data: Vec<u8>,
+       pub message_count: u32,
+       pub announcement_count: u32,
+       pub update_count: u32,
+       pub update_count_full: u32,
+       pub update_count_incremental: u32,
+}
+
+impl RapidSyncProcessor {
+       pub fn new() -> Self {
+               let logger = TestLogger::new();
+               let mut initial_sync_complete = false;
+               let arc_logger = Arc::new(logger);
+               let network_graph = if let Ok(file) = File::open(&config::network_graph_cache_path()) {
+                       println!("Initializing from cached network graph…");
+                       let mut buffered_reader = BufReader::new(file);
+                       let network_graph_result = NetworkGraph::read(&mut buffered_reader, Arc::clone(&arc_logger));
+                       if let Ok(network_graph) = network_graph_result {
+                               initial_sync_complete = true;
+                               network_graph.remove_stale_channels();
+                               println!("Initialized from cached network graph!");
+                               network_graph
+                       } else {
+                               println!("Initialization from cached network graph failed: {}", network_graph_result.err().unwrap());
+                               NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash(), arc_logger)
+                       }
+               } else {
+                       NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash(), arc_logger)
+               };
+               let arc_network_graph = Arc::new(network_graph);
+               let (_sync_termination_sender, _sync_termination_receiver) = mpsc::channel::<()>(1);
+               Self {
+                       network_graph: arc_network_graph,
+                       initial_sync_complete: Arc::new(AtomicBool::new(initial_sync_complete)),
+               }
+       }
+
+       pub async fn start_sync(&self) {
+               // means to indicate sync completion status within this module
+               let (sync_completion_sender, mut sync_completion_receiver) = mpsc::channel::<()>(1);
+               let initial_sync_complete = self.initial_sync_complete.clone();
+
+               let network_graph = self.network_graph.clone();
+               let snapshotter = Snapshotter::new(network_graph.clone());
+
+               if config::DOWNLOAD_NEW_GOSSIP {
+
+                       let mut persister = GossipPersister::new(sync_completion_sender, self.network_graph.clone());
+
+                       let persistence_sender = persister.gossip_persistence_sender.clone();
+                       println!("Starting gossip download");
+                       let download_future = tracking::download_gossip(persistence_sender, network_graph.clone());
+                       tokio::spawn(async move {
+                               // initiate the whole download stuff in the background
+                               download_future.await;
+                       });
+                       println!("Starting gossip db persistence listener");
+                       tokio::spawn(async move {
+                               // initiate persistence of the gossip data
+                               let persistence_future = persister.persist_gossip();
+                               persistence_future.await;
+                       });
+
+               } else {
+                       sync_completion_sender.send(()).await.unwrap();
+               }
+
+               {
+                       let sync_completion = sync_completion_receiver.recv().await;
+                       if sync_completion.is_none() {
+                               panic!("Sync failed!");
+                       }
+                       initial_sync_complete.store(true, Ordering::Release);
+                       println!("Initial sync complete!");
+
+                       // start the gossip snapshotting service
+                       snapshotter.snapshot_gossip().await;
+               }
+       }
+
+       pub async fn serialize_delta(&self, last_sync_timestamp: u32, consider_intermediate_updates: bool) -> SerializedResponse {
+               crate::serialize_delta(self.network_graph.clone(), last_sync_timestamp, consider_intermediate_updates).await
+       }
+}
+
+async fn serialize_delta(network_graph: Arc<NetworkGraph<Arc<TestLogger>>>, last_sync_timestamp: u32, consider_intermediate_updates: bool) -> SerializedResponse {
+       let (client, connection) = lookup::connect_to_db().await;
+
+       tokio::spawn(async move {
+               if let Err(e) = connection.await {
+                       panic!("connection error: {}", e);
+               }
+       });
+
+       let mut output: Vec<u8> = vec![];
+
+       // set a flag if the chain hash is prepended
+       // chain hash only necessary if either channel announcements or non-incremental updates are present
+       // for announcement-free incremental-only updates, chain hash can be skipped
+
+       let mut node_id_set: HashSet<[u8; 33]> = HashSet::new();
+       let mut node_id_indices: HashMap<[u8; 33], usize> = HashMap::new();
+       let mut node_ids: Vec<PublicKey> = Vec::new();
+       let mut duplicate_node_ids: i32 = 0;
+
+       let mut get_node_id_index = |node_id: PublicKey| {
+               let serialized_node_id = node_id.serialize();
+               if node_id_set.insert(serialized_node_id) {
+                       node_ids.push(node_id);
+                       let index = node_ids.len() - 1;
+                       node_id_indices.insert(serialized_node_id, index);
+                       return index;
+               }
+               duplicate_node_ids += 1;
+               node_id_indices[&serialized_node_id]
+       };
+
+       let mut delta_set = DeltaSet::new();
+       lookup::fetch_channel_announcements(&mut delta_set, network_graph, &client, last_sync_timestamp).await;
+       println!("announcement channel count: {}", delta_set.len());
+       lookup::fetch_channel_updates(&mut delta_set, &client, last_sync_timestamp, consider_intermediate_updates).await;
+       println!("update-fetched channel count: {}", delta_set.len());
+       lookup::filter_delta_set(&mut delta_set);
+       println!("update-filtered channel count: {}", delta_set.len());
+       let serialization_details = serialization::serialize_delta_set(delta_set, last_sync_timestamp);
+
+       // process announcements
+       // write the number of channel announcements to the output
+       let announcement_count = serialization_details.announcements.len() as u32;
+       announcement_count.write(&mut output).unwrap();
+       let mut previous_announcement_scid = 0;
+       for current_announcement in serialization_details.announcements {
+               let id_index_1 = get_node_id_index(current_announcement.node_id_1);
+               let id_index_2 = get_node_id_index(current_announcement.node_id_2);
+               let mut stripped_announcement = serialization::serialize_stripped_channel_announcement(&current_announcement, id_index_1, id_index_2, previous_announcement_scid);
+               output.append(&mut stripped_announcement);
+
+               previous_announcement_scid = current_announcement.short_channel_id;
+       }
+
+       // process updates
+       let mut previous_update_scid = 0;
+       let update_count = serialization_details.updates.len() as u32;
+       update_count.write(&mut output).unwrap();
+
+       let default_update_values = serialization_details.full_update_defaults;
+       if update_count > 0 {
+               default_update_values.cltv_expiry_delta.write(&mut output).unwrap();
+               default_update_values.htlc_minimum_msat.write(&mut output).unwrap();
+               default_update_values.fee_base_msat.write(&mut output).unwrap();
+               default_update_values.fee_proportional_millionths.write(&mut output).unwrap();
+               default_update_values.htlc_maximum_msat.write(&mut output).unwrap();
+       }
+
+       let mut update_count_full = 0;
+       let mut update_count_incremental = 0;
+       for current_update in serialization_details.updates {
+               match &current_update.mechanism {
+                       UpdateSerializationMechanism::Full => {
+                               update_count_full += 1;
+                       }
+                       UpdateSerializationMechanism::Incremental(_) => {
+                               update_count_incremental += 1;
+                       }
+               };
+
+               let mut stripped_update = serialization::serialize_stripped_channel_update(&current_update, &default_update_values, previous_update_scid);
+               output.append(&mut stripped_update);
+
+               previous_update_scid = current_update.update.short_channel_id;
+       }
+
+       // some stats
+       let message_count = announcement_count + update_count;
+
+       let mut prefixed_output = vec![76, 68, 75, 1];
+
+       // always write the chain hash
+       serialization_details.chain_hash.write(&mut prefixed_output).unwrap();
+       // always write the latest seen timestamp
+       let latest_seen_timestamp = serialization_details.latest_seen;
+       let overflow_seconds = latest_seen_timestamp % config::SNAPSHOT_CALCULATION_INTERVAL;
+       let serialized_seen_timestamp = latest_seen_timestamp.saturating_sub(overflow_seconds);
+       serialized_seen_timestamp.write(&mut prefixed_output).unwrap();
+
+       let node_id_count = node_ids.len() as u32;
+       node_id_count.write(&mut prefixed_output).unwrap();
+
+       for current_node_id in node_ids {
+               current_node_id.write(&mut prefixed_output).unwrap();
+       }
+
+       prefixed_output.append(&mut output);
+
+       println!("duplicated node ids: {}", duplicate_node_ids);
+       println!("latest seen timestamp: {:?}", serialization_details.latest_seen);
+
+       SerializedResponse {
+               data: prefixed_output,
+               message_count,
+               announcement_count,
+               update_count,
+               update_count_full,
+               update_count_incremental,
+       }
+}
diff --git a/src/lookup.rs b/src/lookup.rs
new file mode 100644 (file)
index 0000000..b6e46a3
--- /dev/null
@@ -0,0 +1,283 @@
+use std::collections::{BTreeMap, HashSet};
+use std::io::Cursor;
+use std::ops::Add;
+use std::sync::Arc;
+use std::time::{Duration, Instant, SystemTime};
+
+use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, UnsignedChannelAnnouncement, UnsignedChannelUpdate};
+use lightning::routing::gossip::NetworkGraph;
+use lightning::util::ser::Readable;
+use tokio_postgres::{Client, Connection, NoTls, Socket};
+use tokio_postgres::tls::NoTlsStream;
+
+use crate::{config, hex_utils, TestLogger};
+use crate::serialization::MutatedProperties;
+
+/// The delta set needs to be a BTreeMap so the keys are sorted.
+/// That way, the scids in the response automatically grow monotonically
+pub(super) type DeltaSet = BTreeMap<u64, ChannelDelta>;
+
+pub(super) struct AnnouncementDelta {
+       pub(super) seen: u32,
+       pub(super) announcement: UnsignedChannelAnnouncement,
+}
+
+pub(super) struct UpdateDelta {
+       pub(super) seen: u32,
+       pub(super) update: UnsignedChannelUpdate,
+}
+
+pub(super) struct DirectedUpdateDelta {
+       pub(super) last_update_before_seen: Option<UnsignedChannelUpdate>,
+       pub(super) mutated_properties: MutatedProperties,
+       pub(super) latest_update_after_seen: Option<UpdateDelta>,
+}
+
+pub(super) struct ChannelDelta {
+       pub(super) announcement: Option<AnnouncementDelta>,
+       pub(super) updates: (Option<DirectedUpdateDelta>, Option<DirectedUpdateDelta>),
+       pub(super) first_update_seen: Option<u32>,
+}
+
+impl Default for ChannelDelta {
+       fn default() -> Self {
+               Self { announcement: None, updates: (None, None), first_update_seen: None }
+       }
+}
+
+impl Default for DirectedUpdateDelta {
+       fn default() -> Self {
+               Self {
+                       last_update_before_seen: None,
+                       mutated_properties: MutatedProperties::default(),
+                       latest_update_after_seen: None,
+               }
+       }
+}
+
+pub(super) async fn connect_to_db() -> (Client, Connection<Socket, NoTlsStream>) {
+       let connection_config = config::db_connection_config();
+       connection_config.connect(NoTls).await.unwrap()
+}
+
+/// Fetch all the channel announcements that are presently in the network graph, regardless of
+/// whether they had been seen before.
+/// Also include all announcements for which the first update was announced
+/// after `last_syc_timestamp`
+pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, network_graph: Arc<NetworkGraph<Arc<TestLogger>>>, client: &Client, last_sync_timestamp: u32) {
+       let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64));
+       println!("Obtaining channel ids from network graph");
+       let channel_ids = {
+               let read_only_graph = network_graph.read_only();
+               println!("Retrieved read-only network graph copy");
+               let channel_iterator = read_only_graph.channels().into_iter();
+               channel_iterator
+                       .filter(|c| c.1.announcement_message.is_some())
+                       .map(|c| hex_utils::hex_str(&c.1.announcement_message.clone().unwrap().contents.short_channel_id.to_be_bytes()))
+                       .collect::<Vec<String>>()
+       };
+
+       println!("Obtaining corresponding database entries");
+       // get all the channel announcements that are currently in the network graph
+       let announcement_rows = client.query("SELECT short_channel_id, announcement_signed, seen FROM channel_announcements WHERE short_channel_id = any($1) ORDER BY short_channel_id ASC", &[&channel_ids]).await.unwrap();
+
+       for current_announcement_row in announcement_rows {
+               let blob: Vec<u8> = current_announcement_row.get("announcement_signed");
+               let mut readable = Cursor::new(blob);
+               let unsigned_announcement = ChannelAnnouncement::read(&mut readable).unwrap().contents;
+
+               let scid = unsigned_announcement.short_channel_id;
+               let current_seen_timestamp_object: SystemTime = current_announcement_row.get("seen");
+               let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
+
+               let mut current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
+               (*current_channel_delta).announcement = Some(AnnouncementDelta {
+                       announcement: unsigned_announcement,
+                       seen: current_seen_timestamp,
+               });
+       }
+
+       println!("Obtaining channel announcements whose first channel updates had not been seen yet");
+
+       // here is where the channels whose first update in either direction occurred after
+       // `last_seen_timestamp` are added to the selection
+       let unannounced_rows = client.query("SELECT short_channel_id, blob_signed, seen FROM (SELECT DISTINCT ON (short_channel_id) short_channel_id, blob_signed, seen FROM channel_updates ORDER BY short_channel_id ASC, seen ASC) AS first_seens WHERE first_seens.seen >= $1", &[&last_sync_timestamp_object]).await.unwrap();
+       for current_row in unannounced_rows {
+
+               let blob: Vec<u8> = current_row.get("blob_signed");
+               let mut readable = Cursor::new(blob);
+               let unsigned_update = ChannelUpdate::read(&mut readable).unwrap().contents;
+               let scid = unsigned_update.short_channel_id;
+               let current_seen_timestamp_object: SystemTime = current_row.get("seen");
+               let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
+
+               let mut current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
+               (*current_channel_delta).first_update_seen = Some(current_seen_timestamp);
+       }
+}
+
+pub(super) async fn fetch_channel_updates(delta_set: &mut DeltaSet, client: &Client, last_sync_timestamp: u32, consider_intermediate_updates: bool) {
+       let start = Instant::now();
+       let last_sync_timestamp_object = SystemTime::UNIX_EPOCH.add(Duration::from_secs(last_sync_timestamp as u64));
+
+       // get the latest channel update in each direction prior to last_sync_timestamp, provided
+       // there was an update in either direction that happened after the last sync (to avoid
+       // collecting too many reference updates)
+       let reference_rows = client.query("SELECT DISTINCT ON (short_channel_id, direction) id, short_channel_id, direction, blob_signed FROM channel_updates WHERE seen < $1 AND short_channel_id IN (SELECT short_channel_id FROM channel_updates WHERE seen >= $1 GROUP BY short_channel_id) ORDER BY short_channel_id ASC, direction ASC, seen DESC", &[&last_sync_timestamp_object]).await.unwrap();
+
+       println!("Fetched reference rows ({}): {:?}", reference_rows.len(), start.elapsed());
+
+       let mut last_seen_update_ids: Vec<i32> = Vec::with_capacity(reference_rows.len());
+       let mut non_intermediate_ids: HashSet<i32> = HashSet::new();
+
+       for current_reference in reference_rows {
+               let update_id: i32 = current_reference.get("id");
+               last_seen_update_ids.push(update_id);
+               non_intermediate_ids.insert(update_id);
+
+               let direction: i32 = current_reference.get("direction");
+               let blob: Vec<u8> = current_reference.get("blob_signed");
+               let mut readable = Cursor::new(blob);
+               let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
+               let scid = unsigned_channel_update.short_channel_id;
+
+               let current_channel_delta = delta_set.entry(scid).or_insert(ChannelDelta::default());
+               let mut update_delta = if direction == 0 {
+                       (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
+               } else if direction == 1 {
+                       (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
+               } else {
+                       panic!("Channel direction must be binary!")
+               };
+               update_delta.last_update_before_seen = Some(unsigned_channel_update);
+
+
+       }
+
+       println!("Processed reference rows (delta size: {}): {:?}", delta_set.len(), start.elapsed());
+
+       // get all the intermediate channel updates
+       // (to calculate the set of mutated fields for snapshotting, where intermediate updates may
+       // have been omitted)
+
+       let mut intermediate_update_prefix = "";
+       if !consider_intermediate_updates {
+               intermediate_update_prefix = "DISTINCT ON (short_channel_id, direction)";
+       }
+
+       let query_string = format!("SELECT {} id, short_channel_id, direction, blob_signed, seen FROM channel_updates WHERE seen >= $1 ORDER BY short_channel_id ASC, direction ASC, seen DESC", intermediate_update_prefix);
+       let intermediate_updates = client.query(&query_string, &[&last_sync_timestamp_object]).await.unwrap();
+       println!("Fetched intermediate rows ({}): {:?}", intermediate_updates.len(), start.elapsed());
+
+       let mut previous_scid = u64::MAX;
+       let mut previously_seen_directions = (false, false);
+
+       // let mut previously_seen_directions = (false, false);
+       let mut intermediate_update_count = 0;
+       for intermediate_update in intermediate_updates {
+               let update_id: i32 = intermediate_update.get("id");
+               if non_intermediate_ids.contains(&update_id) {
+                       continue;
+               }
+               intermediate_update_count += 1;
+
+               let direction: i32 = intermediate_update.get("direction");
+               let current_seen_timestamp_object: SystemTime = intermediate_update.get("seen");
+               let current_seen_timestamp: u32 = current_seen_timestamp_object.duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as u32;
+               let blob: Vec<u8> = intermediate_update.get("blob_signed");
+               let mut readable = Cursor::new(blob);
+               let unsigned_channel_update = ChannelUpdate::read(&mut readable).unwrap().contents;
+
+               let scid = unsigned_channel_update.short_channel_id;
+               if scid != previous_scid {
+                       previous_scid = scid.clone();
+                       previously_seen_directions = (false, false);
+               }
+
+               // get the write configuration for this particular channel's directional details
+               let current_channel_delta = delta_set.entry(scid.clone()).or_insert(ChannelDelta::default());
+               let update_delta = if direction == 0 {
+                       (*current_channel_delta).updates.0.get_or_insert(DirectedUpdateDelta::default())
+               } else if direction == 1 {
+                       (*current_channel_delta).updates.1.get_or_insert(DirectedUpdateDelta::default())
+               } else {
+                       panic!("Channel direction must be binary!")
+               };
+
+               {
+                       // handle the latest deltas
+                       if direction == 0 && !previously_seen_directions.0 {
+                               previously_seen_directions.0 = true;
+                               update_delta.latest_update_after_seen = Some(UpdateDelta {
+                                       seen: current_seen_timestamp,
+                                       update: unsigned_channel_update.clone(),
+                               });
+                       } else if direction == 1 && !previously_seen_directions.1 {
+                               previously_seen_directions.1 = true;
+                               update_delta.latest_update_after_seen = Some(UpdateDelta {
+                                       seen: current_seen_timestamp,
+                                       update: unsigned_channel_update.clone(),
+                               });
+                       }
+               }
+
+               // determine mutations
+               if let Some(last_seen_update) = update_delta.last_update_before_seen.as_ref(){
+                       if unsigned_channel_update.flags != last_seen_update.flags {
+                               update_delta.mutated_properties.flags = true;
+                       }
+                       if unsigned_channel_update.cltv_expiry_delta != last_seen_update.cltv_expiry_delta {
+                               update_delta.mutated_properties.cltv_expiry_delta = true;
+                       }
+                       if unsigned_channel_update.htlc_minimum_msat != last_seen_update.htlc_minimum_msat {
+                               update_delta.mutated_properties.htlc_minimum_msat = true;
+                       }
+                       if unsigned_channel_update.fee_base_msat != last_seen_update.fee_base_msat {
+                               update_delta.mutated_properties.fee_base_msat = true;
+                       }
+                       if unsigned_channel_update.fee_proportional_millionths != last_seen_update.fee_proportional_millionths {
+                               update_delta.mutated_properties.fee_proportional_millionths = true;
+                       }
+                       if unsigned_channel_update.htlc_maximum_msat != last_seen_update.htlc_maximum_msat {
+                               update_delta.mutated_properties.htlc_maximum_msat = true;
+                       }
+               }
+
+       }
+       println!("Processed intermediate rows ({}) (delta size: {}): {:?}", intermediate_update_count, delta_set.len(), start.elapsed());
+}
+
+pub(super) fn filter_delta_set(delta_set: &mut DeltaSet) {
+       let original_length = delta_set.len();
+       let keys: Vec<u64> = delta_set.keys().cloned().collect();
+       for k in keys {
+               let v = delta_set.get(&k).unwrap();
+               if v.announcement.is_none() {
+                       // this channel is not currently in the network graph
+                       delta_set.remove(&k);
+                       continue;
+               }
+
+               let update_meets_criteria = |update: &Option<DirectedUpdateDelta>| {
+                       if update.is_none() {
+                               return false;
+                       };
+                       let update_reference = update.as_ref().unwrap();
+                       // update_reference.latest_update_after_seen.is_some() && !update_reference.intermediate_updates.is_empty()
+                       // if there has been an update after the channel was first seen
+                       update_reference.latest_update_after_seen.is_some()
+               };
+
+               let direction_a_meets_criteria = update_meets_criteria(&v.updates.0);
+               let direction_b_meets_criteria = update_meets_criteria(&v.updates.1);
+
+               if !direction_a_meets_criteria && !direction_b_meets_criteria {
+                       delta_set.remove(&k);
+               }
+       }
+
+       let new_length = delta_set.len();
+       if original_length != new_length {
+               println!("length modified!");
+       }
+}
diff --git a/src/main.rs b/src/main.rs
new file mode 100644 (file)
index 0000000..3e36edf
--- /dev/null
@@ -0,0 +1,7 @@
+use rapid_gossip_sync_server::RapidSyncProcessor;
+
+#[tokio::main]
+async fn main() {
+    let processor = RapidSyncProcessor::new();
+       processor.start_sync().await;
+}
diff --git a/src/persistence.rs b/src/persistence.rs
new file mode 100644 (file)
index 0000000..ded79b5
--- /dev/null
@@ -0,0 +1,253 @@
+use std::fs::OpenOptions;
+use std::io::BufWriter;
+use std::sync::Arc;
+use std::time::Instant;
+use lightning::routing::gossip::NetworkGraph;
+use lightning::util::ser::Writeable;
+use tokio::sync::mpsc;
+use tokio_postgres::NoTls;
+
+use crate::{config, hex_utils, TestLogger};
+use crate::types::{DetectedGossipMessage, GossipMessage};
+
+pub(crate) struct GossipPersister {
+       pub(crate) gossip_persistence_sender: mpsc::Sender<DetectedGossipMessage>,
+       gossip_persistence_receiver: mpsc::Receiver<DetectedGossipMessage>,
+       server_sync_completion_sender: mpsc::Sender<()>,
+       network_graph: Arc<NetworkGraph<Arc<TestLogger>>>,
+}
+
+impl GossipPersister {
+       pub fn new(server_sync_completion_sender: mpsc::Sender<()>, network_graph: Arc<NetworkGraph<Arc<TestLogger>>>) -> Self {
+               let (gossip_persistence_sender, gossip_persistence_receiver) =
+                       mpsc::channel::<DetectedGossipMessage>(10000);
+               GossipPersister {
+                       gossip_persistence_sender,
+                       gossip_persistence_receiver,
+                       server_sync_completion_sender,
+                       network_graph
+               }
+       }
+
+       pub(crate) async fn persist_gossip(&mut self) {
+               let connection_config = config::db_connection_config();
+               let (client, connection) =
+                       connection_config.connect(NoTls).await.unwrap();
+
+               tokio::spawn(async move {
+                       if let Err(e) = connection.await {
+                               panic!("connection error: {}", e);
+                       }
+               });
+
+               {
+                       // initialize the database
+                       let initialization = client
+                               .execute(config::db_config_table_creation_query(), &[])
+                               .await;
+                       if let Err(initialization_error) = initialization {
+                               panic!("db init error: {}", initialization_error);
+                       }
+
+                       let initialization = client
+                               .execute(
+                                       // TODO: figure out a way to fix the id value without Postgres complaining about
+                                       // its value not being default
+                                       "INSERT INTO config (id, db_schema) VALUES ($1, $2) ON CONFLICT (id) DO NOTHING",
+                                       &[&1, &config::SCHEMA_VERSION]
+                               ).await;
+                       if let Err(initialization_error) = initialization {
+                               panic!("db init error: {}", initialization_error);
+                       }
+
+                       let initialization = client
+                               .execute(config::db_announcement_table_creation_query(), &[])
+                               .await;
+                       if let Err(initialization_error) = initialization {
+                               panic!("db init error: {}", initialization_error);
+                       }
+
+                       let initialization = client
+                               .execute(
+                                       config::db_channel_update_table_creation_query(),
+                                       &[],
+                               )
+                               .await;
+                       if let Err(initialization_error) = initialization {
+                               panic!("db init error: {}", initialization_error);
+                       }
+
+                       let initialization = client
+                               .batch_execute(config::db_index_creation_query())
+                               .await;
+                       if let Err(initialization_error) = initialization {
+                               panic!("db init error: {}", initialization_error);
+                       }
+               }
+
+               // print log statement every 10,000 messages
+               let mut persistence_log_threshold = 10000;
+               let mut i = 0u32;
+               let mut server_sync_completion_sent = false;
+               let mut latest_graph_cache_time: Option<Instant> = None;
+               // TODO: it would be nice to have some sort of timeout here so after 10 seconds of
+               // inactivity, some sort of message could be broadcast signaling the activation of request
+               // processing
+               while let Some(detected_gossip_message) = &self.gossip_persistence_receiver.recv().await {
+                       i += 1; // count the persisted gossip messages
+
+                       if i == 1 || i % persistence_log_threshold == 0 {
+                               println!("Persisting gossip message #{}", i);
+                       }
+
+                       if let Some(last_cache_time) = latest_graph_cache_time {
+                               // has it been ten minutes? Just cache it
+                               if last_cache_time.elapsed().as_secs() >= 600 {
+                                       self.persist_network_graph();
+                                       latest_graph_cache_time = Some(Instant::now());
+                               }
+                       } else {
+                               // initialize graph cache timer
+                               latest_graph_cache_time = Some(Instant::now());
+                       }
+
+                       match &detected_gossip_message.message {
+                               GossipMessage::InitialSyncComplete => {
+                                       // signal to the server that it may now serve dynamic responses and calculate
+                                       // snapshots
+                                       // we take this detour through the persister to ensure that all previous
+                                       // messages have already been persisted to the database
+                                       println!("Persister caught up with gossip!");
+                                       i -= 1; // this wasn't an actual gossip message that needed persisting
+                                       persistence_log_threshold = 50;
+                                       if !server_sync_completion_sent {
+                                               server_sync_completion_sent = true;
+                                               self.server_sync_completion_sender.send(()).await.unwrap();
+                                               println!("Server has been notified of persistence completion.");
+                                       }
+
+                                       // now, cache the persisted network graph
+                                       // also persist the network graph here
+                                       let mut too_soon = false;
+                                       if let Some(latest_graph_cache_time) = latest_graph_cache_time {
+                                               let time_since_last_cached = latest_graph_cache_time.elapsed().as_secs();
+                                               // don't cache more frequently than every 2 minutes
+                                               too_soon = time_since_last_cached < 120;
+                                       }
+                                       if too_soon {
+                                               println!("Network graph has been cached too recently.");
+                                       }else {
+                                               latest_graph_cache_time = Some(Instant::now());
+                                               self.persist_network_graph();
+                                       }
+                               }
+                               GossipMessage::ChannelAnnouncement(announcement) => {
+
+                                       let scid = announcement.contents.short_channel_id;
+                                       let scid_hex = hex_utils::hex_str(&scid.to_be_bytes());
+                                       // scid is 8 bytes
+                                       // block height is the first three bytes
+                                       // to obtain block height, shift scid right by 5 bytes (40 bits)
+                                       let block_height = (scid >> 5 * 8) as i32;
+                                       let chain_hash = announcement.contents.chain_hash.as_ref();
+                                       let chain_hash_hex = hex_utils::hex_str(chain_hash);
+
+                                       // start with the type prefix, which is already known a priori
+                                       let mut announcement_signed = Vec::new(); // vec![1, 0];
+                                       announcement.write(&mut announcement_signed).unwrap();
+
+                                       let result = client
+                                               .execute("INSERT INTO channel_announcements (\
+                                                       short_channel_id, \
+                                                       block_height, \
+                                                       chain_hash, \
+                                                       announcement_signed \
+                                               ) VALUES ($1, $2, $3, $4) ON CONFLICT (short_channel_id) DO NOTHING", &[
+                                                       &scid_hex,
+                                                       &block_height,
+                                                       &chain_hash_hex,
+                                                       &announcement_signed
+                                               ]).await;
+                                       if result.is_err() {
+                                               panic!("error: {}", result.err().unwrap());
+                                       }
+                               }
+                               GossipMessage::ChannelUpdate(update) => {
+                                       let scid = update.contents.short_channel_id;
+                                       let scid_hex = hex_utils::hex_str(&scid.to_be_bytes());
+
+                                       let chain_hash = update.contents.chain_hash.as_ref();
+                                       let chain_hash_hex = hex_utils::hex_str(chain_hash);
+
+                                       let timestamp = update.contents.timestamp as i64;
+
+                                       let channel_flags = update.contents.flags as i32;
+                                       let direction = channel_flags & 1;
+                                       let disable = (channel_flags & 2) > 0;
+
+                                       let composite_index = format!("{}:{}:{}", scid_hex, timestamp, direction);
+
+                                       let cltv_expiry_delta = update.contents.cltv_expiry_delta as i32;
+                                       let htlc_minimum_msat = update.contents.htlc_minimum_msat as i64;
+                                       let fee_base_msat = update.contents.fee_base_msat as i32;
+                                       let fee_proportional_millionths =
+                                               update.contents.fee_proportional_millionths as i32;
+                                       let htlc_maximum_msat = update.contents.htlc_maximum_msat as i64;
+
+                                       // start with the type prefix, which is already known a priori
+                                       let mut update_signed = Vec::new(); // vec![1, 2];
+                                       update.write(&mut update_signed).unwrap();
+
+                                       let result = client
+                                               .execute("INSERT INTO channel_updates (\
+                                                       composite_index, \
+                                                       chain_hash, \
+                                                       short_channel_id, \
+                                                       timestamp, \
+                                                       channel_flags, \
+                                                       direction, \
+                                                       disable, \
+                                                       cltv_expiry_delta, \
+                                                       htlc_minimum_msat, \
+                                                       fee_base_msat, \
+                                                       fee_proportional_millionths, \
+                                                       htlc_maximum_msat, \
+                                                       blob_signed \
+                                               ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)  ON CONFLICT (composite_index) DO NOTHING", &[
+                                                       &composite_index,
+                                                       &chain_hash_hex,
+                                                       &scid_hex,
+                                                       &timestamp,
+                                                       &channel_flags,
+                                                       &direction,
+                                                       &disable,
+                                                       &cltv_expiry_delta,
+                                                       &htlc_minimum_msat,
+                                                       &fee_base_msat,
+                                                       &fee_proportional_millionths,
+                                                       &htlc_maximum_msat,
+                                                       &update_signed
+                                               ]).await;
+                                       if result.is_err() {
+                                               panic!("error: {}", result.err().unwrap());
+                                       }
+                               }
+                       }
+               }
+       }
+
+       fn persist_network_graph(&self) {
+               println!("Caching network graph…");
+               let cache_path = config::network_graph_cache_path();
+               let file = OpenOptions::new()
+                       .create(true)
+                       .write(true)
+                       .truncate(true)
+                       .open(&cache_path)
+                       .unwrap();
+               self.network_graph.remove_stale_channels();
+               let mut writer = BufWriter::new(file);
+               self.network_graph.write(&mut writer).unwrap();
+               println!("Cached network graph!");
+       }
+}
diff --git a/src/serialization.rs b/src/serialization.rs
new file mode 100644 (file)
index 0000000..847c754
--- /dev/null
@@ -0,0 +1,305 @@
+use std::cmp::max;
+use std::collections::HashMap;
+
+use bitcoin::BlockHash;
+use lightning::ln::msgs::{UnsignedChannelAnnouncement, UnsignedChannelUpdate};
+use lightning::util::ser::{BigSize, Writeable};
+
+use crate::lookup::{DeltaSet, DirectedUpdateDelta};
+
+pub(super) struct SerializationSet {
+       pub(super) announcements: Vec<UnsignedChannelAnnouncement>,
+       pub(super) updates: Vec<UpdateSerialization>,
+       pub(super) full_update_defaults: DefaultUpdateValues,
+       pub(super) latest_seen: u32,
+       pub(super) chain_hash: BlockHash,
+}
+
+pub(super) struct DefaultUpdateValues {
+       pub(super) cltv_expiry_delta: u16,
+       pub(super) htlc_minimum_msat: u64,
+       pub(super) fee_base_msat: u32,
+       pub(super) fee_proportional_millionths: u32,
+       pub(super) htlc_maximum_msat: u64,
+}
+
+impl Default for DefaultUpdateValues {
+       fn default() -> Self {
+               Self {
+                       cltv_expiry_delta: 0,
+                       htlc_minimum_msat: 0,
+                       fee_base_msat: 0,
+                       fee_proportional_millionths: 0,
+                       htlc_maximum_msat: 0,
+               }
+       }
+}
+
+pub(super) struct MutatedProperties {
+       pub(super) flags: bool,
+       pub(super) cltv_expiry_delta: bool,
+       pub(super) htlc_minimum_msat: bool,
+       pub(super) fee_base_msat: bool,
+       pub(super) fee_proportional_millionths: bool,
+       pub(super) htlc_maximum_msat: bool,
+}
+
+impl Default for MutatedProperties {
+       fn default() -> Self {
+               Self {
+                       flags: false,
+                       cltv_expiry_delta: false,
+                       htlc_minimum_msat: false,
+                       fee_base_msat: false,
+                       fee_proportional_millionths: false,
+                       htlc_maximum_msat: false,
+               }
+       }
+}
+
+pub(super) struct UpdateSerialization {
+       pub(super) update: UnsignedChannelUpdate,
+       pub(super) mechanism: UpdateSerializationMechanism,
+}
+
+impl MutatedProperties {
+       /// Does not include flags because the flag byte is always sent in full
+       fn len(&self) -> u8 {
+               let mut mutations = 0;
+               if self.cltv_expiry_delta { mutations += 1; };
+               if self.htlc_minimum_msat { mutations += 1; };
+               if self.fee_base_msat { mutations += 1; };
+               if self.fee_proportional_millionths { mutations += 1; };
+               if self.htlc_maximum_msat { mutations += 1; };
+               mutations
+       }
+}
+
+pub(super) enum UpdateSerializationMechanism {
+       Full,
+       Incremental(MutatedProperties),
+}
+
+struct FullUpdateValueHistograms {
+       cltv_expiry_delta: HashMap<u16, usize>,
+       htlc_minimum_msat: HashMap<u64, usize>,
+       fee_base_msat: HashMap<u32, usize>,
+       fee_proportional_millionths: HashMap<u32, usize>,
+       htlc_maximum_msat: HashMap<u64, usize>,
+}
+
+pub(super) fn serialize_delta_set(delta_set: DeltaSet, last_sync_timestamp: u32) -> SerializationSet {
+       let mut serialization_set = SerializationSet {
+               announcements: vec![],
+               updates: vec![],
+               full_update_defaults: Default::default(),
+               chain_hash: Default::default(),
+               latest_seen: 0,
+       };
+
+       let mut chain_hash_set = false;
+
+       let mut full_update_histograms = FullUpdateValueHistograms {
+               cltv_expiry_delta: Default::default(),
+               htlc_minimum_msat: Default::default(),
+               fee_base_msat: Default::default(),
+               fee_proportional_millionths: Default::default(),
+               htlc_maximum_msat: Default::default(),
+       };
+
+       let mut record_full_update_in_histograms = |full_update: &UnsignedChannelUpdate| {
+               *full_update_histograms.cltv_expiry_delta.entry(full_update.cltv_expiry_delta).or_insert(0) += 1;
+               *full_update_histograms.htlc_minimum_msat.entry(full_update.htlc_minimum_msat).or_insert(0) += 1;
+               *full_update_histograms.fee_base_msat.entry(full_update.fee_base_msat).or_insert(0) += 1;
+               *full_update_histograms.fee_proportional_millionths.entry(full_update.fee_proportional_millionths).or_insert(0) += 1;
+               *full_update_histograms.htlc_maximum_msat.entry(full_update.htlc_maximum_msat).or_insert(0) += 1;
+       };
+
+       for (_scid, channel_delta) in delta_set.into_iter() {
+
+               // any announcement chain hash is gonna be the same value. Just set it from the first one.
+               let channel_announcement_delta = channel_delta.announcement.as_ref().unwrap();
+               if !chain_hash_set {
+                       chain_hash_set = true;
+                       serialization_set.chain_hash = channel_announcement_delta.announcement.chain_hash.clone();
+               }
+
+               let current_announcement_seen = channel_announcement_delta.seen;
+               let is_new_announcement = current_announcement_seen >= last_sync_timestamp;
+               let is_newly_updated_announcement = if let Some(first_update_seen) = channel_delta.first_update_seen {
+                       first_update_seen >= last_sync_timestamp
+               } else {
+                       false
+               };
+               let send_announcement = is_new_announcement || is_newly_updated_announcement;
+               if send_announcement {
+                       serialization_set.latest_seen = max(serialization_set.latest_seen, current_announcement_seen);
+                       serialization_set.announcements.push(channel_delta.announcement.unwrap().announcement);
+               }
+
+               let direction_a_updates = channel_delta.updates.0;
+               let direction_b_updates = channel_delta.updates.1;
+
+               let mut categorize_directed_update_serialization = |directed_updates: Option<DirectedUpdateDelta>| {
+                       if let Some(updates) = directed_updates {
+                               if let Some(latest_update_delta) = updates.latest_update_after_seen {
+                                       let latest_update = latest_update_delta.update;
+
+                                       // the returned seen timestamp should be the latest of all the returned
+                                       // announcements and latest updates
+                                       serialization_set.latest_seen = max(serialization_set.latest_seen, latest_update_delta.seen);
+
+                                       if updates.last_update_before_seen.is_some() {
+                                               let mutated_properties = updates.mutated_properties;
+                                               if mutated_properties.len() == 5 {
+                                                       // all five values have changed, it makes more sense to just
+                                                       // serialize the update as a full update instead of as a change
+                                                       // this way, the default values can be computed more efficiently
+                                                       record_full_update_in_histograms(&latest_update);
+                                                       serialization_set.updates.push(UpdateSerialization {
+                                                               update: latest_update,
+                                                               mechanism: UpdateSerializationMechanism::Full,
+                                                       });
+                                               } else if mutated_properties.len() > 0 || mutated_properties.flags {
+                                                       // we don't count flags as mutated properties
+                                                       serialization_set.updates.push(UpdateSerialization {
+                                                               update: latest_update,
+                                                               mechanism: UpdateSerializationMechanism::Incremental(mutated_properties),
+                                                       });
+                                               }
+                                       } else {
+                                               // serialize the full update
+                                               record_full_update_in_histograms(&latest_update);
+                                               serialization_set.updates.push(UpdateSerialization {
+                                                       update: latest_update,
+                                                       mechanism: UpdateSerializationMechanism::Full,
+                                               });
+                                       }
+                               }
+                       };
+               };
+
+               categorize_directed_update_serialization(direction_a_updates);
+               categorize_directed_update_serialization(direction_b_updates);
+       }
+
+       let default_update_values = DefaultUpdateValues {
+               cltv_expiry_delta: find_most_common_histogram_entry_with_default(full_update_histograms.cltv_expiry_delta, 0),
+               htlc_minimum_msat: find_most_common_histogram_entry_with_default(full_update_histograms.htlc_minimum_msat, 0),
+               fee_base_msat: find_most_common_histogram_entry_with_default(full_update_histograms.fee_base_msat, 0),
+               fee_proportional_millionths: find_most_common_histogram_entry_with_default(full_update_histograms.fee_proportional_millionths, 0),
+               htlc_maximum_msat: find_most_common_histogram_entry_with_default(full_update_histograms.htlc_maximum_msat, 0),
+       };
+
+       serialization_set.full_update_defaults = default_update_values;
+       serialization_set
+}
+
+pub fn serialize_stripped_channel_announcement(announcement: &UnsignedChannelAnnouncement, node_id_a_index: usize, node_id_b_index: usize, previous_scid: u64) -> Vec<u8> {
+       let mut stripped_announcement = vec![];
+
+       announcement.features.write(&mut stripped_announcement).unwrap();
+
+       if previous_scid > announcement.short_channel_id {
+               panic!("unsorted scids!");
+       }
+       let scid_delta = BigSize(announcement.short_channel_id - previous_scid);
+       scid_delta.write(&mut stripped_announcement).unwrap();
+
+       // write indices of node ids rather than the node IDs themselves
+       BigSize(node_id_a_index as u64).write(&mut stripped_announcement).unwrap();
+       BigSize(node_id_b_index as u64).write(&mut stripped_announcement).unwrap();
+
+       // println!("serialized CA: {}, \n{:?}\n{:?}\n", announcement.short_channel_id, announcement.node_id_1, announcement.node_id_2);
+       stripped_announcement
+}
+
+pub(super) fn serialize_stripped_channel_update(update: &UpdateSerialization, default_values: &DefaultUpdateValues, previous_scid: u64) -> Vec<u8> {
+       let latest_update = &update.update;
+       let mut serialized_flags = latest_update.flags;
+
+       if previous_scid > latest_update.short_channel_id {
+               panic!("unsorted scids!");
+       }
+
+       let mut delta_serialization = Vec::new();
+       let mut prefixed_serialization = Vec::new();
+
+       match &update.mechanism {
+               UpdateSerializationMechanism::Full => {
+                       if latest_update.cltv_expiry_delta != default_values.cltv_expiry_delta {
+                               serialized_flags |= 0b_0100_0000;
+                               latest_update.cltv_expiry_delta.write(&mut delta_serialization).unwrap();
+                       }
+
+                       if latest_update.htlc_minimum_msat != default_values.htlc_minimum_msat {
+                               serialized_flags |= 0b_0010_0000;
+                               latest_update.htlc_minimum_msat.write(&mut delta_serialization).unwrap();
+                       }
+
+                       if latest_update.fee_base_msat != default_values.fee_base_msat {
+                               serialized_flags |= 0b_0001_0000;
+                               latest_update.fee_base_msat.write(&mut delta_serialization).unwrap();
+                       }
+
+                       if latest_update.fee_proportional_millionths != default_values.fee_proportional_millionths {
+                               serialized_flags |= 0b_0000_1000;
+                               latest_update.fee_proportional_millionths.write(&mut delta_serialization).unwrap();
+                       }
+
+                       if latest_update.htlc_maximum_msat != default_values.htlc_maximum_msat {
+                               serialized_flags |= 0b_0000_0100;
+                               latest_update.htlc_maximum_msat.write(&mut delta_serialization).unwrap();
+                       }
+               }
+
+               UpdateSerializationMechanism::Incremental(mutated_properties) => {
+                       // indicate that this update is incremental
+                       serialized_flags |= 0b_1000_0000;
+
+                       if mutated_properties.cltv_expiry_delta {
+                               serialized_flags |= 0b_0100_0000;
+                               latest_update.cltv_expiry_delta.write(&mut delta_serialization).unwrap();
+                       }
+
+                       if mutated_properties.htlc_minimum_msat {
+                               serialized_flags |= 0b_0010_0000;
+                               latest_update.htlc_minimum_msat.write(&mut delta_serialization).unwrap();
+                       }
+
+                       if mutated_properties.fee_base_msat {
+                               serialized_flags |= 0b_0001_0000;
+                               latest_update.fee_base_msat.write(&mut delta_serialization).unwrap();
+                       }
+
+                       if mutated_properties.fee_proportional_millionths {
+                               serialized_flags |= 0b_0000_1000;
+                               latest_update.fee_proportional_millionths.write(&mut delta_serialization).unwrap();
+                       }
+
+                       if mutated_properties.htlc_maximum_msat {
+                               serialized_flags |= 0b_0000_0100;
+                               latest_update.htlc_maximum_msat.write(&mut delta_serialization).unwrap();
+                       }
+               }
+       }
+       let scid_delta = BigSize(latest_update.short_channel_id - previous_scid);
+       scid_delta.write(&mut prefixed_serialization).unwrap();
+
+       serialized_flags.write(&mut prefixed_serialization).unwrap();
+       prefixed_serialization.append(&mut delta_serialization);
+
+       prefixed_serialization
+}
+
+pub(super) fn find_most_common_histogram_entry_with_default<T: Copy>(histogram: HashMap<T, usize>, default: T) -> T {
+       let most_frequent_entry = histogram.iter().max_by(|a, b| a.1.cmp(&b.1));
+       if let Some(entry_details) = most_frequent_entry {
+               // .0 is the value
+               // .1 is the frequency
+               return entry_details.0.to_owned();
+       }
+       // the default should pretty much always be a 0 as T
+       // though for htlc maximum msat it could be a u64::max
+       default
+}
diff --git a/src/snapshot.rs b/src/snapshot.rs
new file mode 100644 (file)
index 0000000..93be840
--- /dev/null
@@ -0,0 +1,135 @@
+use std::collections::HashMap;
+use std::fs;
+use std::os::unix::fs::symlink;
+use std::sync::Arc;
+use std::time::{Duration, SystemTime, UNIX_EPOCH};
+
+use lightning::routing::gossip::NetworkGraph;
+
+use crate::{config, TestLogger};
+
+pub(crate) struct Snapshotter {
+       network_graph: Arc<NetworkGraph<Arc<TestLogger>>>,
+}
+
+impl Snapshotter {
+       pub fn new(network_graph: Arc<NetworkGraph<Arc<TestLogger>>>) -> Self {
+               Self { network_graph }
+       }
+
+       pub(crate) async fn snapshot_gossip(&self) {
+               println!("Initiating snapshotting service");
+
+               let snapshot_sync_day_factors = [1, 2, 3, 4, 5, 6, 7, 14, 21, u64::MAX];
+               let round_day_seconds = config::SNAPSHOT_CALCULATION_INTERVAL as u64;
+
+               let pending_snapshot_directory = "./res/snapshots_pending";
+               let pending_symlink_directory = "./res/symlinks_pending";
+               let finalized_snapshot_directory = "./res/snapshots";
+               let finalized_symlink_directory = "./res/symlinks";
+               let relative_symlink_to_snapshot_path = "../snapshots";
+
+               // this is gonna be a never-ending background job
+               loop {
+                       // 1. get the current timestamp
+                       let timestamp_seen = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
+                       let filename_timestamp = Self::round_down_to_nearest_multiple(timestamp_seen, round_day_seconds);
+                       println!("Capturing snapshots at {} for: {}", timestamp_seen, filename_timestamp);
+
+                       // 2. sleep until the next round 24 hours
+                       // 3. refresh all snapshots
+
+                       // the stored snapshots should adhere to the following format
+                       // from one day ago
+                       // from two days ago
+                       // …
+                       // from a week ago
+                       // from two weeks ago
+                       // from three weeks ago
+                       // full
+                       // That means that at any given moment, there should only ever be
+                       // 6 (daily) + 3 (weekly) + 1 (total) = 10 cached snapshots
+                       // The snapshots, unlike dynamic updates, should account for all intermediate
+                       // channel updates
+                       //
+
+
+                       // purge and recreate the pending directories
+                       if fs::metadata(&pending_snapshot_directory).is_ok(){
+                               fs::remove_dir_all(&pending_snapshot_directory).expect("Failed to remove pending snapshot directory.");
+                       }
+                       if fs::metadata(&pending_symlink_directory).is_ok(){
+                               fs::remove_dir_all(&pending_symlink_directory).expect("Failed to remove pending symlink directory.");
+                       }
+                       fs::create_dir_all(&pending_snapshot_directory).expect("Failed to create pending snapshot directory");
+                       fs::create_dir_all(&pending_symlink_directory).expect("Failed to create pending symlink directory");
+
+                       let mut snapshot_sync_timestamps: Vec<(u64, u64)> = Vec::new();
+                       for factor in &snapshot_sync_day_factors {
+                               // basically timestamp - day_seconds * factor
+                               let timestamp = timestamp_seen.saturating_sub(round_day_seconds.saturating_mul(factor.clone()));
+                               snapshot_sync_timestamps.push((factor.clone(), timestamp));
+                       };
+
+                       let mut snapshot_filenames_by_day_range: HashMap<u64, String> = HashMap::with_capacity(10);
+
+                       for (day_range, current_last_sync_timestamp) in &snapshot_sync_timestamps {
+                               let network_graph_clone = self.network_graph.clone();
+                               {
+                                       println!("Calculating {}-day snapshot", day_range);
+                                       // calculate the snapshot
+                                       let snapshot = super::serialize_delta(network_graph_clone, current_last_sync_timestamp.clone() as u32, true).await;
+
+                                       // persist the snapshot and update the symlink
+                                       let snapshot_filename = format!("snapshot__calculated-at:{}__range:{}-days__previous-sync:{}.lngossip", filename_timestamp, day_range, current_last_sync_timestamp);
+                                       let snapshot_path = format!("{}/{}", pending_snapshot_directory, snapshot_filename);
+                                       println!("Persisting {}-day snapshot: {} ({} messages, {} announcements, {} updates ({} full, {} incremental))", day_range, snapshot_filename, snapshot.message_count, snapshot.announcement_count, snapshot.update_count, snapshot.update_count_full, snapshot.update_count_incremental);
+                                       fs::write(&snapshot_path, snapshot.data).unwrap();
+                                       snapshot_filenames_by_day_range.insert(day_range.clone(), snapshot_filename);
+                               }
+                       }
+
+                       for i in 1..10_001u64 {
+                               // let's create symlinks
+
+                               // first, determine which snapshot range should be referenced
+                               // find min(x) in snapshot_sync_day_factors where x >= i
+                               let referenced_day_range = snapshot_sync_day_factors.iter().find(|x| {
+                                       x >= &&i
+                               }).unwrap().clone();
+
+                               let snapshot_filename = snapshot_filenames_by_day_range.get(&referenced_day_range).unwrap();
+                               let simulated_last_sync_timestamp = timestamp_seen.saturating_sub(round_day_seconds.saturating_mul(i));
+                               let relative_snapshot_path = format!("{}/{}", relative_symlink_to_snapshot_path, snapshot_filename);
+                               let canonical_last_sync_timestamp = Self::round_down_to_nearest_multiple(simulated_last_sync_timestamp, round_day_seconds);
+                               let symlink_path = format!("{}/{}.bin", pending_symlink_directory, canonical_last_sync_timestamp);
+
+                               println!("Symlinking: {} -> {} ({} -> {}", i, referenced_day_range, symlink_path, relative_snapshot_path);
+                               symlink(&relative_snapshot_path, &symlink_path).unwrap();
+                       }
+
+                       if fs::metadata(&finalized_snapshot_directory).is_ok(){
+                               fs::remove_dir_all(&finalized_snapshot_directory).expect("Failed to remove finalized snapshot directory.");
+                       }
+                       if fs::metadata(&finalized_symlink_directory).is_ok(){
+                               fs::remove_dir_all(&finalized_symlink_directory).expect("Failed to remove pending symlink directory.");
+                       }
+                       fs::rename(&pending_snapshot_directory, &finalized_snapshot_directory).expect("Failed to finalize snapshot directory.");
+                       fs::rename(&pending_symlink_directory, &finalized_symlink_directory).expect("Failed to finalize symlink directory.");
+
+
+                       let remainder = timestamp_seen % round_day_seconds;
+                       let time_until_next_day = round_day_seconds - remainder;
+
+                       println!("Sleeping until next snapshot capture: {}s", time_until_next_day);
+                       // add in an extra five seconds to assure the rounding down works correctly
+                       let sleep = tokio::time::sleep(Duration::from_secs(time_until_next_day + 5));
+                       sleep.await;
+               }
+       }
+
+       fn round_down_to_nearest_multiple(number: u64, multiple: u64) -> u64 {
+               let round_multiple_delta = number % multiple;
+               number - round_multiple_delta
+       }
+}
diff --git a/src/tracking.rs b/src/tracking.rs
new file mode 100644 (file)
index 0000000..9c24923
--- /dev/null
@@ -0,0 +1,183 @@
+use std::net::SocketAddr;
+use std::sync::{Arc, RwLock};
+use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
+
+use bitcoin::hashes::hex::ToHex;
+use bitcoin::secp256k1::{PublicKey, SecretKey};
+use futures::executor;
+use lightning;
+use lightning::ln::peer_handler::{
+       ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager,
+};
+use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
+use rand::{Rng, thread_rng};
+use tokio::sync::mpsc;
+
+use crate::{config, TestLogger};
+use crate::downloader::{GossipCounter, GossipRouter};
+use crate::types::{DetectedGossipMessage, GossipChainAccess, GossipMessage, GossipPeerManager};
+use crate::verifier::ChainVerifier;
+
+pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<DetectedGossipMessage>, network_graph: Arc<NetworkGraph<Arc<TestLogger>>>) {
+       let mut key = [0; 32];
+       let mut random_data = [0; 32];
+       thread_rng().fill_bytes(&mut key);
+       thread_rng().fill_bytes(&mut random_data);
+       let our_node_secret = SecretKey::from_slice(&key).unwrap();
+
+       let _arc_chain_access = None::<GossipChainAccess>;
+       let arc_chain_access = Some(Arc::new(ChainVerifier::new()));
+       let ignorer = IgnoringMessageHandler {};
+       let arc_ignorer = Arc::new(ignorer);
+
+       let errorer = ErroringMessageHandler::new();
+       let arc_errorer = Arc::new(errorer);
+
+       let logger = TestLogger::new();
+       let arc_logger = Arc::new(logger);
+
+       let router = P2PGossipSync::new(
+               network_graph.clone(),
+               arc_chain_access,
+               Arc::clone(&arc_logger),
+       );
+       let arc_router = Arc::new(router);
+       let wrapped_router = GossipRouter {
+               native_router: arc_router,
+               counter: RwLock::new(GossipCounter::new()),
+               sender: persistence_sender.clone(),
+       };
+       let arc_wrapped_router = Arc::new(wrapped_router);
+
+       let message_handler = MessageHandler {
+               chan_handler: arc_errorer,
+               route_handler: arc_wrapped_router.clone(),
+       };
+       let peer_handler = PeerManager::new(
+               message_handler,
+               our_node_secret,
+               &random_data,
+               Arc::clone(&arc_logger),
+               arc_ignorer,
+       );
+       let arc_peer_handler = Arc::new(peer_handler);
+
+       println!("Connecting to Lightning peers…");
+       let peers = config::ln_peers();
+       let mut connected_peer_count = 0;
+
+       for current_peer in peers {
+               let initial_connection_succeeded = monitor_peer_connection(current_peer, Arc::clone(&arc_peer_handler));
+               if initial_connection_succeeded {
+                       connected_peer_count += 1;
+               }
+       }
+
+       if connected_peer_count < 1 {
+               panic!("Failed to connect to any peer.");
+       }
+
+       println!("Connected to {} Lightning peers!", connected_peer_count);
+
+       let local_router = arc_wrapped_router.clone();
+       let local_persistence_sender = persistence_sender.clone();
+       tokio::spawn(async move {
+               let mut previous_announcement_count = 0u64;
+               let mut previous_update_count = 0u64;
+               let mut is_caught_up_with_gossip = false;
+
+               let mut i = 0u32;
+               let mut latest_new_gossip_time = Instant::now();
+               let mut needs_to_notify_persister = false;
+
+               loop {
+                       i += 1; // count the background activity
+                       let sleep = tokio::time::sleep(Duration::from_secs(5));
+                       sleep.await;
+
+                       let current_timestamp = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
+                       let router_clone = Arc::clone(&local_router);
+
+                       {
+                               let counter = router_clone.counter.read().unwrap();
+                               let total_message_count = counter.channel_announcements + counter.channel_updates;
+                               let new_message_count = total_message_count - previous_announcement_count - previous_update_count;
+
+                               let was_previously_caught_up_with_gossip = is_caught_up_with_gossip;
+                               // TODO: make new message threshold (20) adjust based on connected peer count
+                               is_caught_up_with_gossip = new_message_count < 20 && previous_announcement_count > 0 && previous_update_count > 0;
+                               if new_message_count > 0 {
+                                       latest_new_gossip_time = Instant::now();
+                               }
+
+                               // if we either aren't caught up, or just stopped/started being caught up
+                               if !is_caught_up_with_gossip || (is_caught_up_with_gossip != was_previously_caught_up_with_gossip) {
+                                       println!(
+                                               "gossip count (iteration {}): {} (delta: {}):\n\tannouncements: {}\n\t\tmismatched scripts: {}\n\tupdates: {}\n\t\tno HTLC max: {}\n",
+                                               i,
+                                               total_message_count,
+                                               new_message_count,
+                                               counter.channel_announcements,
+                                               counter.channel_announcements_with_mismatched_scripts,
+                                               counter.channel_updates,
+                                               counter.channel_updates_without_htlc_max_msats
+                                       );
+                               } else {
+                                       println!("Monitoring for gossip…")
+                               }
+
+                               if is_caught_up_with_gossip && !was_previously_caught_up_with_gossip {
+                                       println!("caught up with gossip!");
+                                       needs_to_notify_persister = true;
+                               } else if !is_caught_up_with_gossip && was_previously_caught_up_with_gossip {
+                                       println!("Received new messages since catching up with gossip!");
+                               }
+
+                               let continuous_caught_up_duration = latest_new_gossip_time.elapsed();
+                               if continuous_caught_up_duration.as_secs() > 600 {
+                                       eprintln!("No new gossip messages in 10 minutes! Something's amiss!");
+                               }
+
+                               previous_announcement_count = counter.channel_announcements;
+                               previous_update_count = counter.channel_updates;
+                       }
+
+                       if needs_to_notify_persister {
+                               needs_to_notify_persister = false;
+                               let sender = local_persistence_sender.clone();
+                               tokio::spawn(async move {
+                                       let _ = sender.send(DetectedGossipMessage {
+                                               timestamp_seen: current_timestamp as u32,
+                                               message: GossipMessage::InitialSyncComplete,
+                                       }).await;
+                               });
+                       }
+               }
+       });
+}
+
+fn monitor_peer_connection(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool {
+       let peer_manager_clone = Arc::clone(&peer_manager);
+       eprintln!("Connecting to peer {}@{}…", current_peer.0.to_hex(), current_peer.1.to_string());
+       let connection = executor::block_on(async move {
+               lightning_net_tokio::connect_outbound(
+                       peer_manager_clone,
+                       current_peer.0,
+                       current_peer.1,
+               ).await
+       });
+       let mut initial_connection_succeeded = false;
+       if let Some(disconnection_future) = connection {
+               eprintln!("Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string());
+               initial_connection_succeeded = true;
+               let peer_manager_clone = Arc::clone(&peer_manager);
+               tokio::spawn(async move {
+                       disconnection_future.await;
+                       eprintln!("Disconnected from peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string());
+                       monitor_peer_connection(current_peer.clone(), peer_manager_clone);
+               });
+       } else {
+               eprintln!("Failed to connect to peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string())
+       };
+       initial_connection_succeeded
+}
diff --git a/src/types.rs b/src/types.rs
new file mode 100644 (file)
index 0000000..63a2adb
--- /dev/null
@@ -0,0 +1,37 @@
+use std::sync::Arc;
+
+use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate};
+use lightning::ln::peer_handler::{ErroringMessageHandler, IgnoringMessageHandler, PeerManager};
+use lightning::util::logger::{Logger, Record};
+
+use crate::downloader::GossipRouter;
+use crate::verifier::ChainVerifier;
+
+pub(crate) type GossipChainAccess = Arc<ChainVerifier>;
+pub(crate) type GossipPeerManager = Arc<PeerManager<lightning_net_tokio::SocketDescriptor, Arc<ErroringMessageHandler>, Arc<GossipRouter>, Arc<TestLogger>, Arc<IgnoringMessageHandler>>>;
+
+pub(crate) enum GossipMessage {
+       ChannelAnnouncement(ChannelAnnouncement),
+       ChannelUpdate(ChannelUpdate),
+       InitialSyncComplete,
+}
+
+pub(crate) struct DetectedGossipMessage {
+       pub(crate) timestamp_seen: u32,
+       pub(crate) message: GossipMessage,
+}
+
+pub(crate) struct TestLogger {}
+
+impl TestLogger {
+       pub(crate) fn new() -> TestLogger {
+               Self {}
+       }
+}
+
+impl Logger for TestLogger {
+       fn log(&self, record: &Record) {
+               // TODO: allow log level threshold to be set
+               println!("{:<5} [{} : {}, {}] {}", record.level.to_string(), record.module_path, record.file, record.line, record.args);
+       }
+}
diff --git a/src/verifier.rs b/src/verifier.rs
new file mode 100644 (file)
index 0000000..2446e1a
--- /dev/null
@@ -0,0 +1,75 @@
+use std::convert::TryInto;
+use std::sync::Arc;
+
+use bitcoin::{BlockHash, TxOut};
+use bitcoin::blockdata::block::Block;
+use bitcoin::hashes::Hash;
+use futures::executor;
+use lightning::chain;
+use lightning::chain::AccessError;
+use lightning_block_sync::BlockSource;
+use lightning_block_sync::http::BinaryResponse;
+use lightning_block_sync::rest::RestClient;
+
+use crate::config;
+
+pub(crate) struct ChainVerifier {
+       rest_client: Arc<RestClient>,
+}
+
+struct RestBinaryResponse(Vec<u8>);
+
+impl ChainVerifier {
+       pub(crate) fn new() -> Self {
+               let rest_client = RestClient::new(config::bitcoin_rest_endpoint()).unwrap();
+               ChainVerifier {
+                       rest_client: Arc::new(rest_client),
+               }
+       }
+
+       fn retrieve_block(&self, block_height: u32) -> Result<Block, AccessError> {
+               let rest_client = self.rest_client.clone();
+               executor::block_on(async move {
+                       let block_hash_result = rest_client.request_resource::<BinaryResponse, RestBinaryResponse>(&format!("blockhashbyheight/{}.bin", block_height)).await;
+                       let block_hash: Vec<u8> = block_hash_result.map_err(|error| {
+                               eprintln!("Could't find block hash at height {}: {}", block_height, error.to_string());
+                               AccessError::UnknownChain
+                       })?.0;
+                       let block_hash = BlockHash::from_slice(&block_hash).unwrap();
+
+                       let block_result = rest_client.get_block(&block_hash).await;
+                       let block = block_result.map_err(|error| {
+                               eprintln!("Couldn't retrieve block {}: {:?} ({})", block_height, error, block_hash);
+                               AccessError::UnknownChain
+                       })?;
+                       Ok(block)
+               })
+       }
+}
+
+impl chain::Access for ChainVerifier {
+       fn get_utxo(&self, _genesis_hash: &BlockHash, short_channel_id: u64) -> Result<TxOut, AccessError> {
+               let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes
+               let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32;
+               let output_index = (short_channel_id & 0xffff) as u16;
+
+               let block = self.retrieve_block(block_height)?;
+               let transaction = block.txdata.get(transaction_index as usize).ok_or_else(|| {
+                       eprintln!("Transaction index {} out of bounds in block {} ({})", transaction_index, block_height, block.block_hash().to_string());
+                       AccessError::UnknownTx
+               })?;
+               let output = transaction.output.get(output_index as usize).ok_or_else(|| {
+                       eprintln!("Output index {} out of bounds in transaction {}", output_index, transaction.txid().to_string());
+                       AccessError::UnknownTx
+               })?;
+               Ok(output.clone())
+       }
+}
+
+impl TryInto<RestBinaryResponse> for BinaryResponse {
+       type Error = std::io::Error;
+
+       fn try_into(self) -> Result<RestBinaryResponse, Self::Error> {
+               Ok(RestBinaryResponse(self.0))
+       }
+}