From: Matt Corallo Date: Mon, 17 Jul 2023 21:01:02 +0000 (+0000) Subject: Drop `tokio/macros` dependency in `lightning-net-tokio`, fix MSRV X-Git-Tag: v0.0.116~7^2~3 X-Git-Url: http://git.bitcoin.ninja/?a=commitdiff_plain;h=fd2464374b2e826a77582c511eb65bece4403be4;p=rust-lightning Drop `tokio/macros` dependency in `lightning-net-tokio`, fix MSRV The `tokio` `macros` feature depends on `proc-macro2`, which recently broke its MSRV in a patch version. Such crates aren't reasonable for us to have as dependencies, so instead we replace the one trivial use we have of `tokio::select!()` with our own manual future. --- diff --git a/lightning-net-tokio/Cargo.toml b/lightning-net-tokio/Cargo.toml index 202849bee..0e5b28599 100644 --- a/lightning-net-tokio/Cargo.toml +++ b/lightning-net-tokio/Cargo.toml @@ -17,7 +17,7 @@ rustdoc-args = ["--cfg", "docsrs"] [dependencies] bitcoin = "0.29.0" lightning = { version = "0.0.116-rc1", path = "../lightning" } -tokio = { version = "1.0", features = [ "io-util", "macros", "rt", "sync", "net", "time" ] } +tokio = { version = "1.0", features = [ "io-util", "rt", "sync", "net", "time" ] } [dev-dependencies] tokio = { version = "1.14", features = [ "io-util", "macros", "rt", "rt-multi-thread", "sync", "net", "time" ] } diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index 724a57fb1..d6d800416 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -42,16 +42,79 @@ use lightning::ln::peer_handler::APeerManager; use lightning::ln::msgs::NetAddress; use std::ops::Deref; -use std::task; +use std::task::{self, Poll}; +use std::future::Future; use std::net::SocketAddr; use std::net::TcpStream as StdTcpStream; use std::sync::{Arc, Mutex}; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::Duration; +use std::pin::Pin; use std::hash::Hash; static ID_COUNTER: AtomicU64 = AtomicU64::new(0); +// We only need to select over multiple futures in one place, and taking on the full `tokio/macros` +// dependency tree in order to do so (which has broken our MSRV before) is excessive. Instead, we +// define a trivial two- and three- select macro with the specific types we need and just use that. + +pub(crate) enum SelectorOutput { + A(Option<()>), B(Option<()>), C(tokio::io::Result), +} + +pub(crate) struct TwoSelector< + A: Future> + Unpin, B: Future> + Unpin +> { + pub a: A, + pub b: B, +} + +impl< + A: Future> + Unpin, B: Future> + Unpin +> Future for TwoSelector { + type Output = SelectorOutput; + fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll { + match Pin::new(&mut self.a).poll(ctx) { + Poll::Ready(res) => { return Poll::Ready(SelectorOutput::A(res)); }, + Poll::Pending => {}, + } + match Pin::new(&mut self.b).poll(ctx) { + Poll::Ready(res) => { return Poll::Ready(SelectorOutput::B(res)); }, + Poll::Pending => {}, + } + Poll::Pending + } +} + +pub(crate) struct ThreeSelector< + A: Future> + Unpin, B: Future> + Unpin, C: Future> + Unpin +> { + pub a: A, + pub b: B, + pub c: C, +} + +impl< + A: Future> + Unpin, B: Future> + Unpin, C: Future> + Unpin +> Future for ThreeSelector { + type Output = SelectorOutput; + fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll { + match Pin::new(&mut self.a).poll(ctx) { + Poll::Ready(res) => { return Poll::Ready(SelectorOutput::A(res)); }, + Poll::Pending => {}, + } + match Pin::new(&mut self.b).poll(ctx) { + Poll::Ready(res) => { return Poll::Ready(SelectorOutput::B(res)); }, + Poll::Pending => {}, + } + match Pin::new(&mut self.c).poll(ctx) { + Poll::Ready(res) => { return Poll::Ready(SelectorOutput::C(res)); }, + Poll::Pending => {}, + } + Poll::Pending + } +} + /// Connection contains all our internal state for a connection - we hold a reference to the /// Connection object (in an Arc>) in each SocketDescriptor we create as well as in the /// read future (which is returned by schedule_read). @@ -127,29 +190,44 @@ impl Connection { } us_lock.read_paused }; - tokio::select! { - v = write_avail_receiver.recv() => { + // TODO: Drop the Box'ing of the futures once Rust has pin-on-stack support. + let select_result = if read_paused { + TwoSelector { + a: Box::pin(write_avail_receiver.recv()), + b: Box::pin(read_wake_receiver.recv()), + }.await + } else { + ThreeSelector { + a: Box::pin(write_avail_receiver.recv()), + b: Box::pin(read_wake_receiver.recv()), + c: Box::pin(reader.read(&mut buf)), + }.await + }; + match select_result { + SelectorOutput::A(v) => { assert!(v.is_some()); // We can't have dropped the sending end, its in the us Arc! if peer_manager.as_ref().write_buffer_space_avail(&mut our_descriptor).is_err() { break Disconnect::CloseConnection; } }, - _ = read_wake_receiver.recv() => {}, - read = reader.read(&mut buf), if !read_paused => match read { - Ok(0) => break Disconnect::PeerDisconnected, - Ok(len) => { - let read_res = peer_manager.as_ref().read_event(&mut our_descriptor, &buf[0..len]); - let mut us_lock = us.lock().unwrap(); - match read_res { - Ok(pause_read) => { - if pause_read { - us_lock.read_paused = true; - } - }, - Err(_) => break Disconnect::CloseConnection, - } - }, - Err(_) => break Disconnect::PeerDisconnected, + SelectorOutput::B(_) => {}, + SelectorOutput::C(read) => { + match read { + Ok(0) => break Disconnect::PeerDisconnected, + Ok(len) => { + let read_res = peer_manager.as_ref().read_event(&mut our_descriptor, &buf[0..len]); + let mut us_lock = us.lock().unwrap(); + match read_res { + Ok(pause_read) => { + if pause_read { + us_lock.read_paused = true; + } + }, + Err(_) => break Disconnect::CloseConnection, + } + }, + Err(_) => break Disconnect::PeerDisconnected, + } }, } let _ = event_waker.try_send(());