Make RoutingMessageHandler a generic Deref instead of an Arc
authorMatt Corallo <git@bluematt.me>
Tue, 12 May 2020 17:31:20 +0000 (13:31 -0400)
committerMatt Corallo <git@bluematt.me>
Fri, 22 May 2020 18:28:13 +0000 (14:28 -0400)
We also update to use single idents when referencing the Deref=*
types since the automated code generator is pretty braindead.

This also moves some test utils out of peer_handler.rs and into
util::test_utils to standardize things a little bit, which we need
to concretize the PeerHandler types used in testing.

fuzz/src/full_stack.rs
lightning-net-tokio/src/lib.rs
lightning/src/ln/peer_handler.rs
lightning/src/util/test_utils.rs

index 81cf6a248cc2f780880d789c64a0c765fb1ef5de..b2ef709330d02e306e0ba7d675a23d299c356ab8 100644 (file)
@@ -134,10 +134,17 @@ impl<'a> std::hash::Hash for Peer<'a> {
        }
 }
 
+type ChannelMan = ChannelManager<
+       EnforcingChannelKeys,
+       Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint, EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<ChainWatchInterfaceUtil>>>,
+       Arc<TestBroadcaster>, Arc<KeyProvider>, Arc<FuzzEstimator>, Arc<dyn Logger>>;
+type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<NetGraphMsgHandler<Arc<ChainWatchInterfaceUtil>, Arc<dyn Logger>>>, Arc<dyn Logger>>;
+
 struct MoneyLossDetector<'a> {
-       manager: Arc<ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint, EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<ChainWatchInterfaceUtil>>>, Arc<TestBroadcaster>, Arc<KeyProvider>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
-       monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint, EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<ChainWatchInterfaceUtil>>>,
-       handler: PeerManager<Peer<'a>, Arc<ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint, EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<ChainWatchInterfaceUtil>>>, Arc<TestBroadcaster>, Arc<KeyProvider>, Arc<FuzzEstimator>, Arc<dyn Logger>>>, Arc<dyn Logger>>,
+       manager: Arc<ChannelMan>,
+       monitor: Arc<channelmonitor::SimpleManyChannelMonitor<
+               OutPoint, EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<ChainWatchInterfaceUtil>>>,
+       handler: PeerMan<'a>,
 
        peers: &'a RefCell<[bool; 256]>,
        funding_txn: Vec<Transaction>,
