const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
#[cfg(not(test))]
-const SCORER_PERSIST_TIMER: u64 = 60 * 60;
+const SCORER_PERSIST_TIMER: u64 = 60 * 5;
#[cfg(test)]
const SCORER_PERSIST_TIMER: u64 = 1;
/// Updates scorer based on event and returns whether an update occurred so we can decide whether
/// to persist.
fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
- scorer: &'a S, event: &Event
+ scorer: &'a S, event: &Event, duration_since_epoch: Duration,
) -> bool {
match event {
Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
let mut score = scorer.write_lock();
- score.payment_path_failed(path, *scid);
+ score.payment_path_failed(path, *scid, duration_since_epoch);
},
Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
// Reached if the destination explicitly failed it back. We treat this as a successful probe
// because the payment made it all the way to the destination with sufficient liquidity.
let mut score = scorer.write_lock();
- score.probe_successful(path);
+ score.probe_successful(path, duration_since_epoch);
},
Event::PaymentPathSuccessful { path, .. } => {
let mut score = scorer.write_lock();
- score.payment_path_successful(path);
+ score.payment_path_successful(path, duration_since_epoch);
},
Event::ProbeSuccessful { path, .. } => {
let mut score = scorer.write_lock();
- score.probe_successful(path);
+ score.probe_successful(path, duration_since_epoch);
},
Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
let mut score = scorer.write_lock();
- score.probe_failed(path, *scid);
+ score.probe_failed(path, *scid, duration_since_epoch);
},
_ => return false,
}
$channel_manager: ident, $process_channel_manager_events: expr,
$peer_manager: ident, $process_onion_message_handler_events: expr, $gossip_sync: ident,
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
- $timer_elapsed: expr, $check_slow_await: expr
+ $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
) => { {
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
$channel_manager.timer_tick_occurred();
let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
let mut have_pruned = false;
+ let mut have_decayed_scorer = false;
loop {
$process_channel_manager_events;
if should_prune {
// The network graph must not be pruned while rapid sync completion is pending
if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
- #[cfg(feature = "std")] {
+ if let Some(duration_since_epoch) = $time_fetch() {
log_trace!($logger, "Pruning and persisting network graph.");
- network_graph.remove_stale_channels_and_tracking();
- }
- #[cfg(not(feature = "std"))] {
+ network_graph.remove_stale_channels_and_tracking_with_time(duration_since_epoch.as_secs());
+ } else {
log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
log_trace!($logger, "Persisting network graph.");
}
last_prune_call = $get_timer(prune_timer);
}
+ if !have_decayed_scorer {
+ if let Some(ref scorer) = $scorer {
+ if let Some(duration_since_epoch) = $time_fetch() {
+ log_trace!($logger, "Calling time_passed on scorer at startup");
+ scorer.write_lock().time_passed(duration_since_epoch);
+ }
+ }
+ have_decayed_scorer = true;
+ }
+
if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
if let Some(ref scorer) = $scorer {
- log_trace!($logger, "Persisting scorer");
+ if let Some(duration_since_epoch) = $time_fetch() {
+ log_trace!($logger, "Calling time_passed and persisting scorer");
+ scorer.write_lock().time_passed(duration_since_epoch);
+ } else {
+ log_trace!($logger, "Persisting scorer");
+ }
if let Err(e) = $persister.persist_scorer(&scorer) {
log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
}
/// are unsure, you should set the flag, as the performance impact of it is minimal unless there
/// are hundreds or thousands of simultaneous process calls running.
///
+/// The `fetch_time` parameter should return the current wall clock time, if one is available. If
+/// no time is available, some features may be disabled, however the node will still operate fine.
+///
/// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
/// could setup `process_events_async` like this:
/// ```
/// # use lightning::io;
/// # use std::sync::{Arc, RwLock};
/// # use std::sync::atomic::{AtomicBool, Ordering};
+/// # use std::time::SystemTime;
/// # use lightning_background_processor::{process_events_async, GossipSync};
/// # struct MyStore {}
/// # impl lightning::util::persist::KVStore for MyStore {
/// Some(background_scorer),
/// sleeper,
/// mobile_interruptable_platform,
+/// || Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap())
/// )
/// .await
/// .expect("Failed to process events");
S: 'static + Deref<Target = SC> + Send + Sync,
SC: for<'b> WriteableScore<'b>,
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
- Sleeper: Fn(Duration) -> SleepFuture
+ Sleeper: Fn(Duration) -> SleepFuture,
+ FetchTime: Fn() -> Option<Duration>,
>(
persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
- sleeper: Sleeper, mobile_interruptable_platform: bool,
+ sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
) -> Result<(), lightning::io::Error>
where
UL::Target: 'static + UtxoLookup,
let scorer = &scorer;
let logger = &logger;
let persister = &persister;
+ let fetch_time = &fetch_time;
async move {
if let Some(network_graph) = network_graph {
handle_network_graph_update(network_graph, &event)
}
if let Some(ref scorer) = scorer {
- if update_scorer(scorer, &event) {
- log_trace!(logger, "Persisting scorer after update");
- if let Err(e) = persister.persist_scorer(&scorer) {
- log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
+ if let Some(duration_since_epoch) = fetch_time() {
+ if update_scorer(scorer, &event, duration_since_epoch) {
+ log_trace!(logger, "Persisting scorer after update");
+ if let Err(e) = persister.persist_scorer(&scorer) {
+ log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
+ }
}
}
}
task::Poll::Ready(exit) => { should_break = exit; true },
task::Poll::Pending => false,
}
- }, mobile_interruptable_platform
+ }, mobile_interruptable_platform, fetch_time,
)
}
handle_network_graph_update(network_graph, &event)
}
if let Some(ref scorer) = scorer {
- if update_scorer(scorer, &event) {
+ use std::time::SystemTime;
+ let duration_since_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
+ .expect("Time should be sometime after 1970");
+ if update_scorer(scorer, &event, duration_since_epoch) {
log_trace!(logger, "Persisting scorer after update");
if let Err(e) = persister.persist_scorer(&scorer) {
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
channel_manager.get_event_or_persistence_needed_future(),
chain_monitor.get_update_future()
).wait_timeout(Duration::from_millis(100)); },
- |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false
+ |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false,
+ || {
+ use std::time::SystemTime;
+ Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
+ .expect("Time should be sometime after 1970"))
+ },
)
});
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
}
impl ScoreUpdate for TestScorer {
- fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64) {
+ fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64, _: Duration) {
if let Some(expectations) = &mut self.event_expectations {
match expectations.pop_front().unwrap() {
TestResult::PaymentFailure { path, short_channel_id } => {
}
}
- fn payment_path_successful(&mut self, actual_path: &Path) {
+ fn payment_path_successful(&mut self, actual_path: &Path, _: Duration) {
if let Some(expectations) = &mut self.event_expectations {
match expectations.pop_front().unwrap() {
TestResult::PaymentFailure { path, .. } => {
}
}
- fn probe_failed(&mut self, actual_path: &Path, _: u64) {
+ fn probe_failed(&mut self, actual_path: &Path, _: u64, _: Duration) {
if let Some(expectations) = &mut self.event_expectations {
match expectations.pop_front().unwrap() {
TestResult::PaymentFailure { path, .. } => {
}
}
}
- fn probe_successful(&mut self, actual_path: &Path) {
+ fn probe_successful(&mut self, actual_path: &Path, _: Duration) {
if let Some(expectations) = &mut self.event_expectations {
match expectations.pop_front().unwrap() {
TestResult::PaymentFailure { path, .. } => {
}
}
}
+ fn time_passed(&mut self, _: Duration) {}
}
#[cfg(c_bindings)]
tokio::time::sleep(dur).await;
false // Never exit
})
- }, false,
+ }, false, || Some(Duration::ZERO),
);
match bp_future.await {
Ok(_) => panic!("Expected error persisting manager"),
loop {
let log_entries = nodes[0].logger.lines.lock().unwrap();
- let expected_log = "Persisting scorer".to_string();
+ let expected_log = "Calling time_passed and persisting scorer".to_string();
if log_entries.get(&("lightning_background_processor", expected_log)).is_some() {
break
}
_ = exit_receiver.changed() => true,
}
})
- }, false,
+ }, false, || Some(Duration::from_secs(1696300000)),
);
let t1 = tokio::spawn(bp_future);
_ = exit_receiver.changed() => true,
}
})
- }, false,
+ }, false, || Some(Duration::ZERO),
);
let t1 = tokio::spawn(bp_future);
let t2 = tokio::spawn(async move {