Connect to peers with tokio TcpStreams isntead of blocking tokio
[ldk-sample] / src / cli.rs
index c5dda5fc33e11f19cad785bb5e042855e7babf68..04b18f9815acc858ff92a0820227941bc7265598 100644 (file)
@@ -18,7 +18,7 @@ use lightning_invoice::{utils, Currency, Invoice};
 use std::env;
 use std::io;
 use std::io::{BufRead, Write};
-use std::net::{SocketAddr, TcpStream};
+use std::net::SocketAddr;
 use std::ops::Deref;
 use std::path::Path;
 use std::str::FromStr;
@@ -103,7 +103,9 @@ pub(crate) async fn poll_for_user_input(
        event_notifier: mpsc::Sender<()>, ldk_data_dir: String, logger: Arc<FilesystemLogger>,
        network: Network,
 ) {
-       println!("LDK startup successful. To view available commands: \"help\".\nLDK logs are available at <your-supplied-ldk-data-dir-path>/.ldk/logs");
+       println!("LDK startup successful. To view available commands: \"help\".");
+       println!("LDK logs are available at <your-supplied-ldk-data-dir-path>/.ldk/logs");
+       println!("Local Node ID is {}.", channel_manager.get_our_node_id());
        let stdin = io::stdin();
        print!("> ");
        io::stdout().flush().unwrap(); // Without flushing, the `>` doesn't print
@@ -149,6 +151,7 @@ pub(crate) async fn poll_for_user_input(
                                                peer_manager.clone(),
                                                event_notifier.clone(),
                                        )
+                                       .await
                                        .is_err()
                                        {
                                                print!("> ");
@@ -298,6 +301,7 @@ pub(crate) async fn poll_for_user_input(
                                                peer_manager.clone(),
                                                event_notifier.clone(),
                                        )
+                                       .await
                                        .is_ok()
                                        {
                                                println!("SUCCESS: connected to peer {}", pubkey);
@@ -450,7 +454,7 @@ fn list_payments(inbound_payments: PaymentInfoStorage, outbound_payments: Paymen
        println!("]");
 }
 
-pub(crate) fn connect_peer_if_necessary(
+pub(crate) async fn connect_peer_if_necessary(
        pubkey: PublicKey, peer_addr: SocketAddr, peer_manager: Arc<PeerManager>,
        event_notifier: mpsc::Sender<()>,
 ) -> Result<(), ()> {
@@ -459,24 +463,29 @@ pub(crate) fn connect_peer_if_necessary(
                        return Ok(());
                }
        }
-       match TcpStream::connect_timeout(&peer_addr, Duration::from_secs(10)) {
-               Ok(stream) => {
-                       let peer_mgr = peer_manager.clone();
-                       let event_ntfns = event_notifier.clone();
-                       tokio::spawn(async move {
-                               lightning_net_tokio::setup_outbound(peer_mgr, event_ntfns, pubkey, stream).await;
-                       });
+       match lightning_net_tokio::connect_outbound(Arc::clone(&peer_manager), event_notifier, pubkey, peer_addr).await {
+               Some(conn_closed_fut) => {
+                       let mut closed_fut_box = Box::pin(conn_closed_fut);
                        let mut peer_connected = false;
                        while !peer_connected {
+                               match futures::poll!(&mut closed_fut_box) {
+                                       std::task::Poll::Ready(_) => {
+                                               println!("ERROR: Peer disconnected before we finished the handshake");
+                                               return Err(());
+                                       },
+                                       std::task::Poll::Pending => {}
+                               }
                                for node_pubkey in peer_manager.get_peer_node_ids() {
                                        if node_pubkey == pubkey {
                                                peer_connected = true;
                                        }
                                }
+                               // Avoid blocking the tokio context by sleeping a bit
+                               tokio::time::sleep(Duration::from_millis(10)).await;
                        }
                }
-               Err(e) => {
-                       println!("ERROR: failed to connect to peer: {:?}", e);
+               None => {
+                       println!("ERROR: failed to connect to peer");
                        return Err(());
                }
        }