Merge pull request #620 from TheBlueMatt/2020-05-pre-bindings-cleanups
[rust-lightning] / lightning / src / ln / peer_handler.rs
index e75c2c96e98c9e28dc24170f379c58bb42b6c4ac..12d7284661dee0c68c2fc377d9d6451141232a00 100644 (file)
@@ -10,7 +10,7 @@ use bitcoin::secp256k1::key::{SecretKey,PublicKey};
 
 use ln::features::InitFeatures;
 use ln::msgs;
-use ln::msgs::ChannelMessageHandler;
+use ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
 use ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager};
 use util::ser::{VecWriter, Writeable};
 use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep};
@@ -19,6 +19,7 @@ use ln::wire::Encode;
 use util::byte_utils;
 use util::events::{MessageSendEvent, MessageSendEventsProvider};
 use util::logger::Logger;
+use routing::network_graph::NetGraphMsgHandler;
 
 use std::collections::{HashMap,hash_map,HashSet,LinkedList};
 use std::sync::{Arc, Mutex};
@@ -31,13 +32,15 @@ use bitcoin::hashes::sha256::HashEngine as Sha256Engine;
 use bitcoin::hashes::{HashEngine, Hash};
 
 /// Provides references to trait impls which handle different types of messages.
-pub struct MessageHandler<CM: Deref> where CM::Target: msgs::ChannelMessageHandler {
+pub struct MessageHandler<CM: Deref, RM: Deref> where
+               CM::Target: ChannelMessageHandler,
+               RM::Target: RoutingMessageHandler {
        /// A message handler which handles messages specific to channels. Usually this is just a
        /// ChannelManager object.
        pub chan_handler: CM,
        /// A message handler which handles messages updating our knowledge of the network channel
        /// graph. Usually this is just a NetGraphMsgHandlerMonitor object.
-       pub route_handler: Arc<msgs::RoutingMessageHandler>,
+       pub route_handler: RM,
 }
 
 /// Provides an object which can be used to send data to and which uniquely identifies a connection
@@ -83,7 +86,7 @@ pub trait SocketDescriptor : cmp::Eq + hash::Hash + Clone {
 pub struct PeerHandleError {
        /// Used to indicate that we probably can't make any future connections to this peer, implying
        /// we should go ahead and force-close any channels we have with it.
-       no_connection_possible: bool,
+       pub no_connection_possible: bool,
 }
 impl fmt::Debug for PeerHandleError {
        fn fmt(&self, formatter: &mut fmt::Formatter) -> Result<(), fmt::Error> {
@@ -171,7 +174,7 @@ fn _check_usize_is_32_or_64() {
 /// lifetimes). Other times you can afford a reference, which is more efficient, in which case
 /// SimpleRefPeerManager is the more appropriate type. Defining these type aliases prevents
 /// issues such as overly long function definitions.
-pub type SimpleArcPeerManager<SD, M, T, F, L> = Arc<PeerManager<SD, SimpleArcChannelManager<M, T, F, L>, Arc<L>>>;
+pub type SimpleArcPeerManager<SD, M, T, F, C, L> = Arc<PeerManager<SD, SimpleArcChannelManager<M, T, F, L>, Arc<NetGraphMsgHandler<Arc<C>, Arc<L>>>, Arc<L>>>;
 
 /// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference
 /// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't
@@ -179,7 +182,7 @@ pub type SimpleArcPeerManager<SD, M, T, F, L> = Arc<PeerManager<SD, SimpleArcCha
 /// usage of lightning-net-tokio (since tokio::spawn requires parameters with static lifetimes).
 /// But if this is not necessary, using a reference is more efficient. Defining these type aliases
 /// helps with issues such as long function definitions.
-pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, SD, M, T, F, L> = PeerManager<SD, SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L>, &'e L>;
+pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, SD, M, T, F, C, L> = PeerManager<SD, SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L>, &'e NetGraphMsgHandler<&'g C, &'f L>, &'f L>;
 
 /// A PeerManager manages a set of peers, described by their SocketDescriptor and marshalls socket
 /// events into messages which it passes on to its MessageHandlers.
@@ -189,8 +192,11 @@ pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, SD, M, T, F, L> = PeerManager<
 /// essentially you should default to using a SimpleRefPeerManager, and use a
 /// SimpleArcPeerManager when you require a PeerManager with a static lifetime, such as when
 /// you're using lightning-net-tokio.
-pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, L: Deref> where CM::Target: msgs::ChannelMessageHandler, L::Target: Logger {
-       message_handler: MessageHandler<CM>,
+pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> where
+               CM::Target: ChannelMessageHandler,
+               RM::Target: RoutingMessageHandler,
+               L::Target: Logger {
+       message_handler: MessageHandler<CM, RM>,
        peers: Mutex<PeerHolder<Descriptor>>,
        our_node_secret: SecretKey,
        ephemeral_key_midstate: Sha256Engine,
@@ -213,11 +219,14 @@ macro_rules! encode_msg {
 
 /// Manages and reacts to connection events. You probably want to use file descriptors as PeerIds.
 /// PeerIds may repeat, but only after socket_disconnected() has been called.
-impl<Descriptor: SocketDescriptor, CM: Deref, L: Deref> PeerManager<Descriptor, CM, L> where CM::Target: msgs::ChannelMessageHandler, L::Target: Logger {
+impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<Descriptor, CM, RM, L> where
+               CM::Target: ChannelMessageHandler,
+               RM::Target: RoutingMessageHandler,
+               L::Target: Logger {
        /// Constructs a new PeerManager with the given message handlers and node_id secret key
        /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
        /// cryptographically secure random bytes.
-       pub fn new(message_handler: MessageHandler<CM>, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L) -> PeerManager<Descriptor, CM, L> {
+       pub fn new(message_handler: MessageHandler<CM, RM>, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L) -> Self {
                let mut ephemeral_key_midstate = Sha256::engine();
                ephemeral_key_midstate.input(ephemeral_random_data);
 
@@ -1159,13 +1168,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, L: Deref> PeerManager<Descriptor,
 
 #[cfg(test)]
 mod tests {
-       use bitcoin::secp256k1::Signature;
-       use bitcoin::BitcoinHash;
-       use bitcoin::network::constants::Network;
-       use bitcoin::blockdata::constants::genesis_block;
        use ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor};
        use ln::msgs;
-       use ln::features::ChannelFeatures;
        use util::events;
        use util::test_utils;
 
@@ -1175,9 +1179,8 @@ mod tests {
        use rand::{thread_rng, Rng};
 
        use std;
-       use std::cmp::min;
        use std::sync::{Arc, Mutex};
-       use std::sync::atomic::{AtomicUsize, Ordering};
+       use std::sync::atomic::Ordering;
 
        #[derive(Clone)]
        struct FileDescriptor {
@@ -1207,18 +1210,18 @@ mod tests {
 
        struct PeerManagerCfg {
                chan_handler: test_utils::TestChannelMessageHandler,
+               routing_handler: test_utils::TestRoutingMessageHandler,
                logger: test_utils::TestLogger,
        }
 
        fn create_peermgr_cfgs(peer_count: usize) -> Vec<PeerManagerCfg> {
                let mut cfgs = Vec::new();
                for _ in 0..peer_count {
-                       let chan_handler = test_utils::TestChannelMessageHandler::new();
-                       let logger = test_utils::TestLogger::new();
                        cfgs.push(
                                PeerManagerCfg{
-                                       chan_handler,
-                                       logger,
+                                       chan_handler: test_utils::TestChannelMessageHandler::new(),
+                                       logger: test_utils::TestLogger::new(),
+                                       routing_handler: test_utils::TestRoutingMessageHandler::new(),
                                }
                        );
                }
@@ -1226,22 +1229,19 @@ mod tests {
                cfgs
        }
 
-       fn create_network<'a>(peer_count: usize, cfgs: &'a Vec<PeerManagerCfg>, routing_handlers: Option<&'a Vec<Arc<msgs::RoutingMessageHandler>>>) -> Vec<PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestLogger>> {
+       fn create_network<'a>(peer_count: usize, cfgs: &'a Vec<PeerManagerCfg>) -> Vec<PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger>> {
                let mut peers = Vec::new();
                let mut rng = thread_rng();
                let mut ephemeral_bytes = [0; 32];
                rng.fill_bytes(&mut ephemeral_bytes);
 
                for i in 0..peer_count {
-                       let router = if let Some(routers) = routing_handlers { routers[i].clone() } else {
-                               Arc::new(test_utils::TestRoutingMessageHandler::new())
-                       };
                        let node_id = {
                                let mut key_slice = [0;32];
                                rng.fill_bytes(&mut key_slice);
                                SecretKey::from_slice(&key_slice).unwrap()
                        };
-                       let msg_handler = MessageHandler { chan_handler: &cfgs[i].chan_handler, route_handler: router };
+                       let msg_handler = MessageHandler { chan_handler: &cfgs[i].chan_handler, route_handler: &cfgs[i].routing_handler };
                        let peer = PeerManager::new(msg_handler, node_id, &ephemeral_bytes, &cfgs[i].logger);
                        peers.push(peer);
                }
@@ -1249,7 +1249,7 @@ mod tests {
                peers
        }
 
-       fn establish_connection<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestLogger>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestLogger>) -> (FileDescriptor, FileDescriptor) {
+       fn establish_connection<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger>) -> (FileDescriptor, FileDescriptor) {
                let secp_ctx = Secp256k1::new();
                let a_id = PublicKey::from_secret_key(&secp_ctx, &peer_a.our_node_secret);
                let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
@@ -1262,7 +1262,7 @@ mod tests {
                (fd_a.clone(), fd_b.clone())
        }
 
-       fn establish_connection_and_read_events<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestLogger>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestLogger>) -> (FileDescriptor, FileDescriptor) {
+       fn establish_connection_and_read_events<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger>) -> (FileDescriptor, FileDescriptor) {
                let (mut fd_a, mut fd_b) = establish_connection(peer_a, peer_b);
                assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
                assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
@@ -1275,7 +1275,7 @@ mod tests {
                // push a DisconnectPeer event to remove the node flagged by id
                let cfgs = create_peermgr_cfgs(2);
                let chan_handler = test_utils::TestChannelMessageHandler::new();
-               let mut peers = create_network(2, &cfgs, None);
+               let mut peers = create_network(2, &cfgs);
                establish_connection(&peers[0], &peers[1]);
                assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);
 
@@ -1297,7 +1297,7 @@ mod tests {
        fn test_timer_tick_occurred() {
                // Create peers, a vector of two peer managers, perform initial set up and check that peers[0] has one Peer.
                let cfgs = create_peermgr_cfgs(2);
-               let peers = create_network(2, &cfgs, None);
+               let peers = create_network(2, &cfgs);
                establish_connection(&peers[0], &peers[1]);
                assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);
 
@@ -1310,119 +1310,13 @@ mod tests {
                assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0);
        }
 
-       pub struct TestRoutingMessageHandler {
-               pub chan_upds_recvd: AtomicUsize,
-               pub chan_anns_recvd: AtomicUsize,
-               pub chan_anns_sent: AtomicUsize,
-       }
-
-       impl TestRoutingMessageHandler {
-               pub fn new() -> Self {
-                       TestRoutingMessageHandler {
-                               chan_upds_recvd: AtomicUsize::new(0),
-                               chan_anns_recvd: AtomicUsize::new(0),
-                               chan_anns_sent: AtomicUsize::new(0),
-                       }
-               }
-
-       }
-       impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
-               fn handle_node_announcement(&self, _msg: &msgs::NodeAnnouncement) -> Result<bool, msgs::LightningError> {
-                       Err(msgs::LightningError { err: "", action: msgs::ErrorAction::IgnoreError })
-               }
-               fn handle_channel_announcement(&self, _msg: &msgs::ChannelAnnouncement) -> Result<bool, msgs::LightningError> {
-                       self.chan_anns_recvd.fetch_add(1, Ordering::AcqRel);
-                       Err(msgs::LightningError { err: "", action: msgs::ErrorAction::IgnoreError })
-               }
-               fn handle_channel_update(&self, _msg: &msgs::ChannelUpdate) -> Result<bool, msgs::LightningError> {
-                       self.chan_upds_recvd.fetch_add(1, Ordering::AcqRel);
-                       Err(msgs::LightningError { err: "", action: msgs::ErrorAction::IgnoreError })
-               }
-               fn handle_htlc_fail_channel_update(&self, _update: &msgs::HTLCFailChannelUpdate) {}
-               fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> {
-                       let mut chan_anns = Vec::new();
-                       const TOTAL_UPDS: u64 = 100;
-                       let end: u64 =  min(starting_point + batch_amount as u64, TOTAL_UPDS - self.chan_anns_sent.load(Ordering::Acquire) as u64);
-                       for i in starting_point..end {
-                               let chan_upd_1 = get_dummy_channel_update(i);
-                               let chan_upd_2 = get_dummy_channel_update(i);
-                               let chan_ann = get_dummy_channel_announcement(i);
-
-                               chan_anns.push((chan_ann, Some(chan_upd_1), Some(chan_upd_2)));
-                       }
-
-                       self.chan_anns_sent.fetch_add(chan_anns.len(), Ordering::AcqRel);
-                       chan_anns
-               }
-
-               fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<msgs::NodeAnnouncement> {
-                       Vec::new()
-               }
-
-               fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool {
-                       true
-               }
-       }
-
-       fn get_dummy_channel_announcement(short_chan_id: u64) -> msgs::ChannelAnnouncement {
-               use bitcoin::secp256k1::ffi::Signature as FFISignature;
-               let secp_ctx = Secp256k1::new();
-               let network = Network::Testnet;
-               let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
-               let node_2_privkey = SecretKey::from_slice(&[41; 32]).unwrap();
-               let node_1_btckey = SecretKey::from_slice(&[40; 32]).unwrap();
-               let node_2_btckey = SecretKey::from_slice(&[39; 32]).unwrap();
-               let unsigned_ann = msgs::UnsignedChannelAnnouncement {
-                       features: ChannelFeatures::known(),
-                       chain_hash: genesis_block(network).header.bitcoin_hash(),
-                       short_channel_id: short_chan_id,
-                       node_id_1: PublicKey::from_secret_key(&secp_ctx, &node_1_privkey),
-                       node_id_2: PublicKey::from_secret_key(&secp_ctx, &node_2_privkey),
-                       bitcoin_key_1: PublicKey::from_secret_key(&secp_ctx, &node_1_btckey),
-                       bitcoin_key_2: PublicKey::from_secret_key(&secp_ctx, &node_2_btckey),
-                       excess_data: Vec::new(),
-               };
-
-               msgs::ChannelAnnouncement {
-                       node_signature_1: Signature::from(FFISignature::new()),
-                       node_signature_2: Signature::from(FFISignature::new()),
-                       bitcoin_signature_1: Signature::from(FFISignature::new()),
-                       bitcoin_signature_2: Signature::from(FFISignature::new()),
-                       contents: unsigned_ann,
-               }
-       }
-
-       fn get_dummy_channel_update(short_chan_id: u64) -> msgs::ChannelUpdate {
-               use bitcoin::secp256k1::ffi::Signature as FFISignature;
-               let network = Network::Testnet;
-               msgs::ChannelUpdate {
-                       signature: Signature::from(FFISignature::new()),
-                       contents: msgs::UnsignedChannelUpdate {
-                               chain_hash: genesis_block(network).header.bitcoin_hash(),
-                               short_channel_id: short_chan_id,
-                               timestamp: 0,
-                               flags: 0,
-                               cltv_expiry_delta: 0,
-                               htlc_minimum_msat: 0,
-                               fee_base_msat: 0,
-                               fee_proportional_millionths: 0,
-                               excess_data: vec![],
-                       }
-               }
-       }
-
        #[test]
        fn test_do_attempt_write_data() {
                // Create 2 peers with custom TestRoutingMessageHandlers and connect them.
                let cfgs = create_peermgr_cfgs(2);
-               let mut routing_handlers: Vec<Arc<msgs::RoutingMessageHandler>> = Vec::new();
-               let mut routing_handlers_concrete: Vec<Arc<TestRoutingMessageHandler>> = Vec::new();
-               for _ in 0..2 {
-                       let routing_handler = Arc::new(TestRoutingMessageHandler::new());
-                       routing_handlers.push(routing_handler.clone());
-                       routing_handlers_concrete.push(routing_handler.clone());
-               }
-               let peers = create_network(2, &cfgs, Some(&routing_handlers));
+               cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release);
+               cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release);
+               let peers = create_network(2, &cfgs);
 
                // By calling establish_connect, we trigger do_attempt_write_data between
                // the peers. Previously this function would mistakenly enter an infinite loop
@@ -1438,10 +1332,10 @@ mod tests {
 
                // Check that each peer has received the expected number of channel updates and channel
                // announcements.
-               assert_eq!(routing_handlers_concrete[0].clone().chan_upds_recvd.load(Ordering::Acquire), 100);
-               assert_eq!(routing_handlers_concrete[0].clone().chan_anns_recvd.load(Ordering::Acquire), 50);
-               assert_eq!(routing_handlers_concrete[1].clone().chan_upds_recvd.load(Ordering::Acquire), 100);
-               assert_eq!(routing_handlers_concrete[1].clone().chan_anns_recvd.load(Ordering::Acquire), 50);
+               assert_eq!(cfgs[0].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
+               assert_eq!(cfgs[0].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
+               assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
+               assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
        }
 
        #[test]
@@ -1449,11 +1343,8 @@ mod tests {
                // Inbound peer 0 requests initial_routing_sync, but outbound peer 1 does not.
                {
                        let cfgs = create_peermgr_cfgs(2);
-                       let routing_handlers: Vec<Arc<msgs::RoutingMessageHandler>> = vec![
-                               Arc::new(test_utils::TestRoutingMessageHandler::new().set_request_full_sync()),
-                               Arc::new(test_utils::TestRoutingMessageHandler::new()),
-                       ];
-                       let peers = create_network(2, &cfgs, Some(&routing_handlers));
+                       cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release);
+                       let peers = create_network(2, &cfgs);
                        let (fd_0_to_1, fd_1_to_0) = establish_connection_and_read_events(&peers[0], &peers[1]);
 
                        let peer_0 = peers[0].peers.lock().unwrap();
@@ -1469,11 +1360,8 @@ mod tests {
                // Outbound peer 1 requests initial_routing_sync, but inbound peer 0 does not.
                {
                        let cfgs = create_peermgr_cfgs(2);
-                       let routing_handlers: Vec<Arc<msgs::RoutingMessageHandler>> = vec![
-                               Arc::new(test_utils::TestRoutingMessageHandler::new()),
-                               Arc::new(test_utils::TestRoutingMessageHandler::new().set_request_full_sync()),
-                       ];
-                       let peers = create_network(2, &cfgs, Some(&routing_handlers));
+                       cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release);
+                       let peers = create_network(2, &cfgs);
                        let (fd_0_to_1, fd_1_to_0) = establish_connection_and_read_events(&peers[0], &peers[1]);
 
                        let peer_0 = peers[0].peers.lock().unwrap();