use ln::msgs;
use util::ser::{Writeable, Readable, Writer};
use util::logger::Logger;
-use util::events;
+use util::events::{MessageSendEvent, MessageSendEventsProvider};
use std::{cmp, fmt};
use std::sync::{RwLock, RwLockReadGuard};
use std::ops::Deref;
use bitcoin::hashes::hex::ToHex;
+/// The maximum number of extra bytes which we do not understand in a gossip message before we will
+/// refuse to relay the message.
+const MAX_EXCESS_BYTES_FOR_RELAY: usize = 1024;
+
/// Represents the network as nodes and channels between them
#[derive(PartialEq)]
pub struct NetworkGraph {
pub network_graph: RwLock<NetworkGraph>,
chain_access: Option<C>,
full_syncs_requested: AtomicUsize,
- pending_events: Mutex<Vec<events::MessageSendEvent>>,
+ pending_events: Mutex<Vec<MessageSendEvent>>,
logger: L,
}
pub fn read_locked_graph<'a>(&'a self) -> LockedNetworkGraph<'a> {
LockedNetworkGraph(self.network_graph.read().unwrap())
}
+
+ /// Returns true when a full routing table sync should be performed with a peer.
+ fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool {
+ //TODO: Determine whether to request a full sync based on the network map.
+ const FULL_SYNCS_TO_REQUEST: usize = 5;
+ if self.full_syncs_requested.load(Ordering::Acquire) < FULL_SYNCS_TO_REQUEST {
+ self.full_syncs_requested.fetch_add(1, Ordering::AcqRel);
+ true
+ } else {
+ false
+ }
+ }
}
impl<'a> LockedNetworkGraph<'a> {
impl<C: Deref + Sync + Send, L: Deref + Sync + Send> RoutingMessageHandler for NetGraphMsgHandler<C, L> where C::Target: chain::Access, L::Target: Logger {
fn handle_node_announcement(&self, msg: &msgs::NodeAnnouncement) -> Result<bool, LightningError> {
self.network_graph.write().unwrap().update_node_from_announcement(msg, &self.secp_ctx)?;
- Ok(msg.contents.excess_data.is_empty() && msg.contents.excess_address_data.is_empty())
+ Ok(msg.contents.excess_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY &&
+ msg.contents.excess_address_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY &&
+ msg.contents.excess_data.len() + msg.contents.excess_address_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY)
}
fn handle_channel_announcement(&self, msg: &msgs::ChannelAnnouncement) -> Result<bool, LightningError> {
self.network_graph.write().unwrap().update_channel_from_announcement(msg, &self.chain_access, &self.secp_ctx)?;
log_trace!(self.logger, "Added channel_announcement for {}{}", msg.contents.short_channel_id, if !msg.contents.excess_data.is_empty() { " with excess uninterpreted data!" } else { "" });
- Ok(msg.contents.excess_data.is_empty())
+ Ok(msg.contents.excess_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY)
}
fn handle_htlc_fail_channel_update(&self, update: &msgs::HTLCFailChannelUpdate) {
fn handle_channel_update(&self, msg: &msgs::ChannelUpdate) -> Result<bool, LightningError> {
self.network_graph.write().unwrap().update_channel(msg, &self.secp_ctx)?;
- Ok(msg.contents.excess_data.is_empty())
+ Ok(msg.contents.excess_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY)
}
fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> {
result
}
- fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool {
- //TODO: Determine whether to request a full sync based on the network map.
- const FULL_SYNCS_TO_REQUEST: usize = 5;
- if self.full_syncs_requested.load(Ordering::Acquire) < FULL_SYNCS_TO_REQUEST {
- self.full_syncs_requested.fetch_add(1, Ordering::AcqRel);
- true
- } else {
- false
- }
- }
-
/// Initiates a stateless sync of routing gossip information with a peer
/// using gossip_queries. The default strategy used by this implementation
/// is to sync the full block range with several peers.
let number_of_blocks = 0xffffffff;
log_debug!(self.logger, "Sending query_channel_range peer={}, first_blocknum={}, number_of_blocks={}", log_pubkey!(their_node_id), first_blocknum, number_of_blocks);
let mut pending_events = self.pending_events.lock().unwrap();
- pending_events.push(events::MessageSendEvent::SendChannelRangeQuery {
+ pending_events.push(MessageSendEvent::SendChannelRangeQuery {
node_id: their_node_id.clone(),
msg: QueryChannelRange {
chain_hash: self.network_graph.read().unwrap().genesis_hash,
/// does not match our chain_hash will be rejected when the announcement is
/// processed.
fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: ReplyChannelRange) -> Result<(), LightningError> {
- log_debug!(self.logger, "Handling reply_channel_range peer={}, first_blocknum={}, number_of_blocks={}, full_information={}, scids={}", log_pubkey!(their_node_id), msg.first_blocknum, msg.number_of_blocks, msg.full_information, msg.short_channel_ids.len(),);
-
- // Validate that the remote node maintains up-to-date channel
- // information for chain_hash. Some nodes use the full_information
- // flag to indicate multi-part messages so we must check whether
- // we received SCIDs as well.
- if !msg.full_information && msg.short_channel_ids.len() == 0 {
- return Err(LightningError {
- err: String::from("Received reply_channel_range with no information available"),
- action: ErrorAction::IgnoreError,
- });
- }
+ log_debug!(self.logger, "Handling reply_channel_range peer={}, first_blocknum={}, number_of_blocks={}, sync_complete={}, scids={}", log_pubkey!(their_node_id), msg.first_blocknum, msg.number_of_blocks, msg.sync_complete, msg.short_channel_ids.len(),);
log_debug!(self.logger, "Sending query_short_channel_ids peer={}, batch_size={}", log_pubkey!(their_node_id), msg.short_channel_ids.len());
let mut pending_events = self.pending_events.lock().unwrap();
- pending_events.push(events::MessageSendEvent::SendShortIdsQuery {
+ pending_events.push(MessageSendEvent::SendShortIdsQuery {
node_id: their_node_id.clone(),
msg: QueryShortChannelIds {
chain_hash: msg.chain_hash,
}
}
-impl<C: Deref, L: Deref> events::MessageSendEventsProvider for NetGraphMsgHandler<C, L>
+impl<C: Deref, L: Deref> MessageSendEventsProvider for NetGraphMsgHandler<C, L>
where
C::Target: chain::Access,
L::Target: Logger,
{
- fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
+ fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
let mut ret = Vec::new();
let mut pending_events = self.pending_events.lock().unwrap();
std::mem::swap(&mut ret, &mut pending_events);
}
}
-#[derive(PartialEq, Debug)]
+#[derive(Clone, PartialEq, Debug)]
/// Details about one direction of a channel. Received
/// within a channel update.
pub struct DirectionalChannelInfo {
}
}
-#[derive(PartialEq, Debug)]
+#[derive(Clone, PartialEq, Debug)]
/// Information received in the latest node_announcement from this node.
pub struct NodeAnnouncementInfo {
/// Protocol features the node announced support for
}
}
-#[derive(PartialEq)]
+#[derive(Clone, PartialEq)]
/// Details about a node in the network, known from the network announcement.
pub struct NodeInfo {
/// All valid channels a node has announced
}
}
- let should_relay = msg.excess_data.is_empty() && msg.excess_address_data.is_empty();
+ let should_relay =
+ msg.excess_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY &&
+ msg.excess_address_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY &&
+ msg.excess_data.len() + msg.excess_address_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY;
node.announcement_info = Some(NodeAnnouncementInfo {
features: msg.features.clone(),
last_update: msg.timestamp,
node_two: msg.node_id_2.clone(),
two_to_one: None,
capacity_sats: utxo_value,
- announcement_message: if msg.excess_data.is_empty() { full_msg.cloned() } else { None },
+ announcement_message: if msg.excess_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY
+ { full_msg.cloned() } else { None },
};
match self.channels.entry(msg.short_channel_id) {
chan_was_enabled = false;
}
- let last_update_message = if msg.excess_data.is_empty() { full_msg.cloned() } else { None };
+ let last_update_message = if msg.excess_data.len() <= MAX_EXCESS_BYTES_FOR_RELAY
+ { full_msg.cloned() } else { None };
let updated_channel_dir_info = DirectionalChannelInfo {
enabled: chan_enabled,
mod tests {
use chain;
use ln::features::{ChannelFeatures, InitFeatures, NodeFeatures};
- use routing::network_graph::{NetGraphMsgHandler, NetworkGraph};
+ use routing::network_graph::{NetGraphMsgHandler, NetworkGraph, MAX_EXCESS_BYTES_FOR_RELAY};
use ln::msgs::{Init, OptionalField, RoutingMessageHandler, UnsignedNodeAnnouncement, NodeAnnouncement,
UnsignedChannelAnnouncement, ChannelAnnouncement, UnsignedChannelUpdate, ChannelUpdate, HTLCFailChannelUpdate,
ReplyChannelRange, ReplyShortChannelIdsEnd, QueryChannelRange, QueryShortChannelIds, MAX_VALUE_MSAT};
};
unsigned_announcement.timestamp += 1000;
- unsigned_announcement.excess_data.push(1);
+ unsigned_announcement.excess_data.resize(MAX_EXCESS_BYTES_FOR_RELAY + 1, 0);
msghash = hash_to_message!(&Sha256dHash::hash(&unsigned_announcement.encode()[..])[..]);
let announcement_with_data = NodeAnnouncement {
signature: secp_ctx.sign(&msghash, node_1_privkey),
// Don't relay valid channels with excess data
unsigned_announcement.short_channel_id += 1;
- unsigned_announcement.excess_data.push(1);
+ unsigned_announcement.excess_data.resize(MAX_EXCESS_BYTES_FOR_RELAY + 1, 0);
msghash = hash_to_message!(&Sha256dHash::hash(&unsigned_announcement.encode()[..])[..]);
let valid_announcement = ChannelAnnouncement {
node_signature_1: secp_ctx.sign(&msghash, node_1_privkey),
}
unsigned_channel_update.timestamp += 100;
- unsigned_channel_update.excess_data.push(1);
+ unsigned_channel_update.excess_data.resize(MAX_EXCESS_BYTES_FOR_RELAY + 1, 0);
let msghash = hash_to_message!(&Sha256dHash::hash(&unsigned_channel_update.encode()[..])[..]);
let valid_channel_update = ChannelUpdate {
signature: secp_ctx.sign(&msghash, node_1_privkey),
htlc_maximum_msat: OptionalField::Absent,
fee_base_msat: 10000,
fee_proportional_millionths: 20,
- excess_data: [1; 3].to_vec()
+ excess_data: [1; MAX_EXCESS_BYTES_FOR_RELAY + 1].to_vec()
};
let msghash = hash_to_message!(&Sha256dHash::hash(&unsigned_channel_update.encode()[..])[..]);
let valid_channel_update = ChannelUpdate {
alias: [0; 32],
addresses: Vec::new(),
excess_address_data: Vec::new(),
- excess_data: [1; 3].to_vec(),
+ excess_data: [1; MAX_EXCESS_BYTES_FOR_RELAY + 1].to_vec(),
};
let msghash = hash_to_message!(&Sha256dHash::hash(&unsigned_announcement.encode()[..])[..]);
let valid_announcement = NodeAnnouncement {
{
let result = net_graph_msg_handler.handle_reply_channel_range(&node_id_1, ReplyChannelRange {
chain_hash,
- full_information: true,
+ sync_complete: true,
first_blocknum: 0,
number_of_blocks: 2000,
short_channel_ids: vec![
_ => panic!("expected MessageSendEvent::SendShortIdsQuery"),
}
}
-
- // Test receipt of a reply that indicates the remote node does not maintain up-to-date
- // information for the chain_hash. Because of discrepancies in implementation we use
- // full_information=false and short_channel_ids=[] as the signal.
- {
- // Handle the reply indicating the peer was unable to fulfill our request.
- let result = net_graph_msg_handler.handle_reply_channel_range(&node_id_1, ReplyChannelRange {
- chain_hash,
- full_information: false,
- first_blocknum: 1000,
- number_of_blocks: 100,
- short_channel_ids: vec![],
- });
- assert!(result.is_err());
- assert_eq!(result.err().unwrap().err, "Received reply_channel_range with no information available");
- }
}
#[test]