From 55e5aafcfe3b4d55df1fa500846e3a0f093f85c8 Mon Sep 17 00:00:00 2001 From: bmancini55 Date: Thu, 22 Oct 2020 10:51:54 -0400 Subject: [PATCH] Add gossip_queries methods to RoutingMessageHandler trait Defines message handlers for gossip_queries messages in the RoutingMessageHandler trait. The MessageSendEventsProvider supertrait is added to RoutingMessageHandler so that the implementor can use SendMessageEvents to send messages to a peer at the appropriate time. The trait methods are stubbed in NetGraphMsgHandler which implements RoutingMessageHandler and return a "not implemented" error. --- lightning-net-tokio/src/lib.rs | 6 +++ lightning/src/ln/msgs.rs | 23 ++++++++- lightning/src/ln/peer_handler.rs | 17 ++++--- lightning/src/routing/network_graph.rs | 68 ++++++++++++++++++++++++++ lightning/src/util/test_utils.rs | 30 ++++++++++++ 5 files changed, 135 insertions(+), 9 deletions(-) diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 36384380..6f80ef2e 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -536,6 +536,12 @@ mod tests { fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> Vec<(ChannelAnnouncement, Option, Option)> { Vec::new() } fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec { Vec::new() } fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool { false } + fn query_channel_range(&self, _their_node_id: &PublicKey, _chain_hash: bitcoin::BlockHash, _first_blocknum: u32, _block_range: u32) -> Result<(), LightningError> { Ok(()) } + fn query_short_channel_ids(&self, _their_node_id: &PublicKey, _chain_hash: bitcoin::BlockHash, _short_channel_ids: Vec) -> Result<(), LightningError> { Ok(()) } + fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: &ReplyChannelRange) -> Result<(), LightningError> { Ok(()) } + fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: &ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) } + fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: &QueryChannelRange) -> Result<(), LightningError> { Ok(()) } + fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: &QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) } } impl ChannelMessageHandler for MsgHandler { fn handle_open_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &OpenChannel) {} diff --git a/lightning/src/ln/msgs.rs b/lightning/src/ln/msgs.rs index f57cf0fd..9111a1ed 100644 --- a/lightning/src/ln/msgs.rs +++ b/lightning/src/ln/msgs.rs @@ -804,7 +804,7 @@ pub trait ChannelMessageHandler : events::MessageSendEventsProvider + Send + Syn } /// A trait to describe an object which can receive routing messages. -pub trait RoutingMessageHandler : Send + Sync { +pub trait RoutingMessageHandler : Send + Sync + events::MessageSendEventsProvider { /// Handle an incoming node_announcement message, returning true if it should be forwarded on, /// false or returning an Err otherwise. fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result; @@ -827,6 +827,27 @@ pub trait RoutingMessageHandler : Send + Sync { fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec; /// Returns whether a full sync should be requested from a peer. fn should_request_full_sync(&self, node_id: &PublicKey) -> bool; + /// Queries a peer for a list of channels with a funding UTXO in the requested + /// chain and range of blocks. + fn query_channel_range(&self, their_node_id: &PublicKey, chain_hash: BlockHash, first_blocknum: u32, number_of_blocks: u32) -> Result<(), LightningError>; + /// Queries a peer for routing gossip messages for a set of channels identified + /// by their short_channel_ids. + fn query_short_channel_ids(&self, their_node_id: &PublicKey, chain_hash: BlockHash, short_channel_ids: Vec) -> Result<(), LightningError>; + /// Handles the reply of a query we initiated to learn about channels + /// for a given range of blocks. We can expect to receive one or more + /// replies to a single query. + fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: &ReplyChannelRange) -> Result<(), LightningError>; + /// Handles the reply of a query we initiated asking for routing gossip + /// messages for a list of channels. We should receive this message when + /// a node has completed its best effort to send us the pertaining routing + /// gossip messages. + fn handle_reply_short_channel_ids_end(&self, their_node_id: &PublicKey, msg: &ReplyShortChannelIdsEnd) -> Result<(), LightningError>; + /// Handles when a peer asks us to send a list of short_channel_ids + /// for the requested range of blocks. + fn handle_query_channel_range(&self, their_node_id: &PublicKey, msg: &QueryChannelRange) -> Result<(), LightningError>; + /// Handles when a peer asks us to send routing gossip messages for a + /// list of short_channel_ids. + fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: &QueryShortChannelIds) -> Result<(), LightningError>; } mod fuzzy_internal_msgs { diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index b318e503..beac542a 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -841,17 +841,17 @@ impl PeerManager { - // TODO: handle message + wire::Message::QueryShortChannelIds(msg) => { + self.message_handler.route_handler.handle_query_short_channel_ids(&peer.their_node_id.unwrap(), &msg)?; }, - wire::Message::ReplyShortChannelIdsEnd(_msg) => { - // TODO: handle message + wire::Message::ReplyShortChannelIdsEnd(msg) => { + self.message_handler.route_handler.handle_reply_short_channel_ids_end(&peer.their_node_id.unwrap(), &msg)?; }, - wire::Message::QueryChannelRange(_msg) => { - // TODO: handle message + wire::Message::QueryChannelRange(msg) => { + self.message_handler.route_handler.handle_query_channel_range(&peer.their_node_id.unwrap(), &msg)?; }, - wire::Message::ReplyChannelRange(_msg) => { - // TODO: handle message + wire::Message::ReplyChannelRange(msg) => { + self.message_handler.route_handler.handle_reply_channel_range(&peer.their_node_id.unwrap(), &msg)?; }, wire::Message::GossipTimestampFilter(_msg) => { // TODO: handle message @@ -880,6 +880,7 @@ impl PeerManager where C::Target: chain::Access pub network_graph: RwLock, chain_access: Option, full_syncs_requested: AtomicUsize, + pending_events: Mutex>, logger: L, } @@ -77,6 +82,7 @@ impl NetGraphMsgHandler where C::Target: chain::Access }), full_syncs_requested: AtomicUsize::new(0), chain_access, + pending_events: Mutex::new(vec![]), logger, } } @@ -89,6 +95,7 @@ impl NetGraphMsgHandler where C::Target: chain::Access network_graph: RwLock::new(network_graph), full_syncs_requested: AtomicUsize::new(0), chain_access, + pending_events: Mutex::new(vec![]), logger, } } @@ -212,6 +219,67 @@ impl RoutingMessageHandler for N false } } + + fn query_channel_range(&self, _their_node_id: &PublicKey, _chain_hash: BlockHash, _first_blocknum: u32, _number_of_blocks: u32) -> Result<(), LightningError> { + // TODO + Err(LightningError { + err: String::from("Not implemented"), + action: ErrorAction::IgnoreError, + }) + } + + fn query_short_channel_ids(&self, _their_node_id: &PublicKey, _chain_hash: BlockHash, _short_channel_ids: Vec) -> Result<(), LightningError> { + // TODO + Err(LightningError { + err: String::from("Not implemented"), + action: ErrorAction::IgnoreError, + }) + } + + fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: &ReplyChannelRange) -> Result<(), LightningError> { + // TODO + Err(LightningError { + err: String::from("Not implemented"), + action: ErrorAction::IgnoreError, + }) + } + + fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: &ReplyShortChannelIdsEnd) -> Result<(), LightningError> { + // TODO + Err(LightningError { + err: String::from("Not implemented"), + action: ErrorAction::IgnoreError, + }) + } + + fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: &QueryChannelRange) -> Result<(), LightningError> { + // TODO + Err(LightningError { + err: String::from("Not implemented"), + action: ErrorAction::IgnoreError, + }) + } + + fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: &QueryShortChannelIds) -> Result<(), LightningError> { + // TODO + Err(LightningError { + err: String::from("Not implemented"), + action: ErrorAction::IgnoreError, + }) + } +} + +impl events::MessageSendEventsProvider for NetGraphMsgHandler +where + C::Target: chain::Access, + L::Target: Logger, +{ + fn get_and_clear_pending_msg_events(&self) -> Vec { + let mut ret = Vec::new(); + let mut pending_events = self.pending_events.lock().unwrap(); + std::mem::swap(&mut ret, &mut pending_events); + ret + } } #[derive(PartialEq, Debug)] diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 6c3552d5..55353c0f 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -319,6 +319,36 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler { fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool { self.request_full_sync.load(Ordering::Acquire) } + + fn query_channel_range(&self, _their_node_id: &PublicKey, _chain_hash: BlockHash, _first_blocknum: u32, _number_of_blocks: u32) -> Result<(), msgs::LightningError> { + Ok(()) + } + + fn query_short_channel_ids(&self, _their_node_id: &PublicKey, _chain_hash: BlockHash, _short_channel_ids: Vec) -> Result<(), msgs::LightningError> { + Ok(()) + } + + fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: &msgs::ReplyChannelRange) -> Result<(), msgs::LightningError> { + Ok(()) + } + + fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: &msgs::ReplyShortChannelIdsEnd) -> Result<(), msgs::LightningError> { + Ok(()) + } + + fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: &msgs::QueryChannelRange) -> Result<(), msgs::LightningError> { + Ok(()) + } + + fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: &msgs::QueryShortChannelIds) -> Result<(), msgs::LightningError> { + Ok(()) + } +} + +impl events::MessageSendEventsProvider for TestRoutingMessageHandler { + fn get_and_clear_pending_msg_events(&self) -> Vec { + vec![] + } } pub struct TestLogger { -- 2.30.2