]> git.bitcoin.ninja Git - rust-lightning/commitdiff
Prepare `lightning-net-tokio/src/lib.rs`
authorElias Rohrer <dev@tnull.de>
Thu, 18 Jul 2024 08:22:30 +0000 (10:22 +0200)
committerElias Rohrer <dev@tnull.de>
Thu, 18 Jul 2024 08:22:30 +0000 (10:22 +0200)
lightning-net-tokio/src/lib.rs

index 5e926ae95f56e67340a3740a29052da4e6525936..35124bab6a260b42cf44f3d05cfca78b885560f3 100644 (file)
@@ -268,12 +268,17 @@ impl Connection {
                stream.set_nonblocking(true).unwrap();
                let tokio_stream = Arc::new(TcpStream::from_std(stream).unwrap());
 
-               (Arc::clone(&tokio_stream), write_receiver, read_receiver,
-               Arc::new(Mutex::new(Self {
-                       writer: Some(tokio_stream), write_avail, read_waker, read_paused: false,
+               let id = ID_COUNTER.fetch_add(1, Ordering::AcqRel);
+               let writer = Some(Arc::clone(&tokio_stream));
+               let conn = Arc::new(Mutex::new(Self {
+                       writer,
+                       write_avail,
+                       read_waker,
+                       read_paused: false,
                        rl_requested_disconnect: false,
-                       id: ID_COUNTER.fetch_add(1, Ordering::AcqRel)
-               })))
+                       id
+               }));
+               (tokio_stream, write_receiver, read_receiver, conn)
        }
 }
 
@@ -308,7 +313,8 @@ where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
        let last_us = Arc::clone(&us);
 
        let handle_opt = if peer_manager.as_ref().new_inbound_connection(SocketDescriptor::new(us.clone()), remote_addr).is_ok() {
-               Some(tokio::spawn(Connection::schedule_read(peer_manager, us, reader, read_receiver, write_receiver)))
+               let handle = tokio::spawn(Connection::schedule_read(peer_manager, us, reader, read_receiver, write_receiver));
+               Some(handle)
        } else {
                // Note that we will skip socket_disconnected here, in accordance with the PeerManager
                // requirements.
@@ -350,13 +356,13 @@ where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
        #[cfg(test)]
        let last_us = Arc::clone(&us);
        let handle_opt = if let Ok(initial_send) = peer_manager.as_ref().new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone()), remote_addr) {
-               Some(tokio::spawn(async move {
+               let handle = tokio::spawn(async move {
                        // We should essentially always have enough room in a TCP socket buffer to send the
                        // initial 10s of bytes. However, tokio running in single-threaded mode will always
                        // fail writes and wake us back up later to write. Thus, we handle a single
                        // std::task::Poll::Pending but still expect to write the full set of bytes at once
                        // and use a relatively tight timeout.
-                       if let Ok(Ok(())) = tokio::time::timeout(Duration::from_millis(100), async {
+                       let send_fut = async {
                                loop {
                                        match SocketDescriptor::new(us.clone()).send_data(&initial_send, true) {
                                                v if v == initial_send.len() => break Ok(()),
@@ -373,10 +379,13 @@ where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
                                                }
                                        }
                                }
-                       }).await {
+                       };
+                       let timeout_send_fut = tokio::time::timeout(Duration::from_millis(100), send_fut);
+                       if let Ok(Ok(())) = timeout_send_fut.await {
                                Connection::schedule_read(peer_manager, us, reader, read_receiver, write_receiver).await;
                        }
-               }))
+               });
+               Some(handle)
        } else {
                // Note that we will skip socket_disconnected here, in accordance with the PeerManager
                // requirements.
@@ -417,7 +426,8 @@ pub async fn connect_outbound<PM: Deref + 'static + Send + Sync + Clone>(
        addr: SocketAddr,
 ) -> Option<impl std::future::Future<Output=()>>
 where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
