From d29e2ba9494725c641ce3f92eb89f70620cb3370 Mon Sep 17 00:00:00 2001 From: Jeffrey Czyz Date: Sat, 23 Mar 2024 16:28:54 -0500 Subject: [PATCH] Use AChannelManager in BackgroundProcessor Replace instance of ChannelManager in BackgroundProcessor and in Persister with AChannelManager. This reduces the number of type parameters need in those types, which would need to be repeated in an async version of Persister. --- lightning-background-processor/src/lib.rs | 137 +++++++++++----------- lightning/src/util/persist.rs | 66 +++++------ 2 files changed, 100 insertions(+), 103 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index cf37c5fd..3736bd60 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -24,19 +24,17 @@ extern crate lightning_rapid_gossip_sync; use lightning::chain; use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use lightning::chain::chainmonitor::{ChainMonitor, Persist}; -use lightning::sign::{EntropySource, NodeSigner, SignerProvider}; use lightning::events::{Event, PathFailure}; #[cfg(feature = "std")] use lightning::events::EventHandler; #[cfg(any(feature = "std", feature = "futures"))] use lightning::events::EventsProvider; -use lightning::ln::channelmanager::ChannelManager; +use lightning::ln::channelmanager::AChannelManager; use lightning::ln::msgs::OnionMessageHandler; use lightning::ln::peer_handler::APeerManager; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::utxo::UtxoLookup; -use lightning::routing::router::Router; use lightning::routing::scoring::{ScoreUpdate, WriteableScore}; use lightning::util::logger::Logger; use lightning::util::persist::Persister; @@ -81,6 +79,8 @@ use alloc::vec::Vec; /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for /// unilateral chain closure fees are at risk. /// +/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager +/// [`ChannelManager::timer_tick_occurred`]: lightning::ln::channelmanager::ChannelManager::timer_tick_occurred /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor /// [`Event`]: lightning::events::Event /// [`PeerManager::timer_tick_occurred`]: lightning::ln::peer_handler::PeerManager::timer_tick_occurred @@ -286,7 +286,7 @@ macro_rules! define_run_body { $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, ) => { { log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); - $channel_manager.timer_tick_occurred(); + $channel_manager.get_cm().timer_tick_occurred(); log_trace!($logger, "Rebroadcasting monitor's pending claims on startup"); $chain_monitor.rebroadcast_pending_claims(); @@ -336,14 +336,14 @@ macro_rules! define_run_body { break; } - if $channel_manager.get_and_clear_needs_persistence() { + if $channel_manager.get_cm().get_and_clear_needs_persistence() { log_trace!($logger, "Persisting ChannelManager..."); - $persister.persist_manager(&*$channel_manager)?; + $persister.persist_manager(&$channel_manager)?; log_trace!($logger, "Done persisting ChannelManager."); } if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) { log_trace!($logger, "Calling ChannelManager's timer_tick_occurred"); - $channel_manager.timer_tick_occurred(); + $channel_manager.get_cm().timer_tick_occurred(); last_freshness_call = $get_timer(FRESHNESS_TIMER); } if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) { @@ -440,7 +440,7 @@ macro_rules! define_run_body { // After we exit, ensure we persist the ChannelManager one final time - this avoids // some races where users quit while channel updates were in-flight, with // ChannelMonitor update(s) persisted without a corresponding ChannelManager update. - $persister.persist_manager(&*$channel_manager)?; + $persister.persist_manager(&$channel_manager)?; // Persist Scorer on exit if let Some(ref scorer) = $scorer { @@ -539,45 +539,64 @@ use core::task; /// # use std::sync::atomic::{AtomicBool, Ordering}; /// # use std::time::SystemTime; /// # use lightning_background_processor::{process_events_async, GossipSync}; -/// # struct MyStore {} -/// # impl lightning::util::persist::KVStore for MyStore { +/// # struct Logger {} +/// # impl lightning::util::logger::Logger for Logger { +/// # fn log(&self, _record: lightning::util::logger::Record) {} +/// # } +/// # struct Store {} +/// # impl lightning::util::persist::KVStore for Store { /// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result> { Ok(Vec::new()) } /// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) } /// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) } /// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { Ok(Vec::new()) } /// # } -/// # struct MyEventHandler {} -/// # impl MyEventHandler { +/// # struct EventHandler {} +/// # impl EventHandler { /// # async fn handle_event(&self, _: lightning::events::Event) {} /// # } /// # #[derive(Eq, PartialEq, Clone, Hash)] -/// # struct MySocketDescriptor {} -/// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor { +/// # struct SocketDescriptor {} +/// # impl lightning::ln::peer_handler::SocketDescriptor for SocketDescriptor { /// # fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 } /// # fn disconnect_socket(&mut self) {} /// # } -/// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync; -/// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync; -/// # type MyNodeSigner = dyn lightning::sign::NodeSigner + Send + Sync; -/// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync; -/// # type MyFilter = dyn lightning::chain::Filter + Send + Sync; -/// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync; -/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; -/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager, MyLogger>; -/// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph>; -/// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync, Arc, Arc>; -/// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager; -/// # type MyScorer = RwLock, Arc>>; -/// -/// # async fn setup_background_processing(my_persister: Arc, my_event_handler: Arc, my_chain_monitor: Arc, my_channel_manager: Arc, my_gossip_sync: Arc, my_logger: Arc, my_scorer: Arc, my_peer_manager: Arc) { -/// let background_persister = Arc::clone(&my_persister); -/// let background_event_handler = Arc::clone(&my_event_handler); -/// let background_chain_mon = Arc::clone(&my_chain_monitor); -/// let background_chan_man = Arc::clone(&my_channel_manager); -/// let background_gossip_sync = GossipSync::p2p(Arc::clone(&my_gossip_sync)); -/// let background_peer_man = Arc::clone(&my_peer_manager); -/// let background_logger = Arc::clone(&my_logger); -/// let background_scorer = Arc::clone(&my_scorer); +/// # type ChainMonitor = lightning::chain::chainmonitor::ChainMonitor, Arc, Arc, Arc, Arc>; +/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph>; +/// # type P2PGossipSync
    = lightning::routing::gossip::P2PGossipSync, Arc
      , Arc>; +/// # type ChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager, B, FE, Logger>; +/// # type Scorer = RwLock, Arc>>; +/// # type PeerManager = lightning::ln::peer_handler::SimpleArcPeerManager, B, FE, Arc
        , Logger>; +/// # +/// # struct Node< +/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static, +/// # F: lightning::chain::Filter + Send + Sync + 'static, +/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static, +/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static, +/// # > { +/// # peer_manager: Arc>, +/// # event_handler: Arc, +/// # channel_manager: Arc>, +/// # chain_monitor: Arc>, +/// # gossip_sync: Arc>, +/// # persister: Arc, +/// # logger: Arc, +/// # scorer: Arc, +/// # } +/// # +/// # async fn setup_background_processing< +/// # B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static, +/// # F: lightning::chain::Filter + Send + Sync + 'static, +/// # FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static, +/// # UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static, +/// # >(node: Node) { +/// let background_persister = Arc::clone(&node.persister); +/// let background_event_handler = Arc::clone(&node.event_handler); +/// let background_chain_mon = Arc::clone(&node.chain_monitor); +/// let background_chan_man = Arc::clone(&node.channel_manager); +/// let background_gossip_sync = GossipSync::p2p(Arc::clone(&node.gossip_sync)); +/// let background_peer_man = Arc::clone(&node.peer_manager); +/// let background_logger = Arc::clone(&node.logger); +/// let background_scorer = Arc::clone(&node.scorer); /// /// // Setup the sleeper. /// let (stop_sender, stop_receiver) = tokio::sync::watch::channel(()); @@ -622,21 +641,16 @@ pub async fn process_events_async< 'a, UL: 'static + Deref + Send + Sync, CF: 'static + Deref + Send + Sync, - CW: 'static + Deref + Send + Sync, T: 'static + Deref + Send + Sync, - ES: 'static + Deref + Send + Sync, - NS: 'static + Deref + Send + Sync, - SP: 'static + Deref + Send + Sync, F: 'static + Deref + Send + Sync, - R: 'static + Deref + Send + Sync, G: 'static + Deref> + Send + Sync, L: 'static + Deref + Send + Sync, P: 'static + Deref + Send + Sync, EventHandlerFuture: core::future::Future, EventHandler: Fn(Event) -> EventHandlerFuture, PS: 'static + Deref + Send, - M: 'static + Deref::EcdsaSigner, CF, T, F, L, P>> + Send + Sync, - CM: 'static + Deref> + Send + Sync, + M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, + CM: 'static + Deref + Send + Sync, PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, PM: 'static + Deref + Send + Sync, @@ -653,16 +667,12 @@ pub async fn process_events_async< where UL::Target: 'static + UtxoLookup, CF::Target: 'static + chain::Filter, - CW::Target: 'static + chain::Watch<::EcdsaSigner>, T::Target: 'static + BroadcasterInterface, - ES::Target: 'static + EntropySource, - NS::Target: 'static + NodeSigner, - SP::Target: 'static + SignerProvider, F::Target: 'static + FeeEstimator, - R::Target: 'static + Router, L::Target: 'static + Logger, - P::Target: 'static + Persist<::EcdsaSigner>, - PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>, + P::Target: 'static + Persist<::Signer>, + PS::Target: 'static + Persister<'a, CM, L, SC>, + CM::Target: AChannelManager + Send + Sync, PM::Target: APeerManager + Send + Sync, { let mut should_break = false; @@ -693,11 +703,11 @@ where define_run_body!( persister, chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await, - channel_manager, channel_manager.process_pending_events_async(async_event_handler).await, + channel_manager, channel_manager.get_cm().process_pending_events_async(async_event_handler).await, peer_manager, process_onion_message_handler_events_async(&peer_manager, async_event_handler).await, gossip_sync, logger, scorer, should_break, { let fut = Selector { - a: channel_manager.get_event_or_persistence_needed_future(), + a: channel_manager.get_cm().get_event_or_persistence_needed_future(), b: chain_monitor.get_update_future(), c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }), }; @@ -788,20 +798,15 @@ impl BackgroundProcessor { 'a, UL: 'static + Deref + Send + Sync, CF: 'static + Deref + Send + Sync, - CW: 'static + Deref + Send + Sync, T: 'static + Deref + Send + Sync, - ES: 'static + Deref + Send + Sync, - NS: 'static + Deref + Send + Sync, - SP: 'static + Deref + Send + Sync, F: 'static + Deref + Send + Sync, - R: 'static + Deref + Send + Sync, G: 'static + Deref> + Send + Sync, L: 'static + Deref + Send + Sync, P: 'static + Deref + Send + Sync, EH: 'static + EventHandler + Send, PS: 'static + Deref + Send, - M: 'static + Deref::EcdsaSigner, CF, T, F, L, P>> + Send + Sync, - CM: 'static + Deref> + Send + Sync, + M: 'static + Deref::Signer, CF, T, F, L, P>> + Send + Sync, + CM: 'static + Deref + Send + Sync, PGS: 'static + Deref> + Send + Sync, RGS: 'static + Deref> + Send, PM: 'static + Deref + Send + Sync, @@ -814,16 +819,12 @@ impl BackgroundProcessor { where UL::Target: 'static + UtxoLookup, CF::Target: 'static + chain::Filter, - CW::Target: 'static + chain::Watch<::EcdsaSigner>, T::Target: 'static + BroadcasterInterface, - ES::Target: 'static + EntropySource, - NS::Target: 'static + NodeSigner, - SP::Target: 'static + SignerProvider, F::Target: 'static + FeeEstimator, - R::Target: 'static + Router, L::Target: 'static + Logger, - P::Target: 'static + Persist<::EcdsaSigner>, - PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>, + P::Target: 'static + Persist<::Signer>, + PS::Target: 'static + Persister<'a, CM, L, SC>, + CM::Target: AChannelManager + Send + Sync, PM::Target: APeerManager + Send + Sync, { let stop_thread = Arc::new(AtomicBool::new(false)); @@ -849,12 +850,12 @@ impl BackgroundProcessor { }; define_run_body!( persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), - channel_manager, channel_manager.process_pending_events(&event_handler), + channel_manager, channel_manager.get_cm().process_pending_events(&event_handler), peer_manager, peer_manager.onion_message_handler().process_pending_events(&event_handler), gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire), { Sleeper::from_two_futures( - &channel_manager.get_event_or_persistence_needed_future(), + &channel_manager.get_cm().get_event_or_persistence_needed_future(), &chain_monitor.get_update_future() ).wait_timeout(Duration::from_millis(100)); }, |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false, diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 41a742ca..35f5b0c7 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -7,6 +7,8 @@ //! This module contains a simple key-value store trait [`KVStore`] that //! allows one to implement the persistence for [`ChannelManager`], [`NetworkGraph`], //! and [`ChannelMonitor`] all in one place. +//! +//! [`ChannelManager`]: crate::ln::channelmanager::ChannelManager use core::cmp; use core::convert::{TryFrom, TryInto}; @@ -21,11 +23,10 @@ use crate::prelude::*; use crate::chain; use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use crate::chain::chainmonitor::{Persist, MonitorUpdateId}; -use crate::sign::{EntropySource, NodeSigner, ecdsa::WriteableEcdsaChannelSigner, SignerProvider}; +use crate::sign::{EntropySource, ecdsa::WriteableEcdsaChannelSigner, SignerProvider}; use crate::chain::transaction::OutPoint; use crate::chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, CLOSED_CHANNEL_UPDATE_ID}; -use crate::ln::channelmanager::ChannelManager; -use crate::routing::router::Router; +use crate::ln::channelmanager::AChannelManager; use crate::routing::gossip::NetworkGraph; use crate::routing::scoring::WriteableScore; use crate::util::logger::Logger; @@ -38,10 +39,16 @@ pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str = "abcdefghijklmnopqrstuvwxyzABCD pub const KVSTORE_NAMESPACE_KEY_MAX_LEN: usize = 120; /// The primary namespace under which the [`ChannelManager`] will be persisted. +/// +/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager pub const CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE: &str = ""; /// The secondary namespace under which the [`ChannelManager`] will be persisted. +/// +/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager pub const CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE: &str = ""; /// The key under which the [`ChannelManager`] will be persisted. +/// +/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager pub const CHANNEL_MANAGER_PERSISTENCE_KEY: &str = "manager"; /// The primary namespace under which [`ChannelMonitor`]s will be persisted. @@ -131,18 +138,17 @@ pub trait KVStore { } /// Trait that handles persisting a [`ChannelManager`], [`NetworkGraph`], and [`WriteableScore`] to disk. -pub trait Persister<'a, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref, S: WriteableScore<'a>> - where M::Target: 'static + chain::Watch<::EcdsaSigner>, - T::Target: 'static + BroadcasterInterface, - ES::Target: 'static + EntropySource, - NS::Target: 'static + NodeSigner, - SP::Target: 'static + SignerProvider, - F::Target: 'static + FeeEstimator, - R::Target: 'static + Router, - L::Target: 'static + Logger, +/// +/// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager +pub trait Persister<'a, CM: Deref, L: Deref, S: WriteableScore<'a>> +where + CM::Target: 'static + AChannelManager, + L::Target: 'static + Logger, { /// Persist the given ['ChannelManager'] to disk, returning an error if persistence failed. - fn persist_manager(&self, channel_manager: &ChannelManager) -> Result<(), io::Error>; + /// + /// [`ChannelManager`]: crate::ln::channelmanager::ChannelManager + fn persist_manager(&self, channel_manager: &CM) -> Result<(), io::Error>; /// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed. fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), io::Error>; @@ -152,21 +158,16 @@ pub trait Persister<'a, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: } -impl<'a, A: KVStore, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref, S: WriteableScore<'a>> Persister<'a, M, T, ES, NS, SP, F, R, L, S> for A - where M::Target: 'static + chain::Watch<::EcdsaSigner>, - T::Target: 'static + BroadcasterInterface, - ES::Target: 'static + EntropySource, - NS::Target: 'static + NodeSigner, - SP::Target: 'static + SignerProvider, - F::Target: 'static + FeeEstimator, - R::Target: 'static + Router, - L::Target: 'static + Logger, +impl<'a, A: KVStore, CM: Deref, L: Deref, S: WriteableScore<'a>> Persister<'a, CM, L, S> for A +where + CM::Target: 'static + AChannelManager, + L::Target: 'static + Logger, { - fn persist_manager(&self, channel_manager: &ChannelManager) -> Result<(), io::Error> { + fn persist_manager(&self, channel_manager: &CM) -> Result<(), io::Error> { self.write(CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, - &channel_manager.encode()) + &channel_manager.get_cm().encode()) } fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), io::Error> { @@ -184,21 +185,16 @@ impl<'a, A: KVStore, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Der } } -impl<'a, M: Deref, T: Deref, ES: Deref, NS: Deref, SP: Deref, F: Deref, R: Deref, L: Deref, S: WriteableScore<'a>> Persister<'a, M, T, ES, NS, SP, F, R, L, S> for dyn KVStore + Send + Sync - where M::Target: 'static + chain::Watch<::EcdsaSigner>, - T::Target: 'static + BroadcasterInterface, - ES::Target: 'static + EntropySource, - NS::Target: 'static + NodeSigner, - SP::Target: 'static + SignerProvider, - F::Target: 'static + FeeEstimator, - R::Target: 'static + Router, - L::Target: 'static + Logger, +impl<'a, CM: Deref, L: Deref, S: WriteableScore<'a>> Persister<'a, CM, L, S> for dyn KVStore + Send + Sync +where + CM::Target: 'static + AChannelManager, + L::Target: 'static + Logger, { - fn persist_manager(&self, channel_manager: &ChannelManager) -> Result<(), io::Error> { + fn persist_manager(&self, channel_manager: &CM) -> Result<(), io::Error> { self.write(CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY, - &channel_manager.encode()) + &channel_manager.get_cm().encode()) } fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), io::Error> { -- 2.30.2