@@ -149,9 +156,9 @@ struct MoneyLossDetector<'a> {
 }
 impl<'a> MoneyLossDetector<'a> {
        pub fn new(peers: &'a RefCell<[bool; 256]>,
-                  manager: Arc<ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint, EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<ChainWatchInterfaceUtil>>>, Arc<TestBroadcaster>, Arc<KeyProvider>, Arc<FuzzEstimator>, Arc<dyn Logger>>>,
+                  manager: Arc<ChannelMan>,
                   monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint, EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<ChainWatchInterfaceUtil>>>,
-                  handler: PeerManager<Peer<'a>, Arc<ChannelManager<EnforcingChannelKeys, Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint, EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<ChainWatchInterfaceUtil>>>, Arc<TestBroadcaster>, Arc<KeyProvider>, Arc<FuzzEstimator>, Arc<dyn Logger>>>, Arc<dyn Logger>>) -> Self {
+                  handler: PeerMan<'a>) -> Self {
                MoneyLossDetector {
                        manager,
                        monitor,
index 15226642a4105aee94c511334952408e42df5964..d2494d270ee185935a71aa6da86e51bdf9fd75c3 100644 (file)
@@ -28,7 +28,7 @@
 //! type ChainWatchInterface = dyn lightning::chain::chaininterface::ChainWatchInterface;
 //! type ChannelMonitor = lightning::ln::channelmonitor::SimpleManyChannelMonitor<lightning::chain::transaction::OutPoint, lightning::chain::keysinterface::InMemoryChannelKeys, Arc<TxBroadcaster>, Arc<FeeEstimator>, Arc<Logger>, Arc<ChainWatchInterface>>;
 //! type ChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<ChannelMonitor, TxBroadcaster, FeeEstimator, Logger>;
-//! type PeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<lightning_net_tokio::SocketDescriptor, ChannelMonitor, TxBroadcaster, FeeEstimator, Logger>;
+//! type PeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<lightning_net_tokio::SocketDescriptor, ChannelMonitor, TxBroadcaster, FeeEstimator, ChainWatchInterface, Logger>;
 //!
 //! // Connect to node with pubkey their_node_id at addr:
 //! async fn connect_to_node(peer_manager: PeerManager, channel_monitor: Arc<ChannelMonitor>, channel_manager: ChannelManager, their_node_id: PublicKey, addr: SocketAddr) {
@@ -70,7 +70,7 @@ use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
 
 use lightning::ln::peer_handler;
 use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
-use lightning::ln::msgs::ChannelMessageHandler;
+use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
 use lightning::util::logger::Logger;
 
 use std::{task, thread};
@@ -124,7 +124,10 @@ impl Connection {
                        _ => panic!()
                }
        }
-       async fn schedule_read<CMH: ChannelMessageHandler + 'static, L: Logger + 'static + ?Sized>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<L>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) {
+       async fn schedule_read<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, us: Arc<Mutex<Self>>, mut reader: io::ReadHalf<TcpStream>, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) where
+                       CMH: ChannelMessageHandler + 'static,
+                       RMH: RoutingMessageHandler + 'static,
+                       L: Logger + 'static + ?Sized {
                let peer_manager_ref = peer_manager.clone();
                // 8KB is nice and big but also should never cause any issues with stack overflowing.
                let mut buf = [0; 8192];
@@ -234,7 +237,10 @@ impl Connection {
 /// not need to poll the provided future in order to make progress.
 ///
 /// See the module-level documentation for how to handle the event_notify mpsc::Sender.
-pub fn setup_inbound<CMH: ChannelMessageHandler + 'static, L: Logger + 'static + ?Sized>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<L>>>, event_notify: mpsc::Sender<()>, stream: TcpStream) -> impl std::future::Future<Output=()> {
+pub fn setup_inbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, event_notify: mpsc::Sender<()>, stream: TcpStream) -> impl std::future::Future<Output=()> where
+               CMH: ChannelMessageHandler + 'static,
+               RMH: RoutingMessageHandler + 'static,
+               L: Logger + 'static + ?Sized {
        let (reader, write_receiver, read_receiver, us) = Connection::new(event_notify, stream);
        #[cfg(debug_assertions)]
        let last_us = Arc::clone(&us);
@@ -273,7 +279,10 @@ pub fn setup_inbound<CMH: ChannelMessageHandler + 'static, L: Logger + 'static +
 /// not need to poll the provided future in order to make progress.
 ///
 /// See the module-level documentation for how to handle the event_notify mpsc::Sender.
-pub fn setup_outbound<CMH: ChannelMessageHandler + 'static, L: Logger + 'static + ?Sized>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<L>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) -> impl std::future::Future<Output=()> {
+pub fn setup_outbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) -> impl std::future::Future<Output=()> where
+               CMH: ChannelMessageHandler + 'static,
+               RMH: RoutingMessageHandler + 'static,
+               L: Logger + 'static + ?Sized {
        let (reader, mut write_receiver, read_receiver, us) = Connection::new(event_notify, stream);
        #[cfg(debug_assertions)]
        let last_us = Arc::clone(&us);
@@ -342,7 +351,10 @@ pub fn setup_outbound<CMH: ChannelMessageHandler + 'static, L: Logger + 'static
 /// make progress.
 ///
 /// See the module-level documentation for how to handle the event_notify mpsc::Sender.
-pub async fn connect_outbound<CMH: ChannelMessageHandler + 'static, L: Logger + 'static + ?Sized>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<L>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> {
+pub async fn connect_outbound<CMH, RMH, L>(peer_manager: Arc<peer_handler::PeerManager<SocketDescriptor, Arc<CMH>, Arc<RMH>, Arc<L>>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) -> Option<impl std::future::Future<Output=()>> where
+               CMH: ChannelMessageHandler + 'static,
+               RMH: RoutingMessageHandler + 'static,
+               L: Logger + 'static + ?Sized {
        if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), TcpStream::connect(&addr)).await {
                Some(setup_outbound(peer_manager, event_notify, their_node_id, stream))
        } else { None }
@@ -568,7 +580,7 @@ mod tests {
                });
                let a_manager = Arc::new(PeerManager::new(MessageHandler {
                        chan_handler: Arc::clone(&a_handler),
-                       route_handler: Arc::clone(&a_handler) as Arc<dyn RoutingMessageHandler>,
+                       route_handler: Arc::clone(&a_handler),
                }, a_key.clone(), &[1; 32], Arc::new(TestLogger())));
 
                let (b_connected_sender, mut b_connected) = mpsc::channel(1);
@@ -581,7 +593,7 @@ mod tests {
                });
                let b_manager = Arc::new(PeerManager::new(MessageHandler {
                        chan_handler: Arc::clone(&b_handler),
-                       route_handler: Arc::clone(&b_handler) as Arc<dyn RoutingMessageHandler>,
+                       route_handler: Arc::clone(&b_handler),
                }, b_key.clone(), &[2; 32], Arc::new(TestLogger())));
 
                // We bind on localhost, hoping the environment is properly configured with a local
