Upgrade to latest lightning crate, switching to async chain resolution
authorMatt Corallo <git@bluematt.me>
Wed, 8 Feb 2023 16:58:06 +0000 (16:58 +0000)
committerMatt Corallo <git@bluematt.me>
Tue, 7 Mar 2023 00:53:44 +0000 (00:53 +0000)
Cargo.toml
src/downloader.rs
src/lib.rs
src/lookup.rs
src/tracking.rs
src/types.rs
src/verifier.rs

index 852c7c2c2ac62adffbc1ce381408cf96ff38619a..c2c689b9407439088e8cd818edfdd0937c53e92c 100644 (file)
@@ -5,9 +5,9 @@ edition = "2018"
 
 [dependencies]
 bitcoin = "0.29"
-lightning = { version = "0.0.112" }
-lightning-block-sync = { version = "0.0.112", features=["rest-client"] }
-lightning-net-tokio = { version = "0.0.112" }
+lightning = { version = "0.0.114" }
+lightning-block-sync = { version = "0.0.114", features=["rest-client"] }
+lightning-net-tokio = { version = "0.0.114" }
 tokio = { version = "1.14.1", features = ["full"] }
 tokio-postgres = { version="0.7.5" }
 futures = "0.3"
index 4fc3d51c6c005f610af7680af999068747c54a04..9256100b581748100147d523fbe9f1a9ae0cee0b 100644 (file)
@@ -3,13 +3,13 @@ use std::sync::{Arc, RwLock};
 use bitcoin::secp256k1::PublicKey;
 use lightning::ln::features::{InitFeatures, NodeFeatures};
 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler};
-use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
+use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
 use lightning::util::events::{MessageSendEvent, MessageSendEventsProvider};
 use tokio::sync::mpsc;
 use tokio::sync::mpsc::error::TrySendError;
 
 use crate::TestLogger;
-use crate::types::{GossipMessage, GossipChainAccess};
+use crate::types::{GossipMessage, GossipChainAccess, GossipPeerManager};
 use crate::verifier::ChainVerifier;
 
 pub(crate) struct GossipCounter {
@@ -34,59 +34,45 @@ pub(crate) struct GossipRouter {
        native_router: P2PGossipSync<Arc<NetworkGraph<TestLogger>>, GossipChainAccess, TestLogger>,
        pub(crate) counter: RwLock<GossipCounter>,
        sender: mpsc::Sender<GossipMessage>,
+       verifier: Arc<ChainVerifier>,
+       outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, GossipChainAccess, TestLogger>>,
 }
 
 impl GossipRouter {
        pub(crate) fn new(network_graph: Arc<NetworkGraph<TestLogger>>, sender: mpsc::Sender<GossipMessage>) -> Self {
+               let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, TestLogger::new()));
+               let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper)));
                Self {
-                       native_router: P2PGossipSync::new(network_graph, Some(Arc::new(ChainVerifier::new())), TestLogger::new()),
+                       native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), TestLogger::new()),
+                       outbound_gossiper,
                        counter: RwLock::new(GossipCounter::new()),
-                       sender
+                       sender,
+                       verifier,
                }
        }
-}
 
-impl MessageSendEventsProvider for GossipRouter {
-       fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
-               self.native_router.get_and_clear_pending_msg_events()
+       pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager) {
+               self.verifier.set_ph(peer_handler);
        }
-}
 
-impl RoutingMessageHandler for GossipRouter {
-       fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
-               self.native_router.handle_node_announcement(msg)
-       }
-
-       fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result<bool, LightningError> {
-               let native_result = self.native_router.handle_channel_announcement(msg);
-               let output_value;
+       fn new_channel_announcement(&self, msg: ChannelAnnouncement) {
                {
                        let mut counter = self.counter.write().unwrap();
-                       output_value = native_result.map_err(|error| {
-                               if error.err.contains("didn't match on-chain script") {
-                                       counter.channel_announcements_with_mismatched_scripts += 1;
-                               }
-                               error
-                       })?;
                        counter.channel_announcements += 1;
                }
 
-               let gossip_message = GossipMessage::ChannelAnnouncement(msg.clone());
+               let gossip_message = GossipMessage::ChannelAnnouncement(msg);
                if let Err(err) = self.sender.try_send(gossip_message) {
                        let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
                        tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
                                self.sender.send(gossip_message).await.unwrap();
                        })});
                }
