Connect to peers with tokio TcpStreams isntead of blocking tokio
authorMatt Corallo <git@bluematt.me>
Tue, 4 May 2021 20:02:27 +0000 (20:02 +0000)
committerMatt Corallo <git@bluematt.me>
Tue, 4 May 2021 20:05:41 +0000 (20:05 +0000)
Cargo.lock
Cargo.toml
src/cli.rs

index 5bc89f35e721bb8fe60969bf91d8f2a6f4134e09..07ff824e678191cc8222d0bd90d6fadef3cb4d1b 100644 (file)
@@ -234,6 +234,7 @@ dependencies = [
  "bech32 0.7.2",
  "bitcoin",
  "bitcoin-bech32",
+ "futures",
  "hex",
  "lightning",
  "lightning-background-processor",
index a51252ec914c7790ddbf14277aaeea052daab7fb..1b3935d044dc6b082e7e34bd4c8221b27d3e052f 100644 (file)
@@ -21,6 +21,7 @@ bitcoin-bech32 = "0.7"
 bech32 = "0.7"
 hex = "0.3"
 
+futures = "0.3"
 time = "0.2"
 rand = "0.4"
 serde_json = { version = "1.0" }
index 32119aef9ea14baa6b68f12b527102f0bb08ea74..04b18f9815acc858ff92a0820227941bc7265598 100644 (file)
@@ -18,7 +18,7 @@ use lightning_invoice::{utils, Currency, Invoice};
 use std::env;
 use std::io;
 use std::io::{BufRead, Write};
-use std::net::{SocketAddr, TcpStream};
+use std::net::SocketAddr;
 use std::ops::Deref;
 use std::path::Path;
 use std::str::FromStr;
@@ -151,6 +151,7 @@ pub(crate) async fn poll_for_user_input(
                                                peer_manager.clone(),
                                                event_notifier.clone(),
                                        )
+                                       .await
                                        .is_err()
                                        {
                                                print!("> ");
@@ -300,6 +301,7 @@ pub(crate) async fn poll_for_user_input(
                                                peer_manager.clone(),
                                                event_notifier.clone(),
                                        )
+                                       .await
                                        .is_ok()
                                        {
                                                println!("SUCCESS: connected to peer {}", pubkey);
@@ -452,7 +454,7 @@ fn list_payments(inbound_payments: PaymentInfoStorage, outbound_payments: Paymen
        println!("]");
 }
 
-pub(crate) fn connect_peer_if_necessary(
+pub(crate) async fn connect_peer_if_necessary(
        pubkey: PublicKey, peer_addr: SocketAddr, peer_manager: Arc<PeerManager>,
        event_notifier: mpsc::Sender<()>,
 ) -> Result<(), ()> {
@@ -461,24 +463,29 @@ pub(crate) fn connect_peer_if_necessary(
                        return Ok(());
                }
        }
-       match TcpStream::connect_timeout(&peer_addr, Duration::from_secs(10)) {
-               Ok(stream) => {
-                       let peer_mgr = peer_manager.clone();
-                       let event_ntfns = event_notifier.clone();
-                       tokio::spawn(async move {
-                               lightning_net_tokio::setup_outbound(peer_mgr, event_ntfns, pubkey, stream).await;
-                       });
+       match lightning_net_tokio::connect_outbound(Arc::clone(&peer_manager), event_notifier, pubkey, peer_addr).await {
+               Some(conn_closed_fut) => {
+                       let mut closed_fut_box = Box::pin(conn_closed_fut);
                        let mut peer_connected = false;
                        while !peer_connected {
+                               match futures::poll!(&mut closed_fut_box) {
+                                       std::task::Poll::Ready(_) => {
+                                               println!("ERROR: Peer disconnected before we finished the handshake");
+                                               return Err(());
+                                       },
+                                       std::task::Poll::Pending => {}
+                               }
                                for node_pubkey in peer_manager.get_peer_node_ids() {
                                        if node_pubkey == pubkey {
                                                peer_connected = true;
                                        }
                                }
+                               // Avoid blocking the tokio context by sleeping a bit
+                               tokio::time::sleep(Duration::from_millis(10)).await;
                        }
                }
-               Err(e) => {
-                       println!("ERROR: failed to connect to peer: {:?}", e);
+               None => {
+                       println!("ERROR: failed to connect to peer");
                        return Err(());
                }
        }