From 222f0cbdd21f7dc813bf0358133ebf7e09b13fa7 Mon Sep 17 00:00:00 2001 From: Valentine Wallace Date: Sun, 17 May 2020 12:13:29 -0400 Subject: [PATCH] PeerManager Logger Arc --> Deref --- fuzz/src/full_stack.rs | 4 +- lightning-net-tokio/src/lib.rs | 9 +++-- lightning/src/ln/peer_handler.rs | 67 ++++++++++++++++++-------------- 3 files changed, 45 insertions(+), 35 deletions(-) diff --git a/fuzz/src/full_stack.rs b/fuzz/src/full_stack.rs index 535fe91f..2af173cf 100644 --- a/fuzz/src/full_stack.rs +++ b/fuzz/src/full_stack.rs @@ -137,7 +137,7 @@ impl<'a> std::hash::Hash for Peer<'a> { struct MoneyLossDetector<'a> { manager: Arc, Arc, Arc, Arc>>, Arc, Arc, Arc, Arc>>, monitor: Arc, Arc, Arc, Arc>>, - handler: PeerManager, Arc, Arc, Arc, Arc>>, Arc, Arc, Arc, Arc>>>, + handler: PeerManager, Arc, Arc, Arc, Arc>>, Arc, Arc, Arc, Arc>>, Arc>, peers: &'a RefCell<[bool; 256]>, funding_txn: Vec, @@ -151,7 +151,7 @@ impl<'a> MoneyLossDetector<'a> { pub fn new(peers: &'a RefCell<[bool; 256]>, manager: Arc, Arc, Arc, Arc>>, Arc, Arc, Arc, Arc>>, monitor: Arc, Arc, Arc, Arc>>, - handler: PeerManager, Arc, Arc, Arc, Arc>>, Arc, Arc, Arc, Arc>>>) -> Self { + handler: PeerManager, Arc, Arc, Arc, Arc>>, Arc, Arc, Arc, Arc>>, Arc>) -> Self { MoneyLossDetector { manager, monitor, diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index c55a04b2..15226642 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -71,6 +71,7 @@ use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt}; use lightning::ln::peer_handler; use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait; use lightning::ln::msgs::ChannelMessageHandler; +use lightning::util::logger::Logger; use std::{task, thread}; use std::net::SocketAddr; @@ -123,7 +124,7 @@ impl Connection { _ => panic!() } } - async fn schedule_read(peer_manager: Arc>>, us: Arc>, mut reader: io::ReadHalf, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) { + async fn schedule_read(peer_manager: Arc, Arc>>, us: Arc>, mut reader: io::ReadHalf, mut read_wake_receiver: mpsc::Receiver<()>, mut write_avail_receiver: mpsc::Receiver<()>) { let peer_manager_ref = peer_manager.clone(); // 8KB is nice and big but also should never cause any issues with stack overflowing. let mut buf = [0; 8192]; @@ -233,7 +234,7 @@ impl Connection { /// not need to poll the provided future in order to make progress. /// /// See the module-level documentation for how to handle the event_notify mpsc::Sender. -pub fn setup_inbound(peer_manager: Arc>>, event_notify: mpsc::Sender<()>, stream: TcpStream) -> impl std::future::Future { +pub fn setup_inbound(peer_manager: Arc, Arc>>, event_notify: mpsc::Sender<()>, stream: TcpStream) -> impl std::future::Future { let (reader, write_receiver, read_receiver, us) = Connection::new(event_notify, stream); #[cfg(debug_assertions)] let last_us = Arc::clone(&us); @@ -272,7 +273,7 @@ pub fn setup_inbound(peer_manager: Arc(peer_manager: Arc>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) -> impl std::future::Future { +pub fn setup_outbound(peer_manager: Arc, Arc>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, stream: TcpStream) -> impl std::future::Future { let (reader, mut write_receiver, read_receiver, us) = Connection::new(event_notify, stream); #[cfg(debug_assertions)] let last_us = Arc::clone(&us); @@ -341,7 +342,7 @@ pub fn setup_outbound(peer_manager: Arc(peer_manager: Arc>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) -> Option> { +pub async fn connect_outbound(peer_manager: Arc, Arc>>, event_notify: mpsc::Sender<()>, their_node_id: PublicKey, addr: SocketAddr) -> Option> { if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), TcpStream::connect(&addr)).await { Some(setup_outbound(peer_manager, event_notify, their_node_id, stream)) } else { None } diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index a983a6c8..dc3c37c4 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -171,7 +171,7 @@ fn _check_usize_is_32_or_64() { /// lifetimes). Other times you can afford a reference, which is more efficient, in which case /// SimpleRefPeerManager is the more appropriate type. Defining these type aliases prevents /// issues such as overly long function definitions. -pub type SimpleArcPeerManager = Arc>>; +pub type SimpleArcPeerManager = Arc, Arc>>; /// SimpleRefPeerManager is a type alias for a PeerManager reference, and is the reference /// counterpart to the SimpleArcPeerManager type alias. Use this type by default when you don't @@ -179,7 +179,7 @@ pub type SimpleArcPeerManager = Arc = PeerManager>; +pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, SD, M, T, F, L> = PeerManager, &'e L>; /// A PeerManager manages a set of peers, described by their SocketDescriptor and marshalls socket /// events into messages which it passes on to its MessageHandlers. @@ -189,7 +189,7 @@ pub type SimpleRefPeerManager<'a, 'b, 'c, 'd, 'e, SD, M, T, F, L> = PeerManager< /// essentially you should default to using a SimpleRefPeerManager, and use a /// SimpleArcPeerManager when you require a PeerManager with a static lifetime, such as when /// you're using lightning-net-tokio. -pub struct PeerManager where CM::Target: msgs::ChannelMessageHandler { +pub struct PeerManager where CM::Target: msgs::ChannelMessageHandler, L::Target: Logger { message_handler: MessageHandler, peers: Mutex>, our_node_secret: SecretKey, @@ -200,7 +200,7 @@ pub struct PeerManager where CM::Target peer_counter_low: AtomicUsize, peer_counter_high: AtomicUsize, - logger: Arc, + logger: L, } macro_rules! encode_msg { @@ -213,22 +213,22 @@ macro_rules! encode_msg { /// Manages and reacts to connection events. You probably want to use file descriptors as PeerIds. /// PeerIds may repeat, but only after socket_disconnected() has been called. -impl PeerManager where CM::Target: msgs::ChannelMessageHandler { +impl PeerManager where CM::Target: msgs::ChannelMessageHandler, L::Target: Logger { /// Constructs a new PeerManager with the given message handlers and node_id secret key /// ephemeral_random_data is used to derive per-connection ephemeral keys and must be /// cryptographically secure random bytes. - pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: Arc) -> PeerManager { + pub fn new(message_handler: MessageHandler, our_node_secret: SecretKey, ephemeral_random_data: &[u8; 32], logger: L) -> PeerManager { let mut ephemeral_key_midstate = Sha256::engine(); ephemeral_key_midstate.input(ephemeral_random_data); PeerManager { - message_handler: message_handler, + message_handler, peers: Mutex::new(PeerHolder { peers: HashMap::new(), peers_needing_send: HashSet::new(), node_id_to_descriptor: HashMap::new() }), - our_node_secret: our_node_secret, + our_node_secret, ephemeral_key_midstate, peer_counter_low: AtomicUsize::new(0), peer_counter_high: AtomicUsize::new(0), @@ -1167,7 +1167,6 @@ mod tests { use ln::features::ChannelFeatures; use util::events; use util::test_utils; - use util::logger::Logger; use bitcoin::secp256k1::Secp256k1; use bitcoin::secp256k1::key::{SecretKey, PublicKey}; @@ -1205,20 +1204,30 @@ mod tests { fn disconnect_socket(&mut self) {} } - fn create_chan_handlers(peer_count: usize) -> Vec { - let mut chan_handlers = Vec::new(); + struct PeerManagerCfg { + chan_handler: test_utils::TestChannelMessageHandler, + logger: test_utils::TestLogger, + } + + fn create_peermgr_cfgs(peer_count: usize) -> Vec { + let mut cfgs = Vec::new(); for _ in 0..peer_count { let chan_handler = test_utils::TestChannelMessageHandler::new(); - chan_handlers.push(chan_handler); + let logger = test_utils::TestLogger::new(); + cfgs.push( + PeerManagerCfg{ + chan_handler, + logger, + } + ); } - chan_handlers + cfgs } - fn create_network<'a>(peer_count: usize, chan_handlers: &'a Vec, routing_handlers: Option<&'a Vec>>) -> Vec> { + fn create_network<'a>(peer_count: usize, cfgs: &'a Vec, routing_handlers: Option<&'a Vec>>) -> Vec> { let mut peers = Vec::new(); let mut rng = thread_rng(); - let logger : Arc = Arc::new(test_utils::TestLogger::new()); let mut ephemeral_bytes = [0; 32]; rng.fill_bytes(&mut ephemeral_bytes); @@ -1231,15 +1240,15 @@ mod tests { rng.fill_bytes(&mut key_slice); SecretKey::from_slice(&key_slice).unwrap() }; - let msg_handler = MessageHandler { chan_handler: &chan_handlers[i], route_handler: router }; - let peer = PeerManager::new(msg_handler, node_id, &ephemeral_bytes, Arc::clone(&logger)); + let msg_handler = MessageHandler { chan_handler: &cfgs[i].chan_handler, route_handler: router }; + let peer = PeerManager::new(msg_handler, node_id, &ephemeral_bytes, &cfgs[i].logger); peers.push(peer); } peers } - fn establish_connection<'a>(peer_a: &PeerManager, peer_b: &PeerManager) -> (FileDescriptor, FileDescriptor) { + fn establish_connection<'a>(peer_a: &PeerManager, peer_b: &PeerManager) -> (FileDescriptor, FileDescriptor) { let secp_ctx = Secp256k1::new(); let a_id = PublicKey::from_secret_key(&secp_ctx, &peer_a.our_node_secret); let mut fd_a = FileDescriptor { fd: 1, outbound_data: Arc::new(Mutex::new(Vec::new())) }; @@ -1252,7 +1261,7 @@ mod tests { (fd_a.clone(), fd_b.clone()) } - fn establish_connection_and_read_events<'a>(peer_a: &PeerManager, peer_b: &PeerManager) -> (FileDescriptor, FileDescriptor) { + fn establish_connection_and_read_events<'a>(peer_a: &PeerManager, peer_b: &PeerManager) -> (FileDescriptor, FileDescriptor) { let (mut fd_a, mut fd_b) = establish_connection(peer_a, peer_b); assert_eq!(peer_b.read_event(&mut fd_b, &fd_a.outbound_data.lock().unwrap().split_off(0)).unwrap(), false); assert_eq!(peer_a.read_event(&mut fd_a, &fd_b.outbound_data.lock().unwrap().split_off(0)).unwrap(), false); @@ -1263,9 +1272,9 @@ mod tests { fn test_disconnect_peer() { // Simple test which builds a network of PeerManager, connects and brings them to NoiseState::Finished and // push a DisconnectPeer event to remove the node flagged by id - let chan_handlers = create_chan_handlers(2); + let cfgs = create_peermgr_cfgs(2); let chan_handler = test_utils::TestChannelMessageHandler::new(); - let mut peers = create_network(2, &chan_handlers, None); + let mut peers = create_network(2, &cfgs, None); establish_connection(&peers[0], &peers[1]); assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1); @@ -1286,8 +1295,8 @@ mod tests { #[test] fn test_timer_tick_occurred() { // Create peers, a vector of two peer managers, perform initial set up and check that peers[0] has one Peer. - let chan_handlers = create_chan_handlers(2); - let peers = create_network(2, &chan_handlers, None); + let cfgs = create_peermgr_cfgs(2); + let peers = create_network(2, &cfgs, None); establish_connection(&peers[0], &peers[1]); assert_eq!(peers[0].peers.lock().unwrap().peers.len(), 1); @@ -1404,7 +1413,7 @@ mod tests { #[test] fn test_do_attempt_write_data() { // Create 2 peers with custom TestRoutingMessageHandlers and connect them. - let chan_handlers = create_chan_handlers(2); + let cfgs = create_peermgr_cfgs(2); let mut routing_handlers: Vec> = Vec::new(); let mut routing_handlers_concrete: Vec> = Vec::new(); for _ in 0..2 { @@ -1412,7 +1421,7 @@ mod tests { routing_handlers.push(routing_handler.clone()); routing_handlers_concrete.push(routing_handler.clone()); } - let peers = create_network(2, &chan_handlers, Some(&routing_handlers)); + let peers = create_network(2, &cfgs, Some(&routing_handlers)); // By calling establish_connect, we trigger do_attempt_write_data between // the peers. Previously this function would mistakenly enter an infinite loop @@ -1438,12 +1447,12 @@ mod tests { fn limit_initial_routing_sync_requests() { // Inbound peer 0 requests initial_routing_sync, but outbound peer 1 does not. { - let chan_handlers = create_chan_handlers(2); + let cfgs = create_peermgr_cfgs(2); let routing_handlers: Vec> = vec![ Arc::new(test_utils::TestRoutingMessageHandler::new().set_request_full_sync()), Arc::new(test_utils::TestRoutingMessageHandler::new()), ]; - let peers = create_network(2, &chan_handlers, Some(&routing_handlers)); + let peers = create_network(2, &cfgs, Some(&routing_handlers)); let (fd_0_to_1, fd_1_to_0) = establish_connection_and_read_events(&peers[0], &peers[1]); let peer_0 = peers[0].peers.lock().unwrap(); @@ -1458,12 +1467,12 @@ mod tests { // Outbound peer 1 requests initial_routing_sync, but inbound peer 0 does not. { - let chan_handlers = create_chan_handlers(2); + let cfgs = create_peermgr_cfgs(2); let routing_handlers: Vec> = vec![ Arc::new(test_utils::TestRoutingMessageHandler::new()), Arc::new(test_utils::TestRoutingMessageHandler::new().set_request_full_sync()), ]; - let peers = create_network(2, &chan_handlers, Some(&routing_handlers)); + let peers = create_network(2, &cfgs, Some(&routing_handlers)); let (fd_0_to_1, fd_1_to_0) = establish_connection_and_read_events(&peers[0], &peers[1]); let peer_0 = peers[0].peers.lock().unwrap(); -- 2.30.2