-
-               Ok(output_value)
        }
 
-       fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError> {
-               let output_value = self.native_router.handle_channel_update(msg)?;
-
+       fn new_channel_update(&self, msg: ChannelUpdate) {
                self.counter.write().unwrap().channel_updates += 1;
-               let gossip_message = GossipMessage::ChannelUpdate(msg.clone());
+               let gossip_message = GossipMessage::ChannelUpdate(msg);
 
                if let Err(err) = self.sender.try_send(gossip_message) {
                        let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
@@ -94,20 +80,59 @@ impl RoutingMessageHandler for GossipRouter {
                                self.sender.send(gossip_message).await.unwrap();
                        })});
                }
+       }
+}
+
+impl MessageSendEventsProvider for GossipRouter {
+       fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
+               let gossip_evs = self.outbound_gossiper.get_and_clear_pending_msg_events();
+               for ev in gossip_evs {
+                       match ev {
+                               MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg: None } => {
+                                       self.new_channel_announcement(msg);
+                               },
+                               MessageSendEvent::BroadcastNodeAnnouncement { .. } => {},
+                               MessageSendEvent::BroadcastChannelUpdate { msg } => {
+                                       self.new_channel_update(msg);
+                               },
+                               _ => { unreachable!() },
+                       }
+               }
+               self.native_router.get_and_clear_pending_msg_events()
+       }
+}
+
+impl RoutingMessageHandler for GossipRouter {
+       fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
+               self.native_router.handle_node_announcement(msg)
+       }
+
+       fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result<bool, LightningError> {
+               let res = self.native_router.handle_channel_announcement(msg)?;
+               self.new_channel_announcement(msg.clone());
+               Ok(res)
+       }
+
+       fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError> {
+               let res = self.native_router.handle_channel_update(msg)?;
+               self.new_channel_update(msg.clone());
+               Ok(res)
+       }
 
-               Ok(output_value)
+       fn processing_queue_high(&self) -> bool {
+               self.native_router.processing_queue_high()
        }
 
        fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
                self.native_router.get_next_channel_announcement(starting_point)
        }
 
