use std::env;
use std::io;
use std::io::{BufRead, Write};
-use std::net::{SocketAddr, TcpStream};
+use std::net::SocketAddr;
use std::ops::Deref;
use std::path::Path;
use std::str::FromStr;
peer_manager.clone(),
event_notifier.clone(),
)
+ .await
.is_err()
{
print!("> ");
peer_manager.clone(),
event_notifier.clone(),
)
+ .await
.is_ok()
{
println!("SUCCESS: connected to peer {}", pubkey);
println!("]");
}
-pub(crate) fn connect_peer_if_necessary(
+pub(crate) async fn connect_peer_if_necessary(
pubkey: PublicKey, peer_addr: SocketAddr, peer_manager: Arc<PeerManager>,
event_notifier: mpsc::Sender<()>,
) -> Result<(), ()> {
return Ok(());
}
}
- match TcpStream::connect_timeout(&peer_addr, Duration::from_secs(10)) {
- Ok(stream) => {
- let peer_mgr = peer_manager.clone();
- let event_ntfns = event_notifier.clone();
- tokio::spawn(async move {
- lightning_net_tokio::setup_outbound(peer_mgr, event_ntfns, pubkey, stream).await;
- });
+ match lightning_net_tokio::connect_outbound(Arc::clone(&peer_manager), event_notifier, pubkey, peer_addr).await {
+ Some(conn_closed_fut) => {
+ let mut closed_fut_box = Box::pin(conn_closed_fut);
let mut peer_connected = false;
while !peer_connected {
+ match futures::poll!(&mut closed_fut_box) {
+ std::task::Poll::Ready(_) => {
+ println!("ERROR: Peer disconnected before we finished the handshake");
+ return Err(());
+ },
+ std::task::Poll::Pending => {}
+ }
for node_pubkey in peer_manager.get_peer_node_ids() {
if node_pubkey == pubkey {
peer_connected = true;
}
}
+ // Avoid blocking the tokio context by sleeping a bit
+ tokio::time::sleep(Duration::from_millis(10)).await;
}
}
- Err(e) => {
- println!("ERROR: failed to connect to peer: {:?}", e);
+ None => {
+ println!("ERROR: failed to connect to peer");
return Err(());
}
}