[dependencies]
bitcoin = "0.29"
-lightning = { version = "0.0.112" }
-lightning-block-sync = { version = "0.0.112", features=["rest-client"] }
-lightning-net-tokio = { version = "0.0.112" }
+lightning = { version = "0.0.114" }
+lightning-block-sync = { version = "0.0.114", features=["rest-client"] }
+lightning-net-tokio = { version = "0.0.114" }
tokio = { version = "1.14.1", features = ["full"] }
tokio-postgres = { version="0.7.5" }
futures = "0.3"
use bitcoin::secp256k1::PublicKey;
use lightning::ln::features::{InitFeatures, NodeFeatures};
use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate, Init, LightningError, NodeAnnouncement, QueryChannelRange, QueryShortChannelIds, ReplyChannelRange, ReplyShortChannelIdsEnd, RoutingMessageHandler};
-use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
+use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
use lightning::util::events::{MessageSendEvent, MessageSendEventsProvider};
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use crate::TestLogger;
-use crate::types::{GossipMessage, GossipChainAccess};
+use crate::types::{GossipMessage, GossipChainAccess, GossipPeerManager};
use crate::verifier::ChainVerifier;
pub(crate) struct GossipCounter {
native_router: P2PGossipSync<Arc<NetworkGraph<TestLogger>>, GossipChainAccess, TestLogger>,
pub(crate) counter: RwLock<GossipCounter>,
sender: mpsc::Sender<GossipMessage>,
+ verifier: Arc<ChainVerifier>,
+ outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, GossipChainAccess, TestLogger>>,
}
impl GossipRouter {
pub(crate) fn new(network_graph: Arc<NetworkGraph<TestLogger>>, sender: mpsc::Sender<GossipMessage>) -> Self {
+ let outbound_gossiper = Arc::new(P2PGossipSync::new(Arc::clone(&network_graph), None, TestLogger::new()));
+ let verifier = Arc::new(ChainVerifier::new(Arc::clone(&network_graph), Arc::clone(&outbound_gossiper)));
Self {
- native_router: P2PGossipSync::new(network_graph, Some(Arc::new(ChainVerifier::new())), TestLogger::new()),
+ native_router: P2PGossipSync::new(network_graph, Some(Arc::clone(&verifier)), TestLogger::new()),
+ outbound_gossiper,
counter: RwLock::new(GossipCounter::new()),
- sender
+ sender,
+ verifier,
}
}
-}
-impl MessageSendEventsProvider for GossipRouter {
- fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
- self.native_router.get_and_clear_pending_msg_events()
+ pub(crate) fn set_pm(&self, peer_handler: GossipPeerManager) {
+ self.verifier.set_ph(peer_handler);
}
-}
-impl RoutingMessageHandler for GossipRouter {
- fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
- self.native_router.handle_node_announcement(msg)
- }
-
- fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result<bool, LightningError> {
- let native_result = self.native_router.handle_channel_announcement(msg);
- let output_value;
+ fn new_channel_announcement(&self, msg: ChannelAnnouncement) {
{
let mut counter = self.counter.write().unwrap();
- output_value = native_result.map_err(|error| {
- if error.err.contains("didn't match on-chain script") {
- counter.channel_announcements_with_mismatched_scripts += 1;
- }
- error
- })?;
counter.channel_announcements += 1;
}
- let gossip_message = GossipMessage::ChannelAnnouncement(msg.clone());
+ let gossip_message = GossipMessage::ChannelAnnouncement(msg);
if let Err(err) = self.sender.try_send(gossip_message) {
let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
self.sender.send(gossip_message).await.unwrap();
})});
}
-
- Ok(output_value)
}
- fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError> {
- let output_value = self.native_router.handle_channel_update(msg)?;
-
+ fn new_channel_update(&self, msg: ChannelUpdate) {
self.counter.write().unwrap().channel_updates += 1;
- let gossip_message = GossipMessage::ChannelUpdate(msg.clone());
+ let gossip_message = GossipMessage::ChannelUpdate(msg);
if let Err(err) = self.sender.try_send(gossip_message) {
let gossip_message = match err { TrySendError::Full(msg)|TrySendError::Closed(msg) => msg };
self.sender.send(gossip_message).await.unwrap();
})});
}
+ }
+}
+
+impl MessageSendEventsProvider for GossipRouter {
+ fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
+ let gossip_evs = self.outbound_gossiper.get_and_clear_pending_msg_events();
+ for ev in gossip_evs {
+ match ev {
+ MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg: None } => {
+ self.new_channel_announcement(msg);
+ },
+ MessageSendEvent::BroadcastNodeAnnouncement { .. } => {},
+ MessageSendEvent::BroadcastChannelUpdate { msg } => {
+ self.new_channel_update(msg);
+ },
+ _ => { unreachable!() },
+ }
+ }
+ self.native_router.get_and_clear_pending_msg_events()
+ }
+}
+
+impl RoutingMessageHandler for GossipRouter {
+ fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError> {
+ self.native_router.handle_node_announcement(msg)
+ }
+
+ fn handle_channel_announcement(&self, msg: &ChannelAnnouncement) -> Result<bool, LightningError> {
+ let res = self.native_router.handle_channel_announcement(msg)?;
+ self.new_channel_announcement(msg.clone());
+ Ok(res)
+ }
+
+ fn handle_channel_update(&self, msg: &ChannelUpdate) -> Result<bool, LightningError> {
+ let res = self.native_router.handle_channel_update(msg)?;
+ self.new_channel_update(msg.clone());
+ Ok(res)
+ }
- Ok(output_value)
+ fn processing_queue_high(&self) -> bool {
+ self.native_router.processing_queue_high()
}
fn get_next_channel_announcement(&self, starting_point: u64) -> Option<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
self.native_router.get_next_channel_announcement(starting_point)
}
- fn get_next_node_announcement(&self, starting_point: Option<&PublicKey>) -> Option<NodeAnnouncement> {
+ fn get_next_node_announcement(&self, starting_point: Option<&NodeId>) -> Option<NodeAnnouncement> {
self.native_router.get_next_node_announcement(starting_point)
}
- fn peer_connected(&self, their_node_id: &PublicKey, init: &Init) -> Result<(), ()> {
- self.native_router.peer_connected(their_node_id, init)
+ fn peer_connected(&self, their_node_id: &PublicKey, init: &Init, inbound: bool) -> Result<(), ()> {
+ self.native_router.peer_connected(their_node_id, init, inbound)
}
fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError> {
use std::io::BufReader;
use std::sync::Arc;
-use bitcoin::blockdata::constants::genesis_block;
-use bitcoin::secp256k1::PublicKey;
-use lightning::routing::gossip::NetworkGraph;
+use lightning::routing::gossip::{NetworkGraph, NodeId};
use lightning::util::ser::{ReadableArgs, Writeable};
use tokio::sync::mpsc;
use crate::lookup::DeltaSet;
network_graph
} else {
println!("Initialization from cached network graph failed: {}", network_graph_result.err().unwrap());
- NetworkGraph::new(genesis_block(network).header.block_hash(), logger)
+ NetworkGraph::new(network, logger)
}
} else {
- NetworkGraph::new(genesis_block(network).header.block_hash(), logger)
+ NetworkGraph::new(network, logger)
};
let arc_network_graph = Arc::new(network_graph);
Self {
// chain hash only necessary if either channel announcements or non-incremental updates are present
// for announcement-free incremental-only updates, chain hash can be skipped
- let mut node_id_set: HashSet<[u8; 33]> = HashSet::new();
- let mut node_id_indices: HashMap<[u8; 33], usize> = HashMap::new();
- let mut node_ids: Vec<PublicKey> = Vec::new();
+ let mut node_id_set: HashSet<NodeId> = HashSet::new();
+ let mut node_id_indices: HashMap<NodeId, usize> = HashMap::new();
+ let mut node_ids: Vec<NodeId> = Vec::new();
let mut duplicate_node_ids: i32 = 0;
- let mut get_node_id_index = |node_id: PublicKey| {
- let serialized_node_id = node_id.serialize();
- if node_id_set.insert(serialized_node_id) {
+ let mut get_node_id_index = |node_id: NodeId| {
+ if node_id_set.insert(node_id) {
node_ids.push(node_id);
let index = node_ids.len() - 1;
- node_id_indices.insert(serialized_node_id, index);
+ node_id_indices.insert(node_id, index);
return index;
}
duplicate_node_ids += 1;
- node_id_indices[&serialized_node_id]
+ node_id_indices[&node_id]
};
let mut delta_set = DeltaSet::new();
let channel_ids = {
let read_only_graph = network_graph.read_only();
println!("Retrieved read-only network graph copy");
- let channel_iterator = read_only_graph.channels().into_iter();
+ let channel_iterator = read_only_graph.channels().unordered_iter();
channel_iterator
.filter(|c| c.1.announcement_message.is_some())
.map(|c| c.1.announcement_message.as_ref().unwrap().contents.short_channel_id as i64)
use std::time::{Duration, Instant};
use bitcoin::hashes::hex::ToHex;
-use bitcoin::secp256k1::{PublicKey, SecretKey};
+use bitcoin::secp256k1::PublicKey;
use lightning;
use lightning::ln::peer_handler::{
ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager,
};
use lightning::routing::gossip::NetworkGraph;
+use lightning::chain::keysinterface::KeysManager;
use tokio::sync::mpsc;
use crate::{config, TestLogger};
rand_hasher.write_u8(2);
random_data[0..8].copy_from_slice(&rand_hasher.finish().to_ne_bytes());
- let our_node_secret = SecretKey::from_slice(&key).unwrap();
+ let keys_manager = Arc::new(KeysManager::new(&key, 0xdeadbeef, 0xdeadbeef));
+
let router = Arc::new(GossipRouter::new(network_graph, persistence_sender.clone()));
let message_handler = MessageHandler {
};
let peer_handler = Arc::new(PeerManager::new(
message_handler,
- our_node_secret,
0xdeadbeef,
&random_data,
TestLogger::new(),
IgnoringMessageHandler {},
+ keys_manager,
));
+ router.set_pm(Arc::clone(&peer_handler));
println!("Connecting to Lightning peers...");
let peers = config::ln_peers();
use std::sync::Arc;
use std::ops::Deref;
+use lightning::chain::keysinterface::KeysManager;
use lightning::ln::msgs::{ChannelAnnouncement, ChannelUpdate};
use lightning::ln::peer_handler::{ErroringMessageHandler, IgnoringMessageHandler, PeerManager};
use lightning::util::logger::{Logger, Record};
use crate::verifier::ChainVerifier;
pub(crate) type GossipChainAccess = Arc<ChainVerifier>;
-pub(crate) type GossipPeerManager = Arc<PeerManager<lightning_net_tokio::SocketDescriptor, ErroringMessageHandler, Arc<GossipRouter>, IgnoringMessageHandler, TestLogger, IgnoringMessageHandler>>;
+pub(crate) type GossipPeerManager = Arc<PeerManager<lightning_net_tokio::SocketDescriptor, ErroringMessageHandler, Arc<GossipRouter>, IgnoringMessageHandler, TestLogger, IgnoringMessageHandler, Arc<KeysManager>>>;
#[derive(Debug)]
pub(crate) enum GossipMessage {
use std::convert::TryInto;
+use std::sync::Arc;
+use std::sync::Mutex;
use bitcoin::{BlockHash, TxOut};
use bitcoin::blockdata::block::Block;
use bitcoin::hashes::Hash;
-use lightning::chain;
-use lightning::chain::AccessError;
+use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
+use lightning::routing::utxo::{UtxoFuture, UtxoLookup, UtxoResult, UtxoLookupError};
use lightning_block_sync::{BlockData, BlockSource};
use lightning_block_sync::http::BinaryResponse;
use lightning_block_sync::rest::RestClient;
use crate::config;
+use crate::TestLogger;
+use crate::types::GossipPeerManager;
pub(crate) struct ChainVerifier {
- rest_client: RestClient,
+ rest_client: Arc<RestClient>,
+ graph: Arc<NetworkGraph<TestLogger>>,
+ outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, Arc<Self>, TestLogger>>,
+ peer_handler: Mutex<Option<GossipPeerManager>>,
}
struct RestBinaryResponse(Vec<u8>);
impl ChainVerifier {
- pub(crate) fn new() -> Self {
+ pub(crate) fn new(graph: Arc<NetworkGraph<TestLogger>>, outbound_gossiper: Arc<P2PGossipSync<Arc<NetworkGraph<TestLogger>>, Arc<Self>, TestLogger>>) -> Self {
ChainVerifier {
- rest_client: RestClient::new(config::bitcoin_rest_endpoint()).unwrap(),
+ rest_client: Arc::new(RestClient::new(config::bitcoin_rest_endpoint()).unwrap()),
+ outbound_gossiper,
+ graph,
+ peer_handler: Mutex::new(None),
}
}
-
- fn retrieve_block(&self, block_height: u32) -> Result<Block, AccessError> {
- tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
- let uri = format!("blockhashbyheight/{}.bin", block_height);
- let block_hash_result =
- self.rest_client.request_resource::<BinaryResponse, RestBinaryResponse>(&uri).await;
- let block_hash: Vec<u8> = block_hash_result.map_err(|error| {
- eprintln!("Could't find block hash at height {}: {}", block_height, error.to_string());
- AccessError::UnknownChain
- })?.0;
- let block_hash = BlockHash::from_slice(&block_hash).unwrap();
-
- let block_result = self.rest_client.get_block(&block_hash).await;
- match block_result {
- Ok(BlockData::FullBlock(block)) => {
- Ok(block)
- },
- Ok(_) => unreachable!(),
- Err(error) => {
- eprintln!("Couldn't retrieve block {}: {:?} ({})", block_height, error, block_hash);
- Err(AccessError::UnknownChain)
- }
- }
- }) })
+ pub(crate) fn set_ph(&self, peer_handler: GossipPeerManager) {
+ *self.peer_handler.lock().unwrap() = Some(peer_handler);
}
-}
-impl chain::Access for ChainVerifier {
- fn get_utxo(&self, _genesis_hash: &BlockHash, short_channel_id: u64) -> Result<TxOut, AccessError> {
+ async fn retrieve_utxo(client: Arc<RestClient>, short_channel_id: u64) -> Result<TxOut, UtxoLookupError> {
let block_height = (short_channel_id >> 5 * 8) as u32; // block height is most significant three bytes
let transaction_index = ((short_channel_id >> 2 * 8) & 0xffffff) as u32;
let output_index = (short_channel_id & 0xffff) as u16;
- let block = self.retrieve_block(block_height)?;
- let transaction = block.txdata.get(transaction_index as usize).ok_or_else(|| {
- eprintln!("Transaction index {} out of bounds in block {} ({})", transaction_index, block_height, block.block_hash().to_string());
- AccessError::UnknownTx
- })?;
- let output = transaction.output.get(output_index as usize).ok_or_else(|| {
- eprintln!("Output index {} out of bounds in transaction {}", output_index, transaction.txid().to_string());
- AccessError::UnknownTx
- })?;
- Ok(output.clone())
+ let mut block = Self::retrieve_block(client, block_height).await?;
+ if transaction_index as usize >= block.txdata.len() { return Err(UtxoLookupError::UnknownTx); }
+ let mut transaction = block.txdata.swap_remove(transaction_index as usize);
+ if output_index as usize >= transaction.output.len() { return Err(UtxoLookupError::UnknownTx); }
+ Ok(transaction.output.swap_remove(output_index as usize))
+ }
+
+ async fn retrieve_block(client: Arc<RestClient>, block_height: u32) -> Result<Block, UtxoLookupError> {
+ let uri = format!("blockhashbyheight/{}.bin", block_height);
+ let block_hash_result =
+ client.request_resource::<BinaryResponse, RestBinaryResponse>(&uri).await;
+ let block_hash: Vec<u8> = block_hash_result.map_err(|error| {
+ eprintln!("Could't find block hash at height {}: {}", block_height, error.to_string());
+ UtxoLookupError::UnknownChain
+ })?.0;
+ let block_hash = BlockHash::from_slice(&block_hash).unwrap();
+
+ let block_result = client.get_block(&block_hash).await;
+ match block_result {
+ Ok(BlockData::FullBlock(block)) => {
+ Ok(block)
+ },
+ Ok(_) => unreachable!(),
+ Err(error) => {
+ eprintln!("Couldn't retrieve block {}: {:?} ({})", block_height, error, block_hash);
+ Err(UtxoLookupError::UnknownChain)
+ }
+ }
+ }
+}
+
+impl UtxoLookup for ChainVerifier {
+ fn get_utxo(&self, _genesis_hash: &BlockHash, short_channel_id: u64) -> UtxoResult {
+ let res = UtxoFuture::new();
+ let fut = res.clone();
+ let graph_ref = Arc::clone(&self.graph);
+ let client_ref = Arc::clone(&self.rest_client);
+ let gossip_ref = Arc::clone(&self.outbound_gossiper);
+ let pm_ref = self.peer_handler.lock().unwrap().clone();
+ tokio::spawn(async move {
+ let res = Self::retrieve_utxo(client_ref, short_channel_id).await;
+ fut.resolve(&*graph_ref, &*gossip_ref, res);
+ if let Some(pm) = pm_ref { pm.process_events(); }
+ });
+ UtxoResult::Async(res)
}
}