From: Matt Corallo Date: Tue, 4 May 2021 20:02:27 +0000 (+0000) Subject: Connect to peers with tokio TcpStreams isntead of blocking tokio X-Git-Url: http://git.bitcoin.ninja/?a=commitdiff_plain;h=bb94f28a1faa695ff5f0f66c05811a7cb739c101;p=ldk-sample Connect to peers with tokio TcpStreams isntead of blocking tokio --- diff --git a/Cargo.lock b/Cargo.lock index 5bc89f3..07ff824 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -234,6 +234,7 @@ dependencies = [ "bech32 0.7.2", "bitcoin", "bitcoin-bech32", + "futures", "hex", "lightning", "lightning-background-processor", diff --git a/Cargo.toml b/Cargo.toml index a51252e..1b3935d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ bitcoin-bech32 = "0.7" bech32 = "0.7" hex = "0.3" +futures = "0.3" time = "0.2" rand = "0.4" serde_json = { version = "1.0" } diff --git a/src/cli.rs b/src/cli.rs index 32119ae..04b18f9 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -18,7 +18,7 @@ use lightning_invoice::{utils, Currency, Invoice}; use std::env; use std::io; use std::io::{BufRead, Write}; -use std::net::{SocketAddr, TcpStream}; +use std::net::SocketAddr; use std::ops::Deref; use std::path::Path; use std::str::FromStr; @@ -151,6 +151,7 @@ pub(crate) async fn poll_for_user_input( peer_manager.clone(), event_notifier.clone(), ) + .await .is_err() { print!("> "); @@ -300,6 +301,7 @@ pub(crate) async fn poll_for_user_input( peer_manager.clone(), event_notifier.clone(), ) + .await .is_ok() { println!("SUCCESS: connected to peer {}", pubkey); @@ -452,7 +454,7 @@ fn list_payments(inbound_payments: PaymentInfoStorage, outbound_payments: Paymen println!("]"); } -pub(crate) fn connect_peer_if_necessary( +pub(crate) async fn connect_peer_if_necessary( pubkey: PublicKey, peer_addr: SocketAddr, peer_manager: Arc, event_notifier: mpsc::Sender<()>, ) -> Result<(), ()> { @@ -461,24 +463,29 @@ pub(crate) fn connect_peer_if_necessary( return Ok(()); } } - match TcpStream::connect_timeout(&peer_addr, Duration::from_secs(10)) { - Ok(stream) => { - let peer_mgr = peer_manager.clone(); - let event_ntfns = event_notifier.clone(); - tokio::spawn(async move { - lightning_net_tokio::setup_outbound(peer_mgr, event_ntfns, pubkey, stream).await; - }); + match lightning_net_tokio::connect_outbound(Arc::clone(&peer_manager), event_notifier, pubkey, peer_addr).await { + Some(conn_closed_fut) => { + let mut closed_fut_box = Box::pin(conn_closed_fut); let mut peer_connected = false; while !peer_connected { + match futures::poll!(&mut closed_fut_box) { + std::task::Poll::Ready(_) => { + println!("ERROR: Peer disconnected before we finished the handshake"); + return Err(()); + }, + std::task::Poll::Pending => {} + } for node_pubkey in peer_manager.get_peer_node_ids() { if node_pubkey == pubkey { peer_connected = true; } } + // Avoid blocking the tokio context by sleeping a bit + tokio::time::sleep(Duration::from_millis(10)).await; } } - Err(e) => { - println!("ERROR: failed to connect to peer: {:?}", e); + None => { + println!("ERROR: failed to connect to peer"); return Err(()); } }