]> git.bitcoin.ninja Git - rapid-gossip-sync-server/commitdiff
Don't spawn new single-threaded executors, stay inside tokio 2022-08-fix-executors
authorMatt Corallo <git@bluematt.me>
Mon, 22 Aug 2022 02:18:30 +0000 (02:18 +0000)
committerMatt Corallo <git@bluematt.me>
Mon, 22 Aug 2022 02:33:20 +0000 (02:33 +0000)
Cargo.toml
src/tracking.rs
src/verifier.rs

index 003f373122e2898e917ea7d894b2da88bc3d8dcf..5926971fcb412e9424599728fc5d1c2da6fa99e9 100644 (file)
@@ -12,7 +12,6 @@ lightning = { version = "0.0.110" }
 lightning-block-sync = { version = "0.0.110", features=["rest-client"] }
 lightning-net-tokio = { version = "0.0.110" }
 chrono = "0.4"
-futures = "0.3"
 hex = "0.3"
 rand = "0.4"
 tokio = { version = "1.14.1", features = ["full"] }
index 9c249232be5d3f37c1f08c5257906040893212f5..93389a9fa3994706915497c9f199221408926d15 100644 (file)
@@ -4,7 +4,6 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
 
 use bitcoin::hashes::hex::ToHex;
 use bitcoin::secp256k1::{PublicKey, SecretKey};
-use futures::executor;
 use lightning;
 use lightning::ln::peer_handler::{
        ErroringMessageHandler, IgnoringMessageHandler, MessageHandler, PeerManager,
@@ -67,7 +66,7 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<DetectedGos
        let mut connected_peer_count = 0;
 
        for current_peer in peers {
-               let initial_connection_succeeded = monitor_peer_connection(current_peer, Arc::clone(&arc_peer_handler));
+               let initial_connection_succeeded = connect_peer(current_peer, Arc::clone(&arc_peer_handler)).await;
                if initial_connection_succeeded {
                        connected_peer_count += 1;
                }
@@ -156,28 +155,33 @@ pub(crate) async fn download_gossip(persistence_sender: mpsc::Sender<DetectedGos
        });
 }
 
-fn monitor_peer_connection(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool {
-       let peer_manager_clone = Arc::clone(&peer_manager);
-       eprintln!("Connecting to peer {}@{}…", current_peer.0.to_hex(), current_peer.1.to_string());
-       let connection = executor::block_on(async move {
-               lightning_net_tokio::connect_outbound(
-                       peer_manager_clone,
-                       current_peer.0,
-                       current_peer.1,
-               ).await
-       });
-       let mut initial_connection_succeeded = false;
+async fn connect_peer(current_peer: (PublicKey, SocketAddr), peer_manager: GossipPeerManager) -> bool {
+       eprintln!("Connecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
+       let connection = lightning_net_tokio::connect_outbound(
+               Arc::clone(&peer_manager),
+               current_peer.0,
+               current_peer.1,
+       ).await;
        if let Some(disconnection_future) = connection {
                eprintln!("Connected to peer {}@{}!", current_peer.0.to_hex(), current_peer.1.to_string());
-               initial_connection_succeeded = true;
-               let peer_manager_clone = Arc::clone(&peer_manager);
                tokio::spawn(async move {
                        disconnection_future.await;
-                       eprintln!("Disconnected from peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string());
-                       monitor_peer_connection(current_peer.clone(), peer_manager_clone);
+                       loop {
+                               eprintln!("Reconnecting to peer {}@{}...", current_peer.0.to_hex(), current_peer.1.to_string());
+                               if let Some(disconnection_future) = lightning_net_tokio::connect_outbound(
+                                       Arc::clone(&peer_manager),
+                                       current_peer.0,
+                                       current_peer.1,
+                               ).await {
+                                       disconnection_future.await;
+                               } else {
+                                       tokio::time::sleep(Duration::from_secs(10)).await;
+                               }
+                       }
                });
+               true
        } else {
-               eprintln!("Failed to connect to peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string())
-       };
-       initial_connection_succeeded
+               eprintln!("Failed to connect to peer {}@{}", current_peer.0.to_hex(), current_peer.1.to_string());
+               false
+       }
 }
index 2446e1a938d426bce0f31d0867dea6b9139cf5a5..90f2f3c776fbf6724faec15a839ed2e85f39aadb 100644 (file)
@@ -4,7 +4,6 @@ use std::sync::Arc;
 use bitcoin::{BlockHash, TxOut};
 use bitcoin::blockdata::block::Block;
 use bitcoin::hashes::Hash;
-use futures::executor;
 use lightning::chain;
 use lightning::chain::AccessError;
 use lightning_block_sync::BlockSource;
@@ -29,7 +28,7 @@ impl ChainVerifier {
 
        fn retrieve_block(&self, block_height: u32) -> Result<Block, AccessError> {
                let rest_client = self.rest_client.clone();
-               executor::block_on(async move {
+               tokio::task::block_in_place(move || { tokio::runtime::Handle::current().block_on(async move {
                        let block_hash_result = rest_client.request_resource::<BinaryResponse, RestBinaryResponse>(&format!("blockhashbyheight/{}.bin", block_height)).await;
                        let block_hash: Vec<u8> = block_hash_result.map_err(|error| {
                                eprintln!("Could't find block hash at height {}: {}", block_height, error.to_string());
@@ -43,7 +42,7 @@ impl ChainVerifier {
                                AccessError::UnknownChain
                        })?;
                        Ok(block)
-               })
+               }) })
        }
 }