Merge pull request #21 from jurvis/jurvis/2022-11-rgs-docker-compose
authorMatt Corallo <649246+TheBlueMatt@users.noreply.github.com>
Mon, 3 Apr 2023 23:12:32 +0000 (23:12 +0000)
committerGitHub <noreply@github.com>
Mon, 3 Apr 2023 23:12:32 +0000 (23:12 +0000)
Create a docker-compose for running an RGS server

Cargo.toml
README.md
src/config.rs
src/downloader.rs
src/hex_utils.rs
src/lib.rs
src/lookup.rs
src/snapshot.rs
src/tracking.rs
src/types.rs
src/verifier.rs

index 852c7c2c2ac62adffbc1ce381408cf96ff38619a..c2c689b9407439088e8cd818edfdd0937c53e92c 100644 (file)
@@ -5,9 +5,9 @@ edition = "2018"
 
 [dependencies]
 bitcoin = "0.29"
-lightning = { version = "0.0.112" }
-lightning-block-sync = { version = "0.0.112", features=["rest-client"] }
-lightning-net-tokio = { version = "0.0.112" }
+lightning = { version = "0.0.114" }
+lightning-block-sync = { version = "0.0.114", features=["rest-client"] }
+lightning-net-tokio = { version = "0.0.114" }
 tokio = { version = "1.14.1", features = ["full"] }
 tokio-postgres = { version="0.7.5" }
 futures = "0.3"
index d14ab2f09793edbb22466a593e65e760e4c4f117..13f46c6a561dbad062c9f0496ebeedaa8c45899f 100644 (file)
--- a/README.md
+++ b/README.md
@@ -12,18 +12,17 @@ These are the components it's comprised of.
 A config file where the Postgres credentials and Lightning peers can be adjusted. Most adjustments
 can be made by setting environment variables, whose usage is as follows:
 
