Use AChannelManager in BackgroundProcessor
[rust-lightning] / lightning-background-processor / src / lib.rs
index cf37c5fd5a85750580f63a35685d0405d1246875..3736bd603e5bd65977bfc874919b5f9c61ee48bc 100644 (file)
@@ -24,19 +24,17 @@ extern crate lightning_rapid_gossip_sync;
 use lightning::chain;
 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
-use lightning::sign::{EntropySource, NodeSigner, SignerProvider};
 use lightning::events::{Event, PathFailure};
 #[cfg(feature = "std")]
 use lightning::events::EventHandler;
 #[cfg(any(feature = "std", feature = "futures"))]
 use lightning::events::EventsProvider;
 
-use lightning::ln::channelmanager::ChannelManager;
+use lightning::ln::channelmanager::AChannelManager;
 use lightning::ln::msgs::OnionMessageHandler;
 use lightning::ln::peer_handler::APeerManager;
 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
 use lightning::routing::utxo::UtxoLookup;
-use lightning::routing::router::Router;
 use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
 use lightning::util::logger::Logger;
 use lightning::util::persist::Persister;
@@ -81,6 +79,8 @@ use alloc::vec::Vec;
 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
 /// unilateral chain closure fees are at risk.
 ///
+/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
+/// [`ChannelManager::timer_tick_occurred`]: lightning::ln::channelmanager::ChannelManager::timer_tick_occurred
 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
 /// [`Event`]: lightning::events::Event
 /// [`PeerManager::timer_tick_occurred`]: lightning::ln::peer_handler::PeerManager::timer_tick_occurred
