use lightning::routing::scoring::{Score, WriteableScore};
use lightning::util::logger::Logger;
use lightning::util::persist::Persister;
+#[cfg(feature = "std")]
+use lightning::util::wakers::Sleeper;
use lightning_rapid_gossip_sync::RapidGossipSync;
use core::ops::Deref;
#[cfg(test)]
const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
+#[cfg(feature = "futures")]
+/// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
+const fn min_u64(a: u64, b: u64) -> u64 { if a < b { a } else { b } }
+#[cfg(feature = "futures")]
+const FASTEST_TIMER: u64 = min_u64(min_u64(FRESHNESS_TIMER, PING_TIMER),
+ min_u64(SCORER_PERSIST_TIMER, FIRST_NETWORK_PRUNE_TIMER));
+
/// Either [`P2PGossipSync`] or [`RapidGossipSync`].
pub enum GossipSync<
P: Deref<Target = P2PGossipSync<G, U, L>>,
($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
$channel_manager: ident, $process_channel_manager_events: expr,
$gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
- $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
+ $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr,
+ $check_slow_await: expr)
=> { {
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
$channel_manager.timer_tick_occurred();
// We wait up to 100ms, but track how long it takes to detect being put to sleep,
// see `await_start`'s use below.
- let mut await_start = $get_timer(1);
+ let mut await_start = None;
+ if $check_slow_await { await_start = Some($get_timer(1)); }
let updates_available = $await;
- let await_slow = $timer_elapsed(&mut await_start, 1);
+ let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false };
if updates_available {
log_trace!($logger, "Persisting ChannelManager...");
use core::task::{Poll, Waker, RawWaker, RawWakerVTable};
use core::pin::Pin;
use core::marker::Unpin;
- pub(crate) struct Selector<A: Future<Output=()> + Unpin, B: Future<Output=bool> + Unpin> {
+ pub(crate) struct Selector<
+ A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
+ > {
pub a: A,
pub b: B,
+ pub c: C,
}
pub(crate) enum SelectorOutput {
- A, B(bool),
+ A, B, C(bool),
}
- impl<A: Future<Output=()> + Unpin, B: Future<Output=bool> + Unpin> Future for Selector<A, B> {
+ impl<
+ A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
+ > Future for Selector<A, B, C> {
type Output = SelectorOutput;
fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<SelectorOutput> {
match Pin::new(&mut self.a).poll(ctx) {
Poll::Pending => {},
}
match Pin::new(&mut self.b).poll(ctx) {
- Poll::Ready(res) => { return Poll::Ready(SelectorOutput::B(res)); },
+ Poll::Ready(()) => { return Poll::Ready(SelectorOutput::B); },
+ Poll::Pending => {},
+ }
+ match Pin::new(&mut self.c).poll(ctx) {
+ Poll::Ready(res) => { return Poll::Ready(SelectorOutput::C(res)); },
Poll::Pending => {},
}
Poll::Pending
/// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
/// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
/// manually instead.
+///
+/// The `mobile_interruptable_platform` flag should be set if we're currently running on a
+/// mobile device, where we may need to check for interruption of the application regularly. If you
+/// are unsure, you should set the flag, as the performance impact of it is minimal unless there
+/// are hundreds or thousands of simultaneous process calls running.
#[cfg(feature = "futures")]
pub async fn process_events_async<
'a,
>(
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
- sleeper: Sleeper,
+ sleeper: Sleeper, mobile_interruptable_platform: bool,
) -> Result<(), lightning::io::Error>
where
UL::Target: 'static + UtxoLookup,
gossip_sync, peer_manager, logger, scorer, should_break, {
let fut = Selector {
a: channel_manager.get_persistable_update_future(),
- b: sleeper(Duration::from_millis(100)),
+ b: chain_monitor.get_update_future(),
+ c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }),
};
match fut.await {
SelectorOutput::A => true,
- SelectorOutput::B(exit) => {
+ SelectorOutput::B => false,
+ SelectorOutput::C(exit) => {
should_break = exit;
false
}
let mut waker = dummy_waker();
let mut ctx = task::Context::from_waker(&mut waker);
core::pin::Pin::new(fut).poll(&mut ctx).is_ready()
- })
+ }, mobile_interruptable_platform)
}
#[cfg(feature = "std")]
define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
channel_manager, channel_manager.process_pending_events(&event_handler),
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
- channel_manager.await_persistable_update_timeout(Duration::from_millis(100)),
- |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
+ Sleeper::from_two_futures(
+ channel_manager.get_persistable_update_future(),
+ chain_monitor.get_update_future()
+ ).wait_timeout(Duration::from_millis(100)),
+ |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false)
});
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
}