Drop dep `tokio`'s `io-util` feat as it broke MSRV and isn't useful
authorMatt Corallo <git@bluematt.me>
Mon, 28 Aug 2023 18:39:04 +0000 (18:39 +0000)
committerMatt Corallo <git@bluematt.me>
Mon, 28 Aug 2023 21:16:54 +0000 (21:16 +0000)
We use `tokio`'s `io-util` feature to provide the
`Async{Read,Write}Ext` traits, which allow us to simply launch a
read future or `poll_write` directly as well as `split` the
`TcpStream` into a read/write half. However, these traits aren't
actually doing much for us - they are really just wrapping the
`readable` future (which we can trivially use ourselves) and
`poll_write` isn't doing anything for us that `poll_write_ready`
can't.

Similarly, the split logic is actually just `Arc`ing the
`TcpStream` and busy-waiting when an operation is busy to prevent
concurrent reads/writes. However, there's no reason to prevent
concurrent access at the stream level - we aren't ever concurrently
writing or reading (though we may concurrently read and write,
which is fine).

Worse, the `io-util` feature broke MSRV (though they're likely to
fix this upstream) and carries two additional dependencies (only
one on the latest upstream tokio).

Thus, we simply drop the dependency here.

Fixes #2527.

lightning-net-tokio/Cargo.toml
lightning-net-tokio/src/lib.rs

index c2befdae51cbf5d44d22014cb364c4fde39a6661..d2ac6e5474543db1392926dc8dfebf07715b0d86 100644 (file)
@@ -17,8 +17,8 @@ rustdoc-args = ["--cfg", "docsrs"]
 [dependencies]
 bitcoin = "0.29.0"
 lightning = { version = "0.0.116", path = "../lightning" }
-tokio = { version = "1.0", features = [ "io-util", "rt", "sync", "net", "time" ] }
+tokio = { version = "1.0", features = [ "rt", "sync", "net", "time" ] }
 
 [dev-dependencies]
-tokio = { version = "1.14", features = [ "io-util", "macros", "rt", "rt-multi-thread", "sync", "net", "time" ] }
+tokio = { version = "1.14", features = [ "macros", "rt", "rt-multi-thread", "sync", "net", "time" ] }
 lightning = { version = "0.0.116", path = "../lightning", features = ["_test_utils"] }
index d6d8004164bf9ab54db94ee64b55ef63f5f57e74..6e2ea3f14c14f54b890f8ff504e4f3fe6dd0a304 100644 (file)
 
 use bitcoin::secp256k1::PublicKey;
 
-use tokio::net::TcpStream;
+use tokio::net::{tcp, TcpStream};
 use tokio::{io, time};
 use tokio::sync::mpsc;
-use tokio::io::{AsyncReadExt, AsyncWrite, AsyncWriteExt};
+use tokio::io::AsyncWrite;
 
 use lightning::ln::peer_handler;
 use lightning::ln::peer_handler::SocketDescriptor as LnSocketTrait;
@@ -59,7 +59,7 @@ static ID_COUNTER: AtomicU64 = AtomicU64::new(0);
 // define a trivial two- and three- select macro with the specific types we need and just use that.
 
 pub(crate) enum SelectorOutput {
-       A(Option<()>), B(Option<()>), C(tokio::io::Result<usize>),
+       A(Option<()>), B(Option<()>), C(tokio::io::Result<()>),
 }
 
 pub(crate) struct TwoSelector<
@@ -87,7 +87,7 @@ impl<
 }
 
 pub(crate) struct ThreeSelector<
-       A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<usize>> + Unpin
+       A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<()>> + Unpin
 > {
        pub a: A,
        pub b: B,
@@ -95,7 +95,7 @@ pub(crate) struct ThreeSelector<
 }
 
 impl<
