From b1d015229e3e6dc97e9bd24b892e2005e3b90779 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 8 Feb 2023 16:58:06 +0000 Subject: [PATCH] Upgrade to latest lightning crate, switching to async chain resolution --- Cargo.toml | 6 +-- src/downloader.rs | 93 +++++++++++++++++++++++++--------------- src/lib.rs | 23 +++++----- src/lookup.rs | 2 +- src/tracking.rs | 9 ++-- src/types.rs | 3 +- src/verifier.rs | 105 +++++++++++++++++++++++++++------------------- 7 files changed, 144 insertions(+), 97 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 852c7c2..c2c689b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,9 +5,9 @@ edition = "2018" [dependencies] bitcoin = "0.29" -lightning = { version = "0.0.112" } -lightning-block-sync = { version = "0.0.112", features=["rest-client"] } -lightning-net-tokio = { version = "0.0.112" } +lightning = { version = "0.0.114" } +lightning-block-sync = { version = "0.0.114", features=["rest-client"] } +lightning-net-tokio = { version = "0.0.114" } tokio = { version = "1.14.1", features = ["full"] } tokio-postgres = { version="0.7.5" } futures = "0.3" diff --git a/src/downloader.rs b/src/downloader.rs index 4fc3d51..9256100 100644 --- a/src/downloader.rs +++ b/src/downloader.rs @@ -3,13 +3,13 @@ use std::sync::{Arc, RwLock}; use bitcoin::secp256k1::PublicKey; use lightning::ln::features::{InitFeatures, NodeFeatures}; use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler}; -use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; +use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync}; use lightning::util::events::{MessageSendEvent, MessageSendEventsProvider}; use tokio::sync::mpsc; use tokio::sync::mpsc::error::TrySendError; use crate::TestLogger; -use crate::types::{GossipMessage, GossipChainAccess}; +use crate::types::{GossipMessage, GossipChainAccess, GossipPeerManager}; use crate::verifier::ChainVerifier; pub(crate) struct GossipCounter { @@ -34,59 +34,45 @@ pub(crate) struct GossipRouter { native_router: P2PGossipSync>, GossipChainAccess, TestLogger>, pub(crate) counter: RwLock, sender: mpsc::Sender, + verifier: Arc, + outbound_gossiper: Arc>, GossipChainAccess, TestLogger>>, } impl GossipRouter { pub(crate) fn new(network_graph: Arc>, sender: mpsc::Sender) -> Self { + let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, TestLogger::new())); + let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper))); Self { - native_router: P2PGossipSync::new(network_graph, Some(Arc::new(ChainVerifier::new())), TestLogger::new()), + native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), TestLogger::new()), + outbound_gossiper, counter: RwLock::new(GossipCounter::new()), - sender + sender, + verifier, } } -} -impl MessageSendEventsProvider for GossipRouter { - fn get_and_clear_pending_msg_events(&self) -> Vec { - self.native_router.get_and_clear_pending_msg_events() + pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager) { + self.verifier.set_ph(peer_handler); } -} -impl RoutingMessageHandler for GossipRouter { - fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result { - self.native_router.handle_node_announcement(msg) - } - - fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result { - let native_result = self.native_router.handle_channel_announcement(msg); - let output_value; + fn new_channel_announcement(&self, msg: ChannelAnnouncement) { { let mut counter = self.counter.write().unwrap(); - output_value = native_result.map_err(|error| { - if error.err.contains("didn't match on-chain script") { - counter.channel_announcements_with_mismatched_scripts += 1; - } - error - })?; counter.channel_announcements += 1; } - let gossip_message = GossipMessage::ChannelAnnouncement(msg.clone()); + let gossip_message = GossipMessage::ChannelAnnouncement(msg); if let Err(err) = self.sender.try_send(gossip_message) { let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg }; tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move { self.sender.send(gossip_message).await.unwrap(); })}); } - - Ok(output_value) } - fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result { - let output_value = self.native_router.handle_channel_update(msg)?; - + fn new_channel_update(&self, msg: ChannelUpdate) { self.counter.write().unwrap().channel_updates += 1; - let gossip_message = GossipMessage::ChannelUpdate(msg.clone()); + let gossip_message = GossipMessage::ChannelUpdate(msg); if let Err(err) = self.sender.try_send(gossip_message) { let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg }; @@ -94,20 +80,59 @@ impl RoutingMessageHandler for GossipRouter { self.sender.send(gossip_message).await.unwrap(); })}); } + } +} + +impl MessageSendEventsProvider for GossipRouter { + fn get_and_clear_pending_msg_events(&self) -> Vec { + let gossip_evs = self.outbound_gossiper.get_and_clear_pending_msg_events(); + for ev in gossip_evs { + match ev { + MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg: None } => { + self.new_channel_announcement(msg); + }, + MessageSendEvent::BroadcastNodeAnnouncement { .. } => {}, + MessageSendEvent::BroadcastChannelUpdate { msg } => { + self.new_channel_update(msg); + }, + _ => { unreachable!() }, + } + } + self.native_router.get_and_clear_pending_msg_events() + } +} + +impl RoutingMessageHandler for GossipRouter { + fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result { + self.native_router.handle_node_announcement(msg) + } + + fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result { + let res = self.native_router.handle_channel_announcement(msg)?; + self.new_channel_announcement(msg.clone()); + Ok(res) + } + + fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result { + let res = self.native_router.handle_channel_update(msg)?; + self.new_channel_update(msg.clone()); + Ok(res) + } - Ok(output_value) + fn processing_queue_high(&self) -> bool { + self.native_router.processing_queue_high() } fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(ChannelAnnouncement, Option, Option)> { self.native_router.get_next_channel_announcement(starting_point) } - fn get_next_node_announcement(&self, starting_point: Option<&PublicKey>) -> Option { + fn get_next_node_announcement(&self, starting_point: Option<&NodeId>) -> Option { self.native_router.get_next_node_announcement(starting_point) } - fn peer_connected(&self, their_node_id: &PublicKey, init: &Init) -> Result<(), ()> { - self.native_router.peer_connected(their_node_id, init) + fn peer_connected(&self, their_node_id: &PublicKey, init: &Init, inbound: bool) -> Result<(), ()> { + self.native_router.peer_connected(their_node_id, init, inbound) } fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError> { diff --git a/src/lib.rs b/src/lib.rs index 026357c..f8a4231 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,9 +15,7 @@ use std::fs::File; use std::io::BufReader; use std::sync::Arc; -use bitcoin::blockdata::constants::genesis_block; -use bitcoin::secp256k1::PublicKey; -use lightning::routing::gossip::NetworkGraph; +use lightning::routing::gossip::{NetworkGraph, NodeId}; use lightning::util::ser::{ReadableArgs, Writeable}; use tokio::sync::mpsc; use crate::lookup::DeltaSet; @@ -64,10 +62,10 @@ impl RapidSyncProcessor { network_graph } else { println!("Initialization from cached network graph failed: {}", network_graph_result.err().unwrap()); - NetworkGraph::new(genesis_block(network).header.block_hash(), logger) + NetworkGraph::new(network, logger) } } else { - NetworkGraph::new(genesis_block(network).header.block_hash(), logger) + NetworkGraph::new(network, logger) }; let arc_network_graph = Arc::new(network_graph); Self { @@ -119,21 +117,20 @@ async fn serialize_delta(network_graph: Arc>, last_sync // chain hash only necessary if either channel announcements or non-incremental updates are present // for announcement-free incremental-only updates, chain hash can be skipped - let mut node_id_set: HashSet<[u8; 33]> = HashSet::new(); - let mut node_id_indices: HashMap<[u8; 33], usize> = HashMap::new(); - let mut node_ids: Vec = Vec::new(); + let mut node_id_set: HashSet = HashSet::new(); + let mut node_id_indices: HashMap = HashMap::new(); + let mut node_ids: Vec = Vec::new(); let mut duplicate_node_ids: i32 = 0; - let mut get_node_id_index = |node_id: PublicKey| { - let serialized_node_id = node_id.serialize(); - if node_id_set.insert(serialized_node_id) { + let mut get_node_id_index = |node_id: NodeId| { + if node_id_set.insert(node_id) { node_ids.push(node_id); let index = node_ids.len() - 1; - node_id_indices.insert(serialized_node_id, index); + node_id_indices.insert(node_id, index); return index; } duplicate_node_ids += 1; - node_id_indices[&serialized_node_id] + node_id_indices[&node_id] }; let mut delta_set = DeltaSet::new(); diff --git a/src/lookup.rs b/src/lookup.rs index b48b375..1bac79f 100644 --- a/src/lookup.rs +++ b/src/lookup.rs @@ -70,7 +70,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ let channel_ids = { let read_only_graph = network_graph.read_only(); println!("Retrieved read-only network graph copy"); - let channel_iterator = read_only_graph.channels().into_iter(); + let channel_iterator = read_only_graph.channels().unordered_iter(); channel_iterator .filter(|c| c.1.announcement_message.is_some()) .map(|c| c.1.announcement_message.as_ref().unwrap().contents.short_channel_id as i64) diff --git a/src/tracking.rs b/src/tracking.rs index 810a037..0f34924 100644 --- a/src/tracking.rs +++ b/src/tracking.rs @@ -5,12 +5,13 @@ use std::sync::Arc; use std::time::{Duration, Instant}; use bitcoin::hashes::hex::ToHex; -use bitcoin::secp256k1::{PublicKey, SecretKey}; +use bitcoin::secp256k1::PublicKey; use lightning; use lightning::ln::peer_handler::{ ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager, }; use lightning::routing::gossip::NetworkGraph; +use lightning::chain::keysinterface::KeysManager; use tokio::sync::mpsc; use crate::{config, TestLogger}; @@ -30,7 +31,8 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender; -pub(crate) type GossipPeerManager = Arc, IgnoringMessageHandler, TestLogger, IgnoringMessageHandler>>; +pub(crate) type GossipPeerManager = Arc, IgnoringMessageHandler, TestLogger, IgnoringMessageHandler, Arc>>; #[derive(Debug)] pub(crate) enum GossipMessage { diff --git a/src/verifier.rs b/src/verifier.rs index 62c191a..4bda871 100644 --- a/src/verifier.rs +++ b/src/verifier.rs @@ -1,71 +1,92 @@ use std::convert::TryInto; +use std::sync::Arc; +use std::sync::Mutex; use bitcoin::{BlockHash, TxOut}; use bitcoin::blockdata::block::Block; use bitcoin::hashes::Hash; -use lightning::chain; -use lightning::chain::AccessError; +use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; +use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoResult, UtxoLookupError}; use lightning_block_sync::{BlockData, BlockSource}; use lightning_block_sync::http::BinaryResponse; use lightning_block_sync::rest::RestClient; use crate::config; +use crate::TestLogger; +use crate::types::GossipPeerManager; pub(crate) struct ChainVerifier { - rest_client: RestClient, + rest_client: Arc, + graph: Arc>, + outbound_gossiper: Arc>, Arc, TestLogger>>, + peer_handler: Mutex>, } struct RestBinaryResponse(Vec); impl ChainVerifier { - pub(crate) fn new() -> Self { + pub(crate) fn new(graph: Arc>, outbound_gossiper: Arc>, Arc, TestLogger>>) -> Self { ChainVerifier { - rest_client: RestClient::new(config::bitcoin_rest_endpoint()).unwrap(), + rest_client: Arc::new(RestClient::new(config::bitcoin_rest_endpoint()).unwrap()), + outbound_gossiper, + graph, + peer_handler: Mutex::new(None), } } - - fn retrieve_block(&self, block_height: u32) -> Result { - tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move { - let uri = format!("blockhashbyheight/{}.bin", block_height); - let block_hash_result = - self.rest_client.request_resource::(&uri).await; - let block_hash: Vec = block_hash_result.map_err(|error| { - eprintln!("Could't find block hash at height {}: {}", block_height, error.to_string()); - AccessError::UnknownChain - })?.0; - let block_hash = BlockHash::from_slice(&block_hash).unwrap(); - - let block_result = self.rest_client.get_block(&block_hash).await; - match block_result { - Ok(BlockData::FullBlock(block)) => { - Ok(block) - }, - Ok(_) => unreachable!(), - Err(error) => { - eprintln!("Couldn't retrieve block {}: {:?} ({})", block_height, error, block_hash); - Err(AccessError::UnknownChain) - } - } - }) }) + pub(crate) fn set_ph(&self, peer_handler: GossipPeerManager) { + *self.peer_handler.lock().unwrap() = Some(peer_handler); } -} -impl chain::Access for ChainVerifier { - fn get_utxo(&self, _genesis_hash: &BlockHash, short_channel_id: u64) -> Result { + async fn retrieve_utxo(client: Arc, short_channel_id: u64) -> Result { let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32; let output_index = (short_channel_id & 0xffff) as u16; - let block = self.retrieve_block(block_height)?; - let transaction = block.txdata.get(transaction_index as usize).ok_or_else(|| { - eprintln!("Transaction index {} out of bounds in block {} ({})", transaction_index, block_height, block.block_hash().to_string()); - AccessError::UnknownTx - })?; - let output = transaction.output.get(output_index as usize).ok_or_else(|| { - eprintln!("Output index {} out of bounds in transaction {}", output_index, transaction.txid().to_string()); - AccessError::UnknownTx - })?; - Ok(output.clone()) + let mut block = Self::retrieve_block(client, block_height).await?; + if transaction_index as usize >= block.txdata.len() { return Err(UtxoLookupError::UnknownTx); } + let mut transaction = block.txdata.swap_remove(transaction_index as usize); + if output_index as usize >= transaction.output.len() { return Err(UtxoLookupError::UnknownTx); } + Ok(transaction.output.swap_remove(output_index as usize)) + } + + async fn retrieve_block(client: Arc, block_height: u32) -> Result { + let uri = format!("blockhashbyheight/{}.bin", block_height); + let block_hash_result = + client.request_resource::(&uri).await; + let block_hash: Vec = block_hash_result.map_err(|error| { + eprintln!("Could't find block hash at height {}: {}", block_height, error.to_string()); + UtxoLookupError::UnknownChain + })?.0; + let block_hash = BlockHash::from_slice(&block_hash).unwrap(); + + let block_result = client.get_block(&block_hash).await; + match block_result { + Ok(BlockData::FullBlock(block)) => { + Ok(block) + }, + Ok(_) => unreachable!(), + Err(error) => { + eprintln!("Couldn't retrieve block {}: {:?} ({})", block_height, error, block_hash); + Err(UtxoLookupError::UnknownChain) + } + } + } +} + +impl UtxoLookup for ChainVerifier { + fn get_utxo(&self, _genesis_hash: &BlockHash, short_channel_id: u64) -> UtxoResult { + let res = UtxoFuture::new(); + let fut = res.clone(); + let graph_ref = Arc::clone(&self.graph); + let client_ref = Arc::clone(&self.rest_client); + let gossip_ref = Arc::clone(&self.outbound_gossiper); + let pm_ref = self.peer_handler.lock().unwrap().clone(); + tokio::spawn(async move { + let res = Self::retrieve_utxo(client_ref, short_channel_id).await; + fut.resolve(&*graph_ref, &*gossip_ref, res); + if let Some(pm) = pm_ref { pm.process_events(); } + }); + UtxoResult::Async(res) } } -- 2.39.5