Merge pull request #6 from TheBlueMatt/main
authorJeffrey Czyz <jkczyz@gmail.com>
Tue, 4 May 2021 23:37:54 +0000 (16:37 -0700)
committerGitHub <noreply@github.com>
Tue, 4 May 2021 23:37:54 +0000 (16:37 -0700)
Make connections non-blocking

Cargo.lock
Cargo.toml
src/cli.rs
src/main.rs

index 5bc89f35e721bb8fe60969bf91d8f2a6f4134e09..07ff824e678191cc8222d0bd90d6fadef3cb4d1b 100644 (file)
@@ -234,6 +234,7 @@ dependencies = [
  "bech32 0.7.2",
  "bitcoin",
  "bitcoin-bech32",
+ "futures",
  "hex",
  "lightning",
  "lightning-background-processor",
index a51252ec914c7790ddbf14277aaeea052daab7fb..1b3935d044dc6b082e7e34bd4c8221b27d3e052f 100644 (file)
@@ -21,6 +21,7 @@ bitcoin-bech32 = "0.7"
 bech32 = "0.7"
 hex = "0.3"
 
+futures = "0.3"
 time = "0.2"
 rand = "0.4"
 serde_json = { version = "1.0" }
index 32119aef9ea14baa6b68f12b527102f0bb08ea74..d2d5a218a3e123299aac3fecf33d1ecd17ea39a6 100644 (file)
@@ -18,7 +18,7 @@ use lightning_invoice::{utils, Currency, Invoice};
 use std::env;
 use std::io;
 use std::io::{BufRead, Write};
-use std::net::{SocketAddr, TcpStream};
+use std::net::{SocketAddr, ToSocketAddrs};
 use std::ops::Deref;
 use std::path::Path;
 use std::str::FromStr;
@@ -151,6 +151,7 @@ pub(crate) async fn poll_for_user_input(
                                                peer_manager.clone(),
                                                event_notifier.clone(),
                                        )
+                                       .await
                                        .is_err()
                                        {
                                                print!("> ");
@@ -300,6 +301,7 @@ pub(crate) async fn poll_for_user_input(
                                                peer_manager.clone(),
                                                event_notifier.clone(),
                                        )
+                                       .await
                                        .is_ok()
                                        {
                                                println!("SUCCESS: connected to peer {}", pubkey);
@@ -452,7 +454,7 @@ fn list_payments(inbound_payments: PaymentInfoStorage, outbound_payments: Paymen
        println!("]");
 }
 
-pub(crate) fn connect_peer_if_necessary(
+pub(crate) async fn connect_peer_if_necessary(
        pubkey: PublicKey, peer_addr: SocketAddr, peer_manager: Arc<PeerManager>,
        event_notifier: mpsc::Sender<()>,
 ) -> Result<(), ()> {
@@ -461,24 +463,36 @@ pub(crate) fn connect_peer_if_necessary(
                        return Ok(());
                }
        }
-       match TcpStream::connect_timeout(&peer_addr, Duration::from_secs(10)) {
-               Ok(stream) => {
-                       let peer_mgr = peer_manager.clone();
-                       let event_ntfns = event_notifier.clone();
-                       tokio::spawn(async move {
-                               lightning_net_tokio::setup_outbound(peer_mgr, event_ntfns, pubkey, stream).await;
-                       });
+       match lightning_net_tokio::connect_outbound(
+               Arc::clone(&peer_manager),
+               event_notifier,
+               pubkey,
+               peer_addr,
+       )
+       .await
+       {
+               Some(conn_closed_fut) => {
+                       let mut closed_fut_box = Box::pin(conn_closed_fut);
                        let mut peer_connected = false;
                        while !peer_connected {
+                               match futures::poll!(&mut closed_fut_box) {
+                                       std::task::Poll::Ready(_) => {
+                                               println!("ERROR: Peer disconnected before we finished the handshake");
+                                               return Err(());
+                                       }
+                                       std::task::Poll::Pending => {}
+                               }
                                for node_pubkey in peer_manager.get_peer_node_ids() {
                                        if node_pubkey == pubkey {
                                                peer_connected = true;
                                        }
                                }
+                               // Avoid blocking the tokio context by sleeping a bit
+                               tokio::time::sleep(Duration::from_millis(10)).await;
                        }
                }
-               Err(e) => {
-                       println!("ERROR: failed to connect to peer: {:?}", e);
+               None => {
+                       println!("ERROR: failed to connect to peer");
                        return Err(());
                }
        }
@@ -625,12 +639,12 @@ pub(crate) fn parse_peer_info(
                return Err(std::io::Error::new(
                        std::io::ErrorKind::Other,
                        "ERROR: incorrectly formatted peer
-        info. Should be formatted as: `pubkey@host:port`",
+               info. Should be formatted as: `pubkey@host:port`",
                ));
        }
 
-       let peer_addr: Result<SocketAddr, _> = peer_addr_str.unwrap().parse();
-       if peer_addr.is_err() {
+       let peer_addr = peer_addr_str.unwrap().to_socket_addrs().map(|mut r| r.next());
+       if peer_addr.is_err() || peer_addr.as_ref().unwrap().is_none() {
                return Err(std::io::Error::new(
                        std::io::ErrorKind::Other,
                        "ERROR: couldn't parse pubkey@host:port into a socket address",
@@ -645,5 +659,5 @@ pub(crate) fn parse_peer_info(
                ));
        }
 
-       Ok((pubkey.unwrap(), peer_addr.unwrap()))
+       Ok((pubkey.unwrap(), peer_addr.unwrap().unwrap()))
 }
index 33e5eb5733d30c151bb97adf1260709d593f1b57..48048b4f493bc8fe40e0f4f84a67c9abba0182c4 100644 (file)
@@ -482,14 +482,20 @@ async fn start_ldk() {
        let event_notifier = event_ntfn_sender.clone();
        let listening_port = args.ldk_peer_listening_port;
        tokio::spawn(async move {
-               let listener = std::net::TcpListener::bind(format!("0.0.0.0:{}", listening_port)).unwrap();
+               let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", listening_port))
+                       .await
+                       .expect("Failed to bind to listen port - is something else already listening on it?");
                loop {
                        let peer_mgr = peer_manager_connection_handler.clone();
                        let notifier = event_notifier.clone();
-                       let tcp_stream = listener.accept().unwrap().0;
+                       let tcp_stream = listener.accept().await.unwrap().0;
                        tokio::spawn(async move {
-                               lightning_net_tokio::setup_inbound(peer_mgr.clone(), notifier.clone(), tcp_stream)
-                                       .await;
+                               lightning_net_tokio::setup_inbound(
+                                       peer_mgr.clone(),
+                                       notifier.clone(),
+                                       tcp_stream.into_std().unwrap(),
+                               )
+                               .await;
                        });
                }
        });