Drop `tokio/macros` dependency in `lightning-net-tokio`, fix MSRV
authorMatt Corallo <git@bluematt.me>
Mon, 17 Jul 2023 21:01:02 +0000 (21:01 +0000)
committerMatt Corallo <git@bluematt.me>
Mon, 17 Jul 2023 23:20:39 +0000 (23:20 +0000)
The `tokio` `macros` feature depends on `proc-macro2`, which
recently broke its MSRV in a patch version. Such crates aren't
reasonable for us to have as dependencies, so instead we replace
the one trivial use we have of `tokio::select!()` with our own
manual future.

lightning-net-tokio/Cargo.toml
lightning-net-tokio/src/lib.rs

index 202849bee49af822bdc47d7045e80734cfa59332..0e5b285993b47966f2b0e69817549fc51672a4a1 100644 (file)
@@ -17,7 +17,7 @@ rustdoc-args = ["--cfg", "docsrs"]
 [dependencies]
 bitcoin = "0.29.0"
 lightning = { version = "0.0.116-rc1", path = "../lightning" }
-tokio = { version = "1.0", features = [ "io-util", "macros", "rt", "sync", "net", "time" ] }
+tokio = { version = "1.0", features = [ "io-util", "rt", "sync", "net", "time" ] }
 
 [dev-dependencies]
 tokio = { version = "1.14", features = [ "io-util", "macros", "rt", "rt-multi-thread", "sync", "net", "time" ] }
index 724a57fb15c19bb1ba9c9c09bd3fbe44fc3a6478..d6d8004164bf9ab54db94ee64b55ef63f5f57e74 100644 (file)
@@ -42,16 +42,79 @@ use lightning::ln::peer_handler::APeerManager;
 use lightning::ln::msgs::NetAddress;
 
 use std::ops::Deref;
-use std::task;
+use std::task::{self, Poll};
+use std::future::Future;
 use std::net::SocketAddr;
 use std::net::TcpStream as StdTcpStream;
 use std::sync::{Arc, Mutex};
 use std::sync::atomic::{AtomicU64, Ordering};
 use std::time::Duration;
+use std::pin::Pin;
 use std::hash::Hash;
 
 static ID_COUNTER: AtomicU64 = AtomicU64::new(0);
 
+// We only need to select over multiple futures in one place, and taking on the full `tokio/macros`
+// dependency tree in order to do so (which has broken our MSRV before) is excessive. Instead, we
+// define a trivial two- and three- select macro with the specific types we need and just use that.
+
+pub(crate) enum SelectorOutput {
+       A(Option<()>), B(Option<()>), C(tokio::io::Result<usize>),
+}
+
+pub(crate) struct TwoSelector<
+       A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin
+> {
+       pub a: A,
+       pub b: B,
+}
+
+impl<
+       A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin
+> Future for TwoSelector<A, B> {
+       type Output = SelectorOutput;
+       fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<SelectorOutput> {
+               match Pin::new(&mut self.a).poll(ctx) {
+                       Poll::Ready(res) => { return Poll::Ready(SelectorOutput::A(res)); },
+                       Poll::Pending => {},
+               }
+               match Pin::new(&mut self.b).poll(ctx) {
+                       Poll::Ready(res) => { return Poll::Ready(SelectorOutput::B(res)); },
+                       Poll::Pending => {},
+               }
+               Poll::Pending
+       }
+}
+
+pub(crate) struct ThreeSelector<
+       A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<usize>> + Unpin
+> {
+       pub a: A,
+       pub b: B,
+       pub c: C,
+}
+
+impl<
+       A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<usize>> + Unpin
+> Future for ThreeSelector<A, B, C> {
+       type Output = SelectorOutput;
+       fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<SelectorOutput> {
+               match Pin::new(&mut self.a).poll(ctx) {
+                       Poll::Ready(res) => { return Poll::Ready(SelectorOutput::A(res)); },
+                       Poll::Pending => {},
+               }
+               match Pin::new(&mut self.b).poll(ctx) {
+                       Poll::Ready(res) => { return Poll::Ready(SelectorOutput::B(res)); },
+                       Poll::Pending => {},
+               }
+               match Pin::new(&mut self.c).poll(ctx) {
+                       Poll::Ready(res) => { return Poll::Ready(SelectorOutput::C(res)); },
+                       Poll::Pending => {},
+               }
+               Poll::Pending
+       }
+}
+
 /// Connection contains all our internal state for a connection - we hold a reference to the
 /// Connection object (in an Arc<Mutex<>>) in each SocketDescriptor we create as well as in the
 /// read future (which is returned by schedule_read).
@@ -127,29 +190,44 @@ impl Connection {
                                }
                                us_lock.read_paused
                        };
-                       tokio::select! {
-                               v = write_avail_receiver.recv() => {
+                       // TODO: Drop the Box'ing of the futures once Rust has pin-on-stack support.
+                       let select_result = if read_paused {
+                               TwoSelector {
+                                       a: Box::pin(write_avail_receiver.recv()),
+                                       b: Box::pin(read_wake_receiver.recv()),
+                               }.await
+                       } else {
+                               ThreeSelector {
+                                       a: Box::pin(write_avail_receiver.recv()),
+                                       b: Box::pin(read_wake_receiver.recv()),
+                                       c: Box::pin(reader.read(&mut buf)),
+                               }.await
+                       };
+                       match select_result {
+                               SelectorOutput::A(v) => {
                                        assert!(v.is_some()); // We can't have dropped the sending end, its in the us Arc!
                                        if peer_manager.as_ref().write_buffer_space_avail(&mut our_descriptor).is_err() {
                                                break Disconnect::CloseConnection;
                                        }
                                },
-                               _ = read_wake_receiver.recv() => {},
-                               read = reader.read(&mut buf), if !read_paused => match read {
-                                       Ok(0) => break Disconnect::PeerDisconnected,
-                                       Ok(len) => {
-                                               let read_res = peer_manager.as_ref().read_event(&mut our_descriptor, &buf[0..len]);
-                                               let mut us_lock = us.lock().unwrap();
-                                               match read_res {
-                                                       Ok(pause_read) => {
-                                                               if pause_read {
-                                                                       us_lock.read_paused = true;
-                                                               }
-                                                       },
-                                                       Err(_) => break Disconnect::CloseConnection,
-                                               }
-                                       },
-                                       Err(_) => break Disconnect::PeerDisconnected,
+                               SelectorOutput::B(_) => {},
+                               SelectorOutput::C(read) => {
+                                       match read {
+                                               Ok(0) => break Disconnect::PeerDisconnected,
+                                               Ok(len) => {
+                                                       let read_res = peer_manager.as_ref().read_event(&mut our_descriptor, &buf[0..len]);
+                                                       let mut us_lock = us.lock().unwrap();
+                                                       match read_res {
+                                                               Ok(pause_read) => {
+                                                                       if pause_read {
+                                                                               us_lock.read_paused = true;
+                                                                       }
+                                                               },
+                                                               Err(_) => break Disconnect::CloseConnection,
+                                                       }
+                                               },
+                                               Err(_) => break Disconnect::PeerDisconnected,
+                                       }
                                },
                        }
                        let _ = event_waker.try_send(());