index dc3c37c41fc3abeda9afdbce193147aba01beb4e..e6160b0e5ec597d76027dc9b61a54ae5d0315fff 100644 (file)
@@ -10,7 +10,7 @@ use bitcoin::secp256k1::key::{SecretKey,PublicKey};
 
 use ln::features::InitFeatures;
 use ln::msgs;
-use ln::msgs::ChannelMessageHandler;
+use ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
 use ln::channelmanager::{SimpleArcChannelManager, SimpleRefChannelManager};
 use util::ser::VecWriter;
 use ln::peer_channel_encryptor::{PeerChannelEncryptor,NextNoiseStep};
@@ -19,6 +19,7 @@ use ln::wire::Encode;
 use util::byte_utils;
 use util::events::{MessageSendEvent, MessageSendEventsProvider};
 use util::logger::Logger;
+use routing::network_graph::NetGraphMsgHandler;
 
 use std::collections::{HashMap,hash_map,HashSet,LinkedList};
 use std::sync::{Arc, Mutex};
@@ -31,13 +32,15 @@ use bitcoin::hashes::sha256::HashEngine as Sha256Engine;
 use bitcoin::hashes::{HashEngine, Hash};
 
 /// Provides references to trait impls which handle different types of messages.
-pub struct MessageHandler<CM: Deref> where CM::Target: msgs::ChannelMessageHandler {
+pub struct MessageHandler<CM: Deref, RM: Deref> where
+               CM::Target: ChannelMessageHandler,
+               RM::Target: RoutingMessageHandler {
        /// A message handler which handles messages specific to channels. Usually this is just a
        /// ChannelManager object.
        pub chan_handler: CM,
        /// A message handler which handles messages updating our knowledge of the network channel
        /// graph. Usually this is just a NetGraphMsgHandlerMonitor object.
-       pub route_handler: Arc<msgs::RoutingMessageHandler>,
+       pub route_handler: RM,
 }
 
 /// Provides an object which can be used to send data to and which uniquely identifies a connection
@@ -171,7 +174,7 @@ fn _check_usize_is_32_or_64() {
 /// lifetimes). Other times you can afford a reference, which is more efficient, in which case
 /// SimpleRefPeerManager is the more appropriate type. Defining these type aliases prevents
 /// issues such as overly long function definitions.
-pub type SimpleArcPeerManager<SD, M, T, F, L> = Arc<PeerManager<SD, SimpleArcChannelManager<M, T, F, L>, Arc<L>>>;
+pub type SimpleArcPeerManager<SD, M, T, F, C, L> = Arc<PeerManager<SD, SimpleArcChannelManager<M, T, F, L>, Arc<NetGraphMsgHandler<Arc<C>, Arc<L>>>, Arc<L>>>;
 
 /// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference
 /// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't
@@ -179,7 +182,7 @@ pub type SimpleArcPeerManager<SD, M, T, F, L> = Arc<PeerManager<SD, SimpleArcCha
 /// usage of lightning-net-tokio (since tokio::spawn requires parameters with static lifetimes).
 /// But if this is not necessary, using a reference is more efficient. Defining these type aliases
 /// helps with issues such as long function definitions.
-pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, SD, M, T, F, L> = PeerManager<SD, SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L>, &'e L>;
+pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, SD, M, T, F, C, L> = PeerManager<SD, SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, M, T, F, L>, &'e NetGraphMsgHandler<&'g C, &'f L>, &'f L>;
 
 /// A PeerManager manages a set of peers, described by their SocketDescriptor and marshalls socket
 /// events into messages which it passes on to its MessageHandlers.
@@ -189,8 +192,11 @@ pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, SD, M, T, F, L> = PeerManager<
 /// essentially you should default to using a SimpleRefPeerManager, and use a
 /// SimpleArcPeerManager when you require a PeerManager with a static lifetime, such as when
 /// you're using lightning-net-tokio.
-pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, L: Deref> where CM::Target: msgs::ChannelMessageHandler, L::Target: Logger {
-       message_handler: MessageHandler<CM>,
+pub struct PeerManager<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> where
+               CM::Target: ChannelMessageHandler,
+               RM::Target: RoutingMessageHandler,
+               L::Target: Logger {
+       message_handler: MessageHandler<CM, RM>,
        peers: Mutex<PeerHolder<Descriptor>>,
        our_node_secret: SecretKey,
        ephemeral_key_midstate: Sha256Engine,
@@ -213,11 +219,14 @@ macro_rules! encode_msg {
 
 /// Manages and reacts to connection events. You probably want to use file descriptors as PeerIds.
 /// PeerIds may repeat, but only after socket_disconnected() has been called.
-impl<Descriptor: SocketDescriptor, CM: Deref, L: Deref> PeerManager<Descriptor, CM, L> where CM::Target: msgs::ChannelMessageHandler, L::Target: Logger {
+impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<Descriptor, CM, RM, L> where
+               CM::Target: ChannelMessageHandler,
+               RM::Target: RoutingMessageHandler,
+               L::Target: Logger {
        /// Constructs a new PeerManager with the given message handlers and node_id secret key
        /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be
        /// cryptographically secure random bytes.
-       pub fn new(message_handler: MessageHandler<CM>, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L) -> PeerManager<Descriptor, CM, L> {
+       pub fn new(message_handler: MessageHandler<CM, RM>, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L) -> Self {
                let mut ephemeral_key_midstate = Sha256::engine();
                ephemeral_key_midstate.input(ephemeral_random_data);
 
@@ -1158,13 +1167,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, L: Deref> PeerManager<Descriptor,
 
 #[cfg(test)]
 mod tests {
-       use bitcoin::secp256k1::Signature;
-       use bitcoin::BitcoinHash;
-       use bitcoin::network::constants::Network;
-       use bitcoin::blockdata::constants::genesis_block;
        use ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor};
        use ln::msgs;
-       use ln::features::ChannelFeatures;
        use util::events;
        use util::test_utils;
 
@@ -1174,9 +1178,8 @@ mod tests {
        use rand::{thread_rng, Rng};
 
        use std;
-       use std::cmp::min;
        use std::sync::{Arc, Mutex};
-       use std::sync::atomic::{AtomicUsize, Ordering};
+       use std::sync::atomic::Ordering;
 
        #[derive(Clone)]
        struct FileDescriptor {
@@ -1206,18 +1209,18 @@ mod tests {
 
        struct PeerManagerCfg {
                chan_handler: test_utils::TestChannelMessageHandler,
+               routing_handler: test_utils::TestRoutingMessageHandler,
                logger: test_utils::TestLogger,
        }
 
        fn create_peermgr_cfgs(peer_count: usize) -> Vec<PeerManagerCfg> {
                let mut cfgs = Vec::new();
                for _ in 0..peer_count {
-                       let chan_handler = test_utils::TestChannelMessageHandler::new();
-                       let logger = test_utils::TestLogger::new();
                        cfgs.push(
                                PeerManagerCfg{
-                                       chan_handler,
-                                       logger,
+                                       chan_handler: test_utils::TestChannelMessageHandler::new(),
+                                       logger: test_utils::TestLogger::new(),
+                                       routing_handler: test_utils::TestRoutingMessageHandler::new(),
                                }
                        );
                }
@@ -1225,22 +1228,19 @@ mod tests {
                cfgs
        }
 
-       fn create_network<'a>(peer_count: usize, cfgs: &'a Vec<PeerManagerCfg>, routing_handlers: Option<&'a Vec<Arc<msgs::RoutingMessageHandler>>>) -> Vec<PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestLogger>> {
+       fn create_network<'a>(peer_count: usize, cfgs: &'a Vec<PeerManagerCfg>) -> Vec<PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger>> {
                let mut peers = Vec::new();
                let mut rng = thread_rng();
                let mut ephemeral_bytes = [0; 32];
                rng.fill_bytes(&mut ephemeral_bytes);
 
                for i in 0..peer_count {
-                       let router = if let Some(routers) = routing_handlers { routers[i].clone() } else {
-                               Arc::new(test_utils::TestRoutingMessageHandler::new())
-                       };
                        let node_id = {
                                let mut key_slice = [0;32];
                                rng.fill_bytes(&mut key_slice);
                                SecretKey::from_slice(&key_slice).unwrap()
                        };
-                       let msg_handler = MessageHandler { chan_handler: &cfgs[i].chan_handler, route_handler: router };
+                       let msg_handler = MessageHandler { chan_handler: &cfgs[i].chan_handler, route_handler: &cfgs[i].routing_handler };
                        let peer = PeerManager::new(msg_handler, node_id, &ephemeral_bytes, &cfgs[i].logger);
                        peers.push(peer);
                }
@@ -1248,7 +1248,7 @@ mod tests {
                peers
        }
 
-       fn establish_connection<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestLogger>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestLogger>) -> (FileDescriptor, FileDescriptor) {
+       fn establish_connection<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger>) -> (FileDescriptor, FileDescriptor) {
                let secp_ctx = Secp256k1::new();
                let a_id = PublicKey::from_secret_key(&secp_ctx, &peer_a.our_node_secret);
                let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) };
@@ -1261,7 +1261,7 @@ mod tests {
                (fd_a.clone(), fd_b.clone())
        }
 
-       fn establish_connection_and_read_events<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestLogger>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestLogger>) -> (FileDescriptor, FileDescriptor) {
+       fn establish_connection_and_read_events<'a>(peer_a: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger>, peer_b: &PeerManager<FileDescriptor, &'a test_utils::TestChannelMessageHandler, &'a test_utils::TestRoutingMessageHandler, &'a test_utils::TestLogger>) -> (FileDescriptor, FileDescriptor) {
                let (mut fd_a, mut fd_b) = establish_connection(peer_a, peer_b);
                assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
                assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false);
@@ -1274,7 +1274,7 @@ mod tests {
                // push a DisconnectPeer event to remove the node flagged by id
                let cfgs = create_peermgr_cfgs(2);
                let chan_handler = test_utils::TestChannelMessageHandler::new();
-               let mut peers = create_network(2, &cfgs, None);
+               let mut peers = create_network(2, &cfgs);
                establish_connection(&peers[0], &peers[1]);
                assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);
 
@@ -1296,7 +1296,7 @@ mod tests {
        fn test_timer_tick_occurred() {
                // Create peers, a vector of two peer managers, perform initial set up and check that peers[0] has one Peer.
                let cfgs = create_peermgr_cfgs(2);
-               let peers = create_network(2, &cfgs, None);
+               let peers = create_network(2, &cfgs);
                establish_connection(&peers[0], &peers[1]);
                assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1);
 
@@ -1309,119 +1309,13 @@ mod tests {
                assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 0);
        }
 
-       pub struct TestRoutingMessageHandler {
-               pub chan_upds_recvd: AtomicUsize,
-               pub chan_anns_recvd: AtomicUsize,
-               pub chan_anns_sent: AtomicUsize,
-       }
-
-       impl TestRoutingMessageHandler {
-               pub fn new() -> Self {
-                       TestRoutingMessageHandler {
-                               chan_upds_recvd: AtomicUsize::new(0),
-                               chan_anns_recvd: AtomicUsize::new(0),
-                               chan_anns_sent: AtomicUsize::new(0),
-                       }
-               }
-
-       }
-       impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
-               fn handle_node_announcement(&self, _msg: &msgs::NodeAnnouncement) -> Result<bool, msgs::LightningError> {
-                       Err(msgs::LightningError { err: "", action: msgs::ErrorAction::IgnoreError })
-               }
-               fn handle_channel_announcement(&self, _msg: &msgs::ChannelAnnouncement) -> Result<bool, msgs::LightningError> {
-                       self.chan_anns_recvd.fetch_add(1, Ordering::AcqRel);
-                       Err(msgs::LightningError { err: "", action: msgs::ErrorAction::IgnoreError })
-               }
-               fn handle_channel_update(&self, _msg: &msgs::ChannelUpdate) -> Result<bool, msgs::LightningError> {
-                       self.chan_upds_recvd.fetch_add(1, Ordering::AcqRel);
-                       Err(msgs::LightningError { err: "", action: msgs::ErrorAction::IgnoreError })
-               }
-               fn handle_htlc_fail_channel_update(&self, _update: &msgs::HTLCFailChannelUpdate) {}
-               fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> {
-                       let mut chan_anns = Vec::new();
-                       const TOTAL_UPDS: u64 = 100;
-                       let end: u64 =  min(starting_point + batch_amount as u64, TOTAL_UPDS - self.chan_anns_sent.load(Ordering::Acquire) as u64);
-                       for i in starting_point..end {
-                               let chan_upd_1 = get_dummy_channel_update(i);
-                               let chan_upd_2 = get_dummy_channel_update(i);
-                               let chan_ann = get_dummy_channel_announcement(i);
-
-                               chan_anns.push((chan_ann, Some(chan_upd_1), Some(chan_upd_2)));
-                       }
-
-                       self.chan_anns_sent.fetch_add(chan_anns.len(), Ordering::AcqRel);
-                       chan_anns
-               }
-
-               fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<msgs::NodeAnnouncement> {
-                       Vec::new()
-               }
-
-               fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool {
-                       true
-               }
-       }
-
-       fn get_dummy_channel_announcement(short_chan_id: u64) -> msgs::ChannelAnnouncement {
-               use bitcoin::secp256k1::ffi::Signature as FFISignature;
-               let secp_ctx = Secp256k1::new();
-               let network = Network::Testnet;
-               let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
-               let node_2_privkey = SecretKey::from_slice(&[41; 32]).unwrap();
-               let node_1_btckey = SecretKey::from_slice(&[40; 32]).unwrap();
-               let node_2_btckey = SecretKey::from_slice(&[39; 32]).unwrap();
-               let unsigned_ann = msgs::UnsignedChannelAnnouncement {
-                       features: ChannelFeatures::known(),
-                       chain_hash: genesis_block(network).header.bitcoin_hash(),
-                       short_channel_id: short_chan_id,
-                       node_id_1: PublicKey::from_secret_key(&secp_ctx, &node_1_privkey),
-                       node_id_2: PublicKey::from_secret_key(&secp_ctx, &node_2_privkey),
-                       bitcoin_key_1: PublicKey::from_secret_key(&secp_ctx, &node_1_btckey),
-                       bitcoin_key_2: PublicKey::from_secret_key(&secp_ctx, &node_2_btckey),
-                       excess_data: Vec::new(),
-               };
-
-               msgs::ChannelAnnouncement {
-                       node_signature_1: Signature::from(FFISignature::new()),
-                       node_signature_2: Signature::from(FFISignature::new()),
-                       bitcoin_signature_1: Signature::from(FFISignature::new()),
-                       bitcoin_signature_2: Signature::from(FFISignature::new()),
-                       contents: unsigned_ann,
-               }
-       }
-
-       fn get_dummy_channel_update(short_chan_id: u64) -> msgs::ChannelUpdate {
-               use bitcoin::secp256k1::ffi::Signature as FFISignature;
-               let network = Network::Testnet;
-               msgs::ChannelUpdate {
-                       signature: Signature::from(FFISignature::new()),
-                       contents: msgs::UnsignedChannelUpdate {
-                               chain_hash: genesis_block(network).header.bitcoin_hash(),
-                               short_channel_id: short_chan_id,
-                               timestamp: 0,
-                               flags: 0,
-                               cltv_expiry_delta: 0,
-                               htlc_minimum_msat: 0,
-                               fee_base_msat: 0,
-                               fee_proportional_millionths: 0,
-                               excess_data: vec![],
-                       }
-               }
-       }
-
        #[test]
        fn test_do_attempt_write_data() {
                // Create 2 peers with custom TestRoutingMessageHandlers and connect them.
                let cfgs = create_peermgr_cfgs(2);
-               let mut routing_handlers: Vec<Arc<msgs::RoutingMessageHandler>> = Vec::new();
-               let mut routing_handlers_concrete: Vec<Arc<TestRoutingMessageHandler>> = Vec::new();
-               for _ in 0..2 {
-                       let routing_handler = Arc::new(TestRoutingMessageHandler::new());
-                       routing_handlers.push(routing_handler.clone());
-                       routing_handlers_concrete.push(routing_handler.clone());
-               }
-               let peers = create_network(2, &cfgs, Some(&routing_handlers));
+               cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release);
+               cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release);
+               let peers = create_network(2, &cfgs);
 
                // By calling establish_connect, we trigger do_attempt_write_data between
                // the peers. Previously this function would mistakenly enter an infinite loop
@@ -1437,10 +1331,10 @@ mod tests {
 
                // Check that each peer has received the expected number of channel updates and channel
                // announcements.
-               assert_eq!(routing_handlers_concrete[0].clone().chan_upds_recvd.load(Ordering::Acquire), 100);
-               assert_eq!(routing_handlers_concrete[0].clone().chan_anns_recvd.load(Ordering::Acquire), 50);
-               assert_eq!(routing_handlers_concrete[1].clone().chan_upds_recvd.load(Ordering::Acquire), 100);
-               assert_eq!(routing_handlers_concrete[1].clone().chan_anns_recvd.load(Ordering::Acquire), 50);
+               assert_eq!(cfgs[0].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
+               assert_eq!(cfgs[0].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
+               assert_eq!(cfgs[1].routing_handler.chan_upds_recvd.load(Ordering::Acquire), 100);
+               assert_eq!(cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Acquire), 50);
        }
 
        #[test]
@@ -1448,11 +1342,8 @@ mod tests {
                // Inbound peer 0 requests initial_routing_sync, but outbound peer 1 does not.
                {
                        let cfgs = create_peermgr_cfgs(2);
-                       let routing_handlers: Vec<Arc<msgs::RoutingMessageHandler>> = vec![
-                               Arc::new(test_utils::TestRoutingMessageHandler::new().set_request_full_sync()),
-                               Arc::new(test_utils::TestRoutingMessageHandler::new()),
-                       ];
-                       let peers = create_network(2, &cfgs, Some(&routing_handlers));
+                       cfgs[0].routing_handler.request_full_sync.store(true, Ordering::Release);
+                       let peers = create_network(2, &cfgs);
                        let (fd_0_to_1, fd_1_to_0) = establish_connection_and_read_events(&peers[0], &peers[1]);
 
                        let peer_0 = peers[0].peers.lock().unwrap();
@@ -1468,11 +1359,8 @@ mod tests {
                // Outbound peer 1 requests initial_routing_sync, but inbound peer 0 does not.
                {
                        let cfgs = create_peermgr_cfgs(2);
-                       let routing_handlers: Vec<Arc<msgs::RoutingMessageHandler>> = vec![
-                               Arc::new(test_utils::TestRoutingMessageHandler::new()),
-                               Arc::new(test_utils::TestRoutingMessageHandler::new().set_request_full_sync()),
-                       ];
-                       let peers = create_network(2, &cfgs, Some(&routing_handlers));
+                       cfgs[1].routing_handler.request_full_sync.store(true, Ordering::Release);
+                       let peers = create_network(2, &cfgs);
                        let (fd_0_to_1, fd_1_to_0) = establish_connection_and_read_events(&peers[0], &peers[1]);
 
                        let peer_0 = peers[0].peers.lock().unwrap();
index b7b6f0422bd4866ba583ef95677651bac6aa5e01..e426ec2d340a6967955016352a6b78aa8e09569c 100644 (file)
@@ -3,15 +3,16 @@ use chain::chaininterface::{ConfirmationTarget, ChainError, ChainWatchInterface}
 use chain::transaction::OutPoint;
 use chain::keysinterface;
 use ln::channelmonitor;
-use ln::features::InitFeatures;
+use ln::features::{ChannelFeatures, InitFeatures};
 use ln::msgs;
-use ln::msgs::LightningError;
 use ln::channelmonitor::HTLCUpdate;
 use util::enforcing_trait_impls::EnforcingChannelKeys;
 use util::events;
 use util::logger::{Logger, Level, Record};
 use util::ser::{Readable, Writer, Writeable};
 
+use bitcoin::BitcoinHash;
+use bitcoin::blockdata::constants::genesis_block;
 use bitcoin::blockdata::transaction::Transaction;
 use bitcoin::blockdata::script::{Builder, Script};
 use bitcoin::blockdata::block::Block;
@@ -19,11 +20,12 @@ use bitcoin::blockdata::opcodes;
 use bitcoin::network::constants::Network;
 use bitcoin::hash_types::{Txid, BlockHash};
 
-use bitcoin::secp256k1::{SecretKey, PublicKey};
+use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1, Signature};
 
 use std::time::{SystemTime, UNIX_EPOCH};
 use std::sync::Mutex;
-use std::mem;
+use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
+use std::{cmp, mem};
 use std::collections::HashMap;
 
 pub struct TestVecWriter(pub Vec<u8>);
@@ -172,41 +174,105 @@ impl events::MessageSendEventsProvider for TestChannelMessageHandler {
        }
 }
 
+fn get_dummy_channel_announcement(short_chan_id: u64) -> msgs::ChannelAnnouncement {
+       use bitcoin::secp256k1::ffi::Signature as FFISignature;
+       let secp_ctx = Secp256k1::new();
+       let network = Network::Testnet;
+       let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
+       let node_2_privkey = SecretKey::from_slice(&[41; 32]).unwrap();
+       let node_1_btckey = SecretKey::from_slice(&[40; 32]).unwrap();
+       let node_2_btckey = SecretKey::from_slice(&[39; 32]).unwrap();
+       let unsigned_ann = msgs::UnsignedChannelAnnouncement {
+               features: ChannelFeatures::known(),
+               chain_hash: genesis_block(network).header.bitcoin_hash(),
+               short_channel_id: short_chan_id,
+               node_id_1: PublicKey::from_secret_key(&secp_ctx, &node_1_privkey),
+               node_id_2: PublicKey::from_secret_key(&secp_ctx, &node_2_privkey),
+               bitcoin_key_1: PublicKey::from_secret_key(&secp_ctx, &node_1_btckey),
+               bitcoin_key_2: PublicKey::from_secret_key(&secp_ctx, &node_2_btckey),
+               excess_data: Vec::new(),
+       };
+
+       msgs::ChannelAnnouncement {
+               node_signature_1: Signature::from(FFISignature::new()),
+               node_signature_2: Signature::from(FFISignature::new()),
+               bitcoin_signature_1: Signature::from(FFISignature::new()),
+               bitcoin_signature_2: Signature::from(FFISignature::new()),
+               contents: unsigned_ann,
+       }
+}
+
+fn get_dummy_channel_update(short_chan_id: u64) -> msgs::ChannelUpdate {
+       use bitcoin::secp256k1::ffi::Signature as FFISignature;
+       let network = Network::Testnet;
+       msgs::ChannelUpdate {
+               signature: Signature::from(FFISignature::new()),
+               contents: msgs::UnsignedChannelUpdate {
+                       chain_hash: genesis_block(network).header.bitcoin_hash(),
+                       short_channel_id: short_chan_id,
+                       timestamp: 0,
+                       flags: 0,
+                       cltv_expiry_delta: 0,
+                       htlc_minimum_msat: 0,
+                       fee_base_msat: 0,
+                       fee_proportional_millionths: 0,
+                       excess_data: vec![],
+               }
+       }
+}
+
 pub struct TestRoutingMessageHandler {
-       request_full_sync: bool,
+       pub chan_upds_recvd: AtomicUsize,
+       pub chan_anns_recvd: AtomicUsize,
+       pub chan_anns_sent: AtomicUsize,
+       pub request_full_sync: AtomicBool,
 }
 
 impl TestRoutingMessageHandler {
        pub fn new() -> Self {
                TestRoutingMessageHandler {
-                       request_full_sync: false,
+                       chan_upds_recvd: AtomicUsize::new(0),
+                       chan_anns_recvd: AtomicUsize::new(0),
+                       chan_anns_sent: AtomicUsize::new(0),
+                       request_full_sync: AtomicBool::new(false),
                }
        }
-
-       pub fn set_request_full_sync(mut self) -> Self {
-               self.request_full_sync = true;
-               self
-       }
 }
 impl msgs::RoutingMessageHandler for TestRoutingMessageHandler {
-       fn handle_node_announcement(&self, _msg: &msgs::NodeAnnouncement) -> Result<bool, LightningError> {
-               Err(LightningError { err: "", action: msgs::ErrorAction::IgnoreError })
+       fn handle_node_announcement(&self, _msg: &msgs::NodeAnnouncement) -> Result<bool, msgs::LightningError> {
+               Err(msgs::LightningError { err: "", action: msgs::ErrorAction::IgnoreError })
        }
-       fn handle_channel_announcement(&self, _msg: &msgs::ChannelAnnouncement) -> Result<bool, LightningError> {
-               Err(LightningError { err: "", action: msgs::ErrorAction::IgnoreError })
+       fn handle_channel_announcement(&self, _msg: &msgs::ChannelAnnouncement) -> Result<bool, msgs::LightningError> {
+               self.chan_anns_recvd.fetch_add(1, Ordering::AcqRel);
+               Err(msgs::LightningError { err: "", action: msgs::ErrorAction::IgnoreError })
        }
-       fn handle_channel_update(&self, _msg: &msgs::ChannelUpdate) -> Result<bool, LightningError> {
-               Err(LightningError { err: "", action: msgs::ErrorAction::IgnoreError })
+       fn handle_channel_update(&self, _msg: &msgs::ChannelUpdate) -> Result<bool, msgs::LightningError> {
+               self.chan_upds_recvd.fetch_add(1, Ordering::AcqRel);
+               Err(msgs::LightningError { err: "", action: msgs::ErrorAction::IgnoreError })
        }
        fn handle_htlc_fail_channel_update(&self, _update: &msgs::HTLCFailChannelUpdate) {}
-       fn get_next_channel_announcements(&self, _starting_point: u64, _batch_amount: u8) -> Vec<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> {
-               Vec::new()
+       fn get_next_channel_announcements(&self, starting_point: u64, batch_amount: u8) -> Vec<(msgs::ChannelAnnouncement, Option<msgs::ChannelUpdate>, Option<msgs::ChannelUpdate>)> {
+               let mut chan_anns = Vec::new();
+               const TOTAL_UPDS: u64 = 100;
+               let end: u64 = cmp::min(starting_point + batch_amount as u64, TOTAL_UPDS - self.chan_anns_sent.load(Ordering::Acquire) as u64);
+               for i in starting_point..end {
+                       let chan_upd_1 = get_dummy_channel_update(i);
+                       let chan_upd_2 = get_dummy_channel_update(i);
+                       let chan_ann = get_dummy_channel_announcement(i);
+
+                       chan_anns.push((chan_ann, Some(chan_upd_1), Some(chan_upd_2)));
+               }
+
+               self.chan_anns_sent.fetch_add(chan_anns.len(), Ordering::AcqRel);
+               chan_anns
        }
+
        fn get_next_node_announcements(&self, _starting_point: Option<&PublicKey>, _batch_amount: u8) -> Vec<msgs::NodeAnnouncement> {
                Vec::new()
        }
+
        fn should_request_full_sync(&self, _node_id: &PublicKey) -> bool {
-               self.request_full_sync
+               self.request_full_sync.load(Ordering::Acquire)
        }
 }