X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-background-processor%2Fsrc%2Flib.rs;h=c30e5ad667652cba466e8243b5558e6d42115b3a;hb=fcf1282975fe8627efdd7657ba3007640daedce8;hp=884a7c2266420eb17f568f5da4cb65d513f6cae1;hpb=0e28bcb704446a376f0c9be449de03fbb22a431f;p=rust-lightning diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 884a7c22..c30e5ad6 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -38,6 +38,8 @@ use lightning::routing::router::Router; use lightning::routing::scoring::{Score, WriteableScore}; use lightning::util::logger::Logger; use lightning::util::persist::Persister; +#[cfg(feature = "std")] +use lightning::util::wakers::Sleeper; use lightning_rapid_gossip_sync::RapidGossipSync; use core::ops::Deref; @@ -114,6 +116,13 @@ const FIRST_NETWORK_PRUNE_TIMER: u64 = 60; #[cfg(test)] const FIRST_NETWORK_PRUNE_TIMER: u64 = 1; +#[cfg(feature = "futures")] +/// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement +const fn min_u64(a: u64, b: u64) -> u64 { if a < b { a } else { b } } +#[cfg(feature = "futures")] +const FASTEST_TIMER: u64 = min_u64(min_u64(FRESHNESS_TIMER, PING_TIMER), + min_u64(SCORER_PERSIST_TIMER, FIRST_NETWORK_PRUNE_TIMER)); + /// Either [`P2PGossipSync`] or [`RapidGossipSync`]. pub enum GossipSync< P: Deref>, @@ -256,7 +265,8 @@ macro_rules! define_run_body { ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr, $channel_manager: ident, $process_channel_manager_events: expr, $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident, - $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr) + $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr, + $check_slow_await: expr) => { { log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); $channel_manager.timer_tick_occurred(); @@ -286,9 +296,10 @@ macro_rules! define_run_body { // We wait up to 100ms, but track how long it takes to detect being put to sleep, // see `await_start`'s use below. - let mut await_start = $get_timer(1); + let mut await_start = None; + if $check_slow_await { await_start = Some($get_timer(1)); } let updates_available = $await; - let await_slow = $timer_elapsed(&mut await_start, 1); + let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false }; if updates_available { log_trace!($logger, "Persisting ChannelManager..."); @@ -388,15 +399,20 @@ pub(crate) mod futures_util { use core::task::{Poll, Waker, RawWaker, RawWakerVTable}; use core::pin::Pin; use core::marker::Unpin; - pub(crate) struct Selector + Unpin, B: Future + Unpin> { + pub(crate) struct Selector< + A: Future + Unpin, B: Future + Unpin, C: Future + Unpin + > { pub a: A, pub b: B, + pub c: C, } pub(crate) enum SelectorOutput { - A, B(bool), + A, B, C(bool), } - impl + Unpin, B: Future + Unpin> Future for Selector { + impl< + A: Future + Unpin, B: Future + Unpin, C: Future + Unpin + > Future for Selector { type Output = SelectorOutput; fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll { match Pin::new(&mut self.a).poll(ctx) { @@ -404,7 +420,11 @@ pub(crate) mod futures_util { Poll::Pending => {}, } match Pin::new(&mut self.b).poll(ctx) { - Poll::Ready(res) => { return Poll::Ready(SelectorOutput::B(res)); }, + Poll::Ready(()) => { return Poll::Ready(SelectorOutput::B); }, + Poll::Pending => {}, + } + match Pin::new(&mut self.c).poll(ctx) { + Poll::Ready(res) => { return Poll::Ready(SelectorOutput::C(res)); }, Poll::Pending => {}, } Poll::Pending @@ -438,6 +458,11 @@ use core::task; /// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`], /// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly /// manually instead. +/// +/// The `mobile_interruptable_platform` flag should be set if we're currently running on a +/// mobile device, where we may need to check for interruption of the application regularly. If you +/// are unsure, you should set the flag, as the performance impact of it is minimal unless there +/// are hundreds or thousands of simultaneous process calls running. #[cfg(feature = "futures")] pub async fn process_events_async< 'a, @@ -473,7 +498,7 @@ pub async fn process_events_async< >( persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM, gossip_sync: GossipSync, peer_manager: PM, logger: L, scorer: Option, - sleeper: Sleeper, + sleeper: Sleeper, mobile_interruptable_platform: bool, ) -> Result<(), lightning::io::Error> where UL::Target: 'static + UtxoLookup, @@ -514,11 +539,13 @@ where gossip_sync, peer_manager, logger, scorer, should_break, { let fut = Selector { a: channel_manager.get_persistable_update_future(), - b: sleeper(Duration::from_millis(100)), + b: chain_monitor.get_update_future(), + c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }), }; match fut.await { SelectorOutput::A => true, - SelectorOutput::B(exit) => { + SelectorOutput::B => false, + SelectorOutput::C(exit) => { should_break = exit; false } @@ -528,7 +555,7 @@ where let mut waker = dummy_waker(); let mut ctx = task::Context::from_waker(&mut waker); core::pin::Pin::new(fut).poll(&mut ctx).is_ready() - }) + }, mobile_interruptable_platform) } #[cfg(feature = "std")] @@ -643,8 +670,11 @@ impl BackgroundProcessor { define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler), channel_manager, channel_manager.process_pending_events(&event_handler), gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire), - channel_manager.await_persistable_update_timeout(Duration::from_millis(100)), - |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur) + Sleeper::from_two_futures( + channel_manager.get_persistable_update_future(), + chain_monitor.get_update_future() + ).wait_timeout(Duration::from_millis(100)), + |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false) }); Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } } @@ -710,7 +740,7 @@ mod tests { use lightning::chain::keysinterface::{InMemorySigner, KeysManager}; use lightning::chain::transaction::OutPoint; use lightning::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent}; - use lightning::get_event_msg; + use lightning::{get_event_msg, get_event}; use lightning::ln::PaymentHash; use lightning::ln::channelmanager; use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId}; @@ -1058,7 +1088,10 @@ mod tests { ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{ $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap(); $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id())); + get_event!($node_b, Event::ChannelPending); + $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id())); + get_event!($node_a, Event::ChannelPending); }} } @@ -1153,7 +1186,9 @@ mod tests { let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string()); check_persisted_data!(nodes[0].scorer, filepath.clone()); - assert!(bg_processor.stop().is_ok()); + if !std::thread::panicking() { + bg_processor.stop().unwrap(); + } } #[test] @@ -1175,7 +1210,9 @@ mod tests { } } - assert!(bg_processor.stop().is_ok()); + if !std::thread::panicking() { + bg_processor.stop().unwrap(); + } } #[test] @@ -1267,7 +1304,9 @@ mod tests { nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding); let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id()); - assert!(bg_processor.stop().is_ok()); + if !std::thread::panicking() { + bg_processor.stop().unwrap(); + } // Set up a background event handler for SpendableOutputs events. let (sender, receiver) = std::sync::mpsc::sync_channel(1); @@ -1293,7 +1332,9 @@ mod tests { _ => panic!("Unexpected event: {:?}", event), } - assert!(bg_processor.stop().is_ok()); + if !std::thread::panicking() { + bg_processor.stop().unwrap(); + } } #[test] @@ -1312,7 +1353,9 @@ mod tests { } } - assert!(bg_processor.stop().is_ok()); + if !std::thread::panicking() { + bg_processor.stop().unwrap(); + } } #[test] @@ -1486,6 +1529,8 @@ mod tests { _ => panic!("Unexpected event"), } - assert!(bg_processor.stop().is_ok()); + if !std::thread::panicking() { + bg_processor.stop().unwrap(); + } } }