#![deny(private_intra_doc_links)]
#![deny(missing_docs)]
-#![deny(unsafe_code)]
+#![cfg_attr(not(feature = "futures"), deny(unsafe_code))]
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider};
+use lightning::events::{Event, PathFailure};
+#[cfg(feature = "std")]
+use lightning::events::{EventHandler, EventsProvider};
use lightning::ln::channelmanager::ChannelManager;
use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
use lightning::routing::utxo::UtxoLookup;
use lightning::routing::router::Router;
use lightning::routing::scoring::{Score, WriteableScore};
-use lightning::util::events::{Event, PathFailure};
-#[cfg(feature = "std")]
-use lightning::util::events::{EventHandler, EventsProvider};
use lightning::util::logger::Logger;
use lightning::util::persist::Persister;
+#[cfg(feature = "std")]
+use lightning::util::wakers::Sleeper;
use lightning_rapid_gossip_sync::RapidGossipSync;
use core::ops::Deref;
#[cfg(feature = "std")]
use std::time::Instant;
-#[cfg(feature = "futures")]
-use futures_util::{select_biased, future::FutureExt, task};
#[cfg(not(feature = "std"))]
use alloc::vec::Vec;
/// unilateral chain closure fees are at risk.
///
/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
-/// [`Event`]: lightning::util::events::Event
+/// [`Event`]: lightning::events::Event
#[cfg(feature = "std")]
#[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
pub struct BackgroundProcessor {
#[cfg(test)]
const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
+#[cfg(feature = "futures")]
+/// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
+const fn min_u64(a: u64, b: u64) -> u64 { if a < b { a } else { b } }
+#[cfg(feature = "futures")]
+const FASTEST_TIMER: u64 = min_u64(min_u64(FRESHNESS_TIMER, PING_TIMER),
+ min_u64(SCORER_PERSIST_TIMER, FIRST_NETWORK_PRUNE_TIMER));
+
/// Either [`P2PGossipSync`] or [`RapidGossipSync`].
pub enum GossipSync<
P: Deref<Target = P2PGossipSync<G, U, L>>,
}
}
-/// (C-not exported) as the bindings concretize everything and have constructors for us
+/// This is not exported to bindings users as the bindings concretize everything and have constructors for us
impl<P: Deref<Target = P2PGossipSync<G, U, L>>, G: Deref<Target = NetworkGraph<L>>, U: Deref, L: Deref>
GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
where
}
}
-/// (C-not exported) as the bindings concretize everything and have constructors for us
+/// This is not exported to bindings users as the bindings concretize everything and have constructors for us
impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
GossipSync<
&P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
}
}
-/// (C-not exported) as the bindings concretize everything and have constructors for us
+/// This is not exported to bindings users as the bindings concretize everything and have constructors for us
impl<'a, L: Deref>
GossipSync<
&P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
$channel_manager: ident, $process_channel_manager_events: expr,
$gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
- $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
+ $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr,
+ $check_slow_await: expr)
=> { {
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
$channel_manager.timer_tick_occurred();
// We wait up to 100ms, but track how long it takes to detect being put to sleep,
// see `await_start`'s use below.
- let mut await_start = $get_timer(1);
+ let mut await_start = None;
+ if $check_slow_await { await_start = Some($get_timer(1)); }
let updates_available = $await;
- let await_slow = $timer_elapsed(&mut await_start, 1);
+ let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false };
if updates_available {
log_trace!($logger, "Persisting ChannelManager...");
} }
}
+#[cfg(feature = "futures")]
+pub(crate) mod futures_util {
+ use core::future::Future;
+ use core::task::{Poll, Waker, RawWaker, RawWakerVTable};
+ use core::pin::Pin;
+ use core::marker::Unpin;
+ pub(crate) struct Selector<
+ A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
+ > {
+ pub a: A,
+ pub b: B,
+ pub c: C,
+ }
+ pub(crate) enum SelectorOutput {
+ A, B, C(bool),
+ }
+
+ impl<
+ A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
+ > Future for Selector<A, B, C> {
+ type Output = SelectorOutput;
+ fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<SelectorOutput> {
+ match Pin::new(&mut self.a).poll(ctx) {
+ Poll::Ready(()) => { return Poll::Ready(SelectorOutput::A); },
+ Poll::Pending => {},
+ }
+ match Pin::new(&mut self.b).poll(ctx) {
+ Poll::Ready(()) => { return Poll::Ready(SelectorOutput::B); },
+ Poll::Pending => {},
+ }
+ match Pin::new(&mut self.c).poll(ctx) {
+ Poll::Ready(res) => { return Poll::Ready(SelectorOutput::C(res)); },
+ Poll::Pending => {},
+ }
+ Poll::Pending
+ }
+ }
+
+ // If we want to poll a future without an async context to figure out if it has completed or
+ // not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
+ // but sadly there's a good bit of boilerplate here.
+ fn dummy_waker_clone(_: *const ()) -> RawWaker { RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE) }
+ fn dummy_waker_action(_: *const ()) { }
+
+ const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
+ dummy_waker_clone, dummy_waker_action, dummy_waker_action, dummy_waker_action);
+ pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) } }
+}
+#[cfg(feature = "futures")]
+use futures_util::{Selector, SelectorOutput, dummy_waker};
+#[cfg(feature = "futures")]
+use core::task;
+
/// Processes background events in a future.
///
/// `sleeper` should return a future which completes in the given amount of time and returns a
/// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
/// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
/// manually instead.
+///
+/// The `mobile_interruptable_platform` flag should be set if we're currently running on a
+/// mobile device, where we may need to check for interruption of the application regularly. If you
+/// are unsure, you should set the flag, as the performance impact of it is minimal unless there
+/// are hundreds or thousands of simultaneous process calls running.
#[cfg(feature = "futures")]
pub async fn process_events_async<
'a,
>(
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
- sleeper: Sleeper,
+ sleeper: Sleeper, mobile_interruptable_platform: bool,
) -> Result<(), lightning::io::Error>
where
UL::Target: 'static + UtxoLookup,
chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
gossip_sync, peer_manager, logger, scorer, should_break, {
- select_biased! {
- _ = channel_manager.get_persistable_update_future().fuse() => true,
- exit = sleeper(Duration::from_millis(100)).fuse() => {
+ let fut = Selector {
+ a: channel_manager.get_persistable_update_future(),
+ b: chain_monitor.get_update_future(),
+ c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }),
+ };
+ match fut.await {
+ SelectorOutput::A => true,
+ SelectorOutput::B => false,
+ SelectorOutput::C(exit) => {
should_break = exit;
false
}
}
}, |t| sleeper(Duration::from_secs(t)),
|fut: &mut SleepFuture, _| {
- let mut waker = task::noop_waker();
+ let mut waker = dummy_waker();
let mut ctx = task::Context::from_waker(&mut waker);
core::pin::Pin::new(fut).poll(&mut ctx).is_ready()
- })
+ }, mobile_interruptable_platform)
}
#[cfg(feature = "std")]
define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
channel_manager, channel_manager.process_pending_events(&event_handler),
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
- channel_manager.await_persistable_update_timeout(Duration::from_millis(100)),
- |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
+ Sleeper::from_two_futures(
+ channel_manager.get_persistable_update_future(),
+ chain_monitor.get_update_future()
+ ).wait_timeout(Duration::from_millis(100)),
+ |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false)
});
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
}
use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
use lightning::chain::keysinterface::{InMemorySigner, KeysManager};
use lightning::chain::transaction::OutPoint;
- use lightning::get_event_msg;
+ use lightning::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent};
+ use lightning::{get_event_msg, get_event};
use lightning::ln::PaymentHash;
use lightning::ln::channelmanager;
use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId};
use lightning::routing::router::{DefaultRouter, RouteHop};
use lightning::routing::scoring::{ChannelUsage, Score};
use lightning::util::config::UserConfig;
- use lightning::util::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent};
use lightning::util::ser::Writeable;
use lightning::util::test_utils;
use lightning::util::persist::KVStorePersister;
($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
$node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
$node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
+ get_event!($node_b, Event::ChannelPending);
+
$node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
+ get_event!($node_a, Event::ChannelPending);
}}
}
failure: PathFailure::OnPath { network_update: None },
path: path.clone(),
short_channel_id: Some(scored_scid),
- retry: None,
});
let event = receiver
.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
failure: PathFailure::OnPath { network_update: None },
path: path.clone(),
short_channel_id: None,
- retry: None,
});
let event = receiver
.recv_timeout(Duration::from_secs(EVENT_DEADLINE))