-       A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<usize>> + Unpin
+       A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<()>> + Unpin
 > Future for ThreeSelector<A, B, C> {
        type Output = SelectorOutput;
        fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<SelectorOutput> {
@@ -119,7 +119,7 @@ impl<
 /// Connection object (in an Arc<Mutex<>>) in each SocketDescriptor we create as well as in the
 /// read future (which is returned by schedule_read).
 struct Connection {
-       writer: Option<io::WriteHalf<TcpStream>>,
+       writer: Option<Arc<TcpStream>>,
        // Because our PeerManager is templated by user-provided types, and we can't (as far as I can
        // tell) have a const RawWakerVTable built out of templated functions, we need some indirection
        // between being woken up with write-ready and calling PeerManager::write_buffer_space_avail.
@@ -156,7 +156,7 @@ impl Connection {
        async fn schedule_read<PM: Deref + 'static + Send + Sync + Clone>(
                peer_manager: PM,
                us: Arc<Mutex<Self>>,
-               mut reader: io::ReadHalf<TcpStream>,
+               reader: Arc<TcpStream>,
                mut read_wake_receiver: mpsc::Receiver<()>,
                mut write_avail_receiver: mpsc::Receiver<()>,
        ) where PM::Target: APeerManager<Descriptor = SocketDescriptor> {
@@ -200,7 +200,7 @@ impl Connection {
                                ThreeSelector {
                                        a: Box::pin(write_avail_receiver.recv()),
                                        b: Box::pin(read_wake_receiver.recv()),
-                                       c: Box::pin(reader.read(&mut buf)),
+                                       c: Box::pin(reader.readable()),
                                }.await
                        };
                        match select_result {
@@ -211,8 +211,9 @@ impl Connection {
                                        }
                                },
                                SelectorOutput::B(_) => {},
-                               SelectorOutput::C(read) => {
-                                       match read {
+                               SelectorOutput::C(res) => {
+                                       if res.is_err() { break Disconnect::PeerDisconnected; }
+                                       match reader.try_read(&mut buf) {
                                                Ok(0) => break Disconnect::PeerDisconnected,
                                                Ok(len) => {
                                                        let read_res = peer_manager.as_ref().read_event(&mut our_descriptor, &buf[0..len]);
@@ -226,7 +227,11 @@ impl Connection {
                                                                Err(_) => break Disconnect::CloseConnection,
                                                        }
                                                },
-                                               Err(_) => break Disconnect::PeerDisconnected,
+                                               Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
+                                                       // readable() is allowed to spuriously wake, so we have to handle
+                                                       // WouldBlock here.
+                                               },
+                                               Err(e) => break Disconnect::PeerDisconnected,
                                        }
                                },
                        }
@@ -239,18 +244,14 @@ impl Connection {
                        // here.
                        let _ = tokio::task::yield_now().await;
                };
-               let writer_option = us.lock().unwrap().writer.take();
-               if let Some(mut writer) = writer_option {
-                       // If the socket is already closed, shutdown() will fail, so just ignore it.
-                       let _ = writer.shutdown().await;
-               }
+               us.lock().unwrap().writer.take();
                if let Disconnect::PeerDisconnected = disconnect_type {
                        peer_manager.as_ref().socket_disconnected(&our_descriptor);
                        peer_manager.as_ref().process_events();
                }
        }
 
-       fn new(stream: StdTcpStream) -> (io::ReadHalf<TcpStream>, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc<Mutex<Self>>) {
+       fn new(stream: StdTcpStream) -> (Arc<TcpStream>, mpsc::Receiver<()>, mpsc::Receiver<()>, Arc<Mutex<Self>>) {
                // We only ever need a channel of depth 1 here: if we returned a non-full write to the
                // PeerManager, we will eventually get notified that there is room in the socket to write
                // new bytes, which will generate an event. That event will be popped off the queue before
@@ -262,11 +263,11 @@ impl Connection {
                // false.
                let (read_waker, read_receiver) = mpsc::channel(1);
                stream.set_nonblocking(true).unwrap();
-               let (reader, writer) = io::split(TcpStream::from_std(stream).unwrap());
+               let tokio_stream = Arc::new(TcpStream::from_std(stream).unwrap());
 
-               (reader, write_receiver, read_receiver,
+               (Arc::clone(&tokio_stream), write_receiver, read_receiver,
                Arc::new(Mutex::new(Self {
-                       writer: Some(writer), write_avail, read_waker, read_paused: false,
+                       writer: Some(tokio_stream), write_avail, read_waker, read_paused: false,
                        rl_requested_disconnect: false,
                        id: ID_COUNTER.fetch_add(1, Ordering::AcqRel)
                })))
@@ -462,9 +463,9 @@ impl SocketDescriptor {
 }
 impl peer_handler::SocketDescriptor for SocketDescriptor {
        fn send_data(&mut self, data: &[u8], resume_read: bool) -> usize {
-               // To send data, we take a lock on our Connection to access the WriteHalf of the TcpStream,
-               // writing to it if there's room in the kernel buffer, or otherwise create a new Waker with
-               // SocketDescriptor in it which can wake up the write_avail Sender, waking up the
+               // To send data, we take a lock on our Connection to access the TcpStream, writing to it if
+               // there's room in the kernel buffer, or otherwise create a new Waker with a
+               // SocketDescriptor in it which can wake up the write_avail Sender, waking up the
                // processing future which will call write_buffer_space_avail and we'll end up back here.
                let mut us = self.conn.lock().unwrap();
                if us.writer.is_none() {
@@ -484,24 +485,18 @@ impl peer_handler::SocketDescriptor for SocketDescriptor {
                let mut ctx = task::Context::from_waker(&waker);
                let mut written_len = 0;
                loop {
-                       match std::pin::Pin::new(us.writer.as_mut().unwrap()).poll_write(&mut ctx, &data[written_len..]) {
-                               task::Poll::Ready(Ok(res)) => {
-                                       // The tokio docs *seem* to indicate this can't happen, and I certainly don't
-                                       // know how to handle it if it does (cause it should be a Poll::Pending
-                                       // instead):
-                                       assert_ne!(res, 0);
-                                       written_len += res;
-                                       if written_len == data.len() { return written_len; }
-                               },
-                               task::Poll::Ready(Err(e)) => {
-                                       // The tokio docs *seem* to indicate this can't happen, and I certainly don't
-                                       // know how to handle it if it does (cause it should be a Poll::Pending
-                                       // instead):
-                                       assert_ne!(e.kind(), io::ErrorKind::WouldBlock);
-                                       // Probably we've already been closed, just return what we have and let the
-                                       // read thread handle closing logic.
-                                       return written_len;
+                       match us.writer.as_ref().unwrap().poll_write_ready(&mut ctx) {
+                               task::Poll::Ready(Ok(())) => {
+                                       match us.writer.as_ref().unwrap().try_write(&data[written_len..]) {
+                                               Ok(res) => {
+                                                       debug_assert_ne!(res, 0);
+                                                       written_len += res;
+                                                       if written_len == data.len() { return written_len; }
+                                               },
+                                               Err(e) => return written_len,
+                                       }
                                },
+                               task::Poll::Ready(Err(e)) => return written_len,
                                task::Poll::Pending => {
                                        // We're queued up for a write event now, but we need to make sure we also
                                        // pause read given we're now waiting on the remote end to ACK (and in