Fix `Route` serialization round-trip
[rust-lightning] / lightning-background-processor / src / lib.rs
index 62c03b81dfe89288e726ae1ee89f383322ce68ae..dd4eea96040add82642e69a26ba0bb2675e9bb55 100644 (file)
@@ -27,7 +27,10 @@ use lightning::chain::chainmonitor::{ChainMonitor, Persist};
 use lightning::sign::{EntropySource, NodeSigner, SignerProvider};
 use lightning::events::{Event, PathFailure};
 #[cfg(feature = "std")]
-use lightning::events::{EventHandler, EventsProvider};
+use lightning::events::EventHandler;
+#[cfg(any(feature = "std", feature = "futures"))]
+use lightning::events::EventsProvider;
+
 use lightning::ln::channelmanager::ChannelManager;
 use lightning::ln::msgs::OnionMessageHandler;
 use lightning::ln::peer_handler::APeerManager;
@@ -113,7 +116,7 @@ const ONION_MESSAGE_HANDLER_TIMER: u64 = 1;
 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
 
 #[cfg(not(test))]
-const SCORER_PERSIST_TIMER: u64 = 60 * 60;
+const SCORER_PERSIST_TIMER: u64 = 60 * 5;
 #[cfg(test)]
 const SCORER_PERSIST_TIMER: u64 = 1;
 
@@ -244,30 +247,30 @@ fn handle_network_graph_update<L: Deref>(
 /// Updates scorer based on event and returns whether an update occurred so we can decide whether
 /// to persist.
 fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
-       scorer: &'a S, event: &Event
+       scorer: &'a S, event: &Event, duration_since_epoch: Duration,
 ) -> bool {
        match event {
                Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
                        let mut score = scorer.write_lock();
-                       score.payment_path_failed(path, *scid);
+                       score.payment_path_failed(path, *scid, duration_since_epoch);
                },
                Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
                        // Reached if the destination explicitly failed it back. We treat this as a successful probe
                        // because the payment made it all the way to the destination with sufficient liquidity.
                        let mut score = scorer.write_lock();
-                       score.probe_successful(path);
+                       score.probe_successful(path, duration_since_epoch);
                },
                Event::PaymentPathSuccessful { path, .. } => {
                        let mut score = scorer.write_lock();
-                       score.payment_path_successful(path);
+                       score.payment_path_successful(path, duration_since_epoch);
                },
                Event::ProbeSuccessful { path, .. } => {
                        let mut score = scorer.write_lock();
-                       score.probe_successful(path);
+                       score.probe_successful(path, duration_since_epoch);
                },
                Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
                        let mut score = scorer.write_lock();
-                       score.probe_failed(path, *scid);
+                       score.probe_failed(path, *scid, duration_since_epoch);
                },
                _ => return false,
        }