-       fn get_next_node_announcement(&self, starting_point: Option<&PublicKey>) -> Option<NodeAnnouncement> {
+       fn get_next_node_announcement(&self, starting_point: Option<&NodeId>) -> Option<NodeAnnouncement> {
                self.native_router.get_next_node_announcement(starting_point)
        }
 
-       fn peer_connected(&self, their_node_id: &PublicKey, init: &Init) -> Result<(), ()> {
-               self.native_router.peer_connected(their_node_id, init)
+       fn peer_connected(&self, their_node_id: &PublicKey, init: &Init, inbound: bool) -> Result<(), ()> {
+               self.native_router.peer_connected(their_node_id, init, inbound)
        }
 
        fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError> {
index 026357cf2405e750ed40e4325cdb6cb5ac99f98c..f8a4231dc4dfb7303f525d7e95419c2a041b781e 100644 (file)
@@ -15,9 +15,7 @@ use std::fs::File;
 use std::io::BufReader;
 use std::sync::Arc;
 
-use bitcoin::blockdata::constants::genesis_block;
-use bitcoin::secp256k1::PublicKey;
-use lightning::routing::gossip::NetworkGraph;
+use lightning::routing::gossip::{NetworkGraph, NodeId};
 use lightning::util::ser::{ReadableArgs, Writeable};
 use tokio::sync::mpsc;
 use crate::lookup::DeltaSet;
@@ -64,10 +62,10 @@ impl RapidSyncProcessor {
                                network_graph
                        } else {
                                println!("Initialization from cached network graph failed: {}", network_graph_result.err().unwrap());
-                               NetworkGraph::new(genesis_block(network).header.block_hash(), logger)
+                               NetworkGraph::new(network, logger)
                        }
                } else {
-                       NetworkGraph::new(genesis_block(network).header.block_hash(), logger)
+                       NetworkGraph::new(network, logger)
                };
                let arc_network_graph = Arc::new(network_graph);
                Self {
@@ -119,21 +117,20 @@ async fn serialize_delta(network_graph: Arc<NetworkGraph<TestLogger>>, last_sync
        // chain hash only necessary if either channel announcements or non-incremental updates are present
        // for announcement-free incremental-only updates, chain hash can be skipped
 
-       let mut node_id_set: HashSet<[u8; 33]> = HashSet::new();
-       let mut node_id_indices: HashMap<[u8; 33], usize> = HashMap::new();
-       let mut node_ids: Vec<PublicKey> = Vec::new();
+       let mut node_id_set: HashSet<NodeId> = HashSet::new();
+       let mut node_id_indices: HashMap<NodeId, usize> = HashMap::new();
+       let mut node_ids: Vec<NodeId> = Vec::new();
        let mut duplicate_node_ids: i32 = 0;
 
-       let mut get_node_id_index = |node_id: PublicKey| {
-               let serialized_node_id = node_id.serialize();
-               if node_id_set.insert(serialized_node_id) {
+       let mut get_node_id_index = |node_id: NodeId| {
+               if node_id_set.insert(node_id) {
                        node_ids.push(node_id);
                        let index = node_ids.len() - 1;
-                       node_id_indices.insert(serialized_node_id, index);
+                       node_id_indices.insert(node_id, index);
                        return index;
                }
                duplicate_node_ids += 1;
-               node_id_indices[&serialized_node_id]
+               node_id_indices[&node_id]
        };
 
        let mut delta_set = DeltaSet::new();
index b48b375921ef07df6b40a3a639eeee98e5237ce0..1bac79f94fb1755de8d0128ec5aa8571fc1c7aaf 100644 (file)
@@ -70,7 +70,7 @@ pub(super) async fn fetch_channel_announcements(delta_set: &mut DeltaSet, networ
        let channel_ids = {
                let read_only_graph = network_graph.read_only();
                println!("Retrieved read-only network graph copy");
-               let channel_iterator = read_only_graph.channels().into_iter();
+               let channel_iterator = read_only_graph.channels().unordered_iter();
                channel_iterator
                        .filter(|c| c.1.announcement_message.is_some())
                        .map(|c| c.1.announcement_message.as_ref().unwrap().contents.short_channel_id as i64)
index 810a037828c328e0fc8f6e38dbcf25d4c1272552..0f34924ed9458f5618014bf785c7fb382789dae0 100644 (file)
@@ -5,12 +5,13 @@ use std::sync::Arc;
 use std::time::{Duration, Instant};
 
 use bitcoin::hashes::hex::ToHex;
-use bitcoin::secp256k1::{PublicKey, SecretKey};
+use bitcoin::secp256k1::PublicKey;
 use lightning;
 use lightning::ln::peer_handler::{
        ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager,
 };
 use lightning::routing::gossip::NetworkGraph;
+use lightning::chain::keysinterface::KeysManager;
 use tokio::sync::mpsc;
 
 use crate::{config, TestLogger};
@@ -30,7 +31,8 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessa
        rand_hasher.write_u8(2);
        random_data[0..8].copy_from_slice(&rand_hasher.finish().to_ne_bytes());
 
-       let our_node_secret = SecretKey::from_slice(&key).unwrap();
+       let keys_manager = Arc::new(KeysManager::new(&key, 0xdeadbeef, 0xdeadbeef));
+
        let router = Arc::new(GossipRouter::new(network_graph, persistence_sender.clone()));
 
        let message_handler = MessageHandler {
@@ -40,12 +42,13 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<GossipMessa
        };
        let peer_handler = Arc::new(PeerManager::new(
                message_handler,
-               our_node_secret,
                0xdeadbeef,
                &random_data,
                TestLogger::new(),
                IgnoringMessageHandler {},
+               keys_manager,
        ));
+       router.set_pm(Arc::clone(&peer_handler));
 
        println!("Connecting to Lightning peers...");
        let peers = config::ln_peers();
index 5b7434ee0de5b5d2d63055845fc664644539bd57..2d5a281e02024a8ab8c25b74e82684c17dc18d07 100644 (file)
@@ -1,6 +1,7 @@
 use std::sync::Arc;
 use std::ops::Deref;
 
+use lightning::chain::keysinterface::KeysManager;
 use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate};
 use lightning::ln::peer_handler::{ErroringMessageHandler, IgnoringMessageHandler, PeerManager};
 use lightning::util::logger::{Logger, Record};
@@ -9,7 +10,7 @@ use crate::downloader::GossipRouter;
 use crate::verifier::ChainVerifier;
 
 pub(crate) type GossipChainAccess = Arc<ChainVerifier>;
-pub(crate) type GossipPeerManager = Arc<PeerManager<lightning_net_tokio::SocketDescriptor, ErroringMessageHandler, Arc<GossipRouter>, IgnoringMessageHandler, TestLogger, IgnoringMessageHandler>>;
+pub(crate) type GossipPeerManager = Arc<PeerManager<lightning_net_tokio::SocketDescriptor, ErroringMessageHandler, Arc<GossipRouter>, IgnoringMessageHandler, TestLogger, IgnoringMessageHandler, Arc<KeysManager>>>;
 
 #[derive(Debug)]
 pub(crate) enum GossipMessage {
index 62c191a3ce2727d9930e06b0ca58425408753dfe..4bda871ad124d85336576f0b2e9b08e3b9c5a92b 100644 (file)
@@ -1,71 +1,92 @@
 use std::convert::TryInto;
+use std::sync::Arc;
+use std::sync::Mutex;
 
 use bitcoin::{BlockHash, TxOut};
 use bitcoin::blockdata::block::Block;
 use bitcoin::hashes::Hash;
-use lightning::chain;
-use lightning::chain::AccessError;
+use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
+use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoResult, UtxoLookupError};
 use lightning_block_sync::{BlockData, BlockSource};
 use lightning_block_sync::http::BinaryResponse;
 use lightning_block_sync::rest::RestClient;
 
 use crate::config;
+use crate::TestLogger;
+use crate::types::GossipPeerManager;
 
 pub(crate) struct ChainVerifier {
-       rest_client: RestClient,
+       rest_client: Arc<RestClient>,
+       graph: Arc<NetworkGraph<TestLogger>>,
+       outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, Arc<Self>, TestLogger>>,
+       peer_handler: Mutex<Option<GossipPeerManager>>,
 }
 
 struct RestBinaryResponse(Vec<u8>);
 
 impl ChainVerifier {
-       pub(crate) fn new() -> Self {
+       pub(crate) fn new(graph: Arc<NetworkGraph<TestLogger>>, outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, Arc<Self>, TestLogger>>) -> Self {
                ChainVerifier {
-                       rest_client: RestClient::new(config::bitcoin_rest_endpoint()).unwrap(),
+                       rest_client: Arc::new(RestClient::new(config::bitcoin_rest_endpoint()).unwrap()),
+                       outbound_gossiper,
+                       graph,
+                       peer_handler: Mutex::new(None),
                }
        }
-
-       fn retrieve_block(&self, block_height: u32) -> Result<Block, AccessError> {
-               tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
-                       let uri = format!("blockhashbyheight/{}.bin", block_height);
-                       let block_hash_result =
-                               self.rest_client.request_resource::<BinaryResponse, RestBinaryResponse>(&uri).await;
-                       let block_hash: Vec<u8> = block_hash_result.map_err(|error| {
-                               eprintln!("Could't find block hash at height {}: {}", block_height, error.to_string());
-                               AccessError::UnknownChain
-                       })?.0;
-                       let block_hash = BlockHash::from_slice(&block_hash).unwrap();
-
-                       let block_result = self.rest_client.get_block(&block_hash).await;
-                       match block_result {
-                               Ok(BlockData::FullBlock(block)) => {
-                                       Ok(block)
-                               },
-                               Ok(_) => unreachable!(),
-                               Err(error) => {
-                                       eprintln!("Couldn't retrieve block {}: {:?} ({})", block_height, error, block_hash);
-                                       Err(AccessError::UnknownChain)
-                               }
-                       }
-               }) })
+       pub(crate) fn set_ph(&self, peer_handler: GossipPeerManager) {
+               *self.peer_handler.lock().unwrap() = Some(peer_handler);
        }
-}
 
-impl chain::Access for ChainVerifier {
-       fn get_utxo(&self, _genesis_hash: &BlockHash, short_channel_id: u64) -> Result<TxOut, AccessError> {
+       async fn retrieve_utxo(client: Arc<RestClient>, short_channel_id: u64) -> Result<TxOut, UtxoLookupError> {
                let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes
                let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32;
                let output_index = (short_channel_id & 0xffff) as u16;
 
-               let block = self.retrieve_block(block_height)?;
-               let transaction = block.txdata.get(transaction_index as usize).ok_or_else(|| {
-                       eprintln!("Transaction index {} out of bounds in block {} ({})", transaction_index, block_height, block.block_hash().to_string());
-                       AccessError::UnknownTx
-               })?;
-               let output = transaction.output.get(output_index as usize).ok_or_else(|| {
-                       eprintln!("Output index {} out of bounds in transaction {}", output_index, transaction.txid().to_string());
-                       AccessError::UnknownTx
-               })?;
-               Ok(output.clone())
+               let mut block = Self::retrieve_block(client, block_height).await?;
+               if transaction_index as usize >= block.txdata.len() { return Err(UtxoLookupError::UnknownTx); }
+               let mut transaction = block.txdata.swap_remove(transaction_index as usize);
+               if output_index as usize >= transaction.output.len() { return Err(UtxoLookupError::UnknownTx); }
+               Ok(transaction.output.swap_remove(output_index as usize))
+       }
+
+       async fn retrieve_block(client: Arc<RestClient>, block_height: u32) -> Result<Block, UtxoLookupError> {
+               let uri = format!("blockhashbyheight/{}.bin", block_height);
+               let block_hash_result =
+                       client.request_resource::<BinaryResponse, RestBinaryResponse>(&uri).await;
+               let block_hash: Vec<u8> = block_hash_result.map_err(|error| {
+                       eprintln!("Could't find block hash at height {}: {}", block_height, error.to_string());
+                       UtxoLookupError::UnknownChain
+               })?.0;
+               let block_hash = BlockHash::from_slice(&block_hash).unwrap();
+
+               let block_result = client.get_block(&block_hash).await;
+               match block_result {
+                       Ok(BlockData::FullBlock(block)) => {
+                               Ok(block)
+                       },
+                       Ok(_) => unreachable!(),
+                       Err(error) => {
+                               eprintln!("Couldn't retrieve block {}: {:?} ({})", block_height, error, block_hash);
+                               Err(UtxoLookupError::UnknownChain)
+                       }
+               }
+       }
+}
+
+impl UtxoLookup for ChainVerifier {
+       fn get_utxo(&self, _genesis_hash: &BlockHash, short_channel_id: u64) -> UtxoResult {
+               let res = UtxoFuture::new();
+               let fut = res.clone();
+               let graph_ref = Arc::clone(&self.graph);
+               let client_ref = Arc::clone(&self.rest_client);
+               let gossip_ref = Arc::clone(&self.outbound_gossiper);
+               let pm_ref = self.peer_handler.lock().unwrap().clone();
+               tokio::spawn(async move {
+                       let res = Self::retrieve_utxo(client_ref, short_channel_id).await;
+                       fut.resolve(&*graph_ref, &*gossip_ref, res);
+                       if let Some(pm) = pm_ref { pm.process_events(); }
+               });
+               UtxoResult::Async(res)
        }
 }