From: Elias Rohrer Date: Thu, 18 Jul 2024 08:22:30 +0000 (+0200) Subject: Prepare `lightning-net-tokio/src/lib.rs` X-Git-Tag: v0.0.124-beta~32^2~2 X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=commitdiff_plain;h=1aa4f0bb21b1d49021f3135fc469f6451088e975;p=rust-lightning Prepare `lightning-net-tokio/src/lib.rs` --- diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 5e926ae95..35124bab6 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -268,12 +268,17 @@ impl Connection { stream.set_nonblocking(true).unwrap(); let tokio_stream = Arc::new(TcpStream::from_std(stream).unwrap()); - (Arc::clone(&tokio_stream), write_receiver, read_receiver, - Arc::new(Mutex::new(Self { - writer: Some(tokio_stream), write_avail, read_waker, read_paused: false, + let id = ID_COUNTER.fetch_add(1, Ordering::AcqRel); + let writer = Some(Arc::clone(&tokio_stream)); + let conn = Arc::new(Mutex::new(Self { + writer, + write_avail, + read_waker, + read_paused: false, rl_requested_disconnect: false, - id: ID_COUNTER.fetch_add(1, Ordering::AcqRel) - }))) + id + })); + (tokio_stream, write_receiver, read_receiver, conn) } } @@ -308,7 +313,8 @@ where PM::Target: APeerManager { let last_us = Arc::clone(&us); let handle_opt = if peer_manager.as_ref().new_inbound_connection(SocketDescriptor::new(us.clone()), remote_addr).is_ok() { - Some(tokio::spawn(Connection::schedule_read(peer_manager, us, reader, read_receiver, write_receiver))) + let handle = tokio::spawn(Connection::schedule_read(peer_manager, us, reader, read_receiver, write_receiver)); + Some(handle) } else { // Note that we will skip socket_disconnected here, in accordance with the PeerManager // requirements. @@ -350,13 +356,13 @@ where PM::Target: APeerManager { #[cfg(test)] let last_us = Arc::clone(&us); let handle_opt = if let Ok(initial_send) = peer_manager.as_ref().new_outbound_connection(their_node_id, SocketDescriptor::new(us.clone()), remote_addr) { - Some(tokio::spawn(async move { + let handle = tokio::spawn(async move { // We should essentially always have enough room in a TCP socket buffer to send the // initial 10s of bytes. However, tokio running in single-threaded mode will always // fail writes and wake us back up later to write. Thus, we handle a single // std::task::Poll::Pending but still expect to write the full set of bytes at once // and use a relatively tight timeout. - if let Ok(Ok(())) = tokio::time::timeout(Duration::from_millis(100), async { + let send_fut = async { loop { match SocketDescriptor::new(us.clone()).send_data(&initial_send, true) { v if v == initial_send.len() => break Ok(()), @@ -373,10 +379,13 @@ where PM::Target: APeerManager { } } } - }).await { + }; + let timeout_send_fut = tokio::time::timeout(Duration::from_millis(100), send_fut); + if let Ok(Ok(())) = timeout_send_fut.await { Connection::schedule_read(peer_manager, us, reader, read_receiver, write_receiver).await; } - })) + }); + Some(handle) } else { // Note that we will skip socket_disconnected here, in accordance with the PeerManager // requirements. @@ -417,7 +426,8 @@ pub async fn connect_outbound( addr: SocketAddr, ) -> Option> where PM::Target: APeerManager { - if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }).await { + let connect_fut = async { TcpStream::connect(&addr).await.map(|s| s.into_std().unwrap()) }; + if let Ok(Ok(stream)) = time::timeout(Duration::from_secs(10), connect_fut).await { Some(setup_outbound(peer_manager, their_node_id, stream)) } else { None } } @@ -560,7 +570,7 @@ impl Hash for SocketDescriptor { mod tests { use lightning::ln::features::*; use lightning::ln::msgs::*; - use lightning::ln::peer_handler::{MessageHandler, PeerManager}; + use lightning::ln::peer_handler::{MessageHandler, PeerManager, IgnoringMessageHandler}; use lightning::routing::gossip::NodeId; use lightning::events::*; use lightning::util::test_utils::TestNodeSigner; @@ -699,12 +709,13 @@ mod tests { disconnected_flag: AtomicBool::new(false), msg_events: Mutex::new(Vec::new()), }); - let a_manager = Arc::new(PeerManager::new(MessageHandler { + let a_msg_handler = MessageHandler { chan_handler: Arc::clone(&a_handler), route_handler: Arc::clone(&a_handler), - onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}), - custom_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}), - }, 0, &[1; 32], Arc::new(TestLogger()), Arc::new(TestNodeSigner::new(a_key)))); + onion_message_handler: Arc::new(IgnoringMessageHandler{}), + custom_message_handler: Arc::new(IgnoringMessageHandler{}), + }; + let a_manager = Arc::new(PeerManager::new(a_msg_handler, 0, &[1; 32], Arc::new(TestLogger()), Arc::new(TestNodeSigner::new(a_key)))); let (b_connected_sender, mut b_connected) = mpsc::channel(1); let (b_disconnected_sender, mut b_disconnected) = mpsc::channel(1); @@ -715,12 +726,13 @@ mod tests { disconnected_flag: AtomicBool::new(false), msg_events: Mutex::new(Vec::new()), }); - let b_manager = Arc::new(PeerManager::new(MessageHandler { + let b_msg_handler = MessageHandler { chan_handler: Arc::clone(&b_handler), route_handler: Arc::clone(&b_handler), - onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}), - custom_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}), - }, 0, &[2; 32], Arc::new(TestLogger()), Arc::new(TestNodeSigner::new(b_key)))); + onion_message_handler: Arc::new(IgnoringMessageHandler{}), + custom_message_handler: Arc::new(IgnoringMessageHandler{}), + }; + let b_manager = Arc::new(PeerManager::new(b_msg_handler, 0, &[2; 32], Arc::new(TestLogger()), Arc::new(TestNodeSigner::new(b_key)))); // We bind on localhost, hoping the environment is properly configured with a local // address. This may not always be the case in containers and the like, so if this test is @@ -769,12 +781,13 @@ mod tests { let b_key = SecretKey::from_slice(&[2; 32]).unwrap(); let b_pub = PublicKey::from_secret_key(&secp_ctx, &b_key); - let a_manager = Arc::new(PeerManager::new(MessageHandler { + let a_msg_handler = MessageHandler { chan_handler: Arc::new(lightning::ln::peer_handler::ErroringMessageHandler::new()), - onion_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}), + onion_message_handler: Arc::new(IgnoringMessageHandler{}), route_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}), - custom_message_handler: Arc::new(lightning::ln::peer_handler::IgnoringMessageHandler{}), - }, 0, &[1; 32], Arc::new(TestLogger()), Arc::new(TestNodeSigner::new(a_key)))); + custom_message_handler: Arc::new(IgnoringMessageHandler{}), + }; + let a_manager = Arc::new(PeerManager::new(a_msg_handler, 0, &[1; 32], Arc::new(TestLogger()), Arc::new(TestNodeSigner::new(a_key)))); // Make two connections, one for an inbound and one for an outbound connection let conn_a = {