-       if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }).await {
+       let connect_fut = async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) };
+       if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), connect_fut).await {
                Some(setup_outbound(peer_manager, their_node_id, stream))
        } else { None }
 }
@@ -560,7 +570,7 @@ impl Hash for SocketDescriptor {
 mod tests {
        use lightning::ln::features::*;
        use lightning::ln::msgs::*;
-       use lightning::ln::peer_handler::{MessageHandler, PeerManager};
+       use lightning::ln::peer_handler::{MessageHandler, PeerManager, IgnoringMessageHandler};
        use lightning::routing::gossip::NodeId;
        use lightning::events::*;
        use lightning::util::test_utils::TestNodeSigner;
@@ -699,12 +709,13 @@ mod tests {
                        disconnected_flag: AtomicBool::new(false),
                        msg_events: Mutex::new(Vec::new()),
                });
-               let a_manager = Arc::new(PeerManager::new(MessageHandler {
+               let a_msg_handler = MessageHandler {
                        chan_handler: Arc::clone(&a_handler),
                        route_handler: Arc::clone(&a_handler),
-                       onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
-                       custom_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
-               }, 0, &[1; 32], Arc::new(TestLogger()), Arc::new(TestNodeSigner::new(a_key))));
+                       onion_message_handler: Arc::new(IgnoringMessageHandler{}),
+                       custom_message_handler: Arc::new(IgnoringMessageHandler{}),
+               };
+               let a_manager = Arc::new(PeerManager::new(a_msg_handler, 0, &[1; 32], Arc::new(TestLogger()), Arc::new(TestNodeSigner::new(a_key))));
 
                let (b_connected_sender, mut b_connected) = mpsc::channel(1);
                let (b_disconnected_sender, mut b_disconnected) = mpsc::channel(1);
@@ -715,12 +726,13 @@ mod tests {
                        disconnected_flag: AtomicBool::new(false),
                        msg_events: Mutex::new(Vec::new()),
                });
-               let b_manager = Arc::new(PeerManager::new(MessageHandler {
+               let b_msg_handler = MessageHandler {
                        chan_handler: Arc::clone(&b_handler),
                        route_handler: Arc::clone(&b_handler),
-                       onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
-                       custom_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
-               }, 0, &[2; 32], Arc::new(TestLogger()), Arc::new(TestNodeSigner::new(b_key))));
+                       onion_message_handler: Arc::new(IgnoringMessageHandler{}),
+                       custom_message_handler: Arc::new(IgnoringMessageHandler{}),
+               };
+               let b_manager = Arc::new(PeerManager::new(b_msg_handler, 0, &[2; 32], Arc::new(TestLogger()), Arc::new(TestNodeSigner::new(b_key))));
 
                // We bind on localhost, hoping the environment is properly configured with a local
                // address. This may not always be the case in containers and the like, so if this test is
@@ -769,12 +781,13 @@ mod tests {
                let b_key = SecretKey::from_slice(&[2; 32]).unwrap();
                let b_pub = PublicKey::from_secret_key(&secp_ctx, &b_key);
 
-               let a_manager = Arc::new(PeerManager::new(MessageHandler {
+               let a_msg_handler = MessageHandler {
                        chan_handler: Arc::new(lightning::ln::peer_handler::ErroringMessageHandler::new()),
-                       onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
+                       onion_message_handler: Arc::new(IgnoringMessageHandler{}),
                        route_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
-                       custom_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}),
-               }, 0, &[1; 32], Arc::new(TestLogger()), Arc::new(TestNodeSigner::new(a_key))));
+                       custom_message_handler: Arc::new(IgnoringMessageHandler{}),
+               };
+               let a_manager = Arc::new(PeerManager::new(a_msg_handler, 0, &[1; 32], Arc::new(TestLogger()), Arc::new(TestNodeSigner::new(a_key))));
 
                // Make two connections, one for an inbound and one for an outbound connection
                let conn_a = {