Merge pull request #2098 from tnull/2023-03-add-channel-pending-event
[rust-lightning] / lightning-background-processor / src / lib.rs
index ea369382aee2a32ab82e3682c3a7fa2205fb8d74..541813c7e42c72f68568ec1c397fc531156d158c 100644 (file)
@@ -7,7 +7,7 @@
 #![deny(private_intra_doc_links)]
 
 #![deny(missing_docs)]
-#![deny(unsafe_code)]
+#![cfg_attr(not(feature = "futures"), deny(unsafe_code))]
 
 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
 
@@ -26,6 +26,9 @@ use lightning::chain;
 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
 use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider};
+use lightning::events::{Event, PathFailure};
+#[cfg(feature = "std")]
+use lightning::events::{EventHandler, EventsProvider};
 use lightning::ln::channelmanager::ChannelManager;
 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
@@ -33,7 +36,6 @@ use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
 use lightning::routing::utxo::UtxoLookup;
 use lightning::routing::router::Router;
 use lightning::routing::scoring::{Score, WriteableScore};
-use lightning::util::events::{Event, EventHandler, EventsProvider, PathFailure};
 use lightning::util::logger::Logger;
 use lightning::util::persist::Persister;
 use lightning_rapid_gossip_sync::RapidGossipSync;
@@ -50,8 +52,6 @@ use std::thread::{self, JoinHandle};
 #[cfg(feature = "std")]
 use std::time::Instant;
 
-#[cfg(feature = "futures")]
-use futures_util::{select_biased, future::FutureExt, task};
 #[cfg(not(feature = "std"))]
 use alloc::vec::Vec;
 
@@ -78,7 +78,7 @@ use alloc::vec::Vec;
 /// unilateral chain closure fees are at risk.
 ///
 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
-/// [`Event`]: lightning::util::events::Event
+/// [`Event`]: lightning::events::Event
 #[cfg(feature = "std")]
 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
 pub struct BackgroundProcessor {
@@ -162,7 +162,7 @@ where U::Target: UtxoLookup, L::Target: Logger {
        }
 }
 
-/// (C-not exported) as the bindings concretize everything and have constructors for us
+/// This is not exported to bindings users as the bindings concretize everything and have constructors for us
 impl<P: Deref<Target = P2PGossipSync<G, U, L>>, G: Deref<Target = NetworkGraph<L>>, U: Deref, L: Deref>
        GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
 where
@@ -175,7 +175,7 @@ where
        }
 }
 
-/// (C-not exported) as the bindings concretize everything and have constructors for us
+/// This is not exported to bindings users as the bindings concretize everything and have constructors for us
 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
        GossipSync<
                &P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
@@ -193,7 +193,7 @@ where
        }
 }
 
-/// (C-not exported) as the bindings concretize everything and have constructors for us
+/// This is not exported to bindings users as the bindings concretize everything and have constructors for us
 impl<'a, L: Deref>
        GossipSync<
                &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
@@ -347,9 +347,9 @@ macro_rules! define_run_body {
                                                log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
                                        }
 
-                                       last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
                                        have_pruned = true;
                                }
+                               last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
                        }
 
                        if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
@@ -382,6 +382,50 @@ macro_rules! define_run_body {
        } }
 }
 
