X-Git-Url: http://git.bitcoin.ninja/index.cgi?a=blobdiff_plain;f=lightning-background-processor%2Fsrc%2Flib.rs;h=1300a67e2a16af0de614c7dce4ab37c6913686d7;hb=refs%2Fheads%2F2024-02-fix-route-ser;hp=62c03b81dfe89288e726ae1ee89f383322ce68ae;hpb=9856fb67106316ca66ef278cdc0cac9a52fad763;p=rust-lightning diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 62c03b81..1300a67e 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -27,7 +27,10 @@ use lightning::chain::chainmonitor::{ChainMonitor, Persist}; use lightning::sign::{EntropySource, NodeSigner, SignerProvider}; use lightning::events::{Event, PathFailure}; #[cfg(feature = "std")] -use lightning::events::{EventHandler, EventsProvider}; +use lightning::events::EventHandler; +#[cfg(any(feature = "std", feature = "futures"))] +use lightning::events::EventsProvider; + use lightning::ln::channelmanager::ChannelManager; use lightning::ln::msgs::OnionMessageHandler; use lightning::ln::peer_handler::APeerManager; @@ -113,7 +116,7 @@ const ONION_MESSAGE_HANDLER_TIMER: u64 = 1; const NETWORK_PRUNE_TIMER: u64 = 60 * 60; #[cfg(not(test))] -const SCORER_PERSIST_TIMER: u64 = 60 * 60; +const SCORER_PERSIST_TIMER: u64 = 60 * 5; #[cfg(test)] const SCORER_PERSIST_TIMER: u64 = 1; @@ -244,30 +247,30 @@ fn handle_network_graph_update( /// Updates scorer based on event and returns whether an update occurred so we can decide whether /// to persist. fn update_scorer<'a, S: 'static + Deref + Send + Sync, SC: 'a + WriteableScore<'a>>( - scorer: &'a S, event: &Event + scorer: &'a S, event: &Event, duration_since_epoch: Duration, ) -> bool { match event { Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => { let mut score = scorer.write_lock(); - score.payment_path_failed(path, *scid); + score.payment_path_failed(path, *scid, duration_since_epoch); }, Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => { // Reached if the destination explicitly failed it back. We treat this as a successful probe // because the payment made it all the way to the destination with sufficient liquidity. let mut score = scorer.write_lock(); - score.probe_successful(path); + score.probe_successful(path, duration_since_epoch); }, Event::PaymentPathSuccessful { path, .. } => { let mut score = scorer.write_lock(); - score.payment_path_successful(path); + score.payment_path_successful(path, duration_since_epoch); }, Event::ProbeSuccessful { path, .. } => { let mut score = scorer.write_lock(); - score.probe_successful(path); + score.probe_successful(path, duration_since_epoch); }, Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => { let mut score = scorer.write_lock(); - score.probe_failed(path, *scid); + score.probe_failed(path, *scid, duration_since_epoch); }, _ => return false, } @@ -280,7 +283,7 @@ macro_rules! define_run_body { $channel_manager: ident, $process_channel_manager_events: expr, $peer_manager: ident, $process_onion_message_handler_events: expr, $gossip_sync: ident, $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr, - $timer_elapsed: expr, $check_slow_await: expr + $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr, ) => { { log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup"); $channel_manager.timer_tick_occurred(); @@ -294,6 +297,7 @@ macro_rules! define_run_body { let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER); let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER); let mut have_pruned = false; + let mut have_decayed_scorer = false; loop { $process_channel_manager_events; @@ -383,11 +387,10 @@ macro_rules! define_run_body { if should_prune { // The network graph must not be pruned while rapid sync completion is pending if let Some(network_graph) = $gossip_sync.prunable_network_graph() { - #[cfg(feature = "std")] { + if let Some(duration_since_epoch) = $time_fetch() { log_trace!($logger, "Pruning and persisting network graph."); - network_graph.remove_stale_channels_and_tracking(); - } - #[cfg(not(feature = "std"))] { + network_graph.remove_stale_channels_and_tracking_with_time(duration_since_epoch.as_secs()); + } else { log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time."); log_trace!($logger, "Persisting network graph."); } @@ -402,9 +405,24 @@ macro_rules! define_run_body { last_prune_call = $get_timer(prune_timer); } + if !have_decayed_scorer { + if let Some(ref scorer) = $scorer { + if let Some(duration_since_epoch) = $time_fetch() { + log_trace!($logger, "Calling time_passed on scorer at startup"); + scorer.write_lock().time_passed(duration_since_epoch); + } + } + have_decayed_scorer = true; + } + if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) { if let Some(ref scorer) = $scorer { - log_trace!($logger, "Persisting scorer"); + if let Some(duration_since_epoch) = $time_fetch() { + log_trace!($logger, "Calling time_passed and persisting scorer"); + scorer.write_lock().time_passed(duration_since_epoch); + } else { + log_trace!($logger, "Persisting scorer"); + } if let Err(e) = $persister.persist_scorer(&scorer) { log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) } @@ -510,12 +528,16 @@ use core::task; /// are unsure, you should set the flag, as the performance impact of it is minimal unless there /// are hundreds or thousands of simultaneous process calls running. /// +/// The `fetch_time` parameter should return the current wall clock time, if one is available. If +/// no time is available, some features may be disabled, however the node will still operate fine. +/// /// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you /// could setup `process_events_async` like this: /// ``` /// # use lightning::io; /// # use std::sync::{Arc, RwLock}; /// # use std::sync::atomic::{AtomicBool, Ordering}; +/// # use std::time::SystemTime; /// # use lightning_background_processor::{process_events_async, GossipSync}; /// # struct MyStore {} /// # impl lightning::util::persist::KVStore for MyStore { @@ -584,6 +606,7 @@ use core::task; /// Some(background_scorer), /// sleeper, /// mobile_interruptable_platform, +/// || Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap()) /// ) /// .await /// .expect("Failed to process events"); @@ -620,11 +643,12 @@ pub async fn process_events_async< S: 'static + Deref + Send + Sync, SC: for<'b> WriteableScore<'b>, SleepFuture: core::future::Future + core::marker::Unpin, - Sleeper: Fn(Duration) -> SleepFuture + Sleeper: Fn(Duration) -> SleepFuture, + FetchTime: Fn() -> Option, >( persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM, gossip_sync: GossipSync, peer_manager: PM, logger: L, scorer: Option, - sleeper: Sleeper, mobile_interruptable_platform: bool, + sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime, ) -> Result<(), lightning::io::Error> where UL::Target: 'static + UtxoLookup, @@ -648,15 +672,18 @@ where let scorer = &scorer; let logger = &logger; let persister = &persister; + let fetch_time = &fetch_time; async move { if let Some(network_graph) = network_graph { handle_network_graph_update(network_graph, &event) } if let Some(ref scorer) = scorer { - if update_scorer(scorer, &event) { - log_trace!(logger, "Persisting scorer after update"); - if let Err(e) = persister.persist_scorer(&scorer) { - log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) + if let Some(duration_since_epoch) = fetch_time() { + if update_scorer(scorer, &event, duration_since_epoch) { + log_trace!(logger, "Persisting scorer after update"); + if let Err(e) = persister.persist_scorer(&scorer) { + log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) + } } } } @@ -688,7 +715,7 @@ where task::Poll::Ready(exit) => { should_break = exit; true }, task::Poll::Pending => false, } - }, mobile_interruptable_platform + }, mobile_interruptable_platform, fetch_time, ) } @@ -703,8 +730,6 @@ async fn process_onion_message_handler_events_async< where PM::Target: APeerManager + Send + Sync, { - use lightning::events::EventsProvider; - let events = core::cell::RefCell::new(Vec::new()); peer_manager.onion_message_handler().process_pending_events(&|e| events.borrow_mut().push(e)); @@ -810,7 +835,10 @@ impl BackgroundProcessor { handle_network_graph_update(network_graph, &event) } if let Some(ref scorer) = scorer { - if update_scorer(scorer, &event) { + use std::time::SystemTime; + let duration_since_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) + .expect("Time should be sometime after 1970"); + if update_scorer(scorer, &event, duration_since_epoch) { log_trace!(logger, "Persisting scorer after update"); if let Err(e) = persister.persist_scorer(&scorer) { log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e) @@ -829,7 +857,12 @@ impl BackgroundProcessor { channel_manager.get_event_or_persistence_needed_future(), chain_monitor.get_update_future() ).wait_timeout(Duration::from_millis(100)); }, - |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false + |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false, + || { + use std::time::SystemTime; + Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) + .expect("Time should be sometime after 1970")) + }, ) }); Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) } @@ -896,7 +929,7 @@ mod tests { use lightning::chain::transaction::OutPoint; use lightning::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent}; use lightning::{get_event_msg, get_event}; - use lightning::ln::PaymentHash; + use lightning::ln::{PaymentHash, ChannelId}; use lightning::ln::channelmanager; use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId}; use lightning::ln::features::{ChannelFeatures, NodeFeatures}; @@ -951,6 +984,7 @@ mod tests { Arc>>, Arc, + Arc, Arc>, (), TestScorer> @@ -1117,7 +1151,7 @@ mod tests { } impl ScoreUpdate for TestScorer { - fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64) { + fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64, _: Duration) { if let Some(expectations) = &mut self.event_expectations { match expectations.pop_front().unwrap() { TestResult::PaymentFailure { path, short_channel_id } => { @@ -1137,7 +1171,7 @@ mod tests { } } - fn payment_path_successful(&mut self, actual_path: &Path) { + fn payment_path_successful(&mut self, actual_path: &Path, _: Duration) { if let Some(expectations) = &mut self.event_expectations { match expectations.pop_front().unwrap() { TestResult::PaymentFailure { path, .. } => { @@ -1156,7 +1190,7 @@ mod tests { } } - fn probe_failed(&mut self, actual_path: &Path, _: u64) { + fn probe_failed(&mut self, actual_path: &Path, _: u64, _: Duration) { if let Some(expectations) = &mut self.event_expectations { match expectations.pop_front().unwrap() { TestResult::PaymentFailure { path, .. } => { @@ -1174,7 +1208,7 @@ mod tests { } } } - fn probe_successful(&mut self, actual_path: &Path) { + fn probe_successful(&mut self, actual_path: &Path, _: Duration) { if let Some(expectations) = &mut self.event_expectations { match expectations.pop_front().unwrap() { TestResult::PaymentFailure { path, .. } => { @@ -1192,6 +1226,7 @@ mod tests { } } } + fn time_passed(&mut self, _: Duration) {} } #[cfg(c_bindings)] @@ -1229,12 +1264,12 @@ mod tests { let genesis_block = genesis_block(network); let network_graph = Arc::new(NetworkGraph::new(network, logger.clone())); let scorer = Arc::new(LockingWrapper::new(TestScorer::new())); + let now = Duration::from_secs(genesis_block.header.time as u64); let seed = [i as u8; 32]; - let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone(), Default::default())); + let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos())); + let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), Arc::clone(&keys_manager), scorer.clone(), Default::default())); let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin)); let kv_store = Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into())); - let now = Duration::from_secs(genesis_block.header.time as u64); - let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos())); let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), kv_store.clone())); let best_block = BestBlock::from_network(network); let params = ChainParameters { network, best_block }; @@ -1380,7 +1415,7 @@ mod tests { } // Force-close the channel. - nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap(); + nodes[0].node.force_close_broadcasting_latest_txn(&ChannelId::v1_from_funding_outpoint(OutPoint { txid: tx.txid(), index: 0 }), &nodes[1].node.get_our_node_id()).unwrap(); // Check that the force-close updates are persisted. check_persisted_data!(nodes[0].node, filepath.clone()); @@ -1469,7 +1504,7 @@ mod tests { tokio::time::sleep(dur).await; false // Never exit }) - }, false, + }, false, || Some(Duration::ZERO), ); match bp_future.await { Ok(_) => panic!("Expected error persisting manager"), @@ -1600,7 +1635,7 @@ mod tests { loop { let log_entries = nodes[0].logger.lines.lock().unwrap(); - let expected_log = "Persisting scorer".to_string(); + let expected_log = "Calling time_passed and persisting scorer".to_string(); if log_entries.get(&("lightning_background_processor", expected_log)).is_some() { break } @@ -1699,7 +1734,7 @@ mod tests { _ = exit_receiver.changed() => true, } }) - }, false, + }, false, || Some(Duration::from_secs(1696300000)), ); let t1 = tokio::spawn(bp_future); @@ -1874,7 +1909,7 @@ mod tests { _ = exit_receiver.changed() => true, } }) - }, false, + }, false, || Some(Duration::ZERO), ); let t1 = tokio::spawn(bp_future); let t2 = tokio::spawn(async move {