X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-background-processor%2Fsrc%2Flib.rs;h=c30e5ad667652cba466e8243b5558e6d42115b3a;hb=fcf1282975fe8627efdd7657ba3007640daedce8;hp=f64f1e7651effd4f0ddf229c23b89ee40ea3bb7c;hpb=dd11cae6408ba01f7836507ba5c123b93cd0328a;p=rust-lightning diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index f64f1e76..c30e5ad6 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -7,7 +7,7 @@ #![deny(private_intra_doc_links)] #![deny(missing_docs)] -#![deny(unsafe_code)] +#![cfg_attr(not(feature = "futures"), deny(unsafe_code))] #![cfg_attr(docsrs, feature(doc_auto_cfg))] @@ -26,6 +26,9 @@ use lightning::chain; use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; use lightning::chain::chainmonitor::{ChainMonitor, Persist}; use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider}; +use lightning::events::{Event, PathFailure}; +#[cfg(feature = "std")] +use lightning::events::{EventHandler, EventsProvider}; use lightning::ln::channelmanager::ChannelManager; use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler}; use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor}; @@ -33,11 +36,10 @@ use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::utxo::UtxoLookup; use lightning::routing::router::Router; use lightning::routing::scoring::{Score, WriteableScore}; -use lightning::util::events::{Event, PathFailure}; -#[cfg(feature = "std")] -use lightning::util::events::{EventHandler, EventsProvider}; use lightning::util::logger::Logger; use lightning::util::persist::Persister; +#[cfg(feature = "std")] +use lightning::util::wakers::Sleeper; use lightning_rapid_gossip_sync::RapidGossipSync; use core::ops::Deref; @@ -52,8 +54,6 @@ use std::thread::{self, JoinHandle}; #[cfg(feature = "std")] use std::time::Instant; -#[cfg(feature = "futures")] -use futures_util::{select_biased, future::FutureExt, task}; #[cfg(not(feature = "std"))] use alloc::vec::Vec; @@ -80,7 +80,7 @@ use alloc::vec::Vec; /// unilateral chain closure fees are at risk. /// /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor -/// [`Event`]: lightning::util::events::Event +/// [`Event`]: lightning::events::Event #[cfg(feature = "std")] #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."] pub struct BackgroundProcessor { @@ -116,6 +116,13 @@ const FIRST_NETWORK_PRUNE_TIMER: u64 = 60; #[cfg(test)] const FIRST_NETWORK_PRUNE_TIMER: u64 = 1; +#[cfg(feature = "futures")] +/// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement +const fn min_u64(a: u64, b: u64) -> u64 { if a < b { a } else { b } } +#[cfg(feature = "futures")] +const FASTEST_TIMER: u64 = min_u64(min_u64(FRESHNESS_TIMER, PING_TIMER), + min_u64(SCORER_PERSIST_TIMER, FIRST_NETWORK_PRUNE_TIMER)); + /// Either [`P2PGossipSync`] or [`RapidGossipSync`]. pub enum GossipSync< P: Deref>, @@ -164,7 +171,7 @@ where U::Target: UtxoLookup, L::Target: Logger { } } -/// (C-not exported) as the bindings concretize everything and have constructors for us +/// This is not exported to bindings users as the bindings concretize everything and have constructors for us impl>, G: Deref>, U: Deref, L: Deref> GossipSync, G, U, L> where @@ -177,7 +184,7 @@ where } } -/// (C-not exported) as the bindings concretize everything and have constructors for us +/// This is not exported to bindings users as the bindings concretize everything and have constructors for us impl<'a, R: Deref>, G: Deref>, L: Deref> GossipSync< &P2PGossipSync, @@ -195,7 +202,7 @@ where } } -/// (C-not exported) as the bindings concretize everything and have constructors for us +/// This is not exported to bindings users as the bindings concretize everything and have constructors for us impl<'a, L: Deref> GossipSync< &P2PGossipSync<&'a NetworkGraph, &'a (dyn UtxoLookup + Send + Sync), L>, @@ -258,7 +265,8 @@ macro_rules! define_run_body { ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, $channel_manager: ident, $process_channel_manager_events: expr, $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident, - $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr) + $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr, + $check_slow_await: expr) => { { log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); $channel_manager.timer_tick_occurred(); @@ -288,9 +296,10 @@ macro_rules! define_run_body { // We wait up to 100ms, but track how long it takes to detect being put to sleep, // see `await_start`'s use below. - let mut await_start = $get_timer(1); + let mut await_start = None; + if $check_slow_await { await_start = Some($get_timer(1)); } let updates_available = $await; - let await_slow = $timer_elapsed(&mut await_start, 1); + let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false }; if updates_available { log_trace!($logger, "Persisting ChannelManager..."); @@ -384,6 +393,59 @@ macro_rules! define_run_body { } } } +#[cfg(feature = "futures")] +pub(crate) mod futures_util { + use core::future::Future; + use core::task::{Poll, Waker, RawWaker, RawWakerVTable}; + use core::pin::Pin; + use core::marker::Unpin; + pub(crate) struct Selector< + A: Future + Unpin, B: Future + Unpin, C: Future + Unpin + > { + pub a: A, + pub b: B, + pub c: C, + } + pub(crate) enum SelectorOutput { + A, B, C(bool), + } + + impl< + A: Future + Unpin, B: Future + Unpin, C: Future + Unpin + > Future for Selector { + type Output = SelectorOutput; + fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll { + match Pin::new(&mut self.a).poll(ctx) { + Poll::Ready(()) => { return Poll::Ready(SelectorOutput::A); }, + Poll::Pending => {}, + } + match Pin::new(&mut self.b).poll(ctx) { + Poll::Ready(()) => { return Poll::Ready(SelectorOutput::B); }, + Poll::Pending => {}, + } + match Pin::new(&mut self.c).poll(ctx) { + Poll::Ready(res) => { return Poll::Ready(SelectorOutput::C(res)); }, + Poll::Pending => {}, + } + Poll::Pending + } + } + + // If we want to poll a future without an async context to figure out if it has completed or + // not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values + // but sadly there's a good bit of boilerplate here. + fn dummy_waker_clone(_: *const ()) -> RawWaker { RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE) } + fn dummy_waker_action(_: *const ()) { } + + const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new( + dummy_waker_clone, dummy_waker_action, dummy_waker_action, dummy_waker_action); + pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) } } +} +#[cfg(feature = "futures")] +use futures_util::{Selector, SelectorOutput, dummy_waker}; +#[cfg(feature = "futures")] +use core::task; + /// Processes background events in a future. /// /// `sleeper` should return a future which completes in the given amount of time and returns a @@ -396,6 +458,11 @@ macro_rules! define_run_body { /// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`], /// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly /// manually instead. +/// +/// The `mobile_interruptable_platform` flag should be set if we're currently running on a +/// mobile device, where we may need to check for interruption of the application regularly. If you +/// are unsure, you should set the flag, as the performance impact of it is minimal unless there +/// are hundreds or thousands of simultaneous process calls running. #[cfg(feature = "futures")] pub async fn process_events_async< 'a, @@ -431,7 +498,7 @@ pub async fn process_events_async< >( persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM, gossip_sync: GossipSync, peer_manager: PM, logger: L, scorer: Option, - sleeper: Sleeper, + sleeper: Sleeper, mobile_interruptable_platform: bool, ) -> Result<(), lightning::io::Error> where UL::Target: 'static + UtxoLookup, @@ -470,19 +537,25 @@ where chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await, channel_manager, channel_manager.process_pending_events_async(async_event_handler).await, gossip_sync, peer_manager, logger, scorer, should_break, { - select_biased! { - _ = channel_manager.get_persistable_update_future().fuse() => true, - exit = sleeper(Duration::from_millis(100)).fuse() => { + let fut = Selector { + a: channel_manager.get_persistable_update_future(), + b: chain_monitor.get_update_future(), + c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }), + }; + match fut.await { + SelectorOutput::A => true, + SelectorOutput::B => false, + SelectorOutput::C(exit) => { should_break = exit; false } } }, |t| sleeper(Duration::from_secs(t)), |fut: &mut SleepFuture, _| { - let mut waker = task::noop_waker(); + let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); core::pin::Pin::new(fut).poll(&mut ctx).is_ready() - }) + }, mobile_interruptable_platform) } #[cfg(feature = "std")] @@ -597,8 +670,11 @@ impl BackgroundProcessor { define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), channel_manager, channel_manager.process_pending_events(&event_handler), gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire), - channel_manager.await_persistable_update_timeout(Duration::from_millis(100)), - |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur) + Sleeper::from_two_futures( + channel_manager.get_persistable_update_future(), + chain_monitor.get_update_future() + ).wait_timeout(Duration::from_millis(100)), + |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false) }); Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } } @@ -663,7 +739,8 @@ mod tests { use lightning::chain::channelmonitor::ANTI_REORG_DELAY; use lightning::chain::keysinterface::{InMemorySigner, KeysManager}; use lightning::chain::transaction::OutPoint; - use lightning::get_event_msg; + use lightning::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent}; + use lightning::{get_event_msg, get_event}; use lightning::ln::PaymentHash; use lightning::ln::channelmanager; use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId}; @@ -674,7 +751,6 @@ mod tests { use lightning::routing::router::{DefaultRouter, RouteHop}; use lightning::routing::scoring::{ChannelUsage, Score}; use lightning::util::config::UserConfig; - use lightning::util::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent}; use lightning::util::ser::Writeable; use lightning::util::test_utils; use lightning::util::persist::KVStorePersister; @@ -1012,7 +1088,10 @@ mod tests { ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{ $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap(); $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id())); + get_event!($node_b, Event::ChannelPending); + $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id())); + get_event!($node_a, Event::ChannelPending); }} } @@ -1107,7 +1186,9 @@ mod tests { let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string()); check_persisted_data!(nodes[0].scorer, filepath.clone()); - assert!(bg_processor.stop().is_ok()); + if !std::thread::panicking() { + bg_processor.stop().unwrap(); + } } #[test] @@ -1129,7 +1210,9 @@ mod tests { } } - assert!(bg_processor.stop().is_ok()); + if !std::thread::panicking() { + bg_processor.stop().unwrap(); + } } #[test] @@ -1221,7 +1304,9 @@ mod tests { nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding); let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id()); - assert!(bg_processor.stop().is_ok()); + if !std::thread::panicking() { + bg_processor.stop().unwrap(); + } // Set up a background event handler for SpendableOutputs events. let (sender, receiver) = std::sync::mpsc::sync_channel(1); @@ -1247,7 +1332,9 @@ mod tests { _ => panic!("Unexpected event: {:?}", event), } - assert!(bg_processor.stop().is_ok()); + if !std::thread::panicking() { + bg_processor.stop().unwrap(); + } } #[test] @@ -1266,7 +1353,9 @@ mod tests { } } - assert!(bg_processor.stop().is_ok()); + if !std::thread::panicking() { + bg_processor.stop().unwrap(); + } } #[test] @@ -1369,7 +1458,6 @@ mod tests { failure: PathFailure::OnPath { network_update: None }, path: path.clone(), short_channel_id: Some(scored_scid), - retry: None, }); let event = receiver .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) @@ -1389,7 +1477,6 @@ mod tests { failure: PathFailure::OnPath { network_update: None }, path: path.clone(), short_channel_id: None, - retry: None, }); let event = receiver .recv_timeout(Duration::from_secs(EVENT_DEADLINE)) @@ -1442,6 +1529,8 @@ mod tests { _ => panic!("Unexpected event"), } - assert!(bg_processor.stop().is_ok()); + if !std::thread::panicking() { + bg_processor.stop().unwrap(); + } } }