@@ -280,7 +283,7 @@ macro_rules! define_run_body {
                $channel_manager: ident, $process_channel_manager_events: expr,
                $peer_manager: ident, $process_onion_message_handler_events: expr, $gossip_sync: ident,
                $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
-               $timer_elapsed: expr, $check_slow_await: expr
+               $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
        ) => { {
                log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
                $channel_manager.timer_tick_occurred();
@@ -294,6 +297,7 @@ macro_rules! define_run_body {
                let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
                let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
                let mut have_pruned = false;
+               let mut have_decayed_scorer = false;
 
                loop {
                        $process_channel_manager_events;
@@ -383,11 +387,10 @@ macro_rules! define_run_body {
                        if should_prune {
                                // The network graph must not be pruned while rapid sync completion is pending
                                if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
-                                       #[cfg(feature = "std")] {
+                                       if let Some(duration_since_epoch) = $time_fetch() {
                                                log_trace!($logger, "Pruning and persisting network graph.");
-                                               network_graph.remove_stale_channels_and_tracking();
-                                       }
-                                       #[cfg(not(feature = "std"))] {
+                                               network_graph.remove_stale_channels_and_tracking_with_time(duration_since_epoch.as_secs());
+                                       } else {
                                                log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
                                                log_trace!($logger, "Persisting network graph.");
                                        }
@@ -402,9 +405,24 @@ macro_rules! define_run_body {
                                last_prune_call = $get_timer(prune_timer);
                        }
 
+                       if !have_decayed_scorer {
+                               if let Some(ref scorer) = $scorer {
+                                       if let Some(duration_since_epoch) = $time_fetch() {
+                                               log_trace!($logger, "Calling time_passed on scorer at startup");
+                                               scorer.write_lock().time_passed(duration_since_epoch);
+                                       }
+                               }
+                               have_decayed_scorer = true;
+                       }
+
                        if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
                                if let Some(ref scorer) = $scorer {
-                                       log_trace!($logger, "Persisting scorer");
+                                       if let Some(duration_since_epoch) = $time_fetch() {
+                                               log_trace!($logger, "Calling time_passed and persisting scorer");
+                                               scorer.write_lock().time_passed(duration_since_epoch);
+                                       } else {
+                                               log_trace!($logger, "Persisting scorer");
+                                       }
                                        if let Err(e) = $persister.persist_scorer(&scorer) {
                                                log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
                                        }
@@ -510,12 +528,16 @@ use core::task;
 /// are unsure, you should set the flag, as the performance impact of it is minimal unless there
 /// are hundreds or thousands of simultaneous process calls running.
 ///
+/// The `fetch_time` parameter should return the current wall clock time, if one is available. If
+/// no time is available, some features may be disabled, however the node will still operate fine.
+///
 /// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
 /// could setup `process_events_async` like this:
 /// ```
 /// # use lightning::io;
 /// # use std::sync::{Arc, RwLock};
 /// # use std::sync::atomic::{AtomicBool, Ordering};
+/// # use std::time::SystemTime;
 /// # use lightning_background_processor::{process_events_async, GossipSync};
 /// # struct MyStore {}
 /// # impl lightning::util::persist::KVStore for MyStore {
@@ -584,6 +606,7 @@ use core::task;
 ///                    Some(background_scorer),
 ///                    sleeper,
 ///                    mobile_interruptable_platform,
+///                    || Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap())
 ///                    )
 ///                    .await
 ///                    .expect("Failed to process events");
@@ -620,11 +643,12 @@ pub async fn process_events_async<
        S: 'static + Deref<Target = SC> + Send + Sync,
        SC: for<'b> WriteableScore<'b>,
        SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
-       Sleeper: Fn(Duration) -> SleepFuture
+       Sleeper: Fn(Duration) -> SleepFuture,
+       FetchTime: Fn() -> Option<Duration>,
 >(
        persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
        gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
-       sleeper: Sleeper, mobile_interruptable_platform: bool,
+       sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
 ) -> Result<(), lightning::io::Error>
 where
        UL::Target: 'static + UtxoLookup,
@@ -648,15 +672,18 @@ where
                let scorer = &scorer;
                let logger = &logger;
                let persister = &persister;
+               let fetch_time = &fetch_time;
                async move {
                        if let Some(network_graph) = network_graph {
                                handle_network_graph_update(network_graph, &event)
                        }
                        if let Some(ref scorer) = scorer {
-                               if update_scorer(scorer, &event) {
-                                       log_trace!(logger, "Persisting scorer after update");
-                                       if let Err(e) = persister.persist_scorer(&scorer) {
-                                               log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
+                               if let Some(duration_since_epoch) = fetch_time() {
+                                       if update_scorer(scorer, &event, duration_since_epoch) {
+                                               log_trace!(logger, "Persisting scorer after update");
+                                               if let Err(e) = persister.persist_scorer(&scorer) {
+                                                       log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
+                                               }
                                        }
                                }
                        }
@@ -667,7 +694,10 @@ where
                persister, chain_monitor,
                chain_monitor.process_pending_events_async(async_event_handler).await,
                channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
-               peer_manager, process_onion_message_handler_events_async(&peer_manager, async_event_handler).await,
+               peer_manager,
+               for event in onion_message_handler_events(peer_manager) {
+                       handler(event).await
+               },
                gossip_sync, logger, scorer, should_break, {
                        let fut = Selector {
                                a: channel_manager.get_event_or_persistence_needed_future(),
@@ -688,29 +718,15 @@ where
                                task::Poll::Ready(exit) => { should_break = exit; true },
                                task::Poll::Pending => false,
                        }
-               }, mobile_interruptable_platform
+               }, mobile_interruptable_platform, fetch_time,
        )
 }
 
-#[cfg(feature = "futures")]
-async fn process_onion_message_handler_events_async<
-       EventHandlerFuture: core::future::Future<Output = ()>,
-       EventHandler: Fn(Event) -> EventHandlerFuture,
-       PM: 'static + Deref + Send + Sync,
->(
-       peer_manager: &PM, handler: EventHandler
-)
-where
-       PM::Target: APeerManager + Send + Sync,
-{
-       use lightning::events::EventsProvider;
-
-       let events = core::cell::RefCell::new(Vec::new());
-       peer_manager.onion_message_handler().process_pending_events(&|e| events.borrow_mut().push(e));
-
-       for event in events.into_inner() {
-               handler(event).await
-       }
+fn onion_message_handler_events<PM: 'static + Deref + Send + Sync>(
+       peer_manager: &PM
+) -> impl Iterator<Item=Event> where PM::Target: APeerManager + Send + Sync {
+       peer_manager.onion_message_handler().get_and_clear_connections_needed()
+               .into_iter().map(|(node_id, addresses)| Event::ConnectionNeeded { node_id, addresses })
 }
 
 #[cfg(feature = "std")]
@@ -810,7 +826,10 @@ impl BackgroundProcessor {
                                        handle_network_graph_update(network_graph, &event)
                                }
                                if let Some(ref scorer) = scorer {
-                                       if update_scorer(scorer, &event) {
+                                       use std::time::SystemTime;
+                                       let duration_since_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
+                                               .expect("Time should be sometime after 1970");
+                                       if update_scorer(scorer, &event, duration_since_epoch) {
                                                log_trace!(logger, "Persisting scorer after update");
                                                if let Err(e) = persister.persist_scorer(&scorer) {
                                                        log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
@@ -823,13 +842,20 @@ impl BackgroundProcessor {
                                persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
                                channel_manager, channel_manager.process_pending_events(&event_handler),
                                peer_manager,
-                               peer_manager.onion_message_handler().process_pending_events(&event_handler),
+                               for event in onion_message_handler_events(&peer_manager) {
+                                       event_handler.handle_event(event);
+                               },
                                gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire),
                                { Sleeper::from_two_futures(
                                        channel_manager.get_event_or_persistence_needed_future(),
                                        chain_monitor.get_update_future()
                                ).wait_timeout(Duration::from_millis(100)); },
-                               |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false
+                               |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false,
+                               || {
+                                       use std::time::SystemTime;
+                                       Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
+                                               .expect("Time should be sometime after 1970"))
+                               },
                        )
                });
                Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
@@ -951,9 +977,8 @@ mod tests {
                        Arc<DefaultRouter<
                                Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
                                Arc<test_utils::TestLogger>,
-                               Arc<LockingWrapper<TestScorer>>,
-                               (),
-                               TestScorer>
+                               Arc<KeysManager>,
+                               Arc<LockingWrapper<TestScorer>>>
                        >,
                        Arc<test_utils::TestLogger>>;
 
@@ -1110,14 +1135,15 @@ mod tests {
        }
 
        impl ScoreLookUp for TestScorer {
+               #[cfg(not(c_bindings))]
                type ScoreParams = ();
                fn channel_penalty_msat(
-                       &self, _candidate: &CandidateRouteHop, _usage: ChannelUsage, _score_params: &Self::ScoreParams
+                       &self, _candidate: &CandidateRouteHop, _usage: ChannelUsage, _score_params: &lightning::routing::scoring::ProbabilisticScoringFeeParameters
                ) -> u64 { unimplemented!(); }
        }
 
        impl ScoreUpdate for TestScorer {
-               fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64) {
+               fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64, _: Duration) {
                        if let Some(expectations) = &mut self.event_expectations {
                                match expectations.pop_front().unwrap() {
                                        TestResult::PaymentFailure { path, short_channel_id } => {
@@ -1137,7 +1163,7 @@ mod tests {
                        }
                }
 
-               fn payment_path_successful(&mut self, actual_path: &Path) {
+               fn payment_path_successful(&mut self, actual_path: &Path, _: Duration) {
                        if let Some(expectations) = &mut self.event_expectations {
                                match expectations.pop_front().unwrap() {
                                        TestResult::PaymentFailure { path, .. } => {
@@ -1156,7 +1182,7 @@ mod tests {
                        }
                }
 
-               fn probe_failed(&mut self, actual_path: &Path, _: u64) {
+               fn probe_failed(&mut self, actual_path: &Path, _: u64, _: Duration) {
                        if let Some(expectations) = &mut self.event_expectations {
                                match expectations.pop_front().unwrap() {
                                        TestResult::PaymentFailure { path, .. } => {
@@ -1174,7 +1200,7 @@ mod tests {
                                }
                        }
                }
-               fn probe_successful(&mut self, actual_path: &Path) {
+               fn probe_successful(&mut self, actual_path: &Path, _: Duration) {
                        if let Some(expectations) = &mut self.event_expectations {
                                match expectations.pop_front().unwrap() {
                                        TestResult::PaymentFailure { path, .. } => {
@@ -1192,6 +1218,7 @@ mod tests {
                                }
                        }
                }
+               fn time_passed(&mut self, _: Duration) {}
        }
 
        #[cfg(c_bindings)]
@@ -1229,12 +1256,12 @@ mod tests {
                        let genesis_block = genesis_block(network);
                        let network_graph = Arc::new(NetworkGraph::new(network, logger.clone()));
                        let scorer = Arc::new(LockingWrapper::new(TestScorer::new()));
+                       let now = Duration::from_secs(genesis_block.header.time as u64);
                        let seed = [i as u8; 32];
-                       let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone(), Default::default()));
+                       let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
+                       let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), Arc::clone(&keys_manager), scorer.clone(), Default::default()));
                        let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin));
                        let kv_store = Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into()));
-                       let now = Duration::from_secs(genesis_block.header.time as u64);
-                       let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
                        let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), kv_store.clone()));
                        let best_block = BestBlock::from_network(network);
                        let params = ChainParameters { network, best_block };
@@ -1469,7 +1496,7 @@ mod tests {
                                        tokio::time::sleep(dur).await;
                                        false // Never exit
                                })
-                       }, false,
+                       }, false, || Some(Duration::ZERO),
                );
                match bp_future.await {
                        Ok(_) => panic!("Expected error persisting manager"),
@@ -1600,7 +1627,7 @@ mod tests {
 
                loop {
                        let log_entries = nodes[0].logger.lines.lock().unwrap();
-                       let expected_log = "Persisting scorer".to_string();
+                       let expected_log = "Calling time_passed and persisting scorer".to_string();
                        if log_entries.get(&("lightning_background_processor", expected_log)).is_some() {
                                break
                        }
@@ -1699,7 +1726,7 @@ mod tests {
                                                _ = exit_receiver.changed() => true,
                                        }
                                })
-                       }, false,
+                       }, false, || Some(Duration::from_secs(1696300000)),
                );
 
                let t1 = tokio::spawn(bp_future);
@@ -1874,7 +1901,7 @@ mod tests {
                                                _ = exit_receiver.changed() => true,
                                        }
                                })
-                       }, false,
+                       }, false, || Some(Duration::ZERO),
                );
                let t1 = tokio::spawn(bp_future);
                let t2 = tokio::spawn(async move {