Stop connecting outbound on shutdown in addition to stopping listen
[ldk-sample] / src / main.rs
index e962db83a2556c11f81a44f0f4fbc892660ea963..49fdcb81e8080183a1bcec4b1095c605a39da15c 100644 (file)
@@ -29,7 +29,7 @@ use lightning::routing::scoring::ProbabilisticScorer;
 use lightning::util::config::UserConfig;
 use lightning::util::events::{Event, PaymentPurpose};
 use lightning::util::ser::ReadableArgs;
-use lightning_background_processor::BackgroundProcessor;
+use lightning_background_processor::{BackgroundProcessor, Persister};
 use lightning_block_sync::init;
 use lightning_block_sync::poll;
 use lightning_block_sync::SpvClient;
@@ -109,6 +109,36 @@ pub(crate) type InvoicePayer<E> = payment::InvoicePayer<
 
 type Router = DefaultRouter<Arc<NetworkGraph>, Arc<FilesystemLogger>>;
 
+struct DataPersister {
+       data_dir: String,
+}
+
+impl
+       Persister<
+               InMemorySigner,
+               Arc<ChainMonitor>,
+               Arc<BitcoindClient>,
+               Arc<KeysManager>,
+               Arc<BitcoindClient>,
+               Arc<FilesystemLogger>,
+       > for DataPersister
+{
+       fn persist_manager(&self, channel_manager: &ChannelManager) -> Result<(), std::io::Error> {
+               FilesystemPersister::persist_manager(self.data_dir.clone(), channel_manager)
+       }
+
+       fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), std::io::Error> {
+               if FilesystemPersister::persist_network_graph(self.data_dir.clone(), network_graph).is_err()
+               {
+                       // Persistence errors here are non-fatal as we can just fetch the routing graph
+                       // again later, but they may indicate a disk error which could be fatal elsewhere.
+                       eprintln!("Warning: Failed to persist network graph, check your disk and permissions");
+               }
+
+               Ok(())
+       }
+}
+
 async fn handle_ldk_events(
        channel_manager: Arc<ChannelManager>, bitcoind_client: Arc<BitcoindClient>,
        keys_manager: Arc<KeysManager>, inbound_payments: PaymentInfoStorage,
@@ -491,22 +521,6 @@ async fn start_ldk() {
                None::<Arc<dyn chain::Access + Send + Sync>>,
                logger.clone(),
        ));
-       let network_graph_persist = Arc::clone(&network_graph);
-       tokio::spawn(async move {
-               let mut interval = tokio::time::interval(Duration::from_secs(600));
-               loop {
-                       interval.tick().await;
-                       if disk::persist_network(Path::new(&network_graph_path), &network_graph_persist)
-                               .is_err()
-                       {
-                               // Persistence errors here are non-fatal as we can just fetch the routing graph
-                               // again later, but they may indicate a disk error which could be fatal elsewhere.
-                               eprintln!(
-                                       "Warning: Failed to persist network graph, check your disk and permissions"
-                               );
-                       }
-               }
-       });
 
        // Step 12: Initialize the PeerManager
        let channel_manager: Arc<ChannelManager> = Arc::new(channel_manager);
@@ -529,8 +543,8 @@ async fn start_ldk() {
 
        let peer_manager_connection_handler = peer_manager.clone();
        let listening_port = args.ldk_peer_listening_port;
-       let stop_listen = Arc::new(AtomicBool::new(false));
-       let stop_listen_ref = Arc::clone(&stop_listen);
+       let stop_listen_connect = Arc::new(AtomicBool::new(false));
+       let stop_listen = Arc::clone(&stop_listen_connect);
        tokio::spawn(async move {
                let listener = tokio::net::TcpListener::bind(format!("0.0.0.0:{}", listening_port))
                        .await
@@ -538,7 +552,7 @@ async fn start_ldk() {
                loop {
                        let peer_mgr = peer_manager_connection_handler.clone();
                        let tcp_stream = listener.accept().await.unwrap().0;
-                       if stop_listen_ref.load(Ordering::Acquire) {
+                       if stop_listen.load(Ordering::Acquire) {
                                return;
                        }
                        tokio::spawn(async move {
@@ -617,7 +631,11 @@ async fn start_ldk() {
        });
 
        // Step 17: Create InvoicePayer
-       let router = DefaultRouter::new(network_graph.clone(), logger.clone());
+       let router = DefaultRouter::new(
+               network_graph.clone(),
+               logger.clone(),
+               keys_manager.get_secure_random_bytes(),
+       );
        let invoice_payer = Arc::new(InvoicePayer::new(
                channel_manager.clone(),
                router,
@@ -627,14 +645,12 @@ async fn start_ldk() {
                payment::RetryAttempts(5),
        ));
 
-       // Step 18: Persist ChannelManager
-       let data_dir = ldk_data_dir.clone();
-       let persist_channel_manager_callback =
-               move |node: &ChannelManager| FilesystemPersister::persist_manager(data_dir.clone(), &*node);
+       // Step 18: Persist ChannelManager and NetworkGraph
+       let persister = DataPersister { data_dir: ldk_data_dir.clone() };
 
        // Step 19: Background Processing
        let background_processor = BackgroundProcessor::start(
-               persist_channel_manager_callback,
+               persister,
                invoice_payer.clone(),
                chain_monitor.clone(),
                channel_manager.clone(),
@@ -647,6 +663,7 @@ async fn start_ldk() {
        let connect_cm = Arc::clone(&channel_manager);
        let connect_pm = Arc::clone(&peer_manager);
        let peer_data_path = format!("{}/channel_peer_data", ldk_data_dir.clone());
+       let stop_connect = Arc::clone(&stop_listen_connect);
        tokio::spawn(async move {
                let mut interval = tokio::time::interval(Duration::from_secs(1));
                loop {
@@ -660,6 +677,9 @@ async fn start_ldk() {
                                                .map(|chan| chan.counterparty.node_id)
                                                .filter(|id| !peers.contains(id))
                                        {
+                                               if stop_connect.load(Ordering::Acquire) {
+                                                       return;
+                                               }
                                                for (pubkey, peer_addr) in info.iter() {
                                                        if *pubkey == node_id {
                                                                let _ = cli::do_connect_peer(
@@ -699,10 +719,11 @@ async fn start_ldk() {
 
        // Start the CLI.
        cli::poll_for_user_input(
-               invoice_payer.clone(),
-               peer_manager.clone(),
-               channel_manager.clone(),
-               keys_manager.clone(),
+               Arc::clone(&invoice_payer),
+               Arc::clone(&peer_manager),
+               Arc::clone(&channel_manager),
+               Arc::clone(&keys_manager),
+               Arc::clone(&network_graph),
                inbound_payments,
                outbound_payments,
                ldk_data_dir.clone(),
@@ -712,7 +733,7 @@ async fn start_ldk() {
 
        // Disconnect our peers and stop accepting new connections. This ensures we don't continue
        // updating our channel data after we've stopped the background processor.
-       stop_listen.store(true, Ordering::Release);
+       stop_listen_connect.store(true, Ordering::Release);
        peer_manager.disconnect_all_peers();
 
        // Stop the background processor.