use std::ops::Deref;
#[cfg(feature = "futures")]
-use futures_util::{select_biased, future::FutureExt};
+use futures_util::{select_biased, future::FutureExt, task};
/// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
/// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
$channel_manager: ident, $process_channel_manager_events: expr,
$gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
- $loop_exit_check: expr, $await: expr)
+ $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
=> { {
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
$channel_manager.timer_tick_occurred();
- let mut last_freshness_call = Instant::now();
- let mut last_ping_call = Instant::now();
- let mut last_prune_call = Instant::now();
- let mut last_scorer_persist_call = Instant::now();
+ let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
+ let mut last_ping_call = $get_timer(PING_TIMER);
+ let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
+ let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
let mut have_pruned = false;
loop {
// We wait up to 100ms, but track how long it takes to detect being put to sleep,
// see `await_start`'s use below.
- let await_start = Instant::now();
+ let mut await_start = $get_timer(1);
let updates_available = $await;
- let await_time = await_start.elapsed();
+ let await_slow = $timer_elapsed(&mut await_start, 1);
if updates_available {
log_trace!($logger, "Persisting ChannelManager...");
log_trace!($logger, "Terminating background processor.");
break;
}
- if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
+ if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
$channel_manager.timer_tick_occurred();
- last_freshness_call = Instant::now();
+ last_freshness_call = $get_timer(FRESHNESS_TIMER);
}
- if await_time > Duration::from_secs(1) {
+ if await_slow {
// On various platforms, we may be starved of CPU cycles for several reasons.
// E.g. on iOS, if we've been in the background, we will be entirely paused.
// Similarly, if we're on a desktop platform and the device has been asleep, we
// peers.
log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
$peer_manager.disconnect_all_peers();
- last_ping_call = Instant::now();
- } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
+ last_ping_call = $get_timer(PING_TIMER);
+ } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
$peer_manager.timer_tick_occurred();
- last_ping_call = Instant::now();
+ last_ping_call = $get_timer(PING_TIMER);
}
// Note that we want to run a graph prune once not long after startup before
// falling back to our usual hourly prunes. This avoids short-lived clients never
// pruning their network graph. We run once 60 seconds after startup before
// continuing our normal cadence.
- if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
+ if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
// The network graph must not be pruned while rapid sync completion is pending
if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
log_trace!($logger, "Pruning and persisting network graph.");
log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
}
- last_prune_call = Instant::now();
+ last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
have_pruned = true;
}
}
- if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
+ if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
if let Some(ref scorer) = $scorer {
log_trace!($logger, "Persisting scorer");
if let Err(e) = $persister.persist_scorer(&scorer) {
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
}
}
- last_scorer_persist_call = Instant::now();
+ last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
}
}
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
S: 'static + Deref<Target = SC> + Send + Sync,
SC: WriteableScore<'a>,
- SleepFuture: core::future::Future<Output = bool>,
+ SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
Sleeper: Fn(Duration) -> SleepFuture
>(
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
false
}
}
+ }, |t| sleeper(Duration::from_secs(t)),
+ |fut: &mut SleepFuture, _| {
+ let mut waker = task::noop_waker();
+ let mut ctx = task::Context::from_waker(&mut waker);
+ core::pin::Pin::new(fut).poll(&mut ctx).is_ready()
})
}
define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
channel_manager, channel_manager.process_pending_events(&event_handler),
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
- channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
+ channel_manager.await_persistable_update_timeout(Duration::from_millis(100)),
+ |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
});
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
}
use lightning::chain::keysinterface::{InMemorySigner, Recipient, EntropySource, KeysManager, NodeSigner};
use lightning::chain::transaction::OutPoint;
use lightning::get_event_msg;
- use lightning::ln::channelmanager::{self, BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
+ use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
use lightning::ln::features::ChannelFeatures;
use lightning::ln::msgs::{ChannelMessageHandler, Init};
use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
for i in 0..num_nodes {
for j in (i+1)..num_nodes {
- nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
- nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
+ nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }).unwrap();
+ nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }).unwrap();
}
}
macro_rules! begin_open_channel {
($node_a: expr, $node_b: expr, $channel_value: expr) => {{
$node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
- $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
- $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
+ $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), $node_a.node.init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
+ $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), $node_b.node.init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
}}
}