+#[cfg(feature = "futures")]
+pub(crate) mod futures_util {
+       use core::future::Future;
+       use core::task::{Poll, Waker, RawWaker, RawWakerVTable};
+       use core::pin::Pin;
+       use core::marker::Unpin;
+       pub(crate) struct Selector<A: Future<Output=()> + Unpin, B: Future<Output=bool> + Unpin> {
+               pub a: A,
+               pub b: B,
+       }
+       pub(crate) enum SelectorOutput {
+               A, B(bool),
+       }
+
+       impl<A: Future<Output=()> + Unpin, B: Future<Output=bool> + Unpin> Future for Selector<A, B> {
+               type Output = SelectorOutput;
+               fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<SelectorOutput> {
+                       match Pin::new(&mut self.a).poll(ctx) {
+                               Poll::Ready(()) => { return Poll::Ready(SelectorOutput::A); },
+                               Poll::Pending => {},
+                       }
+                       match Pin::new(&mut self.b).poll(ctx) {
+                               Poll::Ready(res) => { return Poll::Ready(SelectorOutput::B(res)); },
+                               Poll::Pending => {},
+                       }
+                       Poll::Pending
+               }
+       }
+
+       // If we want to poll a future without an async context to figure out if it has completed or
+       // not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
+       // but sadly there's a good bit of boilerplate here.
+       fn dummy_waker_clone(_: *const ()) -> RawWaker { RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE) }
+       fn dummy_waker_action(_: *const ()) { }
+
+       const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
+               dummy_waker_clone, dummy_waker_action, dummy_waker_action, dummy_waker_action);
+       pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) } }
+}
+#[cfg(feature = "futures")]
+use futures_util::{Selector, SelectorOutput, dummy_waker};
+#[cfg(feature = "futures")]
+use core::task;
+
 /// Processes background events in a future.
 ///
 /// `sleeper` should return a future which completes in the given amount of time and returns a
@@ -468,16 +512,20 @@ where
                chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
                channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
                gossip_sync, peer_manager, logger, scorer, should_break, {
-                       select_biased! {
-                               _ = channel_manager.get_persistable_update_future().fuse() => true,
-                               exit = sleeper(Duration::from_millis(100)).fuse() => {
+                       let fut = Selector {
+                               a: channel_manager.get_persistable_update_future(),
+                               b: sleeper(Duration::from_millis(100)),
+                       };
+                       match fut.await {
+                               SelectorOutput::A => true,
+                               SelectorOutput::B(exit) => {
                                        should_break = exit;
                                        false
                                }
                        }
                }, |t| sleeper(Duration::from_secs(t)),
                |fut: &mut SleepFuture, _| {
-                       let mut waker = task::noop_waker();
+                       let mut waker = dummy_waker();
                        let mut ctx = task::Context::from_waker(&mut waker);
                        core::pin::Pin::new(fut).poll(&mut ctx).is_ready()
                })
@@ -661,7 +709,8 @@ mod tests {
        use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
        use lightning::chain::keysinterface::{InMemorySigner, KeysManager};
        use lightning::chain::transaction::OutPoint;
-       use lightning::get_event_msg;
+       use lightning::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent};
+       use lightning::{get_event_msg, get_event};
        use lightning::ln::PaymentHash;
        use lightning::ln::channelmanager;
        use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId};
@@ -672,7 +721,6 @@ mod tests {
        use lightning::routing::router::{DefaultRouter, RouteHop};
        use lightning::routing::scoring::{ChannelUsage, Score};
        use lightning::util::config::UserConfig;
-       use lightning::util::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent};
        use lightning::util::ser::Writeable;
        use lightning::util::test_utils;
        use lightning::util::persist::KVStorePersister;
@@ -953,7 +1001,7 @@ mod tests {
                        let params = ChainParameters { network, best_block };
                        let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
                        let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
-                       let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
+                       let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone()));
                        let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
                        let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), IgnoringMessageHandler{}, keys_manager.clone()));
                        let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
@@ -1010,7 +1058,10 @@ mod tests {
                ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
                        $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
                        $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
+                       get_event!($node_b, Event::ChannelPending);
+
                        $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
+                       get_event!($node_a, Event::ChannelPending);
                }}
        }
 
@@ -1367,7 +1418,6 @@ mod tests {
                        failure: PathFailure::OnPath { network_update: None },
                        path: path.clone(),
                        short_channel_id: Some(scored_scid),
-                       retry: None,
                });
                let event = receiver
                        .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
@@ -1387,7 +1437,6 @@ mod tests {
                        failure: PathFailure::OnPath { network_update: None },
                        path: path.clone(),
                        short_channel_id: None,
-                       retry: None,
                });
                let event = receiver
                        .recv_timeout(Duration::from_secs(EVENT_DEADLINE))