Connect to peers with tokio TcpStreams isntead of blocking tokio
[ldk-sample] / src / cli.rs
index 32119aef9ea14baa6b68f12b527102f0bb08ea74..04b18f9815acc858ff92a0820227941bc7265598 100644 (file)
@@ -18,7 +18,7 @@ use lightning_invoice::{utils, Currency, Invoice};
 use std::env;
 use std::io;
 use std::io::{BufRead, Write};
-use std::net::{SocketAddr, TcpStream};
+use std::net::SocketAddr;
 use std::ops::Deref;
 use std::path::Path;
 use std::str::FromStr;
@@ -151,6 +151,7 @@ pub(crate) async fn poll_for_user_input(
                                                peer_manager.clone(),
                                                event_notifier.clone(),
                                        )
+                                       .await
                                        .is_err()
                                        {
                                                print!("> ");
@@ -300,6 +301,7 @@ pub(crate) async fn poll_for_user_input(
                                                peer_manager.clone(),
                                                event_notifier.clone(),
                                        )
+                                       .await
                                        .is_ok()
                                        {
                                                println!("SUCCESS: connected to peer {}", pubkey);
@@ -452,7 +454,7 @@ fn list_payments(inbound_payments: PaymentInfoStorage, outbound_payments: Paymen
        println!("]");
 }
 
-pub(crate) fn connect_peer_if_necessary(
+pub(crate) async fn connect_peer_if_necessary(
        pubkey: PublicKey, peer_addr: SocketAddr, peer_manager: Arc<PeerManager>,
        event_notifier: mpsc::Sender<()>,
 ) -> Result<(), ()> {
@@ -461,24 +463,29 @@ pub(crate) fn connect_peer_if_necessary(
                        return Ok(());
                }
        }
-       match TcpStream::connect_timeout(&peer_addr, Duration::from_secs(10)) {
-               Ok(stream) => {
-                       let peer_mgr = peer_manager.clone();
-                       let event_ntfns = event_notifier.clone();
-                       tokio::spawn(async move {
-                               lightning_net_tokio::setup_outbound(peer_mgr, event_ntfns, pubkey, stream).await;
-                       });
+       match lightning_net_tokio::connect_outbound(Arc::clone(&peer_manager), event_notifier, pubkey, peer_addr).await {
+               Some(conn_closed_fut) => {
+                       let mut closed_fut_box = Box::pin(conn_closed_fut);
                        let mut peer_connected = false;
                        while !peer_connected {
+                               match futures::poll!(&mut closed_fut_box) {
+                                       std::task::Poll::Ready(_) => {
+                                               println!("ERROR: Peer disconnected before we finished the handshake");
+                                               return Err(());
+                                       },
+                                       std::task::Poll::Pending => {}
+                               }
                                for node_pubkey in peer_manager.get_peer_node_ids() {
                                        if node_pubkey == pubkey {
                                                peer_connected = true;
                                        }
                                }
+                               // Avoid blocking the tokio context by sleeping a bit
+                               tokio::time::sleep(Duration::from_millis(10)).await;
                        }
                }
-               Err(e) => {
-                       println!("ERROR: failed to connect to peer: {:?}", e);
+               None => {
+                       println!("ERROR: failed to connect to peer");
                        return Err(());
                }
        }