X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-background-processor%2Fsrc%2Flib.rs;h=541813c7e42c72f68568ec1c397fc531156d158c;hb=7ca37097274d5d2ed66e5f42e9948f71a344681f;hp=ea369382aee2a32ab82e3682c3a7fa2205fb8d74;hpb=57017dfc0b610ef6095cb69ef50bd13274d5a2c8;p=rust-lightning diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index ea369382..541813c7 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -7,7 +7,7 @@ #![deny(private_intra_doc_links)] #![deny(missing_docs)] -#![deny(unsafe_code)] +#![cfg_attr(not(feature = "futures"), deny(unsafe_code))] #![cfg_attr(docsrs, feature(doc_auto_cfg))] @@ -26,6 +26,9 @@ use lightning::chain; use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use lightning::chain::chainmonitor::{ChainMonitor, Persist}; use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider}; +use lightning::events::{Event, PathFailure}; +#[cfg(feature = "std")] +use lightning::events::{EventHandler, EventsProvider}; use lightning::ln::channelmanager::ChannelManager; use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler}; use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor}; @@ -33,7 +36,6 @@ use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::utxo::UtxoLookup; use lightning::routing::router::Router; use lightning::routing::scoring::{Score, WriteableScore}; -use lightning::util::events::{Event, EventHandler, EventsProvider, PathFailure}; use lightning::util::logger::Logger; use lightning::util::persist::Persister; use lightning_rapid_gossip_sync::RapidGossipSync; @@ -50,8 +52,6 @@ use std::thread::{self, JoinHandle}; #[cfg(feature = "std")] use std::time::Instant; -#[cfg(feature = "futures")] -use futures_util::{select_biased, future::FutureExt, task}; #[cfg(not(feature = "std"))] use alloc::vec::Vec; @@ -78,7 +78,7 @@ use alloc::vec::Vec; /// unilateral chain closure fees are at risk. /// /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor -/// [`Event`]: lightning::util::events::Event +/// [`Event`]: lightning::events::Event #[cfg(feature = "std")] #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."] pub struct BackgroundProcessor { @@ -162,7 +162,7 @@ where U::Target: UtxoLookup, L::Target: Logger { } } -/// (C-not exported) as the bindings concretize everything and have constructors for us +/// This is not exported to bindings users as the bindings concretize everything and have constructors for us impl>, G: Deref>, U: Deref, L: Deref> GossipSync, G, U, L> where @@ -175,7 +175,7 @@ where } } -/// (C-not exported) as the bindings concretize everything and have constructors for us +/// This is not exported to bindings users as the bindings concretize everything and have constructors for us impl<'a, R: Deref>, G: Deref>, L: Deref> GossipSync< &P2PGossipSync, @@ -193,7 +193,7 @@ where } } -/// (C-not exported) as the bindings concretize everything and have constructors for us +/// This is not exported to bindings users as the bindings concretize everything and have constructors for us impl<'a, L: Deref> GossipSync< &P2PGossipSync<&'a NetworkGraph, &'a (dyn UtxoLookup + Send + Sync), L>, @@ -347,9 +347,9 @@ macro_rules! define_run_body { log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e) } - last_prune_call = $get_timer(NETWORK_PRUNE_TIMER); have_pruned = true; } + last_prune_call = $get_timer(NETWORK_PRUNE_TIMER); } if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) { @@ -382,6 +382,50 @@ macro_rules! define_run_body { } } } +#[cfg(feature = "futures")] +pub(crate) mod futures_util { + use core::future::Future; + use core::task::{Poll, Waker, RawWaker, RawWakerVTable}; + use core::pin::Pin; + use core::marker::Unpin; + pub(crate) struct Selector + Unpin, B: Future + Unpin> { + pub a: A, + pub b: B, + } + pub(crate) enum SelectorOutput { + A, B(bool), + } + + impl + Unpin, B: Future + Unpin> Future for Selector { + type Output = SelectorOutput; + fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll { + match Pin::new(&mut self.a).poll(ctx) { + Poll::Ready(()) => { return Poll::Ready(SelectorOutput::A); }, + Poll::Pending => {}, + } + match Pin::new(&mut self.b).poll(ctx) { + Poll::Ready(res) => { return Poll::Ready(SelectorOutput::B(res)); }, + Poll::Pending => {}, + } + Poll::Pending + } + } + + // If we want to poll a future without an async context to figure out if it has completed or + // not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values + // but sadly there's a good bit of boilerplate here. + fn dummy_waker_clone(_: *const ()) -> RawWaker { RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE) } + fn dummy_waker_action(_: *const ()) { } + + const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( + dummy_waker_clone, dummy_waker_action, dummy_waker_action, dummy_waker_action); + pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) } } +} +#[cfg(feature = "futures")] +use futures_util::{Selector, SelectorOutput, dummy_waker}; +#[cfg(feature = "futures")] +use core::task; + /// Processes background events in a future. /// /// `sleeper` should return a future which completes in the given amount of time and returns a @@ -468,16 +512,20 @@ where chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await, channel_manager, channel_manager.process_pending_events_async(async_event_handler).await, gossip_sync, peer_manager, logger, scorer, should_break, { - select_biased! { - _ = channel_manager.get_persistable_update_future().fuse() => true, - exit = sleeper(Duration::from_millis(100)).fuse() => { + let fut = Selector { + a: channel_manager.get_persistable_update_future(), + b: sleeper(Duration::from_millis(100)), + }; + match fut.await { + SelectorOutput::A => true, + SelectorOutput::B(exit) => { should_break = exit; false } } }, |t| sleeper(Duration::from_secs(t)), |fut: &mut SleepFuture, _| { - let mut waker = task::noop_waker(); + let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); core::pin::Pin::new(fut).poll(&mut ctx).is_ready() }) @@ -661,7 +709,8 @@ mod tests { use lightning::chain::channelmonitor::ANTI_REORG_DELAY; use lightning::chain::keysinterface::{InMemorySigner, KeysManager}; use lightning::chain::transaction::OutPoint; - use lightning::get_event_msg; + use lightning::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent}; + use lightning::{get_event_msg, get_event}; use lightning::ln::PaymentHash; use lightning::ln::channelmanager; use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId}; @@ -672,7 +721,6 @@ mod tests { use lightning::routing::router::{DefaultRouter, RouteHop}; use lightning::routing::scoring::{ChannelUsage, Score}; use lightning::util::config::UserConfig; - use lightning::util::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent}; use lightning::util::ser::Writeable; use lightning::util::test_utils; use lightning::util::persist::KVStorePersister; @@ -953,7 +1001,7 @@ mod tests { let params = ChainParameters { network, best_block }; let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params)); let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone())); - let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone())); + let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone())); let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}}; let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), IgnoringMessageHandler{}, keys_manager.clone())); let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer }; @@ -1010,7 +1058,10 @@ mod tests { ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{ $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap(); $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id())); + get_event!($node_b, Event::ChannelPending); + $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id())); + get_event!($node_a, Event::ChannelPending); }} } @@ -1367,7 +1418,6 @@ mod tests { failure: PathFailure::OnPath { network_update: None }, path: path.clone(), short_channel_id: Some(scored_scid), - retry: None, }); let event = receiver .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) @@ -1387,7 +1437,6 @@ mod tests { failure: PathFailure::OnPath { network_update: None }, path: path.clone(), short_channel_id: None, - retry: None, }); let event = receiver .recv_timeout(Duration::from_secs(EVENT_DEADLINE))