use lightning::ln::msgs::NetAddress;
use std::ops::Deref;
-use std::task;
+use std::task::{self, Poll};
+use std::future::Future;
use std::net::SocketAddr;
use std::net::TcpStream as StdTcpStream;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::Duration;
+use std::pin::Pin;
use std::hash::Hash;
static ID_COUNTER: AtomicU64 = AtomicU64::new(0);
+// We only need to select over multiple futures in one place, and taking on the full `tokio/macros`
+// dependency tree in order to do so (which has broken our MSRV before) is excessive. Instead, we
+// define a trivial two- and three- select macro with the specific types we need and just use that.
+
+pub(crate) enum SelectorOutput {
+ A(Option<()>), B(Option<()>), C(tokio::io::Result<usize>),
+}
+
+pub(crate) struct TwoSelector<
+ A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin
+> {
+ pub a: A,
+ pub b: B,
+}
+
+impl<
+ A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin
+> Future for TwoSelector<A, B> {
+ type Output = SelectorOutput;
+ fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<SelectorOutput> {
+ match Pin::new(&mut self.a).poll(ctx) {
+ Poll::Ready(res) => { return Poll::Ready(SelectorOutput::A(res)); },
+ Poll::Pending => {},
+ }
+ match Pin::new(&mut self.b).poll(ctx) {
+ Poll::Ready(res) => { return Poll::Ready(SelectorOutput::B(res)); },
+ Poll::Pending => {},
+ }
+ Poll::Pending
+ }
+}
+
+pub(crate) struct ThreeSelector<
+ A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<usize>> + Unpin
+> {
+ pub a: A,
+ pub b: B,
+ pub c: C,
+}
+
+impl<
+ A: Future<Output=Option<()>> + Unpin, B: Future<Output=Option<()>> + Unpin, C: Future<Output=tokio::io::Result<usize>> + Unpin
+> Future for ThreeSelector<A, B, C> {
+ type Output = SelectorOutput;
+ fn poll(mut self: Pin<&mut Self>, ctx: &mut task::Context<'_>) -> Poll<SelectorOutput> {
+ match Pin::new(&mut self.a).poll(ctx) {
+ Poll::Ready(res) => { return Poll::Ready(SelectorOutput::A(res)); },
+ Poll::Pending => {},
+ }
+ match Pin::new(&mut self.b).poll(ctx) {
+ Poll::Ready(res) => { return Poll::Ready(SelectorOutput::B(res)); },
+ Poll::Pending => {},
+ }
+ match Pin::new(&mut self.c).poll(ctx) {
+ Poll::Ready(res) => { return Poll::Ready(SelectorOutput::C(res)); },
+ Poll::Pending => {},
+ }
+ Poll::Pending
+ }
+}
+
/// Connection contains all our internal state for a connection - we hold a reference to the
/// Connection object (in an Arc<Mutex<>>) in each SocketDescriptor we create as well as in the
/// read future (which is returned by schedule_read).
}
us_lock.read_paused
};
- tokio::select! {
- v = write_avail_receiver.recv() => {
+ // TODO: Drop the Box'ing of the futures once Rust has pin-on-stack support.
+ let select_result = if read_paused {
+ TwoSelector {
+ a: Box::pin(write_avail_receiver.recv()),
+ b: Box::pin(read_wake_receiver.recv()),
+ }.await
+ } else {
+ ThreeSelector {
+ a: Box::pin(write_avail_receiver.recv()),
+ b: Box::pin(read_wake_receiver.recv()),
+ c: Box::pin(reader.read(&mut buf)),
+ }.await
+ };
+ match select_result {
+ SelectorOutput::A(v) => {
assert!(v.is_some()); // We can't have dropped the sending end, its in the us Arc!
if peer_manager.as_ref().write_buffer_space_avail(&mut our_descriptor).is_err() {
break Disconnect::CloseConnection;
}
},
- _ = read_wake_receiver.recv() => {},
- read = reader.read(&mut buf), if !read_paused => match read {
- Ok(0) => break Disconnect::PeerDisconnected,
- Ok(len) => {
- let read_res = peer_manager.as_ref().read_event(&mut our_descriptor, &buf[0..len]);
- let mut us_lock = us.lock().unwrap();
- match read_res {
- Ok(pause_read) => {
- if pause_read {
- us_lock.read_paused = true;
- }
- },
- Err(_) => break Disconnect::CloseConnection,
- }
- },
- Err(_) => break Disconnect::PeerDisconnected,
+ SelectorOutput::B(_) => {},
+ SelectorOutput::C(read) => {
+ match read {
+ Ok(0) => break Disconnect::PeerDisconnected,
+ Ok(len) => {
+ let read_res = peer_manager.as_ref().read_event(&mut our_descriptor, &buf[0..len]);
+ let mut us_lock = us.lock().unwrap();
+ match read_res {
+ Ok(pause_read) => {
+ if pause_read {
+ us_lock.read_paused = true;
+ }
+ },
+ Err(_) => break Disconnect::CloseConnection,
+ }
+ },
+ Err(_) => break Disconnect::PeerDisconnected,
+ }
},
}
let _ = event_waker.try_send(());