-| Name                                 | Default       | Description                                                                                                |
-|:-------------------------------------|:--------------|:-----------------------------------------------------------------------------------------------------------|
-| RAPID_GOSSIP_SYNC_SERVER_DB_HOST     | localhost     | Domain of the Postgres database                                                                            |
-| RAPID_GOSSIP_SYNC_SERVER_DB_USER     | alice         | Username to access Postgres                                                                                |
-| RAPID_GOSSIP_SYNC_SERVER_DB_PASSWORD | _None_        | Password to access Postgres                                                                                |
-| RAPID_GOSSIP_SYNC_SERVER_DB_NAME     | ln_graph_sync | Name of the database to be used for gossip storage                                                         |
-| BITCOIN_REST_DOMAIN                  | 127.0.0.1     | Domain of the [bitcoind REST server](https://github.com/bitcoin/bitcoin/blob/master/doc/REST-interface.md) |
-| BITCOIN_REST_PORT                    | 8332          | HTTP port of the bitcoind REST server                                                                      |
-| BITCOIN_REST_PATH                    | /rest/        | Path infix to access the bitcoind REST endpoints                                                           |
-
-Notably, one property needs to be modified in code, namely the `ln_peers()` method. It specifies how
-many and which peers to use for retrieving gossip.
+| Name                                 | Default             | Description                                                                                                |
+|:-------------------------------------|:--------------------|:-----------------------------------------------------------------------------------------------------------|
+| RAPID_GOSSIP_SYNC_SERVER_DB_HOST     | localhost           | Domain of the Postgres database                                                                            |
+| RAPID_GOSSIP_SYNC_SERVER_DB_USER     | alice               | Username to access Postgres                                                                                |
+| RAPID_GOSSIP_SYNC_SERVER_DB_PASSWORD | _None_              | Password to access Postgres                                                                                |
+| RAPID_GOSSIP_SYNC_SERVER_DB_NAME     | ln_graph_sync       | Name of the database to be used for gossip storage                                                         |
+| RAPID_GOSSIP_SYNC_SERVER_NETWORK     | mainnet             | Network to operate in. Possible values are mainnet, testnet, signet, regtest                               |
+| BITCOIN_REST_DOMAIN                  | 127.0.0.1           | Domain of the [bitcoind REST server](https://github.com/bitcoin/bitcoin/blob/master/doc/REST-interface.md) |
+| BITCOIN_REST_PORT                    | 8332                | HTTP port of the bitcoind REST server                                                                      |
+| BITCOIN_REST_PATH                    | /rest/              | Path infix to access the bitcoind REST endpoints                                                           |
+| LN_PEERS                             | _Wallet of Satoshi_ | Comma separated list of LN peers to use for retrieving gossip                                              |
 
 ### downloader
 
index 97417532b4a5e0d5297435b7f16e8205d692750b..1f968d6418760b880cd1db6862ecf892b9446dc7 100644 (file)
@@ -1,20 +1,35 @@
+use crate::hex_utils;
+
 use std::convert::TryInto;
 use std::env;
-use std::net::SocketAddr;
 use std::io::Cursor;
+use std::net::{SocketAddr, ToSocketAddrs};
+
+use bitcoin::Network;
+use bitcoin::hashes::hex::FromHex;
 use bitcoin::secp256k1::PublicKey;
+use futures::stream::{FuturesUnordered, StreamExt};
 use lightning::ln::msgs::ChannelAnnouncement;
 use lightning::util::ser::Readable;
 use lightning_block_sync::http::HttpEndpoint;
 use tokio_postgres::Config;
-use crate::hex_utils;
-
-use futures::stream::{FuturesUnordered, StreamExt};
 
 pub(crate) const SCHEMA_VERSION: i32 = 8;
 pub(crate) const SNAPSHOT_CALCULATION_INTERVAL: u32 = 3600 * 24; // every 24 hours, in seconds
 pub(crate) const DOWNLOAD_NEW_GOSSIP: bool = true;
 
+pub(crate) fn network() -> Network {
+       let network = env::var("RAPID_GOSSIP_SYNC_SERVER_NETWORK").unwrap_or("bitcoin".to_string()).to_lowercase();
+       match network.as_str() {
+               "mainnet" => Network::Bitcoin,
+               "bitcoin" => Network::Bitcoin,
+               "testnet" => Network::Testnet,
+               "signet" => Network::Signet,
+               "regtest" => Network::Regtest,
+               _ => panic!("Invalid network"),
+       }
+}
+
 pub(crate) fn network_graph_cache_path() -> &'static str {
        "./res/network_graph.bin"
 }
@@ -209,19 +224,54 @@ pub(crate) async fn upgrade_db(schema: i32, client: &mut tokio_postgres::Client)
        let _ = client.execute("ALTER TABLE channel_announcements SET ( autovacuum_vacuum_insert_scale_factor = 0.005 );", &[]).await;
 }
 
