Add gossip_queries methods to RoutingMessageHandler trait
authorbmancini55 <bmancini@gmail.com>
Thu, 22 Oct 2020 14:51:54 +0000 (10:51 -0400)
committerbmancini55 <bmancini@gmail.com>
Wed, 9 Dec 2020 20:02:32 +0000 (15:02 -0500)
Defines message handlers for gossip_queries messages in the RoutingMessageHandler
trait. The MessageSendEventsProvider supertrait is added to RoutingMessageHandler
so that the implementor can use SendMessageEvents to send messages to a
peer at the appropriate time.

The trait methods are stubbed in NetGraphMsgHandler which implements
RoutingMessageHandler and return a "not implemented" error.

lightning-net-tokio/src/lib.rs
lightning/src/ln/msgs.rs
lightning/src/ln/peer_handler.rs
lightning/src/routing/network_graph.rs
lightning/src/util/test_utils.rs

index 36384380fb14a458b31e132446ae374ffaf0b51d..6f80ef2e39671718226d80f514d11e75bfa5ce83 100644 (file)
@@ -536,6 +536,12 @@ mod tests {
                fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> Vec<(ChannelAnnouncement, Option<ChannelUpdate>, Option<ChannelUpdate>)> { Vec::new() }
                fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<NodeAnnouncement> { Vec::new() }
                fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool { false }
+               fn query_channel_range(&self, _their_node_id: &PublicKey, _chain_hash: bitcoin::BlockHash,  _first_blocknum: u32, _block_range: u32) -> Result<(), LightningError> { Ok(()) }
+               fn query_short_channel_ids(&self, _their_node_id: &PublicKey, _chain_hash: bitcoin::BlockHash, _short_channel_ids: Vec<u64>) -> Result<(), LightningError> { Ok(()) }
+               fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: &ReplyChannelRange) -> Result<(), LightningError> { Ok(()) }
+               fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: &ReplyShortChannelIdsEnd) -> Result<(), LightningError> { Ok(()) }
+               fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: &QueryChannelRange) -> Result<(), LightningError> { Ok(()) }
+               fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: &QueryShortChannelIds) -> Result<(), LightningError> { Ok(()) }
        }
        impl ChannelMessageHandler for MsgHandler {
                fn handle_open_channel(&self, _their_node_id: &PublicKey, _their_features: InitFeatures, _msg: &OpenChannel) {}
index f57cf0fd1fb9304fac06f34fad5164e6e8a1dfe4..9111a1ed657a544c9ca0709193e99c0069d967be 100644 (file)
@@ -804,7 +804,7 @@ pub trait ChannelMessageHandler : events::MessageSendEventsProvider + Send + Syn
 }
 
 /// A trait to describe an object which can receive routing messages.