@@ -286,7 +286,7 @@ macro_rules! define_run_body {
                $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
        ) => { {
                log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
-               $channel_manager.timer_tick_occurred();
+               $channel_manager.get_cm().timer_tick_occurred();
                log_trace!($logger, "Rebroadcasting monitor's pending claims on startup");
                $chain_monitor.rebroadcast_pending_claims();
 
@@ -336,14 +336,14 @@ macro_rules! define_run_body {
                                break;
                        }
 
-                       if $channel_manager.get_and_clear_needs_persistence() {
+                       if $channel_manager.get_cm().get_and_clear_needs_persistence() {
                                log_trace!($logger, "Persisting ChannelManager...");
-                               $persister.persist_manager(&*$channel_manager)?;
+                               $persister.persist_manager(&$channel_manager)?;
                                log_trace!($logger, "Done persisting ChannelManager.");
                        }
                        if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
                                log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
-                               $channel_manager.timer_tick_occurred();
+                               $channel_manager.get_cm().timer_tick_occurred();
                                last_freshness_call = $get_timer(FRESHNESS_TIMER);
                        }
                        if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) {
@@ -440,7 +440,7 @@ macro_rules! define_run_body {
                // After we exit, ensure we persist the ChannelManager one final time - this avoids
                // some races where users quit while channel updates were in-flight, with
                // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
-               $persister.persist_manager(&*$channel_manager)?;
+               $persister.persist_manager(&$channel_manager)?;
 
                // Persist Scorer on exit
                if let Some(ref scorer) = $scorer {
@@ -539,45 +539,64 @@ use core::task;
 /// # use std::sync::atomic::{AtomicBool, Ordering};
 /// # use std::time::SystemTime;
 /// # use lightning_background_processor::{process_events_async, GossipSync};
-/// # struct MyStore {}
-/// # impl lightning::util::persist::KVStore for MyStore {
+/// # struct Logger {}
+/// # impl lightning::util::logger::Logger for Logger {
+/// #     fn log(&self, _record: lightning::util::logger::Record) {}
+/// # }
+/// # struct Store {}
+/// # impl lightning::util::persist::KVStore for Store {
 /// #     fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
 /// #     fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
 /// #     fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
 /// #     fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
 /// # }
-/// # struct MyEventHandler {}
-/// # impl MyEventHandler {
+/// # struct EventHandler {}
+/// # impl EventHandler {
 /// #     async fn handle_event(&self, _: lightning::events::Event) {}
 /// # }
 /// # #[derive(Eq, PartialEq, Clone, Hash)]
-/// # struct MySocketDescriptor {}
-/// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor {
+/// # struct SocketDescriptor {}
+/// # impl lightning::ln::peer_handler::SocketDescriptor for SocketDescriptor {
 /// #     fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
 /// #     fn disconnect_socket(&mut self) {}
 /// # }
-/// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync;
-/// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync;
-/// # type MyNodeSigner = dyn lightning::sign::NodeSigner + Send + Sync;
-/// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync;
-/// # type MyFilter = dyn lightning::chain::Filter + Send + Sync;
-/// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync;
-/// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyStore>>;
-/// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, Arc<MyUtxoLookup>, MyLogger>;
-/// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<MyLogger>>;
-/// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync<Arc<MyNetworkGraph>, Arc<MyUtxoLookup>, Arc<MyLogger>>;
-/// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyLogger>;
-/// # type MyScorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
-///
-/// # async fn setup_background_processing(my_persister: Arc<MyStore>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_gossip_sync: Arc<MyGossipSync>, my_logger: Arc<MyLogger>, my_scorer: Arc<MyScorer>, my_peer_manager: Arc<MyPeerManager>) {
-///    let background_persister = Arc::clone(&my_persister);
-///    let background_event_handler = Arc::clone(&my_event_handler);
-///    let background_chain_mon = Arc::clone(&my_chain_monitor);
-///    let background_chan_man = Arc::clone(&my_channel_manager);
-///    let background_gossip_sync = GossipSync::p2p(Arc::clone(&my_gossip_sync));
-///    let background_peer_man = Arc::clone(&my_peer_manager);
-///    let background_logger = Arc::clone(&my_logger);
-///    let background_scorer = Arc::clone(&my_scorer);
+/// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<Store>>;
+/// # type NetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<Logger>>;
+/// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
+/// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
+/// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
+/// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger>;
+/// #
+/// # struct Node<
+/// #     B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
+/// #     F: lightning::chain::Filter + Send + Sync + 'static,
+/// #     FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
+/// #     UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
+/// # > {
+/// #     peer_manager: Arc<PeerManager<B, F, FE, UL>>,
+/// #     event_handler: Arc<EventHandler>,
+/// #     channel_manager: Arc<ChannelManager<B, F, FE>>,
+/// #     chain_monitor: Arc<ChainMonitor<B, F, FE>>,
+/// #     gossip_sync: Arc<P2PGossipSync<UL>>,
+/// #     persister: Arc<Store>,
+/// #     logger: Arc<Logger>,
+/// #     scorer: Arc<Scorer>,
+/// # }
+/// #
+/// # async fn setup_background_processing<
+/// #     B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
+/// #     F: lightning::chain::Filter + Send + Sync + 'static,
+/// #     FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
+/// #     UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
+/// # >(node: Node<B, F, FE, UL>) {
+///    let background_persister = Arc::clone(&node.persister);
+///    let background_event_handler = Arc::clone(&node.event_handler);
+///    let background_chain_mon = Arc::clone(&node.chain_monitor);
+///    let background_chan_man = Arc::clone(&node.channel_manager);
+///    let background_gossip_sync = GossipSync::p2p(Arc::clone(&node.gossip_sync));
+///    let background_peer_man = Arc::clone(&node.peer_manager);
+///    let background_logger = Arc::clone(&node.logger);
+///    let background_scorer = Arc::clone(&node.scorer);
 ///
 ///    // Setup the sleeper.
 ///    let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());
@@ -622,21 +641,16 @@ pub async fn process_events_async<
        'a,
        UL: 'static + Deref + Send + Sync,
        CF: 'static + Deref + Send + Sync,
-       CW: 'static + Deref + Send + Sync,
        T: 'static + Deref + Send + Sync,
-       ES: 'static + Deref + Send + Sync,
-       NS: 'static + Deref + Send + Sync,
-       SP: 'static + Deref + Send + Sync,
        F: 'static + Deref + Send + Sync,
-       R: 'static + Deref + Send + Sync,
        G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
        L: 'static + Deref + Send + Sync,
        P: 'static + Deref + Send + Sync,
        EventHandlerFuture: core::future::Future<Output = ()>,
        EventHandler: Fn(Event) -> EventHandlerFuture,
        PS: 'static + Deref + Send,
-       M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::EcdsaSigner, CF, T, F, L, P>> + Send + Sync,
-       CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
+       M: 'static + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>> + Send + Sync,
+       CM: 'static + Deref + Send + Sync,
        PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
        RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
        PM: 'static + Deref + Send + Sync,
@@ -653,16 +667,12 @@ pub async fn process_events_async<
 where
        UL::Target: 'static + UtxoLookup,
        CF::Target: 'static + chain::Filter,
-       CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
        T::Target: 'static + BroadcasterInterface,
-       ES::Target: 'static + EntropySource,
-       NS::Target: 'static + NodeSigner,
-       SP::Target: 'static + SignerProvider,
        F::Target: 'static + FeeEstimator,
-       R::Target: 'static + Router,
        L::Target: 'static + Logger,
-       P::Target: 'static + Persist<<SP::Target as SignerProvider>::EcdsaSigner>,
-       PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
+       P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
+       PS::Target: 'static + Persister<'a, CM, L, SC>,
+       CM::Target: AChannelManager + Send + Sync,
        PM::Target: APeerManager + Send + Sync,
 {
        let mut should_break = false;
@@ -693,11 +703,11 @@ where
        define_run_body!(
                persister, chain_monitor,
                chain_monitor.process_pending_events_async(async_event_handler).await,
-               channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
+               channel_manager, channel_manager.get_cm().process_pending_events_async(async_event_handler).await,
                peer_manager, process_onion_message_handler_events_async(&peer_manager, async_event_handler).await,
                gossip_sync, logger, scorer, should_break, {
                        let fut = Selector {
-                               a: channel_manager.get_event_or_persistence_needed_future(),
+                               a: channel_manager.get_cm().get_event_or_persistence_needed_future(),
                                b: chain_monitor.get_update_future(),
                                c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }),
                        };
@@ -788,20 +798,15 @@ impl BackgroundProcessor {
                'a,
                UL: 'static + Deref + Send + Sync,
                CF: 'static + Deref + Send + Sync,
-               CW: 'static + Deref + Send + Sync,
                T: 'static + Deref + Send + Sync,
-               ES: 'static + Deref + Send + Sync,
-               NS: 'static + Deref + Send + Sync,
-               SP: 'static + Deref + Send + Sync,
                F: 'static + Deref + Send + Sync,
-               R: 'static + Deref + Send + Sync,
                G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
                L: 'static + Deref + Send + Sync,
                P: 'static + Deref + Send + Sync,
                EH: 'static + EventHandler + Send,
                PS: 'static + Deref + Send,
-               M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::EcdsaSigner, CF, T, F, L, P>> + Send + Sync,
-               CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
+               M: 'static + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>> + Send + Sync,
+               CM: 'static + Deref + Send + Sync,
                PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
                RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
                PM: 'static + Deref + Send + Sync,
@@ -814,16 +819,12 @@ impl BackgroundProcessor {
        where
                UL::Target: 'static + UtxoLookup,
                CF::Target: 'static + chain::Filter,
-               CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
                T::Target: 'static + BroadcasterInterface,
-               ES::Target: 'static + EntropySource,
-               NS::Target: 'static + NodeSigner,
-               SP::Target: 'static + SignerProvider,
                F::Target: 'static + FeeEstimator,
-               R::Target: 'static + Router,
                L::Target: 'static + Logger,
-               P::Target: 'static + Persist<<SP::Target as SignerProvider>::EcdsaSigner>,
-               PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
+               P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
+               PS::Target: 'static + Persister<'a, CM, L, SC>,
+               CM::Target: AChannelManager + Send + Sync,
                PM::Target: APeerManager + Send + Sync,
        {
                let stop_thread = Arc::new(AtomicBool::new(false));
@@ -849,12 +850,12 @@ impl BackgroundProcessor {
                        };
                        define_run_body!(
                                persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
-                               channel_manager, channel_manager.process_pending_events(&event_handler),
+                               channel_manager, channel_manager.get_cm().process_pending_events(&event_handler),
                                peer_manager,
                                peer_manager.onion_message_handler().process_pending_events(&event_handler),
                                gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire),
                                { Sleeper::from_two_futures(
-                                       &channel_manager.get_event_or_persistence_needed_future(),
+                                       &channel_manager.get_cm().get_event_or_persistence_needed_future(),
                                        &chain_monitor.get_update_future()
                                ).wait_timeout(Duration::from_millis(100)); },
                                |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false,