[dependencies]
bitcoin = "0.29"
-lightning = { version = "0.0.112" }
-lightning-block-sync = { version = "0.0.112", features=["rest-client"] }
-lightning-net-tokio = { version = "0.0.112" }
+lightning = { version = "0.0.114" }
+lightning-block-sync = { version = "0.0.114", features=["rest-client"] }
+lightning-net-tokio = { version = "0.0.114" }
tokio = { version = "1.14.1", features = ["full"] }
tokio-postgres = { version="0.7.5" }
futures = "0.3"
A config file where the Postgres credentials and Lightning peers can be adjusted. Most adjustments
can be made by setting environment variables, whose usage is as follows:
-| Name | Default | Description |
-|:-------------------------------------|:--------------|:-----------------------------------------------------------------------------------------------------------|
-| RAPID_GOSSIP_SYNC_SERVER_DB_HOST | localhost | Domain of the Postgres database |
-| RAPID_GOSSIP_SYNC_SERVER_DB_USER | alice | Username to access Postgres |
-| RAPID_GOSSIP_SYNC_SERVER_DB_PASSWORD | _None_ | Password to access Postgres |
-| RAPID_GOSSIP_SYNC_SERVER_DB_NAME | ln_graph_sync | Name of the database to be used for gossip storage |
-| BITCOIN_REST_DOMAIN | 127.0.0.1 | Domain of the [bitcoind REST server](https://github.com/bitcoin/bitcoin/blob/master/doc/REST-interface.md) |
-| BITCOIN_REST_PORT | 8332 | HTTP port of the bitcoind REST server |
-| BITCOIN_REST_PATH | /rest/ | Path infix to access the bitcoind REST endpoints |
-
-Notably, one property needs to be modified in code, namely the `ln_peers()` method. It specifies how
-many and which peers to use for retrieving gossip.
+| Name | Default | Description |
+|:-------------------------------------|:--------------------|:-----------------------------------------------------------------------------------------------------------|
+| RAPID_GOSSIP_SYNC_SERVER_DB_HOST | localhost | Domain of the Postgres database |
+| RAPID_GOSSIP_SYNC_SERVER_DB_USER | alice | Username to access Postgres |
+| RAPID_GOSSIP_SYNC_SERVER_DB_PASSWORD | _None_ | Password to access Postgres |
+| RAPID_GOSSIP_SYNC_SERVER_DB_NAME | ln_graph_sync | Name of the database to be used for gossip storage |
+| RAPID_GOSSIP_SYNC_SERVER_NETWORK | mainnet | Network to operate in. Possible values are mainnet, testnet, signet, regtest |
+| BITCOIN_REST_DOMAIN | 127.0.0.1 | Domain of the [bitcoind REST server](https://github.com/bitcoin/bitcoin/blob/master/doc/REST-interface.md) |
+| BITCOIN_REST_PORT | 8332 | HTTP port of the bitcoind REST server |
+| BITCOIN_REST_PATH | /rest/ | Path infix to access the bitcoind REST endpoints |
+| LN_PEERS | _Wallet of Satoshi_ | Comma separated list of LN peers to use for retrieving gossip |
### downloader
+use crate::hex_utils;
+
use std::convert::TryInto;
use std::env;
-use std::net::SocketAddr;
use std::io::Cursor;
+use std::net::{SocketAddr, ToSocketAddrs};
+
+use bitcoin::Network;
+use bitcoin::hashes::hex::FromHex;
use bitcoin::secp256k1::PublicKey;
+use futures::stream::{FuturesUnordered, StreamExt};
use lightning::ln::msgs::ChannelAnnouncement;
use lightning::util::ser::Readable;
use lightning_block_sync::http::HttpEndpoint;
use tokio_postgres::Config;
-use crate::hex_utils;
-
-use futures::stream::{FuturesUnordered, StreamExt};
pub(crate) const SCHEMA_VERSION: i32 = 8;
pub(crate) const SNAPSHOT_CALCULATION_INTERVAL: u32 = 3600 * 24; // every 24 hours, in seconds
pub(crate) const DOWNLOAD_NEW_GOSSIP: bool = true;
+pub(crate) fn network() -> Network {
+ let network = env::var("RAPID_GOSSIP_SYNC_SERVER_NETWORK").unwrap_or("bitcoin".to_string()).to_lowercase();
+ match network.as_str() {
+ "mainnet" => Network::Bitcoin,
+ "bitcoin" => Network::Bitcoin,
+ "testnet" => Network::Testnet,
+ "signet" => Network::Signet,
+ "regtest" => Network::Regtest,
+ _ => panic!("Invalid network"),
+ }
+}
+
pub(crate) fn network_graph_cache_path() -> &'static str {
"./res/network_graph.bin"
}
let _ = client.execute("ALTER TABLE channel_announcements SET ( autovacuum_vacuum_insert_scale_factor = 0.005 );", &[]).await;
}
-/// EDIT ME
pub(crate) fn ln_peers() -> Vec<(PublicKey, SocketAddr)> {
- vec![
- // Bitfinex
- // (hex_utils::to_compressed_pubkey("033d8656219478701227199cbd6f670335c8d408a92ae88b962c49d4dc0e83e025").unwrap(), "34.65.85.39:9735".parse().unwrap()),
+ const WALLET_OF_SATOSHI: &str = "035e4ff418fc8b5554c5d9eea66396c227bd429a3251c8cbc711002ba215bfc226@170.75.163.209:9735";
+ let list = env::var("LN_PEERS").unwrap_or(WALLET_OF_SATOSHI.to_string());
+ let mut peers = Vec::new();
+ for peer_info in list.split(',') {
+ peers.push(resolve_peer_info(peer_info).expect("Invalid peer info in LN_PEERS"));
+ }
+ peers
+}
+
+fn resolve_peer_info(peer_info: &str) -> Result<(PublicKey, SocketAddr), &str> {
+ let mut peer_info = peer_info.splitn(2, '@');
- // Matt Corallo
- // (hex_utils::to_compressed_pubkey("03db10aa09ff04d3568b0621750794063df401e6853c79a21a83e1a3f3b5bfb0c8").unwrap(), "69.59.18.80:9735".parse().unwrap())
+ let pubkey = peer_info.next().ok_or("Invalid peer info. Should be formatted as: `pubkey@host:port`")?;
+ let pubkey = Vec::from_hex(pubkey).map_err(|_| "Invalid node pubkey")?;
+ let pubkey = PublicKey::from_slice(&pubkey).map_err(|_| "Invalid node pubkey")?;
- // River Financial
- // (hex_utils::to_compressed_pubkey("03037dc08e9ac63b82581f79b662a4d0ceca8a8ca162b1af3551595b8f2d97b70a").unwrap(), "104.196.249.140:9735".parse().unwrap())
+ let socket_address = peer_info.next().ok_or("Invalid peer info. Should be formatted as: `pubkey@host:port`")?;
+ let socket_address = socket_address
+ .to_socket_addrs()
+ .map_err(|_| "Cannot resolve node address")?
+ .next()
+ .ok_or("Cannot resolve node address")?;
+
+ Ok((pubkey, socket_address))
+}
- // Wallet of Satoshi | 035e4ff418fc8b5554c5d9eea66396c227bd429a3251c8cbc711002ba215bfc226@170.75.163.209:9735
- (hex_utils::to_compressed_pubkey("035e4ff418fc8b5554c5d9eea66396c227bd429a3251c8cbc711002ba215bfc226").unwrap(), "170.75.163.209:9735".parse().unwrap())
- ]
+#[cfg(test)]
+mod tests {
+ use super::resolve_peer_info;
+ use bitcoin::hashes::hex::ToHex;
+
+ #[test]
+ fn test_resolve_peer_info() {
+ let wallet_of_satoshi = "035e4ff418fc8b5554c5d9eea66396c227bd429a3251c8cbc711002ba215bfc226@170.75.163.209:9735";
+ let (pubkey, socket_address) = resolve_peer_info(wallet_of_satoshi).unwrap();
+ assert_eq!(pubkey.serialize().to_hex(), "035e4ff418fc8b5554c5d9eea66396c227bd429a3251c8cbc711002ba215bfc226");
+ assert_eq!(socket_address.to_string(), "170.75.163.209:9735");
+
+ let ipv6 = "033d8656219478701227199cbd6f670335c8d408a92ae88b962c49d4dc0e83e025@[2001:db8::1]:80";
+ let (pubkey, socket_address) = resolve_peer_info(ipv6).unwrap();
+ assert_eq!(pubkey.serialize().to_hex(), "033d8656219478701227199cbd6f670335c8d408a92ae88b962c49d4dc0e83e025");
+ assert_eq!(socket_address.to_string(), "[2001:db8::1]:80");
+
+ let localhost = "033d8656219478701227199cbd6f670335c8d408a92ae88b962c49d4dc0e83e025@localhost:9735";
+ let (pubkey, socket_address) = resolve_peer_info(localhost).unwrap();
+ assert_eq!(pubkey.serialize().to_hex(), "033d8656219478701227199cbd6f670335c8d408a92ae88b962c49d4dc0e83e025");
+ let socket_address = socket_address.to_string();
+ assert!(socket_address == "127.0.0.1:9735" || socket_address == "[::1]:9735");
+ }
}
use bitcoin::secp256k1::PublicKey;
use lightning::ln::features::{InitFeatures, NodeFeatures};
use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler};
-use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
+use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
use lightning::util::events::{MessageSendEvent, MessageSendEventsProvider};
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use crate::TestLogger;
-use crate::types::{GossipMessage, GossipChainAccess};
+use crate::types::{GossipMessage, GossipChainAccess, GossipPeerManager};
use crate::verifier::ChainVerifier;
pub(crate) struct GossipCounter {
native_router: P2PGossipSync<Arc<NetworkGraph<TestLogger>>, GossipChainAccess, TestLogger>,
pub(crate) counter: RwLock<GossipCounter>,
sender: mpsc::Sender<GossipMessage>,
+ verifier: Arc<ChainVerifier>,
+ outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, GossipChainAccess, TestLogger>>,
}
impl GossipRouter {
pub(crate) fn new(network_graph: Arc<NetworkGraph<TestLogger>>, sender: mpsc::Sender<GossipMessage>) -> Self {
+ let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, TestLogger::new()));
+ let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper)));
Self {
- native_router: P2PGossipSync::new(network_graph, Some(Arc::new(ChainVerifier::new())), TestLogger::new()),
+ native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), TestLogger::new()),
+ outbound_gossiper,
counter: RwLock::new(GossipCounter::new()),
- sender
+ sender,
+ verifier,
}
}
-}
-impl MessageSendEventsProvider for GossipRouter {
- fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
- self.native_router.get_and_clear_pending_msg_events()
+ pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager) {
+ self.verifier.set_ph(peer_handler);
}
-}
-impl RoutingMessageHandler for GossipRouter {
- fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
- self.native_router.handle_node_announcement(msg)
- }
-
- fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result<bool, LightningError> {
- let native_result = self.native_router.handle_channel_announcement(msg);
- let output_value;
+ fn new_channel_announcement(&self, msg: ChannelAnnouncement) {
{
let mut counter = self.counter.write().unwrap();
- output_value = native_result.map_err(|error| {
- if error.err.contains("didn't match on-chain script") {
- counter.channel_announcements_with_mismatched_scripts += 1;
- }
- error
- })?;
counter.channel_announcements += 1;
}
- let gossip_message = GossipMessage::ChannelAnnouncement(msg.clone());
+ let gossip_message = GossipMessage::ChannelAnnouncement(msg);
if let Err(err) = self.sender.try_send(gossip_message) {
let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
self.sender.send(gossip_message).await.unwrap();
})});
}
-
- Ok(output_value)
}
- fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError> {
- let output_value = self.native_router.handle_channel_update(msg)?;
-
+ fn new_channel_update(&self, msg: ChannelUpdate) {
self.counter.write().unwrap().channel_updates += 1;
- let gossip_message = GossipMessage::ChannelUpdate(msg.clone());
+ let gossip_message = GossipMessage::ChannelUpdate(msg);
if let Err(err) = self.sender.try_send(gossip_message) {
let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
self.sender.send(gossip_message).await.unwrap();
})});
}
+ }
+}
+
+impl MessageSendEventsProvider for GossipRouter {
+ fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
+ let gossip_evs = self.outbound_gossiper.get_and_clear_pending_msg_events();
+ for ev in gossip_evs {
+ match ev {
+ MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg: None } => {
+ self.new_channel_announcement(msg);
+ },
+ MessageSendEvent::BroadcastNodeAnnouncement { .. } => {},
+ MessageSendEvent::BroadcastChannelUpdate { msg } => {
+ self.new_channel_update(msg);
+ },
+ _ => { unreachable!() },
+ }
+ }
+ self.native_router.get_and_clear_pending_msg_events()
+ }
+}
+
+impl RoutingMessageHandler for GossipRouter {
+ fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
+ self.native_router.handle_node_announcement(msg)
+ }
+
+ fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result<bool, LightningError> {
+ let res = self.native_router.handle_channel_announcement(msg)?;
+ self.new_channel_announcement(msg.clone());
+ Ok(res)
+ }
+
+ fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError> {
+ let res = self.native_router.handle_channel_update(msg)?;
+ self.new_channel_update(msg.clone());
+ Ok(res)
+ }
- Ok(output_value)
+ fn processing_queue_high(&self) -> bool {
+ self.native_router.processing_queue_high()
}
fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
self.native_router.get_next_channel_announcement(starting_point)
}
- fn get_next_node_announcement(&self, starting_point: Option<&PublicKey>) -> Option<NodeAnnouncement> {
+ fn get_next_node_announcement(&self, starting_point: Option<&NodeId>) -> Option<NodeAnnouncement> {
self.native_router.get_next_node_announcement(starting_point)
}
- fn peer_connected(&self, their_node_id: &PublicKey, init: &Init) -> Result<(), ()> {
- self.native_router.peer_connected(their_node_id, init)
+ fn peer_connected(&self, their_node_id: &PublicKey, init: &Init, inbound: bool) -> Result<(), ()> {
+ self.native_router.peer_connected(their_node_id, init, inbound)
}
fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError> {
-use bitcoin::secp256k1::PublicKey;
-
pub fn to_vec(hex: &str) -> Option<Vec<u8>> {
let mut out = Vec::with_capacity(hex.len() / 2);
Some(out)
}
-
-pub fn to_compressed_pubkey(hex: &str) -> Option<PublicKey> {
- let data = match to_vec(&hex[0..33 * 2]) {
- Some(bytes) => bytes,
- None => return None,
- };
- match PublicKey::from_slice(&data) {
- Ok(pk) => Some(pk),
- Err(_) => None,
- }
-}
use std::io::BufReader;
use std::sync::Arc;
-use bitcoin::blockdata::constants::genesis_block;
-use bitcoin::Network;
-use bitcoin::secp256k1::PublicKey;
-use lightning::routing::gossip::NetworkGraph;
+use lightning::routing::gossip::{NetworkGraph, NodeId};
use lightning::util::ser::{ReadableArgs, Writeable};
use tokio::sync::mpsc;
use crate::lookup::DeltaSet;
impl RapidSyncProcessor {
pub fn new() -> Self {
+ let network = config::network();
let logger = TestLogger::new();
let network_graph = if let Ok(file) = File::open(&config::network_graph_cache_path()) {
println!("Initializing from cached network graph…");
let mut buffered_reader = BufReader::new(file);
let network_graph_result = NetworkGraph::read(&mut buffered_reader, logger);
if let Ok(network_graph) = network_graph_result {
- network_graph.remove_stale_channels_and_tracking();
println!("Initialized from cached network graph!");
network_graph
} else {
println!("Initialization from cached network graph failed: {}", network_graph_result.err().unwrap());
- NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash(), logger)
+ NetworkGraph::new(network, logger)
}
} else {
- NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash(), logger)
+ NetworkGraph::new(network, logger)
};
let arc_network_graph = Arc::new(network_graph);
Self {
// chain hash only necessary if either channel announcements or non-incremental updates are present
// for announcement-free incremental-only updates, chain hash can be skipped
- let mut node_id_set: HashSet<[u8; 33]> = HashSet::new();
- let mut node_id_indices: HashMap<[u8; 33], usize> = HashMap::new();
- let mut node_ids: Vec<PublicKey> = Vec::new();
+ let mut node_id_set: HashSet<NodeId> = HashSet::new();
+ let mut node_id_indices: HashMap<NodeId, usize> = HashMap::new();
+ let mut node_ids: Vec<NodeId> = Vec::new();
let mut duplicate_node_ids: i32 = 0;
- let mut get_node_id_index = |node_id: PublicKey| {
- let serialized_node_id = node_id.serialize();
- if node_id_set.insert(serialized_node_id) {
+ let mut get_node_id_index = |node_id: NodeId| {
+ if node_id_set.insert(node_id) {
node_ids.push(node_id);
let index = node_ids.len() - 1;
- node_id_indices.insert(serialized_node_id, index);
+ node_id_indices.insert(node_id, index);
return index;
}
duplicate_node_ids += 1;
- node_id_indices[&serialized_node_id]
+ node_id_indices[&node_id]
};
let mut delta_set = DeltaSet::new();
let channel_ids = {
let read_only_graph = network_graph.read_only();
println!("Retrieved read-only network graph copy");
- let channel_iterator = read_only_graph.channels().into_iter();
+ let channel_iterator = read_only_graph.channels().unordered_iter();
channel_iterator
.filter(|c| c.1.announcement_message.is_some())
.map(|c| c.1.announcement_message.as_ref().unwrap().contents.short_channel_id as i64)
symlink(&relative_snapshot_path, &symlink_path).unwrap();
}
+ let update_time_path = format!("{}/update_time.txt", pending_symlink_directory);
+ let update_time = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_secs();
+ fs::write(&update_time_path, format!("{}", update_time)).unwrap();
+
if fs::metadata(&finalized_snapshot_directory).is_ok(){
fs::remove_dir_all(&finalized_snapshot_directory).expect("Failed to remove finalized snapshot directory.");
}
use std::time::{Duration, Instant};
use bitcoin::hashes::hex::ToHex;
-use bitcoin::secp256k1::{PublicKey, SecretKey};
+use bitcoin::secp256k1::PublicKey;
use lightning;
use lightning::ln::peer_handler::{
ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager,
};
use lightning::routing::gossip::NetworkGraph;
+use lightning::chain::keysinterface::KeysManager;
use tokio::sync::mpsc;
use crate::{config, TestLogger};
rand_hasher.write_u8(2);
random_data[0..8].copy_from_slice(&rand_hasher.finish().to_ne_bytes());
- let our_node_secret = SecretKey::from_slice(&key).unwrap();
+ let keys_manager = Arc::new(KeysManager::new(&key, 0xdeadbeef, 0xdeadbeef));
+
let router = Arc::new(GossipRouter::new(network_graph, persistence_sender.clone()));
let message_handler = MessageHandler {
};
let peer_handler = Arc::new(PeerManager::new(
message_handler,
- our_node_secret,
0xdeadbeef,
&random_data,
TestLogger::new(),
IgnoringMessageHandler {},
+ keys_manager,
));
+ router.set_pm(Arc::clone(&peer_handler));
+
+ let ph_timer = Arc::clone(&peer_handler);
+ tokio::spawn(async move {
+ let mut intvl = tokio::time::interval(Duration::from_secs(10));
+ loop {
+ intvl.tick().await;
+ ph_timer.timer_tick_occurred();
+ }
+ });
println!("Connecting to Lightning peers...");
let peers = config::ln_peers();
current_peer.1,
).await {
disconnection_future.await;
- } else {
- tokio::time::sleep(Duration::from_secs(10)).await;
}
+ tokio::time::sleep(Duration::from_secs(10)).await;
}
});
true
use std::sync::Arc;
use std::ops::Deref;
+use lightning::chain::keysinterface::KeysManager;
use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate};
use lightning::ln::peer_handler::{ErroringMessageHandler, IgnoringMessageHandler, PeerManager};
use lightning::util::logger::{Logger, Record};
use crate::verifier::ChainVerifier;
pub(crate) type GossipChainAccess = Arc<ChainVerifier>;
-pub(crate) type GossipPeerManager = Arc<PeerManager<lightning_net_tokio::SocketDescriptor, ErroringMessageHandler, Arc<GossipRouter>, IgnoringMessageHandler, TestLogger, IgnoringMessageHandler>>;
+pub(crate) type GossipPeerManager = Arc<PeerManager<lightning_net_tokio::SocketDescriptor, ErroringMessageHandler, Arc<GossipRouter>, IgnoringMessageHandler, TestLogger, IgnoringMessageHandler, Arc<KeysManager>>>;
#[derive(Debug)]
pub(crate) enum GossipMessage {
use std::convert::TryInto;
+use std::sync::Arc;
+use std::sync::Mutex;
use bitcoin::{BlockHash, TxOut};
use bitcoin::blockdata::block::Block;
use bitcoin::hashes::Hash;
-use lightning::chain;
-use lightning::chain::AccessError;
+use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
+use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoResult, UtxoLookupError};
use lightning_block_sync::{BlockData, BlockSource};
use lightning_block_sync::http::BinaryResponse;
use lightning_block_sync::rest::RestClient;
use crate::config;
+use crate::TestLogger;
+use crate::types::GossipPeerManager;
pub(crate) struct ChainVerifier {
- rest_client: RestClient,
+ rest_client: Arc<RestClient>,
+ graph: Arc<NetworkGraph<TestLogger>>,
+ outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, Arc<Self>, TestLogger>>,
+ peer_handler: Mutex<Option<GossipPeerManager>>,
}
struct RestBinaryResponse(Vec<u8>);
impl ChainVerifier {
- pub(crate) fn new() -> Self {
+ pub(crate) fn new(graph: Arc<NetworkGraph<TestLogger>>, outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, Arc<Self>, TestLogger>>) -> Self {
ChainVerifier {
- rest_client: RestClient::new(config::bitcoin_rest_endpoint()).unwrap(),
+ rest_client: Arc::new(RestClient::new(config::bitcoin_rest_endpoint()).unwrap()),
+ outbound_gossiper,
+ graph,
+ peer_handler: Mutex::new(None),
}
}
-
- fn retrieve_block(&self, block_height: u32) -> Result<Block, AccessError> {
- tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
- let uri = format!("blockhashbyheight/{}.bin", block_height);
- let block_hash_result =
- self.rest_client.request_resource::<BinaryResponse, RestBinaryResponse>(&uri).await;
- let block_hash: Vec<u8> = block_hash_result.map_err(|error| {
- eprintln!("Could't find block hash at height {}: {}", block_height, error.to_string());
- AccessError::UnknownChain
- })?.0;
- let block_hash = BlockHash::from_slice(&block_hash).unwrap();
-
- let block_result = self.rest_client.get_block(&block_hash).await;
- match block_result {
- Ok(BlockData::FullBlock(block)) => {
- Ok(block)
- },
- Ok(_) => unreachable!(),
- Err(error) => {
- eprintln!("Couldn't retrieve block {}: {:?} ({})", block_height, error, block_hash);
- Err(AccessError::UnknownChain)
- }
- }
- }) })
+ pub(crate) fn set_ph(&self, peer_handler: GossipPeerManager) {
+ *self.peer_handler.lock().unwrap() = Some(peer_handler);
}
-}
-impl chain::Access for ChainVerifier {
- fn get_utxo(&self, _genesis_hash: &BlockHash, short_channel_id: u64) -> Result<TxOut, AccessError> {
+ async fn retrieve_utxo(client: Arc<RestClient>, short_channel_id: u64) -> Result<TxOut, UtxoLookupError> {
let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes
let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32;
let output_index = (short_channel_id & 0xffff) as u16;
- let block = self.retrieve_block(block_height)?;
- let transaction = block.txdata.get(transaction_index as usize).ok_or_else(|| {
- eprintln!("Transaction index {} out of bounds in block {} ({})", transaction_index, block_height, block.block_hash().to_string());
- AccessError::UnknownTx
- })?;
- let output = transaction.output.get(output_index as usize).ok_or_else(|| {
- eprintln!("Output index {} out of bounds in transaction {}", output_index, transaction.txid().to_string());
- AccessError::UnknownTx
- })?;
- Ok(output.clone())
+ let mut block = Self::retrieve_block(client, block_height).await?;
+ if transaction_index as usize >= block.txdata.len() { return Err(UtxoLookupError::UnknownTx); }
+ let mut transaction = block.txdata.swap_remove(transaction_index as usize);
+ if output_index as usize >= transaction.output.len() { return Err(UtxoLookupError::UnknownTx); }
+ Ok(transaction.output.swap_remove(output_index as usize))
+ }
+
+ async fn retrieve_block(client: Arc<RestClient>, block_height: u32) -> Result<Block, UtxoLookupError> {
+ let uri = format!("blockhashbyheight/{}.bin", block_height);
+ let block_hash_result =
+ client.request_resource::<BinaryResponse, RestBinaryResponse>(&uri).await;
+ let block_hash: Vec<u8> = block_hash_result.map_err(|error| {
+ eprintln!("Could't find block hash at height {}: {}", block_height, error.to_string());
+ UtxoLookupError::UnknownChain
+ })?.0;
+ let block_hash = BlockHash::from_slice(&block_hash).unwrap();
+
+ let block_result = client.get_block(&block_hash).await;
+ match block_result {
+ Ok(BlockData::FullBlock(block)) => {
+ Ok(block)
+ },
+ Ok(_) => unreachable!(),
+ Err(error) => {
+ eprintln!("Couldn't retrieve block {}: {:?} ({})", block_height, error, block_hash);
+ Err(UtxoLookupError::UnknownChain)
+ }
+ }
+ }
+}
+
+impl UtxoLookup for ChainVerifier {
+ fn get_utxo(&self, _genesis_hash: &BlockHash, short_channel_id: u64) -> UtxoResult {
+ let res = UtxoFuture::new();
+ let fut = res.clone();
+ let graph_ref = Arc::clone(&self.graph);
+ let client_ref = Arc::clone(&self.rest_client);
+ let gossip_ref = Arc::clone(&self.outbound_gossiper);
+ let pm_ref = self.peer_handler.lock().unwrap().clone();
+ tokio::spawn(async move {
+ let res = Self::retrieve_utxo(client_ref, short_channel_id).await;
+ fut.resolve(&*graph_ref, &*gossip_ref, res);
+ if let Some(pm) = pm_ref { pm.process_events(); }
+ });
+ UtxoResult::Async(res)
}
}