-/// EDIT ME
 pub(crate) fn ln_peers() -> Vec<(PublicKey, SocketAddr)> {
-       vec![
-               // Bitfinex
-               // (hex_utils::to_compressed_pubkey("033d8656219478701227199cbd6f670335c8d408a92ae88b962c49d4dc0e83e025").unwrap(), "34.65.85.39:9735".parse().unwrap()),
+       const WALLET_OF_SATOSHI: &str = "035e4ff418fc8b5554c5d9eea66396c227bd429a3251c8cbc711002ba215bfc226@170.75.163.209:9735";
+       let list = env::var("LN_PEERS").unwrap_or(WALLET_OF_SATOSHI.to_string());
+       let mut peers = Vec::new();
+       for peer_info in list.split(',') {
+               peers.push(resolve_peer_info(peer_info).expect("Invalid peer info in LN_PEERS"));
+       }
+       peers
+}
+
+fn resolve_peer_info(peer_info: &str) -> Result<(PublicKey, SocketAddr), &str> {
+       let mut peer_info = peer_info.splitn(2, '@');
 
-               // Matt Corallo
-               // (hex_utils::to_compressed_pubkey("03db10aa09ff04d3568b0621750794063df401e6853c79a21a83e1a3f3b5bfb0c8").unwrap(), "69.59.18.80:9735".parse().unwrap())
+       let pubkey = peer_info.next().ok_or("Invalid peer info. Should be formatted as: `pubkey@host:port`")?;
+       let pubkey = Vec::from_hex(pubkey).map_err(|_| "Invalid node pubkey")?;
+       let pubkey = PublicKey::from_slice(&pubkey).map_err(|_| "Invalid node pubkey")?;
 
-               // River Financial
-               // (hex_utils::to_compressed_pubkey("03037dc08e9ac63b82581f79b662a4d0ceca8a8ca162b1af3551595b8f2d97b70a").unwrap(), "104.196.249.140:9735".parse().unwrap())
+       let socket_address = peer_info.next().ok_or("Invalid peer info. Should be formatted as: `pubkey@host:port`")?;
+       let socket_address = socket_address
+               .to_socket_addrs()
+               .map_err(|_| "Cannot resolve node address")?
+               .next()
+               .ok_or("Cannot resolve node address")?;
+
+       Ok((pubkey, socket_address))
+}
 
-               // Wallet of Satoshi | 035e4ff418fc8b5554c5d9eea66396c227bd429a3251c8cbc711002ba215bfc226@170.75.163.209:9735
-               (hex_utils::to_compressed_pubkey("035e4ff418fc8b5554c5d9eea66396c227bd429a3251c8cbc711002ba215bfc226").unwrap(), "170.75.163.209:9735".parse().unwrap())
-       ]
+#[cfg(test)]
+mod tests {
+       use super::resolve_peer_info;
+       use bitcoin::hashes::hex::ToHex;
+
+       #[test]
+       fn test_resolve_peer_info() {
+               let wallet_of_satoshi = "035e4ff418fc8b5554c5d9eea66396c227bd429a3251c8cbc711002ba215bfc226@170.75.163.209:9735";
+               let (pubkey, socket_address) = resolve_peer_info(wallet_of_satoshi).unwrap();
+               assert_eq!(pubkey.serialize().to_hex(), "035e4ff418fc8b5554c5d9eea66396c227bd429a3251c8cbc711002ba215bfc226");
+               assert_eq!(socket_address.to_string(), "170.75.163.209:9735");
+
+               let ipv6 = "033d8656219478701227199cbd6f670335c8d408a92ae88b962c49d4dc0e83e025@[2001:db8::1]:80";
+               let (pubkey, socket_address) = resolve_peer_info(ipv6).unwrap();
+               assert_eq!(pubkey.serialize().to_hex(), "033d8656219478701227199cbd6f670335c8d408a92ae88b962c49d4dc0e83e025");
+               assert_eq!(socket_address.to_string(), "[2001:db8::1]:80");
+
+               let localhost = "033d8656219478701227199cbd6f670335c8d408a92ae88b962c49d4dc0e83e025@localhost:9735";
+               let (pubkey, socket_address) = resolve_peer_info(localhost).unwrap();
+               assert_eq!(pubkey.serialize().to_hex(), "033d8656219478701227199cbd6f670335c8d408a92ae88b962c49d4dc0e83e025");
+               let socket_address = socket_address.to_string();
+               assert!(socket_address == "127.0.0.1:9735" || socket_address == "[::1]:9735");
+       }
 }
index 4fc3d51c6c005f610af7680af999068747c54a04..9256100b581748100147d523fbe9f1a9ae0cee0b 100644 (file)
@@ -3,13 +3,13 @@ use std::sync::{Arc, RwLock};
 use bitcoin::secp256k1::PublicKey;
 use lightning::ln::features::{InitFeatures, NodeFeatures};
 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler};
-use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
+use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
 use lightning::util::events::{MessageSendEvent, MessageSendEventsProvider};
 use tokio::sync::mpsc;
 use tokio::sync::mpsc::error::TrySendError;
 
 use crate::TestLogger;
