use std::env;
use std::io;
use std::io::{BufRead, Write};
-use std::net::{SocketAddr, TcpStream};
+use std::net::{SocketAddr, ToSocketAddrs};
use std::ops::Deref;
use std::path::Path;
use std::str::FromStr;
peer_manager.clone(),
event_notifier.clone(),
)
+ .await
.is_err()
{
print!("> ");
peer_manager.clone(),
event_notifier.clone(),
)
+ .await
.is_ok()
{
println!("SUCCESS: connected to peer {}", pubkey);
println!("]");
}
-pub(crate) fn connect_peer_if_necessary(
+pub(crate) async fn connect_peer_if_necessary(
pubkey: PublicKey, peer_addr: SocketAddr, peer_manager: Arc<PeerManager>,
event_notifier: mpsc::Sender<()>,
) -> Result<(), ()> {
return Ok(());
}
}
- match TcpStream::connect_timeout(&peer_addr, Duration::from_secs(10)) {
- Ok(stream) => {
- let peer_mgr = peer_manager.clone();
- let event_ntfns = event_notifier.clone();
- tokio::spawn(async move {
- lightning_net_tokio::setup_outbound(peer_mgr, event_ntfns, pubkey, stream).await;
- });
+ match lightning_net_tokio::connect_outbound(
+ Arc::clone(&peer_manager),
+ event_notifier,
+ pubkey,
+ peer_addr,
+ )
+ .await
+ {
+ Some(conn_closed_fut) => {
+ let mut closed_fut_box = Box::pin(conn_closed_fut);
let mut peer_connected = false;
while !peer_connected {
+ match futures::poll!(&mut closed_fut_box) {
+ std::task::Poll::Ready(_) => {
+ println!("ERROR: Peer disconnected before we finished the handshake");
+ return Err(());
+ }
+ std::task::Poll::Pending => {}
+ }
for node_pubkey in peer_manager.get_peer_node_ids() {
if node_pubkey == pubkey {
peer_connected = true;
}
}
+ // Avoid blocking the tokio context by sleeping a bit
+ tokio::time::sleep(Duration::from_millis(10)).await;
}
}
- Err(e) => {
- println!("ERROR: failed to connect to peer: {:?}", e);
+ None => {
+ println!("ERROR: failed to connect to peer");
return Err(());
}
}
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"ERROR: incorrectly formatted peer
- info. Should be formatted as: `pubkey@host:port`",
+ info. Should be formatted as: `pubkey@host:port`",
));
}
- let peer_addr: Result<SocketAddr, _> = peer_addr_str.unwrap().parse();
- if peer_addr.is_err() {
+ let peer_addr = peer_addr_str.unwrap().to_socket_addrs().map(|mut r| r.next());
+ if peer_addr.is_err() || peer_addr.as_ref().unwrap().is_none() {
return Err(std::io::Error::new(
std::io::ErrorKind::Other,
"ERROR: couldn't parse pubkey@host:port into a socket address",
));
}
- Ok((pubkey.unwrap(), peer_addr.unwrap()))
+ Ok((pubkey.unwrap(), peer_addr.unwrap().unwrap()))
}
let event_notifier = event_ntfn_sender.clone();
let listening_port = args.ldk_peer_listening_port;
tokio::spawn(async move {
- let listener = std::net::TcpListener::bind(format!("0.0.0.0:{}", listening_port)).unwrap();
+ let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", listening_port))
+ .await
+ .expect("Failed to bind to listen port - is something else already listening on it?");
loop {
let peer_mgr = peer_manager_connection_handler.clone();
let notifier = event_notifier.clone();
- let tcp_stream = listener.accept().unwrap().0;
+ let tcp_stream = listener.accept().await.unwrap().0;
tokio::spawn(async move {
- lightning_net_tokio::setup_inbound(peer_mgr.clone(), notifier.clone(), tcp_stream)
- .await;
+ lightning_net_tokio::setup_inbound(
+ peer_mgr.clone(),
+ notifier.clone(),
+ tcp_stream.into_std().unwrap(),
+ )
+ .await;
});
}
});