-pub trait RoutingMessageHandler : Send + Sync {
+pub trait RoutingMessageHandler : Send + Sync + events::MessageSendEventsProvider {
        /// Handle an incoming node_announcement message, returning true if it should be forwarded on,
        /// false or returning an Err otherwise.
        fn handle_node_announcement(&self, msg: &NodeAnnouncement) -> Result<bool, LightningError>;
@@ -827,6 +827,27 @@ pub trait RoutingMessageHandler : Send + Sync {
        fn get_next_node_announcements(&self, starting_point: Option<&PublicKey>, batch_amount: u8) -> Vec<NodeAnnouncement>;
        /// Returns whether a full sync should be requested from a peer.
        fn should_request_full_sync(&self, node_id: &PublicKey) -> bool;
+       /// Queries a peer for a list of channels with a funding UTXO in the requested
+       /// chain and range of blocks.
+       fn query_channel_range(&self, their_node_id: &PublicKey, chain_hash: BlockHash, first_blocknum: u32, number_of_blocks: u32) -> Result<(), LightningError>;
+       /// Queries a peer for routing gossip messages for a set of channels identified
+       /// by their short_channel_ids.
+       fn query_short_channel_ids(&self, their_node_id: &PublicKey, chain_hash: BlockHash, short_channel_ids: Vec<u64>) -> Result<(), LightningError>;
+       /// Handles the reply of a query we initiated to learn about channels
+       /// for a given range of blocks. We can expect to receive one or more
+       /// replies to a single query.
+       fn handle_reply_channel_range(&self, their_node_id: &PublicKey, msg: &ReplyChannelRange) -> Result<(), LightningError>;
+       /// Handles the reply of a query we initiated asking for routing gossip
+       /// messages for a list of channels. We should receive this message when
+       /// a node has completed its best effort to send us the pertaining routing
+       /// gossip messages.
+       fn handle_reply_short_channel_ids_end(&self, their_node_id: &PublicKey, msg: &ReplyShortChannelIdsEnd) -> Result<(), LightningError>;
+       /// Handles when a peer asks us to send a list of short_channel_ids
+       /// for the requested range of blocks.
+       fn handle_query_channel_range(&self, their_node_id: &PublicKey, msg: &QueryChannelRange) -> Result<(), LightningError>;
+       /// Handles when a peer asks us to send routing gossip messages for a
+       /// list of short_channel_ids.
+       fn handle_query_short_channel_ids(&self, their_node_id: &PublicKey, msg: &QueryShortChannelIds) -> Result<(), LightningError>;
 }
 
 mod fuzzy_internal_msgs {
index b318e50318285497c8bbb0bfb797664c92c6ac1a..beac542abddd56f53281bd5895ed359e542fa07a 100644 (file)
@@ -841,17 +841,17 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                                        // TODO: forward msg along to all our other peers!
                                }
                        },
-                       wire::Message::QueryShortChannelIds(_msg) => {
-                               // TODO: handle message
+                       wire::Message::QueryShortChannelIds(msg) => {
+                               self.message_handler.route_handler.handle_query_short_channel_ids(&peer.their_node_id.unwrap(), &msg)?;
                        },
-                       wire::Message::ReplyShortChannelIdsEnd(_msg) => {
-                               // TODO: handle message
+                       wire::Message::ReplyShortChannelIdsEnd(msg) => {
+                               self.message_handler.route_handler.handle_reply_short_channel_ids_end(&peer.their_node_id.unwrap(), &msg)?;
                        },
-                       wire::Message::QueryChannelRange(_msg) => {
-                               // TODO: handle message
+                       wire::Message::QueryChannelRange(msg) => {
+                               self.message_handler.route_handler.handle_query_channel_range(&peer.their_node_id.unwrap(), &msg)?;
                        },
-                       wire::Message::ReplyChannelRange(_msg) => {
-                               // TODO: handle message
+                       wire::Message::ReplyChannelRange(msg) => {
+                               self.message_handler.route_handler.handle_reply_channel_range(&peer.their_node_id.unwrap(), &msg)?;
                        },
                        wire::Message::GossipTimestampFilter(_msg) => {
                                // TODO: handle message
@@ -880,6 +880,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
                        // drop optional-ish messages when send buffers get full!
 
                        let mut events_generated = self.message_handler.chan_handler.get_and_clear_pending_msg_events();
+                       events_generated.append(&mut self.message_handler.route_handler.get_and_clear_pending_msg_events());
                        let mut peers_lock = self.peers.lock().unwrap();
                        let peers = &mut *peers_lock;
                        for event in events_generated.drain(..) {
index 54783dd1dc3cdf633efcf19d49092c03ec0a7f78..9f20219eba52a4475e21c0adc59179aeeee31a52 100644 (file)
@@ -18,19 +18,23 @@ use bitcoin::hashes::Hash;
 use bitcoin::blockdata::script::Builder;
 use bitcoin::blockdata::transaction::TxOut;
 use bitcoin::blockdata::opcodes;
+use bitcoin::hash_types::BlockHash;
 
 use chain;
 use chain::Access;
 use ln::features::{ChannelFeatures, NodeFeatures};
 use ln::msgs::{DecodeError, ErrorAction, LightningError, RoutingMessageHandler, NetAddress, MAX_VALUE_MSAT};
 use ln::msgs::{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement, OptionalField};
+use ln::msgs::{QueryChannelRange, ReplyChannelRange, QueryShortChannelIds, ReplyShortChannelIdsEnd};
 use ln::msgs;
 use util::ser::{Writeable, Readable, Writer};
 use util::logger::Logger;
+use util::events;
 
 use std::{cmp, fmt};
 use std::sync::{RwLock, RwLockReadGuard};
 use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Mutex;
 use std::collections::BTreeMap;
 use std::collections::btree_map::Entry as BtreeEntry;
 use std::ops::Deref;
@@ -59,6 +63,7 @@ pub struct NetGraphMsgHandler<C: Deref, L: Deref> where C::Target: chain::Access
        pub network_graph: RwLock<NetworkGraph>,
        chain_access: Option<C>,
        full_syncs_requested: AtomicUsize,
+       pending_events: Mutex<Vec<events::MessageSendEvent>>,
        logger: L,
 }
 
@@ -77,6 +82,7 @@ impl<C: Deref, L: Deref> NetGraphMsgHandler<C, L> where C::Target: chain::Access
                        }),
                        full_syncs_requested: AtomicUsize::new(0),
                        chain_access,
+                       pending_events: Mutex::new(vec![]),
                        logger,
                }
        }
@@ -89,6 +95,7 @@ impl<C: Deref, L: Deref> NetGraphMsgHandler<C, L> where C::Target: chain::Access
                        network_graph: RwLock::new(network_graph),
                        full_syncs_requested: AtomicUsize::new(0),
                        chain_access,
+                       pending_events: Mutex::new(vec![]),
                        logger,
                }
        }