-use crate::types::{GossipMessage, GossipChainAccess};
+use crate::types::{GossipMessage, GossipChainAccess, GossipPeerManager};
 use crate::verifier::ChainVerifier;
 
 pub(crate) struct GossipCounter {
@@ -34,59 +34,45 @@ pub(crate) struct GossipRouter {
        native_router: P2PGossipSync<Arc<NetworkGraph<TestLogger>>, GossipChainAccess, TestLogger>,
        pub(crate) counter: RwLock<GossipCounter>,
        sender: mpsc::Sender<GossipMessage>,
+       verifier: Arc<ChainVerifier>,
+       outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, GossipChainAccess, TestLogger>>,
 }
 
 impl GossipRouter {
        pub(crate) fn new(network_graph: Arc<NetworkGraph<TestLogger>>, sender: mpsc::Sender<GossipMessage>) -> Self {
+               let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, TestLogger::new()));
+               let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper)));
                Self {
-                       native_router: P2PGossipSync::new(network_graph, Some(Arc::new(ChainVerifier::new())), TestLogger::new()),
+                       native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), TestLogger::new()),
+                       outbound_gossiper,
                        counter: RwLock::new(GossipCounter::new()),
-                       sender
+                       sender,
+                       verifier,
                }
        }
-}
 
-impl MessageSendEventsProvider for GossipRouter {
-       fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
-               self.native_router.get_and_clear_pending_msg_events()
+       pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager) {
+               self.verifier.set_ph(peer_handler);
        }
-}
 
-impl RoutingMessageHandler for GossipRouter {
-       fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
-               self.native_router.handle_node_announcement(msg)
-       }
-
-       fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result<bool, LightningError> {
-               let native_result = self.native_router.handle_channel_announcement(msg);
-               let output_value;
+       fn new_channel_announcement(&self, msg: ChannelAnnouncement) {
                {
                        let mut counter = self.counter.write().unwrap();
-                       output_value = native_result.map_err(|error| {
-                               if error.err.contains("didn't match on-chain script") {
-                                       counter.channel_announcements_with_mismatched_scripts += 1;
-                               }
-                               error
-                       })?;
                        counter.channel_announcements += 1;
                }
 
-               let gossip_message = GossipMessage::ChannelAnnouncement(msg.clone());
+               let gossip_message = GossipMessage::ChannelAnnouncement(msg);
                if let Err(err) = self.sender.try_send(gossip_message) {
                        let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
                        tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
                                self.sender.send(gossip_message).await.unwrap();
                        })});
                }
-
-               Ok(output_value)
        }
 
-       fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError> {
-               let output_value = self.native_router.handle_channel_update(msg)?;
-
+       fn new_channel_update(&self, msg: ChannelUpdate) {
                self.counter.write().unwrap().channel_updates += 1;
-               let gossip_message = GossipMessage::ChannelUpdate(msg.clone());
+               let gossip_message = GossipMessage::ChannelUpdate(msg);
 
                if let Err(err) = self.sender.try_send(gossip_message) {
                        let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
@@ -94,20 +80,59 @@ impl RoutingMessageHandler for GossipRouter {
                                self.sender.send(gossip_message).await.unwrap();
                        })});
                }
+       }
+}
+
+impl MessageSendEventsProvider for GossipRouter {
+       fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
+               let gossip_evs = self.outbound_gossiper.get_and_clear_pending_msg_events();
+               for ev in gossip_evs {
+                       match ev {
+                               MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg: None } => {
+                                       self.new_channel_announcement(msg);
+                               },
+                               MessageSendEvent::BroadcastNodeAnnouncement { .. } => {},
+                               MessageSendEvent::BroadcastChannelUpdate { msg } => {
+                                       self.new_channel_update(msg);
+                               },
+                               _ => { unreachable!() },
+                       }
+               }
+               self.native_router.get_and_clear_pending_msg_events()
+       }
+}
+
+impl RoutingMessageHandler for GossipRouter {
+       fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
+               self.native_router.handle_node_announcement(msg)
+       }
+
+       fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result<bool, LightningError> {
+               let res = self.native_router.handle_channel_announcement(msg)?;
+               self.new_channel_announcement(msg.clone());
+               Ok(res)
+       }
+
+       fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError> {
+               let res = self.native_router.handle_channel_update(msg)?;
+               self.new_channel_update(msg.clone());
+               Ok(res)
+       }
 
-               Ok(output_value)
+       fn processing_queue_high(&self) -> bool {
+               self.native_router.processing_queue_high()
        }
 
        fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
                self.native_router.get_next_channel_announcement(starting_point)
        }
 
