use bitcoin::blockdata::opcodes;
use bitcoin::consensus::encode::deserialize;
use bitcoin::network::constants::Network;
+use bitcoin::blockdata::constants::genesis_block;
use bitcoin::hashes::Hash as TraitImport;
use bitcoin::hashes::HashEngine as TraitImportEngine;
config.peer_channel_config_limits.min_dust_limit_satoshis = 0;
let channelmanager = Arc::new(ChannelManager::new(Network::Bitcoin, fee_est.clone(), monitor.clone(), broadcast.clone(), Arc::clone(&logger), keys_manager.clone(), config, 0));
let our_id = PublicKey::from_secret_key(&Secp256k1::signing_only(), &keys_manager.get_node_secret());
- let net_graph_msg_handler = Arc::new(NetGraphMsgHandler::new(None, Arc::clone(&logger)));
+ let net_graph_msg_handler = Arc::new(NetGraphMsgHandler::new(genesis_block(Network::Bitcoin).header.block_hash(), None, Arc::clone(&logger)));
let peers = RefCell::new([false; 256]);
let mut loss_detector = MoneyLossDetector::new(&peers, channelmanager.clone(), monitor.clone(), PeerManager::new(MessageHandler {
// What each byte represents is broken down below, and then everything is concatenated into
// one large test at the end (you want %s/ -.*//g %s/\n\| \|\t\|\///g).
- // Following BOLT 8, lightning message on the wire are: 2-byte encrypted message length +
+ // Following BOLT 8, lightning message on the wire are: 2-byte encrypted message length +
// 16-byte MAC of the encrypted message length + encrypted Lightning message + 16-byte MAC
// of the Lightning message
// I.e 2nd inbound read, len 18 : 0006 (encrypted message length) + 03000000000000000000000000000000 (MAC of the encrypted message length)
use lightning::routing::network_graph::{NetworkGraph, RoutingFees};
use bitcoin::secp256k1::key::PublicKey;
+use bitcoin::network::constants::Network;
+use bitcoin::blockdata::constants::genesis_block;
use utils::test_logger;
let logger: Arc<dyn Logger> = Arc::new(test_logger::TestLogger::new("".to_owned(), out));
let our_pubkey = get_pubkey!();
- let mut net_graph = NetworkGraph::new();
+ let mut net_graph = NetworkGraph::new(genesis_block(Network::Bitcoin).header.block_hash());
let mut node_pks = HashSet::new();
let mut scid = 42;
fn handle_htlc_fail_channel_update(&self, _update: &HTLCFailChannelUpdate) { }
fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> { Vec::new() }
fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<NodeAnnouncement> { Vec::new() }
- fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool { false }
+ fn sync_routing_table(&self, _their_node_id: &PublicKey, _init_msg: &Init) { }
+ fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: ReplyChannelRange) -> Result<(), LightningError> { Ok(()) }
+ fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) }
+ fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: QueryChannelRange) -> Result<(), LightningError> { Ok(()) }
+ fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) }
}
impl ChannelMessageHandler for MsgHandler {
fn handle_open_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &OpenChannel) {}
&events::MessageSendEvent::BroadcastChannelUpdate { .. } => true,
&events::MessageSendEvent::HandleError { ref node_id, .. } => node_id != counterparty_node_id,
&events::MessageSendEvent::PaymentFailureNetworkUpdate { .. } => true,
+ &events::MessageSendEvent::SendChannelRangeQuery { .. } => false,
+ &events::MessageSendEvent::SendShortIdsQuery { .. } => false,
}
});
}
],
optional_features: [
// Byte 0
- DataLossProtect | InitialRoutingSync | UpfrontShutdownScript,
+ DataLossProtect | InitialRoutingSync | UpfrontShutdownScript | GossipQueries,
// Byte 1
VariableLengthOnion | PaymentSecret,
// Byte 2
],
optional_features: [
// Byte 0
- DataLossProtect | UpfrontShutdownScript,
+ DataLossProtect | UpfrontShutdownScript | GossipQueries,
// Byte 1
VariableLengthOnion | PaymentSecret,
// Byte 2
"Feature flags for `initial_routing_sync`.");
define_feature!(5, UpfrontShutdownScript, [InitContext, NodeContext],
"Feature flags for `option_upfront_shutdown_script`.");
+ define_feature!(7, GossipQueries, [InitContext, NodeContext],
+ "Feature flags for `gossip_queries`.");
define_feature!(9, VariableLengthOnion, [InitContext, NodeContext],
"Feature flags for `var_onion_optin`.");
define_feature!(13, StaticRemoteKey, [InitContext, NodeContext],
}
}
+
+impl<T: sealed::GossipQueries> Features<T> {
+ #[cfg(test)]
+ pub(crate) fn requires_gossip_queries(&self) -> bool {
+ <T as sealed::GossipQueries>::requires_feature(&self.flags)
+ }
+ pub(crate) fn supports_gossip_queries(&self) -> bool {
+ <T as sealed::GossipQueries>::supports_feature(&self.flags)
+ }
+ #[cfg(test)]
+ pub(crate) fn clear_gossip_queries(mut self) -> Self {
+ <T as sealed::GossipQueries>::clear_bits(&mut self.flags);
+ self
+ }
+}
+
impl<T: sealed::VariableLengthOnion> Features<T> {
#[cfg(test)]
pub(crate) fn requires_variable_length_onion(&self) -> bool {
pub(crate) fn initial_routing_sync(&self) -> bool {
<T as sealed::InitialRoutingSync>::supports_feature(&self.flags)
}
+ // We are no longer setting initial_routing_sync now that gossip_queries
+ // is enabled. This feature is ignored by a peer when gossip_queries has
+ // been negotiated.
+ #[cfg(test)]
pub(crate) fn clear_initial_routing_sync(&mut self) {
<T as sealed::InitialRoutingSync>::clear_bits(&mut self.flags)
}
assert!(!InitFeatures::known().requires_upfront_shutdown_script());
assert!(!NodeFeatures::known().requires_upfront_shutdown_script());
+ assert!(InitFeatures::known().supports_gossip_queries());
+ assert!(NodeFeatures::known().supports_gossip_queries());
+ assert!(!InitFeatures::known().requires_gossip_queries());
+ assert!(!NodeFeatures::known().requires_gossip_queries());
+
assert!(InitFeatures::known().supports_data_loss_protect());
assert!(NodeFeatures::known().supports_data_loss_protect());
assert!(!InitFeatures::known().requires_data_loss_protect());
#[test]
fn convert_to_context_with_relevant_flags() {
- let init_features = InitFeatures::known().clear_upfront_shutdown_script();
+ let init_features = InitFeatures::known().clear_upfront_shutdown_script().clear_gossip_queries();
assert!(init_features.initial_routing_sync());
assert!(!init_features.supports_upfront_shutdown_script());
+ assert!(!init_features.supports_gossip_queries());
let node_features: NodeFeatures = init_features.to_context();
{
// Check that cleared flags are kept blank when converting back:
// - initial_routing_sync was not applicable to NodeContext
// - upfront_shutdown_script was cleared before converting
+ // - gossip_queries was cleared before converting
let features: InitFeatures = node_features.to_context_internal();
assert!(!features.initial_routing_sync());
assert!(!features.supports_upfront_shutdown_script());
+ assert!(!init_features.supports_gossip_queries());
}
}
let payment_count = Rc::new(RefCell::new(0));
for i in 0..node_count {
- let net_graph_msg_handler = NetGraphMsgHandler::new(None, cfgs[i].logger);
+ let net_graph_msg_handler = NetGraphMsgHandler::new(cfgs[i].chain_source.genesis_hash, None, cfgs[i].logger);
nodes.push(Node{ chain_source: cfgs[i].chain_source,
tx_broadcaster: cfgs[i].tx_broadcaster, chain_monitor: &cfgs[i].chain_monitor,
keys_manager: &cfgs[i].keys_manager, node: &chan_mgrs[i], net_graph_msg_handler,
}
/// A trait to describe an object which can receive routing messages.
-pub trait RoutingMessageHandler : Send + Sync {
+///
+/// # Implementor DoS Warnings
+///
+/// For `gossip_queries` messages there are potential DoS vectors when handling
+/// inbound queries. Implementors using an on-disk network graph should be aware of
+/// repeated disk I/O for queries accessing different parts of the network graph.
+pub trait RoutingMessageHandler : Send + Sync + events::MessageSendEventsProvider {
/// Handle an incoming node_announcement message, returning true if it should be forwarded on,
/// false or returning an Err otherwise.
fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError>;
/// immediately higher (as defined by <PublicKey as Ord>::cmp) than starting_point.
/// If None is provided for starting_point, we start at the first node.
fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<NodeAnnouncement>;
- /// Returns whether a full sync should be requested from a peer.
- fn should_request_full_sync(&self, node_id: &PublicKey) -> bool;
+ /// Called when a connection is established with a peer. This can be used to
+ /// perform routing table synchronization using a strategy defined by the
+ /// implementor.
+ fn sync_routing_table(&self, their_node_id: &PublicKey, init: &Init);
+ /// Handles the reply of a query we initiated to learn about channels
+ /// for a given range of blocks. We can expect to receive one or more
+ /// replies to a single query.
+ fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError>;
+ /// Handles the reply of a query we initiated asking for routing gossip
+ /// messages for a list of channels. We should receive this message when
+ /// a node has completed its best effort to send us the pertaining routing
+ /// gossip messages.
+ fn handle_reply_short_channel_ids_end(&self, their_node_id: &PublicKey, msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError>;
+ /// Handles when a peer asks us to send a list of short_channel_ids
+ /// for the requested range of blocks.
+ fn handle_query_channel_range(&self, their_node_id: &PublicKey, msg: QueryChannelRange) -> Result<(), LightningError>;
+ /// Handles when a peer asks us to send routing gossip messages for a
+ /// list of short_channel_ids.
+ fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: QueryShortChannelIds) -> Result<(), LightningError>;
}
mod fuzzy_internal_msgs {
peer.their_node_id = Some(their_node_id);
insert_node_id!();
- let mut features = InitFeatures::known();
- if !self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) {
- features.clear_initial_routing_sync();
- }
-
+ let features = InitFeatures::known();
let resp = msgs::Init { features };
self.enqueue_message(&mut peers.peers_needing_send, peer, peer_descriptor.clone(), &resp);
},
}
log_info!(
- self.logger, "Received peer Init message: data_loss_protect: {}, initial_routing_sync: {}, upfront_shutdown_script: {}, static_remote_key: {}, unknown flags (local and global): {}",
+ self.logger, "Received peer Init message: data_loss_protect: {}, initial_routing_sync: {}, upfront_shutdown_script: {}, gossip_queries: {}, static_remote_key: {}, unknown flags (local and global): {}",
if msg.features.supports_data_loss_protect() { "supported" } else { "not supported"},
if msg.features.initial_routing_sync() { "requested" } else { "not requested" },
if msg.features.supports_upfront_shutdown_script() { "supported" } else { "not supported"},
+ if msg.features.supports_gossip_queries() { "supported" } else { "not supported" },
if msg.features.supports_static_remote_key() { "supported" } else { "not supported"},
if msg.features.supports_unknown_bits() { "present" } else { "none" }
);
}
if !peer.outbound {
- let mut features = InitFeatures::known();
- if !self.message_handler.route_handler.should_request_full_sync(&peer.their_node_id.unwrap()) {
- features.clear_initial_routing_sync();
- }
-
+ let features = InitFeatures::known();
let resp = msgs::Init { features };
self.enqueue_message(peers_needing_send, peer, peer_descriptor.clone(), &resp);
}
+ self.message_handler.route_handler.sync_routing_table(&peer.their_node_id.unwrap(), &msg);
+
self.message_handler.chan_handler.peer_connected(&peer.their_node_id.unwrap(), &msg);
peer.their_features = Some(msg.features);
},
// TODO: forward msg along to all our other peers!
}
},
+ wire::Message::QueryShortChannelIds(msg) => {
+ self.message_handler.route_handler.handle_query_short_channel_ids(&peer.their_node_id.unwrap(), msg)?;
+ },
+ wire::Message::ReplyShortChannelIdsEnd(msg) => {
+ self.message_handler.route_handler.handle_reply_short_channel_ids_end(&peer.their_node_id.unwrap(), msg)?;
+ },
+ wire::Message::QueryChannelRange(msg) => {
+ self.message_handler.route_handler.handle_query_channel_range(&peer.their_node_id.unwrap(), msg)?;
+ },
+ wire::Message::ReplyChannelRange(msg) => {
+ self.message_handler.route_handler.handle_reply_channel_range(&peer.their_node_id.unwrap(), msg)?;
+ },
+ wire::Message::GossipTimestampFilter(_msg) => {
+ // TODO: handle message
+ },
// Unknown messages:
wire::Message::Unknown(msg_type) if msg_type.is_even() => {
// drop optional-ish messages when send buffers get full!
let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
+ events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
let mut peers_lock = self.peers.lock().unwrap();
let peers = &mut *peers_lock;
for event in events_generated.drain(..) {
self.do_attempt_write_data(&mut descriptor, peer);
},
}
+ },
+ MessageSendEvent::SendChannelRangeQuery { ref node_id, ref msg } => {
+ let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {});
+ peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
+ self.do_attempt_write_data(&mut descriptor, peer);
+ },
+ MessageSendEvent::SendShortIdsQuery { ref node_id, ref msg } => {
+ let (mut descriptor, peer) = get_peer_for_forwarding!(node_id, {});
+ peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encode_msg!(msg)));
+ self.do_attempt_write_data(&mut descriptor, peer);
}
}
}
(fd_a.clone(), fd_b.clone())
}
- fn establish_connection_and_read_events<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger>) -> (FileDescriptor, FileDescriptor) {
- let (mut fd_a, mut fd_b) = establish_connection(peer_a, peer_b);
- assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
- assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
- (fd_a.clone(), fd_b.clone())
- }
-
#[test]
fn test_disconnect_peer() {
// Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and
assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
}
-
- #[test]
- fn limit_initial_routing_sync_requests() {
- // Inbound peer 0 requests initial_routing_sync, but outbound peer 1 does not.
- {
- let cfgs = create_peermgr_cfgs(2);
- cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release);
- let peers = create_network(2, &cfgs);
- let (fd_0_to_1, fd_1_to_0) = establish_connection_and_read_events(&peers[0], &peers[1]);
-
- let peer_0 = peers[0].peers.lock().unwrap();
- let peer_1 = peers[1].peers.lock().unwrap();
-
- let peer_0_features = peer_1.peers.get(&fd_1_to_0).unwrap().their_features.as_ref();
- let peer_1_features = peer_0.peers.get(&fd_0_to_1).unwrap().their_features.as_ref();
-
- assert!(peer_0_features.unwrap().initial_routing_sync());
- assert!(!peer_1_features.unwrap().initial_routing_sync());
- }
-
- // Outbound peer 1 requests initial_routing_sync, but inbound peer 0 does not.
- {
- let cfgs = create_peermgr_cfgs(2);
- cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release);
- let peers = create_network(2, &cfgs);
- let (fd_0_to_1, fd_1_to_0) = establish_connection_and_read_events(&peers[0], &peers[1]);
-
- let peer_0 = peers[0].peers.lock().unwrap();
- let peer_1 = peers[1].peers.lock().unwrap();
-
- let peer_0_features = peer_1.peers.get(&fd_1_to_0).unwrap().their_features.as_ref();
- let peer_1_features = peer_0.peers.get(&fd_0_to_1).unwrap().their_features.as_ref();
-
- assert!(!peer_0_features.unwrap().initial_routing_sync());
- assert!(peer_1_features.unwrap().initial_routing_sync());
- }
- }
}
ChannelAnnouncement(msgs::ChannelAnnouncement),
NodeAnnouncement(msgs::NodeAnnouncement),
ChannelUpdate(msgs::ChannelUpdate),
+ QueryShortChannelIds(msgs::QueryShortChannelIds),
+ ReplyShortChannelIdsEnd(msgs::ReplyShortChannelIdsEnd),
+ QueryChannelRange(msgs::QueryChannelRange),
+ ReplyChannelRange(msgs::ReplyChannelRange),
+ GossipTimestampFilter(msgs::GossipTimestampFilter),
/// A message that could not be decoded because its type is unknown.
Unknown(MessageType),
}
&Message::ChannelAnnouncement(ref msg) => msg.type_id(),
&Message::NodeAnnouncement(ref msg) => msg.type_id(),
&Message::ChannelUpdate(ref msg) => msg.type_id(),
+ &Message::QueryShortChannelIds(ref msg) => msg.type_id(),
+ &Message::ReplyShortChannelIdsEnd(ref msg) => msg.type_id(),
+ &Message::QueryChannelRange(ref msg) => msg.type_id(),
+ &Message::ReplyChannelRange(ref msg) => msg.type_id(),
+ &Message::GossipTimestampFilter(ref msg) => msg.type_id(),
&Message::Unknown(type_id) => type_id,
}
}
msgs::ChannelUpdate::TYPE => {
Ok(Message::ChannelUpdate(Readable::read(buffer)?))
},
+ msgs::QueryShortChannelIds::TYPE => {
+ Ok(Message::QueryShortChannelIds(Readable::read(buffer)?))
+ },
+ msgs::ReplyShortChannelIdsEnd::TYPE => {
+ Ok(Message::ReplyShortChannelIdsEnd(Readable::read(buffer)?))
+ },
+ msgs::QueryChannelRange::TYPE => {
+ Ok(Message::QueryChannelRange(Readable::read(buffer)?))
+ },
+ msgs::ReplyChannelRange::TYPE => {
+ Ok(Message::ReplyChannelRange(Readable::read(buffer)?))
+ }
+ msgs::GossipTimestampFilter::TYPE => {
+ Ok(Message::GossipTimestampFilter(Readable::read(buffer)?))
+ },
_ => {
Ok(Message::Unknown(MessageType(message_type)))
},
const TYPE: u16 = 258;
}
+impl Encode for msgs::QueryShortChannelIds {
+ const TYPE: u16 = 261;
+}
+
+impl Encode for msgs::ReplyShortChannelIdsEnd {
+ const TYPE: u16 = 262;
+}
+
+impl Encode for msgs::QueryChannelRange {
+ const TYPE: u16 = 263;
+}
+
+impl Encode for msgs::ReplyChannelRange {
+ const TYPE: u16 = 264;
+}
+
+impl Encode for msgs::GossipTimestampFilter {
+ const TYPE: u16 = 265;
+}
+
#[cfg(test)]
mod tests {
use super::*;
fn read_lnd_init_msg() {
// Taken from lnd v0.9.0-beta.
let buffer = vec![0, 16, 0, 2, 34, 0, 0, 3, 2, 162, 161];
- check_init_msg(buffer);
+ check_init_msg(buffer, false);
}
#[test]
fn read_clightning_init_msg() {
// Taken from c-lightning v0.8.0.
let buffer = vec![0, 16, 0, 2, 34, 0, 0, 3, 2, 170, 162, 1, 32, 6, 34, 110, 70, 17, 26, 11, 89, 202, 175, 18, 96, 67, 235, 91, 191, 40, 195, 79, 58, 94, 51, 42, 31, 199, 178, 183, 60, 241, 136, 145, 15];
- check_init_msg(buffer);
+ check_init_msg(buffer, true);
}
- fn check_init_msg(buffer: Vec<u8>) {
+ fn check_init_msg(buffer: Vec<u8>, expect_unknown: bool) {
let mut reader = ::std::io::Cursor::new(buffer);
let decoded_msg = read(&mut reader).unwrap();
match decoded_msg {
Message::Init(msgs::Init { features }) => {
assert!(features.supports_variable_length_onion());
assert!(features.supports_upfront_shutdown_script());
- assert!(features.supports_unknown_bits());
+ assert!(features.supports_gossip_queries());
+ assert_eq!(expect_unknown, features.supports_unknown_bits());
assert!(!features.requires_unknown_bits());
assert!(!features.initial_routing_sync());
},
Message::NodeAnnouncement(msgs::NodeAnnouncement { contents: msgs::UnsignedNodeAnnouncement { features, ..}, ..}) => {
assert!(features.supports_variable_length_onion());
assert!(features.supports_upfront_shutdown_script());
- assert!(features.supports_unknown_bits());
+ assert!(features.supports_gossip_queries());
assert!(!features.requires_unknown_bits());
},
_ => panic!("Expected node announcement, found message type: {}", decoded_msg.type_id())
use bitcoin::blockdata::script::Builder;
use bitcoin::blockdata::transaction::TxOut;
use bitcoin::blockdata::opcodes;
+use bitcoin::hash_types::BlockHash;
use chain;
use chain::Access;
use ln::features::{ChannelFeatures, NodeFeatures};
-use ln::msgs::{DecodeError, ErrorAction, LightningError, RoutingMessageHandler, NetAddress, MAX_VALUE_MSAT};
+use ln::msgs::{DecodeError, ErrorAction, Init, LightningError, RoutingMessageHandler, NetAddress, MAX_VALUE_MSAT};
use ln::msgs::{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement, OptionalField};
+use ln::msgs::{QueryChannelRange, ReplyChannelRange, QueryShortChannelIds, ReplyShortChannelIdsEnd};
use ln::msgs;
use util::ser::{Writeable, Readable, Writer};
use util::logger::Logger;
+use util::events;
use std::{cmp, fmt};
use std::sync::{RwLock, RwLockReadGuard};
use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Mutex;
use std::collections::BTreeMap;
use std::collections::btree_map::Entry as BtreeEntry;
use std::ops::Deref;
/// Represents the network as nodes and channels between them
#[derive(PartialEq)]
pub struct NetworkGraph {
+ genesis_hash: BlockHash,
channels: BTreeMap<u64, ChannelInfo>,
nodes: BTreeMap<PublicKey, NodeInfo>,
}
pub network_graph: RwLock<NetworkGraph>,
chain_access: Option<C>,
full_syncs_requested: AtomicUsize,
+ pending_events: Mutex<Vec<events::MessageSendEvent>>,
logger: L,
}
/// Chain monitor is used to make sure announced channels exist on-chain,
/// channel data is correct, and that the announcement is signed with
/// channel owners' keys.
- pub fn new(chain_access: Option<C>, logger: L) -> Self {
+ pub fn new(genesis_hash: BlockHash, chain_access: Option<C>, logger: L) -> Self {
NetGraphMsgHandler {
secp_ctx: Secp256k1::verification_only(),
- network_graph: RwLock::new(NetworkGraph {
- channels: BTreeMap::new(),
- nodes: BTreeMap::new(),
- }),
+ network_graph: RwLock::new(NetworkGraph::new(genesis_hash)),
full_syncs_requested: AtomicUsize::new(0),
chain_access,
+ pending_events: Mutex::new(vec![]),
logger,
}
}
network_graph: RwLock::new(network_graph),
full_syncs_requested: AtomicUsize::new(0),
chain_access,
+ pending_events: Mutex::new(vec![]),
logger,
}
}
pub fn read_locked_graph<'a>(&'a self) -> LockedNetworkGraph<'a> {
LockedNetworkGraph(self.network_graph.read().unwrap())
}
+
+ /// Returns true when a full routing table sync should be performed with a peer.
+ fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool {
+ //TODO: Determine whether to request a full sync based on the network map.
+ const FULL_SYNCS_TO_REQUEST: usize = 5;
+ if self.full_syncs_requested.load(Ordering::Acquire) < FULL_SYNCS_TO_REQUEST {
+ self.full_syncs_requested.fetch_add(1, Ordering::AcqRel);
+ true
+ } else {
+ false
+ }
+ }
}
impl<'a> LockedNetworkGraph<'a> {
result
}
- fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool {
- //TODO: Determine whether to request a full sync based on the network map.
- const FULL_SYNCS_TO_REQUEST: usize = 5;
- if self.full_syncs_requested.load(Ordering::Acquire) < FULL_SYNCS_TO_REQUEST {
- self.full_syncs_requested.fetch_add(1, Ordering::AcqRel);
- true
- } else {
- false
+ /// Initiates a stateless sync of routing gossip information with a peer
+ /// using gossip_queries. The default strategy used by this implementation
+ /// is to sync the full block range with several peers.
+ ///
+ /// We should expect one or more reply_channel_range messages in response
+ /// to our query_channel_range. Each reply will enqueue a query_scid message
+ /// to request gossip messages for each channel. The sync is considered complete
+ /// when the final reply_scids_end message is received, though we are not
+ /// tracking this directly.
+ fn sync_routing_table(&self, their_node_id: &PublicKey, init_msg: &Init) {
+
+ // We will only perform a sync with peers that support gossip_queries.
+ if !init_msg.features.supports_gossip_queries() {
+ return ();
+ }
+
+ // Check if we need to perform a full synchronization with this peer
+ if !self.should_request_full_sync(their_node_id) {
+ return ();
}
+
+ let first_blocknum = 0;
+ let number_of_blocks = 0xffffffff;
+ log_debug!(self.logger, "Sending query_channel_range peer={}, first_blocknum={}, number_of_blocks={}", log_pubkey!(their_node_id), first_blocknum, number_of_blocks);
+ let mut pending_events = self.pending_events.lock().unwrap();
+ pending_events.push(events::MessageSendEvent::SendChannelRangeQuery {
+ node_id: their_node_id.clone(),
+ msg: QueryChannelRange {
+ chain_hash: self.network_graph.read().unwrap().genesis_hash,
+ first_blocknum,
+ number_of_blocks,
+ },
+ });
+ }
+
+ /// Statelessly processes a reply to a channel range query by immediately
+ /// sending an SCID query with SCIDs in the reply. To keep this handler
+ /// stateless, it does not validate the sequencing of replies for multi-
+ /// reply ranges. It does not validate whether the reply(ies) cover the
+ /// queried range. It also does not filter SCIDs to only those in the
+ /// original query range. We also do not validate that the chain_hash
+ /// matches the chain_hash of the NetworkGraph. Any chan_ann message that
+ /// does not match our chain_hash will be rejected when the announcement is
+ /// processed.
+ fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError> {
+ log_debug!(self.logger, "Handling reply_channel_range peer={}, first_blocknum={}, number_of_blocks={}, full_information={}, scids={}", log_pubkey!(their_node_id), msg.first_blocknum, msg.number_of_blocks, msg.full_information, msg.short_channel_ids.len(),);
+
+ // Validate that the remote node maintains up-to-date channel
+ // information for chain_hash. Some nodes use the full_information
+ // flag to indicate multi-part messages so we must check whether
+ // we received SCIDs as well.
+ if !msg.full_information && msg.short_channel_ids.len() == 0 {
+ return Err(LightningError {
+ err: String::from("Received reply_channel_range with no information available"),
+ action: ErrorAction::IgnoreError,
+ });
+ }
+
+ log_debug!(self.logger, "Sending query_short_channel_ids peer={}, batch_size={}", log_pubkey!(their_node_id), msg.short_channel_ids.len());
+ let mut pending_events = self.pending_events.lock().unwrap();
+ pending_events.push(events::MessageSendEvent::SendShortIdsQuery {
+ node_id: their_node_id.clone(),
+ msg: QueryShortChannelIds {
+ chain_hash: msg.chain_hash,
+ short_channel_ids: msg.short_channel_ids,
+ }
+ });
+
+ Ok(())
+ }
+
+ /// When an SCID query is initiated the remote peer will begin streaming
+ /// gossip messages. In the event of a failure, we may have received
+ /// some channel information. Before trying with another peer, the
+ /// caller should update its set of SCIDs that need to be queried.
+ fn handle_reply_short_channel_ids_end(&self, their_node_id: &PublicKey, msg: ReplyShortChannelIdsEnd) -> Result<(), LightningError> {
+ log_debug!(self.logger, "Handling reply_short_channel_ids_end peer={}, full_information={}", log_pubkey!(their_node_id), msg.full_information);
+
+ // If the remote node does not have up-to-date information for the
+ // chain_hash they will set full_information=false. We can fail
+ // the result and try again with a different peer.
+ if !msg.full_information {
+ return Err(LightningError {
+ err: String::from("Received reply_short_channel_ids_end with no information"),
+ action: ErrorAction::IgnoreError
+ });
+ }
+
+ Ok(())
+ }
+
+ fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: QueryChannelRange) -> Result<(), LightningError> {
+ // TODO
+ Err(LightningError {
+ err: String::from("Not implemented"),
+ action: ErrorAction::IgnoreError,
+ })
+ }
+
+ fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: QueryShortChannelIds) -> Result<(), LightningError> {
+ // TODO
+ Err(LightningError {
+ err: String::from("Not implemented"),
+ action: ErrorAction::IgnoreError,
+ })
+ }
+}
+
+impl<C: Deref, L: Deref> events::MessageSendEventsProvider for NetGraphMsgHandler<C, L>
+where
+ C::Target: chain::Access,
+ L::Target: Logger,
+{
+ fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
+ let mut ret = Vec::new();
+ let mut pending_events = self.pending_events.lock().unwrap();
+ std::mem::swap(&mut ret, &mut pending_events);
+ ret
}
}
impl Writeable for NetworkGraph {
fn write<W: Writer>(&self, writer: &mut W) -> Result<(), ::std::io::Error> {
+ self.genesis_hash.write(writer)?;
(self.channels.len() as u64).write(writer)?;
for (ref chan_id, ref chan_info) in self.channels.iter() {
(*chan_id).write(writer)?;
impl Readable for NetworkGraph {
fn read<R: ::std::io::Read>(reader: &mut R) -> Result<NetworkGraph, DecodeError> {
+ let genesis_hash: BlockHash = Readable::read(reader)?;
let channels_count: u64 = Readable::read(reader)?;
let mut channels = BTreeMap::new();
for _ in 0..channels_count {
nodes.insert(node_id, node_info);
}
Ok(NetworkGraph {
+ genesis_hash,
channels,
nodes,
})
}
/// Creates a new, empty, network graph.
- pub fn new() -> NetworkGraph {
+ pub fn new(genesis_hash: BlockHash) -> NetworkGraph {
Self {
+ genesis_hash,
channels: BTreeMap::new(),
nodes: BTreeMap::new(),
}
#[cfg(test)]
mod tests {
use chain;
- use ln::features::{ChannelFeatures, NodeFeatures};
+ use ln::features::{ChannelFeatures, InitFeatures, NodeFeatures};
use routing::network_graph::{NetGraphMsgHandler, NetworkGraph};
- use ln::msgs::{OptionalField, RoutingMessageHandler, UnsignedNodeAnnouncement, NodeAnnouncement,
+ use ln::msgs::{Init, OptionalField, RoutingMessageHandler, UnsignedNodeAnnouncement, NodeAnnouncement,
UnsignedChannelAnnouncement, ChannelAnnouncement, UnsignedChannelUpdate, ChannelUpdate, HTLCFailChannelUpdate,
- MAX_VALUE_MSAT};
+ ReplyChannelRange, ReplyShortChannelIdsEnd, QueryChannelRange, QueryShortChannelIds, MAX_VALUE_MSAT};
use util::test_utils;
use util::logger::Logger;
use util::ser::{Readable, Writeable};
+ use util::events::{MessageSendEvent, MessageSendEventsProvider};
use bitcoin::hashes::sha256d::Hash as Sha256dHash;
use bitcoin::hashes::Hash;
fn create_net_graph_msg_handler() -> (Secp256k1<All>, NetGraphMsgHandler<Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>) {
let secp_ctx = Secp256k1::new();
let logger = Arc::new(test_utils::TestLogger::new());
- let net_graph_msg_handler = NetGraphMsgHandler::new(None, Arc::clone(&logger));
+ let genesis_hash = genesis_block(Network::Testnet).header.block_hash();
+ let net_graph_msg_handler = NetGraphMsgHandler::new(genesis_hash, None, Arc::clone(&logger));
(secp_ctx, net_graph_msg_handler)
}
};
// Test if the UTXO lookups were not supported
- let mut net_graph_msg_handler = NetGraphMsgHandler::new(None, Arc::clone(&logger));
+ let mut net_graph_msg_handler = NetGraphMsgHandler::new(genesis_block(Network::Testnet).header.block_hash(), None, Arc::clone(&logger));
match net_graph_msg_handler.handle_channel_announcement(&valid_announcement) {
Ok(res) => assert!(res),
_ => panic!()
// Test if an associated transaction were not on-chain (or not confirmed).
let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
*chain_source.utxo_ret.lock().unwrap() = Err(chain::AccessError::UnknownTx);
- net_graph_msg_handler = NetGraphMsgHandler::new(Some(chain_source.clone()), Arc::clone(&logger));
+ net_graph_msg_handler = NetGraphMsgHandler::new(chain_source.clone().genesis_hash, Some(chain_source.clone()), Arc::clone(&logger));
unsigned_announcement.short_channel_id += 1;
msghash = hash_to_message!(&Sha256dHash::hash(&unsigned_announcement.encode()[..])[..]);
let secp_ctx = Secp256k1::new();
let logger: Arc<Logger> = Arc::new(test_utils::TestLogger::new());
let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
- let net_graph_msg_handler = NetGraphMsgHandler::new(Some(chain_source.clone()), Arc::clone(&logger));
+ let net_graph_msg_handler = NetGraphMsgHandler::new(genesis_block(Network::Testnet).header.block_hash(), Some(chain_source.clone()), Arc::clone(&logger));
let node_1_privkey = &SecretKey::from_slice(&[42; 32]).unwrap();
let node_2_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
network.write(&mut w).unwrap();
assert!(<NetworkGraph>::read(&mut ::std::io::Cursor::new(&w.0)).unwrap() == *network);
}
+
+ #[test]
+ fn calling_sync_routing_table() {
+ let (secp_ctx, net_graph_msg_handler) = create_net_graph_msg_handler();
+ let node_privkey_1 = &SecretKey::from_slice(&[42; 32]).unwrap();
+ let node_id_1 = PublicKey::from_secret_key(&secp_ctx, node_privkey_1);
+
+ let chain_hash = genesis_block(Network::Testnet).header.block_hash();
+ let first_blocknum = 0;
+ let number_of_blocks = 0xffff_ffff;
+
+ // It should ignore if gossip_queries feature is not enabled
+ {
+ let init_msg = Init { features: InitFeatures::known().clear_gossip_queries() };
+ net_graph_msg_handler.sync_routing_table(&node_id_1, &init_msg);
+ let events = net_graph_msg_handler.get_and_clear_pending_msg_events();
+ assert_eq!(events.len(), 0);
+ }
+
+ // It should send a query_channel_message with the correct information
+ {
+ let init_msg = Init { features: InitFeatures::known() };
+ net_graph_msg_handler.sync_routing_table(&node_id_1, &init_msg);
+ let events = net_graph_msg_handler.get_and_clear_pending_msg_events();
+ assert_eq!(events.len(), 1);
+ match &events[0] {
+ MessageSendEvent::SendChannelRangeQuery{ node_id, msg } => {
+ assert_eq!(node_id, &node_id_1);
+ assert_eq!(msg.chain_hash, chain_hash);
+ assert_eq!(msg.first_blocknum, first_blocknum);
+ assert_eq!(msg.number_of_blocks, number_of_blocks);
+ },
+ _ => panic!("Expected MessageSendEvent::SendChannelRangeQuery")
+ };
+ }
+
+ // It should not enqueue a query when should_request_full_sync return false.
+ // The initial implementation allows syncing with the first 5 peers after
+ // which should_request_full_sync will return false
+ {
+ let (secp_ctx, net_graph_msg_handler) = create_net_graph_msg_handler();
+ let init_msg = Init { features: InitFeatures::known() };
+ for n in 1..7 {
+ let node_privkey = &SecretKey::from_slice(&[n; 32]).unwrap();
+ let node_id = PublicKey::from_secret_key(&secp_ctx, node_privkey);
+ net_graph_msg_handler.sync_routing_table(&node_id, &init_msg);
+ let events = net_graph_msg_handler.get_and_clear_pending_msg_events();
+ if n <= 5 {
+ assert_eq!(events.len(), 1);
+ } else {
+ assert_eq!(events.len(), 0);
+ }
+
+ }
+ }
+ }
+
+ #[test]
+ fn handling_reply_channel_range() {
+ let (secp_ctx, net_graph_msg_handler) = create_net_graph_msg_handler();
+ let node_privkey_1 = &SecretKey::from_slice(&[42; 32]).unwrap();
+ let node_id_1 = PublicKey::from_secret_key(&secp_ctx, node_privkey_1);
+
+ let chain_hash = genesis_block(Network::Testnet).header.block_hash();
+
+ // Test receipt of a single reply that should enqueue an SCID query
+ // matching the SCIDs in the reply
+ {
+ let result = net_graph_msg_handler.handle_reply_channel_range(&node_id_1, ReplyChannelRange {
+ chain_hash,
+ full_information: true,
+ first_blocknum: 0,
+ number_of_blocks: 2000,
+ short_channel_ids: vec![
+ 0x0003e0_000000_0000, // 992x0x0
+ 0x0003e8_000000_0000, // 1000x0x0
+ 0x0003e9_000000_0000, // 1001x0x0
+ 0x0003f0_000000_0000, // 1008x0x0
+ 0x00044c_000000_0000, // 1100x0x0
+ 0x0006e0_000000_0000, // 1760x0x0
+ ],
+ });
+ assert!(result.is_ok());
+
+ // We expect to emit a query_short_channel_ids message with the received scids
+ let events = net_graph_msg_handler.get_and_clear_pending_msg_events();
+ assert_eq!(events.len(), 1);
+ match &events[0] {
+ MessageSendEvent::SendShortIdsQuery { node_id, msg } => {
+ assert_eq!(node_id, &node_id_1);
+ assert_eq!(msg.chain_hash, chain_hash);
+ assert_eq!(msg.short_channel_ids, vec![
+ 0x0003e0_000000_0000, // 992x0x0
+ 0x0003e8_000000_0000, // 1000x0x0
+ 0x0003e9_000000_0000, // 1001x0x0
+ 0x0003f0_000000_0000, // 1008x0x0
+ 0x00044c_000000_0000, // 1100x0x0
+ 0x0006e0_000000_0000, // 1760x0x0
+ ]);
+ },
+ _ => panic!("expected MessageSendEvent::SendShortIdsQuery"),
+ }
+ }
+
+ // Test receipt of a reply that indicates the remote node does not maintain up-to-date
+ // information for the chain_hash. Because of discrepancies in implementation we use
+ // full_information=false and short_channel_ids=[] as the signal.
+ {
+ // Handle the reply indicating the peer was unable to fulfill our request.
+ let result = net_graph_msg_handler.handle_reply_channel_range(&node_id_1, ReplyChannelRange {
+ chain_hash,
+ full_information: false,
+ first_blocknum: 1000,
+ number_of_blocks: 100,
+ short_channel_ids: vec![],
+ });
+ assert!(result.is_err());
+ assert_eq!(result.err().unwrap().err, "Received reply_channel_range with no information available");
+ }
+ }
+
+ #[test]
+ fn handling_reply_short_channel_ids() {
+ let (secp_ctx, net_graph_msg_handler) = create_net_graph_msg_handler();
+ let node_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
+ let node_id = PublicKey::from_secret_key(&secp_ctx, node_privkey);
+
+ let chain_hash = genesis_block(Network::Testnet).header.block_hash();
+
+ // Test receipt of a successful reply
+ {
+ let result = net_graph_msg_handler.handle_reply_short_channel_ids_end(&node_id, ReplyShortChannelIdsEnd {
+ chain_hash,
+ full_information: true,
+ });
+ assert!(result.is_ok());
+ }
+
+ // Test receipt of a reply that indicates the peer does not maintain up-to-date information
+ // for the chain_hash requested in the query.
+ {
+ let result = net_graph_msg_handler.handle_reply_short_channel_ids_end(&node_id, ReplyShortChannelIdsEnd {
+ chain_hash,
+ full_information: false,
+ });
+ assert!(result.is_err());
+ assert_eq!(result.err().unwrap().err, "Received reply_short_channel_ids_end with no information");
+ }
+ }
+
+ #[test]
+ fn handling_query_channel_range() {
+ let (secp_ctx, net_graph_msg_handler) = create_net_graph_msg_handler();
+ let node_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
+ let node_id = PublicKey::from_secret_key(&secp_ctx, node_privkey);
+
+ let chain_hash = genesis_block(Network::Testnet).header.block_hash();
+
+ let result = net_graph_msg_handler.handle_query_channel_range(&node_id, QueryChannelRange {
+ chain_hash,
+ first_blocknum: 0,
+ number_of_blocks: 0xffff_ffff,
+ });
+ assert!(result.is_err());
+ }
+
+ #[test]
+ fn handling_query_short_channel_ids() {
+ let (secp_ctx, net_graph_msg_handler) = create_net_graph_msg_handler();
+ let node_privkey = &SecretKey::from_slice(&[41; 32]).unwrap();
+ let node_id = PublicKey::from_secret_key(&secp_ctx, node_privkey);
+
+ let chain_hash = genesis_block(Network::Testnet).header.block_hash();
+
+ let result = net_graph_msg_handler.handle_query_short_channel_ids(&node_id, QueryShortChannelIds {
+ chain_hash,
+ short_channel_ids: vec![0x0003e8_000000_0000],
+ });
+ assert!(result.is_err());
+ }
}
fn build_graph() -> (Secp256k1<All>, NetGraphMsgHandler<std::sync::Arc<crate::util::test_utils::TestChainSource>, std::sync::Arc<crate::util::test_utils::TestLogger>>, std::sync::Arc<test_utils::TestLogger>) {
let secp_ctx = Secp256k1::new();
let logger = Arc::new(test_utils::TestLogger::new());
- let net_graph_msg_handler = NetGraphMsgHandler::new(None, Arc::clone(&logger));
+ let net_graph_msg_handler = NetGraphMsgHandler::new(genesis_block(Network::Testnet).header.block_hash(), None, Arc::clone(&logger));
// Build network from our_id to node7:
//
// -1(1)2- node0 -1(3)2-
inbound_capacity_msat: 100000,
is_live: true,
}];
- let route = get_route(&source_node_id, &NetworkGraph::new(), &target_node_id, Some(&our_chans.iter().collect::<Vec<_>>()), &last_hops.iter().collect::<Vec<_>>(), 100, 42, Arc::new(test_utils::TestLogger::new())).unwrap();
+ let route = get_route(&source_node_id, &NetworkGraph::new(genesis_block(Network::Testnet).header.block_hash()), &target_node_id, Some(&our_chans.iter().collect::<Vec<_>>()), &last_hops.iter().collect::<Vec<_>>(), 100, 42, Arc::new(test_utils::TestLogger::new())).unwrap();
assert_eq!(route.paths[0].len(), 2);
PaymentFailureNetworkUpdate {
/// The channel/node update which should be sent to NetGraphMsgHandler
update: msgs::HTLCFailChannelUpdate,
- }
+ },
+ /// Query a peer for channels with funding transaction UTXOs in a block range.
+ SendChannelRangeQuery {
+ /// The node_id of this message recipient
+ node_id: PublicKey,
+ /// The query_channel_range which should be sent.
+ msg: msgs::QueryChannelRange,
+ },
+ /// Request routing gossip messages from a peer for a list of channels identified by
+ /// their short_channel_ids.
+ SendShortIdsQuery {
+ /// The node_id of this message recipient
+ node_id: PublicKey,
+ /// The query_short_channel_ids which should be sent.
+ msg: msgs::QueryShortChannelIds,
+ },
}
/// A trait indicating an object may generate message send events
Vec::new()
}
- fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool {
- self.request_full_sync.load(Ordering::Acquire)
+ fn sync_routing_table(&self, _their_node_id: &PublicKey, _init_msg: &msgs::Init) {}
+
+ fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyChannelRange) -> Result<(), msgs::LightningError> {
+ Ok(())
+ }
+
+ fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: msgs::ReplyShortChannelIdsEnd) -> Result<(), msgs::LightningError> {
+ Ok(())
+ }
+
+ fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: msgs::QueryChannelRange) -> Result<(), msgs::LightningError> {
+ Ok(())
+ }
+
+ fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: msgs::QueryShortChannelIds) -> Result<(), msgs::LightningError> {
+ Ok(())
+ }
+}
+
+impl events::MessageSendEventsProvider for TestRoutingMessageHandler {
+ fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
+ vec![]
}
}