@@ -212,6 +219,67 @@ impl<C: Deref + Sync + Send, L: Deref + Sync + Send> RoutingMessageHandler for N
                        false
                }
        }
+
+       fn query_channel_range(&self, _their_node_id: &PublicKey, _chain_hash: BlockHash, _first_blocknum: u32, _number_of_blocks: u32) -> Result<(), LightningError> {
+               // TODO
+               Err(LightningError {
+                       err: String::from("Not implemented"),
+                       action: ErrorAction::IgnoreError,
+               })
+       }
+
+       fn query_short_channel_ids(&self, _their_node_id: &PublicKey, _chain_hash: BlockHash, _short_channel_ids: Vec<u64>) -> Result<(), LightningError> {
+               // TODO
+               Err(LightningError {
+                       err: String::from("Not implemented"),
+                       action: ErrorAction::IgnoreError,
+               })
+       }
+
+       fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: &ReplyChannelRange) -> Result<(), LightningError> {
+               // TODO
+               Err(LightningError {
+                       err: String::from("Not implemented"),
+                       action: ErrorAction::IgnoreError,
+               })
+       }
+
+       fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: &ReplyShortChannelIdsEnd) -> Result<(), LightningError> {
+               // TODO
+               Err(LightningError {
+                       err: String::from("Not implemented"),
+                       action: ErrorAction::IgnoreError,
+               })
+       }
+
+       fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: &QueryChannelRange) -> Result<(), LightningError> {
+               // TODO
+               Err(LightningError {
+                       err: String::from("Not implemented"),
+                       action: ErrorAction::IgnoreError,
+               })
+       }
+
+       fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: &QueryShortChannelIds) -> Result<(), LightningError> {
+               // TODO
+               Err(LightningError {
+                       err: String::from("Not implemented"),
+                       action: ErrorAction::IgnoreError,
+               })
+       }
+}
+
+impl<C: Deref, L: Deref> events::MessageSendEventsProvider for NetGraphMsgHandler<C, L>
+where
+       C::Target: chain::Access,
+       L::Target: Logger,
+{
+       fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
+               let mut ret = Vec::new();
+               let mut pending_events = self.pending_events.lock().unwrap();
+               std::mem::swap(&mut ret, &mut pending_events);
+               ret
+       }
 }
 
 #[derive(PartialEq, Debug)]
index 6c3552d5df0853f1b8b2e79592573373c25f7c99..55353c0f26be81bf107267ae083990bfd5da27c8 100644 (file)
@@ -319,6 +319,36 @@ impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
        fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool {
                self.request_full_sync.load(Ordering::Acquire)
        }
+
+       fn query_channel_range(&self, _their_node_id: &PublicKey, _chain_hash: BlockHash, _first_blocknum: u32, _number_of_blocks: u32) -> Result<(), msgs::LightningError> {
+               Ok(())
+       }
+
+       fn query_short_channel_ids(&self, _their_node_id: &PublicKey, _chain_hash: BlockHash, _short_channel_ids: Vec<u64>) -> Result<(), msgs::LightningError> {
+               Ok(())
+       }
+
+       fn handle_reply_channel_range(&self, _their_node_id: &PublicKey, _msg: &msgs::ReplyChannelRange) -> Result<(), msgs::LightningError> {
+               Ok(())
+       }
+
+       fn handle_reply_short_channel_ids_end(&self, _their_node_id: &PublicKey, _msg: &msgs::ReplyShortChannelIdsEnd) -> Result<(), msgs::LightningError> {
+               Ok(())
+       }
+
+       fn handle_query_channel_range(&self, _their_node_id: &PublicKey, _msg: &msgs::QueryChannelRange) -> Result<(), msgs::LightningError> {
+               Ok(())
+       }
+
+       fn handle_query_short_channel_ids(&self, _their_node_id: &PublicKey, _msg: &msgs::QueryShortChannelIds) -> Result<(), msgs::LightningError> {
+               Ok(())
+       }
+}
+
+impl events::MessageSendEventsProvider for TestRoutingMessageHandler {
+       fn get_and_clear_pending_msg_events(&self) -> Vec<events::MessageSendEvent> {
+               vec![]
+       }
 }
 
 pub struct TestLogger {