-       fn get_next_node_announcement(&self, starting_point: Option<&PublicKey>) -> Option<NodeAnnouncement> {
+       fn get_next_node_announcement(&self, starting_point: Option<&NodeId>) -> Option<NodeAnnouncement> {
                self.native_router.get_next_node_announcement(starting_point)
        }
 
-       fn peer_connected(&self, their_node_id: &PublicKey, init: &Init) -> Result<(), ()> {
-               self.native_router.peer_connected(their_node_id, init)
+       fn peer_connected(&self, their_node_id: &PublicKey, init: &Init, inbound: bool) -> Result<(), ()> {
+               self.native_router.peer_connected(their_node_id, init, inbound)
        }
 
        fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError> {
index 36f2779b02f0efc7f04ba5077648c6301c9861bb..be55839d8639d0965b6634b0afe94e2c620000b8 100644 (file)
@@ -1,5 +1,3 @@
-use bitcoin::secp256k1::PublicKey;
-
 pub fn to_vec(hex: &str) -> Option<Vec<u8>> {
     let mut out = Vec::with_capacity(hex.len() / 2);
 
@@ -20,14 +18,3 @@ pub fn to_vec(hex: &str) -> Option<Vec<u8>> {
 
     Some(out)
 }
-
-pub fn to_compressed_pubkey(hex: &str) -> Option<PublicKey> {
-    let data = match to_vec(&hex[0..33 * 2]) {
-        Some(bytes) => bytes,
-        None => return None,
-    };
-    match PublicKey::from_slice(&data) {
-        Ok(pk) => Some(pk),
-        Err(_) => None,
-    }
-}
index cc5a350847e2c4c42322d2bf20545d5de900059a..f8a4231dc4dfb7303f525d7e95419c2a041b781e 100644 (file)
@@ -15,10 +15,7 @@ use std::fs::File;
 use std::io::BufReader;
 use std::sync::Arc;
 
-use bitcoin::blockdata::constants::genesis_block;
-use bitcoin::Network;
-use bitcoin::secp256k1::PublicKey;
-use lightning::routing::gossip::NetworkGraph;
+use lightning::routing::gossip::{NetworkGraph, NodeId};
 use lightning::util::ser::{ReadableArgs, Writeable};
 use tokio::sync::mpsc;
 use crate::lookup::DeltaSet;
@@ -54,21 +51,21 @@ pub struct SerializedResponse {
 
 impl RapidSyncProcessor {
        pub fn new() -> Self {
+               let network = config::network();
                let logger = TestLogger::new();
                let network_graph = if let Ok(file) = File::open(&config::network_graph_cache_path()) {
                        println!("Initializing from cached network graph…");
                        let mut buffered_reader = BufReader::new(file);
                        let network_graph_result = NetworkGraph::read(&mut buffered_reader, logger);
                        if let Ok(network_graph) = network_graph_result {
-                               network_graph.remove_stale_channels_and_tracking();
                                println!("Initialized from cached network graph!");
                                network_graph
                        } else {
                                println!("Initialization from cached network graph failed: {}", network_graph_result.err().unwrap());
-                               NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash(), logger)
+                               NetworkGraph::new(network, logger)
                        }
                } else {
-                       NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash(), logger)
+                       NetworkGraph::new(network, logger)
                };
                let arc_network_graph = Arc::new(network_graph);
                Self {
@@ -120,21 +117,20 @@ async fn serialize_delta(network_graph: Arc<NetworkGraph<TestLogger>>, last_sync
        // chain hash only necessary if either channel announcements or non-incremental updates are present
        // for announcement-free incremental-only updates, chain hash can be skipped
 
-       let mut node_id_set: HashSet<[u8; 33]> = HashSet::new();
-       let mut node_id_indices: HashMap<[u8; 33], usize> = HashMap::new();
-       let mut node_ids: Vec<PublicKey> = Vec::new();
+       let mut node_id_set: HashSet<NodeId> = HashSet::new();
+       let mut node_id_indices: HashMap<NodeId, usize> = HashMap::new();
+       let mut node_ids: Vec<NodeId> = Vec::new();
        let mut duplicate_node_ids: i32 = 0;
 
-       let mut get_node_id_index = |node_id: PublicKey| {
-               let serialized_node_id = node_id.serialize();
-               if node_id_set.insert(serialized_node_id) {
+       let mut get_node_id_index = |node_id: NodeId| {
+               if node_id_set.insert(node_id) {
                        node_ids.push(node_id);
                        let index = node_ids.len() - 1;
-                       node_id_indices.insert(serialized_node_id, index);
+                       node_id_indices.insert(node_id, index);
                        return index;
                }
                duplicate_node_ids += 1;
-               node_id_indices[&serialized_node_id]
+               node_id_indices[&node_id]
        };
 
        let mut delta_set = DeltaSet::new();
index b48b375921ef07df6b40a3a639eeee98e5237ce0..1bac79f94fb1755de8d0128ec5aa8571fc1c7aaf 100644 (file)
@@ -70,7 +70,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ
        let channel_ids = {
                let read_only_graph = network_graph.read_only();
                println!("Retrieved read-only network graph copy");
-               let channel_iterator = read_only_graph.channels().into_iter();
+               let channel_iterator = read_only_graph.channels().unordered_iter();
                channel_iterator
                        .filter(|c| c.1.announcement_message.is_some())
                        .map(|c| c.1.announcement_message.as_ref().unwrap().contents.short_channel_id as i64)
index dbfbc4deb1871e70b7321f91f01a6ae710457f87..ae28233e448aea058d070126644e550f5d55f389 100644 (file)
@@ -116,6 +116,10 @@ impl Snapshotter {
                                symlink(&relative_snapshot_path, &symlink_path).unwrap();
                        }
 
+                       let update_time_path = format!("{}/update_time.txt", pending_symlink_directory);
+                       let update_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
+                       fs::write(&update_time_path, format!("{}", update_time)).unwrap();
+
                        if fs::metadata(&finalized_snapshot_directory).is_ok(){
                                fs::remove_dir_all(&finalized_snapshot_directory).expect("Failed to remove finalized snapshot directory.");
                        }
index 810a037828c328e0fc8f6e38dbcf25d4c1272552..243590765f1e829e750d6c6dc0cc68b07687a1c1 100644 (file)
@@ -5,12 +5,13 @@ use std::sync::Arc;
 use std::time::{Duration, Instant};
 
 use bitcoin::hashes::hex::ToHex;
-use bitcoin::secp256k1::{PublicKey, SecretKey};
+use bitcoin::secp256k1::PublicKey;
 use lightning;
 use lightning::ln::peer_handler::{
        ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager,
 };
 use lightning::routing::gossip::NetworkGraph;
+use lightning::chain::keysinterface::KeysManager;
 use tokio::sync::mpsc;
 
 use crate::{config, TestLogger};
@@ -30,7 +31,8 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessa
        rand_hasher.write_u8(2);
        random_data[0..8].copy_from_slice(&rand_hasher.finish().to_ne_bytes());
 
-       let our_node_secret = SecretKey::from_slice(&key).unwrap();
+       let keys_manager = Arc::new(KeysManager::new(&key, 0xdeadbeef, 0xdeadbeef));
+
        let router = Arc::new(GossipRouter::new(network_graph, persistence_sender.clone()));
 
        let message_handler = MessageHandler {
@@ -40,12 +42,22 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessa
        };
        let peer_handler = Arc::new(PeerManager::new(
                message_handler,
-               our_node_secret,
                0xdeadbeef,
                &random_data,
                TestLogger::new(),
                IgnoringMessageHandler {},
+               keys_manager,
        ));
+       router.set_pm(Arc::clone(&peer_handler));
+
+       let ph_timer = Arc::clone(&peer_handler);
+       tokio::spawn(async move {
+               let mut intvl = tokio::time::interval(Duration::from_secs(10));
+               loop {
+                       intvl.tick().await;
+                       ph_timer.timer_tick_occurred();
+               }
+       });
 
        println!("Connecting to Lightning peers...");
        let peers = config::ln_peers();
@@ -149,9 +161,8 @@ async fn connect_peer(current_peer: (PublicKey, SocketAddr), peer_manager: Gossi
                                        current_peer.1,
                                ).await {
                                        disconnection_future.await;
-                               } else {
-                                       tokio::time::sleep(Duration::from_secs(10)).await;
                                }
+                               tokio::time::sleep(Duration::from_secs(10)).await;
                        }
                });
                true
index 5b7434ee0de5b5d2d63055845fc664644539bd57..2d5a281e02024a8ab8c25b74e82684c17dc18d07 100644 (file)
@@ -1,6 +1,7 @@
 use std::sync::Arc;
 use std::ops::Deref;
 
+use lightning::chain::keysinterface::KeysManager;
 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate};
 use lightning::ln::peer_handler::{ErroringMessageHandler, IgnoringMessageHandler, PeerManager};
 use lightning::util::logger::{Logger, Record};
@@ -9,7 +10,7 @@ use crate::downloader::GossipRouter;
 use crate::verifier::ChainVerifier;
 
 pub(crate) type GossipChainAccess = Arc<ChainVerifier>;
-pub(crate) type GossipPeerManager = Arc<PeerManager<lightning_net_tokio::SocketDescriptor, ErroringMessageHandler, Arc<GossipRouter>, IgnoringMessageHandler, TestLogger, IgnoringMessageHandler>>;
+pub(crate) type GossipPeerManager = Arc<PeerManager<lightning_net_tokio::SocketDescriptor, ErroringMessageHandler, Arc<GossipRouter>, IgnoringMessageHandler, TestLogger, IgnoringMessageHandler, Arc<KeysManager>>>;
 
 #[derive(Debug)]
 pub(crate) enum GossipMessage {
index 62c191a3ce2727d9930e06b0ca58425408753dfe..4bda871ad124d85336576f0b2e9b08e3b9c5a92b 100644 (file)
@@ -1,71 +1,92 @@
 use std::convert::TryInto;
+use std::sync::Arc;
+use std::sync::Mutex;
 
 use bitcoin::{BlockHash, TxOut};
 use bitcoin::blockdata::block::Block;
 use bitcoin::hashes::Hash;
-use lightning::chain;
-use lightning::chain::AccessError;
+use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
+use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoResult, UtxoLookupError};
 use lightning_block_sync::{BlockData, BlockSource};
 use lightning_block_sync::http::BinaryResponse;
 use lightning_block_sync::rest::RestClient;
 
 use crate::config;
+use crate::TestLogger;
+use crate::types::GossipPeerManager;
 
 pub(crate) struct ChainVerifier {
-       rest_client: RestClient,
+       rest_client: Arc<RestClient>,
+       graph: Arc<NetworkGraph<TestLogger>>,
+       outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, Arc<Self>, TestLogger>>,
+       peer_handler: Mutex<Option<GossipPeerManager>>,
 }
 
 struct RestBinaryResponse(Vec<u8>);
 
 impl ChainVerifier {
-       pub(crate) fn new() -> Self {
+       pub(crate) fn new(graph: Arc<NetworkGraph<TestLogger>>, outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, Arc<Self>, TestLogger>>) -> Self {
                ChainVerifier {
-                       rest_client: RestClient::new(config::bitcoin_rest_endpoint()).unwrap(),
+                       rest_client: Arc::new(RestClient::new(config::bitcoin_rest_endpoint()).unwrap()),
+                       outbound_gossiper,
+                       graph,
+                       peer_handler: Mutex::new(None),
                }
        }
-
-       fn retrieve_block(&self, block_height: u32) -> Result<Block, AccessError> {
-               tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
-                       let uri = format!("blockhashbyheight/{}.bin", block_height);
-                       let block_hash_result =
-                               self.rest_client.request_resource::<BinaryResponse, RestBinaryResponse>(&uri).await;
-                       let block_hash: Vec<u8> = block_hash_result.map_err(|error| {
-                               eprintln!("Could't find block hash at height {}: {}", block_height, error.to_string());
-                               AccessError::UnknownChain
-                       })?.0;
-                       let block_hash = BlockHash::from_slice(&block_hash).unwrap();
-
-                       let block_result = self.rest_client.get_block(&block_hash).await;
-                       match block_result {
-                               Ok(BlockData::FullBlock(block)) => {
-                                       Ok(block)
-                               },
-                               Ok(_) => unreachable!(),
-                               Err(error) => {
-                                       eprintln!("Couldn't retrieve block {}: {:?} ({})", block_height, error, block_hash);
-                                       Err(AccessError::UnknownChain)
-                               }
-                       }
-               }) })
+       pub(crate) fn set_ph(&self, peer_handler: GossipPeerManager) {
+               *self.peer_handler.lock().unwrap() = Some(peer_handler);
        }
-}
 
-impl chain::Access for ChainVerifier {
-       fn get_utxo(&self, _genesis_hash: &BlockHash, short_channel_id: u64) -> Result<TxOut, AccessError> {
+       async fn retrieve_utxo(client: Arc<RestClient>, short_channel_id: u64) -> Result<TxOut, UtxoLookupError> {
                let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes
                let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32;
                let output_index = (short_channel_id & 0xffff) as u16;
 
-               let block = self.retrieve_block(block_height)?;
-               let transaction = block.txdata.get(transaction_index as usize).ok_or_else(|| {
-                       eprintln!("Transaction index {} out of bounds in block {} ({})", transaction_index, block_height, block.block_hash().to_string());
-                       AccessError::UnknownTx
-               })?;
-               let output = transaction.output.get(output_index as usize).ok_or_else(|| {
-                       eprintln!("Output index {} out of bounds in transaction {}", output_index, transaction.txid().to_string());
-                       AccessError::UnknownTx
-               })?;
-               Ok(output.clone())
+               let mut block = Self::retrieve_block(client, block_height).await?;
+               if transaction_index as usize >= block.txdata.len() { return Err(UtxoLookupError::UnknownTx); }
+               let mut transaction = block.txdata.swap_remove(transaction_index as usize);
+               if output_index as usize >= transaction.output.len() { return Err(UtxoLookupError::UnknownTx); }
+               Ok(transaction.output.swap_remove(output_index as usize))
+       }
+
+       async fn retrieve_block(client: Arc<RestClient>, block_height: u32) -> Result<Block, UtxoLookupError> {
+               let uri = format!("blockhashbyheight/{}.bin", block_height);
+               let block_hash_result =
+                       client.request_resource::<BinaryResponse, RestBinaryResponse>(&uri).await;
+               let block_hash: Vec<u8> = block_hash_result.map_err(|error| {
+                       eprintln!("Could't find block hash at height {}: {}", block_height, error.to_string());
+                       UtxoLookupError::UnknownChain
+               })?.0;
+               let block_hash = BlockHash::from_slice(&block_hash).unwrap();
+
+               let block_result = client.get_block(&block_hash).await;
+               match block_result {
+                       Ok(BlockData::FullBlock(block)) => {
+                               Ok(block)
+                       },
+                       Ok(_) => unreachable!(),
+                       Err(error) => {
+                               eprintln!("Couldn't retrieve block {}: {:?} ({})", block_height, error, block_hash);
+                               Err(UtxoLookupError::UnknownChain)
+                       }
+               }
+       }
+}
+
+impl UtxoLookup for ChainVerifier {
+       fn get_utxo(&self, _genesis_hash: &BlockHash, short_channel_id: u64) -> UtxoResult {
+               let res = UtxoFuture::new();
+               let fut = res.clone();
+               let graph_ref = Arc::clone(&self.graph);
+               let client_ref = Arc::clone(&self.rest_client);
+               let gossip_ref = Arc::clone(&self.outbound_gossiper);
+               let pm_ref = self.peer_handler.lock().unwrap().clone();
+               tokio::spawn(async move {
+                       let res = Self::retrieve_utxo(client_ref, short_channel_id).await;
+                       fut.resolve(&*graph_ref, &*gossip_ref, res);
+                       if let Some(pm) = pm_ref { pm.process_events(); }
+               });
+               UtxoResult::Async(res)
        }
 }