Defines message handlers for gossip_queries messages in the RoutingMessageHandler
trait. The MessageSendEventsProvider supertrait is added to RoutingMessageHandler
so that the implementor can use SendMessageEvents to send messages to a
peer at the appropriate time.
The trait methods are stubbed in NetGraphMsgHandler which implements
RoutingMessageHandler and return a "not implemented" error.
fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> { Vec::new() }
fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<NodeAnnouncement> { Vec::new() }
fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool { false }
+ fn query_channel_range(&self, _their_node_id: &PublicKey, _chain_hash: bitcoin::BlockHash, _first_blocknum: u32, _block_range: u32) -> Result<(), LightningError> { Ok(()) }
+ fn query_short_channel_ids(&self, _their_node_id: &PublicKey, _chain_hash: bitcoin::BlockHash, _short_channel_ids: Vec<u64>) -> Result<(), LightningError> { Ok(()) }
+ fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: &ReplyChannelRange) -> Result<(), LightningError> { Ok(()) }
+ fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: &ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) }
+ fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: &QueryChannelRange) -> Result<(), LightningError> { Ok(()) }
+ fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: &QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) }
}
impl ChannelMessageHandler for MsgHandler {
fn handle_open_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &OpenChannel) {}
}
/// A trait to describe an object which can receive routing messages.
-pub trait RoutingMessageHandler : Send + Sync {
+pub trait RoutingMessageHandler : Send + Sync + events::MessageSendEventsProvider {
/// Handle an incoming node_announcement message, returning true if it should be forwarded on,
/// false or returning an Err otherwise.
fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError>;
fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<NodeAnnouncement>;
/// Returns whether a full sync should be requested from a peer.
fn should_request_full_sync(&self, node_id: &PublicKey) -> bool;
+ /// Queries a peer for a list of channels with a funding UTXO in the requested
+ /// chain and range of blocks.
+ fn query_channel_range(&self, their_node_id: &PublicKey, chain_hash: BlockHash, first_blocknum: u32, number_of_blocks: u32) -> Result<(), LightningError>;
+ /// Queries a peer for routing gossip messages for a set of channels identified
+ /// by their short_channel_ids.
+ fn query_short_channel_ids(&self, their_node_id: &PublicKey, chain_hash: BlockHash, short_channel_ids: Vec<u64>) -> Result<(), LightningError>;
+ /// Handles the reply of a query we initiated to learn about channels
+ /// for a given range of blocks. We can expect to receive one or more
+ /// replies to a single query.
+ fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: &ReplyChannelRange) -> Result<(), LightningError>;
+ /// Handles the reply of a query we initiated asking for routing gossip
+ /// messages for a list of channels. We should receive this message when
+ /// a node has completed its best effort to send us the pertaining routing
+ /// gossip messages.
+ fn handle_reply_short_channel_ids_end(&self, their_node_id: &PublicKey, msg: &ReplyShortChannelIdsEnd) -> Result<(), LightningError>;
+ /// Handles when a peer asks us to send a list of short_channel_ids
+ /// for the requested range of blocks.
+ fn handle_query_channel_range(&self, their_node_id: &PublicKey, msg: &QueryChannelRange) -> Result<(), LightningError>;
+ /// Handles when a peer asks us to send routing gossip messages for a
+ /// list of short_channel_ids.
+ fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: &QueryShortChannelIds) -> Result<(), LightningError>;
}
mod fuzzy_internal_msgs {
// TODO: forward msg along to all our other peers!
}
},
- wire::Message::QueryShortChannelIds(_msg) => {
- // TODO: handle message
+ wire::Message::QueryShortChannelIds(msg) => {
+ self.message_handler.route_handler.handle_query_short_channel_ids(&peer.their_node_id.unwrap(), &msg)?;
},
- wire::Message::ReplyShortChannelIdsEnd(_msg) => {
- // TODO: handle message
+ wire::Message::ReplyShortChannelIdsEnd(msg) => {
+ self.message_handler.route_handler.handle_reply_short_channel_ids_end(&peer.their_node_id.unwrap(), &msg)?;
},
- wire::Message::QueryChannelRange(_msg) => {
- // TODO: handle message
+ wire::Message::QueryChannelRange(msg) => {
+ self.message_handler.route_handler.handle_query_channel_range(&peer.their_node_id.unwrap(), &msg)?;
},
- wire::Message::ReplyChannelRange(_msg) => {
- // TODO: handle message
+ wire::Message::ReplyChannelRange(msg) => {
+ self.message_handler.route_handler.handle_reply_channel_range(&peer.their_node_id.unwrap(), &msg)?;
},
wire::Message::GossipTimestampFilter(_msg) => {
// TODO: handle message
// drop optional-ish messages when send buffers get full!
let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
+ events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
let mut peers_lock = self.peers.lock().unwrap();
let peers = &mut *peers_lock;
for event in events_generated.drain(..) {
use bitcoin::blockdata::script::Builder;
use bitcoin::blockdata::transaction::TxOut;
use bitcoin::blockdata::opcodes;
+use bitcoin::hash_types::BlockHash;
use chain;
use chain::Access;
use ln::features::{ChannelFeatures, NodeFeatures};
use ln::msgs::{DecodeError, ErrorAction, LightningError, RoutingMessageHandler, NetAddress, MAX_VALUE_MSAT};
use ln::msgs::{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement, OptionalField};
+use ln::msgs::{QueryChannelRange, ReplyChannelRange, QueryShortChannelIds, ReplyShortChannelIdsEnd};
use ln::msgs;
use util::ser::{Writeable, Readable, Writer};
use util::logger::Logger;
+use util::events;
use std::{cmp, fmt};
use std::sync::{RwLock, RwLockReadGuard};
use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Mutex;
use std::collections::BTreeMap;
use std::collections::btree_map::Entry as BtreeEntry;
use std::ops::Deref;
pub network_graph: RwLock<NetworkGraph>,
chain_access: Option<C>,
full_syncs_requested: AtomicUsize,
+ pending_events: Mutex<Vec<events::MessageSendEvent>>,
logger: L,
}
}),
full_syncs_requested: AtomicUsize::new(0),
chain_access,
+ pending_events: Mutex::new(vec![]),
logger,
}
}
network_graph: RwLock::new(network_graph),
full_syncs_requested: AtomicUsize::new(0),
chain_access,
+ pending_events: Mutex::new(vec![]),
logger,
}
}
false
}
}
+
+ fn query_channel_range(&self, _their_node_id: &PublicKey, _chain_hash: BlockHash, _first_blocknum: u32, _number_of_blocks: u32) -> Result<(), LightningError> {
+ // TODO
+ Err(LightningError {
+ err: String::from("Not implemented"),
+ action: ErrorAction::IgnoreError,
+ })
+ }
+
+ fn query_short_channel_ids(&self, _their_node_id: &PublicKey, _chain_hash: BlockHash, _short_channel_ids: Vec<u64>) -> Result<(), LightningError> {
+ // TODO
+ Err(LightningError {
+ err: String::from("Not implemented"),
+ action: ErrorAction::IgnoreError,
+ })
+ }
+
+ fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: &ReplyChannelRange) -> Result<(), LightningError> {
+ // TODO
+ Err(LightningError {
+ err: String::from("Not implemented"),
+ action: ErrorAction::IgnoreError,
+ })
+ }
+
+ fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: &ReplyShortChannelIdsEnd) -> Result<(), LightningError> {
+ // TODO
+ Err(LightningError {
+ err: String::from("Not implemented"),
+ action: ErrorAction::IgnoreError,
+ })
+ }
+
+ fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: &QueryChannelRange) -> Result<(), LightningError> {
+ // TODO
+ Err(LightningError {
+ err: String::from("Not implemented"),
+ action: ErrorAction::IgnoreError,
+ })
+ }
+
+ fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: &QueryShortChannelIds) -> Result<(), LightningError> {
+ // TODO
+ Err(LightningError {
+ err: String::from("Not implemented"),
+ action: ErrorAction::IgnoreError,
+ })
+ }
+}
+
+impl<C: Deref, L: Deref> events::MessageSendEventsProvider for NetGraphMsgHandler<C, L>
+where
+ C::Target: chain::Access,
+ L::Target: Logger,
+{
+ fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
+ let mut ret = Vec::new();
+ let mut pending_events = self.pending_events.lock().unwrap();
+ std::mem::swap(&mut ret, &mut pending_events);
+ ret
+ }
}
#[derive(PartialEq, Debug)]
fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool {
self.request_full_sync.load(Ordering::Acquire)
}
+
+ fn query_channel_range(&self, _their_node_id: &PublicKey, _chain_hash: BlockHash, _first_blocknum: u32, _number_of_blocks: u32) -> Result<(), msgs::LightningError> {
+ Ok(())
+ }
+
+ fn query_short_channel_ids(&self, _their_node_id: &PublicKey, _chain_hash: BlockHash, _short_channel_ids: Vec<u64>) -> Result<(), msgs::LightningError> {
+ Ok(())
+ }
+
+ fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: &msgs::ReplyChannelRange) -> Result<(), msgs::LightningError> {
+ Ok(())
+ }
+
+ fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: &msgs::ReplyShortChannelIdsEnd) -> Result<(), msgs::LightningError> {
+ Ok(())
+ }
+
+ fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: &msgs::QueryChannelRange) -> Result<(), msgs::LightningError> {
+ Ok(())
+ }
+
+ fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: &msgs::QueryShortChannelIds) -> Result<(), msgs::LightningError> {
+ Ok(())
+ }
+}
+
+impl events::MessageSendEventsProvider for TestRoutingMessageHandler {
+ fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
+ vec![]
+ }
}
pub struct TestLogger {