Add a scoring decay method to the `ScoreUpdate` trait
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
8
9 #![deny(missing_docs)]
10 #![cfg_attr(not(feature = "futures"), deny(unsafe_code))]
11
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
13
14 #![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
15
16 #[cfg(any(test, feature = "std"))]
17 extern crate core;
18
19 #[cfg(not(feature = "std"))]
20 extern crate alloc;
21
22 #[macro_use] extern crate lightning;
23 extern crate lightning_rapid_gossip_sync;
24
25 use lightning::chain;
26 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
27 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
28 use lightning::sign::{EntropySource, NodeSigner, SignerProvider};
29 use lightning::events::{Event, PathFailure};
30 #[cfg(feature = "std")]
31 use lightning::events::{EventHandler, EventsProvider};
32 use lightning::ln::channelmanager::ChannelManager;
33 use lightning::ln::msgs::OnionMessageHandler;
34 use lightning::ln::peer_handler::APeerManager;
35 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
36 use lightning::routing::utxo::UtxoLookup;
37 use lightning::routing::router::Router;
38 use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
39 use lightning::util::logger::Logger;
40 use lightning::util::persist::Persister;
41 #[cfg(feature = "std")]
42 use lightning::util::wakers::Sleeper;
43 use lightning_rapid_gossip_sync::RapidGossipSync;
44
45 use core::ops::Deref;
46 use core::time::Duration;
47
48 #[cfg(feature = "std")]
49 use std::sync::Arc;
50 #[cfg(feature = "std")]
51 use core::sync::atomic::{AtomicBool, Ordering};
52 #[cfg(feature = "std")]
53 use std::thread::{self, JoinHandle};
54 #[cfg(feature = "std")]
55 use std::time::Instant;
56
57 #[cfg(not(feature = "std"))]
58 use alloc::vec::Vec;
59
60 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
61 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
62 /// responsibilities are:
63 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
64 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
65 ///   writing it to disk/backups by invoking the callback given to it at startup.
66 ///   [`ChannelManager`] persistence should be done in the background.
67 /// * Calling [`ChannelManager::timer_tick_occurred`], [`ChainMonitor::rebroadcast_pending_claims`]
68 ///   and [`PeerManager::timer_tick_occurred`] at the appropriate intervals.
69 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
70 ///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
71 ///
72 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
73 /// upon as doing so may result in high latency.
74 ///
75 /// # Note
76 ///
77 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
78 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
79 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
80 /// unilateral chain closure fees are at risk.
81 ///
82 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
83 /// [`Event`]: lightning::events::Event
84 /// [`PeerManager::timer_tick_occurred`]: lightning::ln::peer_handler::PeerManager::timer_tick_occurred
85 /// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events
86 #[cfg(feature = "std")]
87 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
88 pub struct BackgroundProcessor {
89         stop_thread: Arc<AtomicBool>,
90         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
91 }
92
93 #[cfg(not(test))]
94 const FRESHNESS_TIMER: u64 = 60;
95 #[cfg(test)]
96 const FRESHNESS_TIMER: u64 = 1;
97
98 #[cfg(all(not(test), not(debug_assertions)))]
99 const PING_TIMER: u64 = 10;
100 /// Signature operations take a lot longer without compiler optimisations.
101 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
102 /// timeout is reached.
103 #[cfg(all(not(test), debug_assertions))]
104 const PING_TIMER: u64 = 30;
105 #[cfg(test)]
106 const PING_TIMER: u64 = 1;
107
108 #[cfg(not(test))]
109 const ONION_MESSAGE_HANDLER_TIMER: u64 = 10;
110 #[cfg(test)]
111 const ONION_MESSAGE_HANDLER_TIMER: u64 = 1;
112
113 /// Prune the network graph of stale entries hourly.
114 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
115
116 #[cfg(not(test))]
117 const SCORER_PERSIST_TIMER: u64 = 60 * 60;
118 #[cfg(test)]
119 const SCORER_PERSIST_TIMER: u64 = 1;
120
121 #[cfg(not(test))]
122 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
123 #[cfg(test)]
124 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
125
126 #[cfg(not(test))]
127 const REBROADCAST_TIMER: u64 = 30;
128 #[cfg(test)]
129 const REBROADCAST_TIMER: u64 = 1;
130
131 #[cfg(feature = "futures")]
132 /// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
133 const fn min_u64(a: u64, b: u64) -> u64 { if a < b { a } else { b } }
134 #[cfg(feature = "futures")]
135 const FASTEST_TIMER: u64 = min_u64(min_u64(FRESHNESS_TIMER, PING_TIMER),
136         min_u64(SCORER_PERSIST_TIMER, min_u64(FIRST_NETWORK_PRUNE_TIMER, REBROADCAST_TIMER)));
137
138 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
139 pub enum GossipSync<
140         P: Deref<Target = P2PGossipSync<G, U, L>>,
141         R: Deref<Target = RapidGossipSync<G, L>>,
142         G: Deref<Target = NetworkGraph<L>>,
143         U: Deref,
144         L: Deref,
145 >
146 where U::Target: UtxoLookup, L::Target: Logger {
147         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
148         P2P(P),
149         /// Rapid gossip sync from a trusted server.
150         Rapid(R),
151         /// No gossip sync.
152         None,
153 }
154
155 impl<
156         P: Deref<Target = P2PGossipSync<G, U, L>>,
157         R: Deref<Target = RapidGossipSync<G, L>>,
158         G: Deref<Target = NetworkGraph<L>>,
159         U: Deref,
160         L: Deref,
161 > GossipSync<P, R, G, U, L>
162 where U::Target: UtxoLookup, L::Target: Logger {
163         fn network_graph(&self) -> Option<&G> {
164                 match self {
165                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
166                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
167                         GossipSync::None => None,
168                 }
169         }
170
171         fn prunable_network_graph(&self) -> Option<&G> {
172                 match self {
173                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
174                         GossipSync::Rapid(gossip_sync) => {
175                                 if gossip_sync.is_initial_sync_complete() {
176                                         Some(gossip_sync.network_graph())
177                                 } else {
178                                         None
179                                 }
180                         },
181                         GossipSync::None => None,
182                 }
183         }
184 }
185
186 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
187 impl<P: Deref<Target = P2PGossipSync<G, U, L>>, G: Deref<Target = NetworkGraph<L>>, U: Deref, L: Deref>
188         GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
189 where
190         U::Target: UtxoLookup,
191         L::Target: Logger,
192 {
193         /// Initializes a new [`GossipSync::P2P`] variant.
194         pub fn p2p(gossip_sync: P) -> Self {
195                 GossipSync::P2P(gossip_sync)
196         }
197 }
198
199 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
200 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
201         GossipSync<
202                 &P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
203                 R,
204                 G,
205                 &'a (dyn UtxoLookup + Send + Sync),
206                 L,
207         >
208 where
209         L::Target: Logger,
210 {
211         /// Initializes a new [`GossipSync::Rapid`] variant.
212         pub fn rapid(gossip_sync: R) -> Self {
213                 GossipSync::Rapid(gossip_sync)
214         }
215 }
216
217 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
218 impl<'a, L: Deref>
219         GossipSync<
220                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
221                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
222                 &'a NetworkGraph<L>,
223                 &'a (dyn UtxoLookup + Send + Sync),
224                 L,
225         >
226 where
227         L::Target: Logger,
228 {
229         /// Initializes a new [`GossipSync::None`] variant.
230         pub fn none() -> Self {
231                 GossipSync::None
232         }
233 }
234
235 fn handle_network_graph_update<L: Deref>(
236         network_graph: &NetworkGraph<L>, event: &Event
237 ) where L::Target: Logger {
238         if let Event::PaymentPathFailed {
239                 failure: PathFailure::OnPath { network_update: Some(ref upd) }, .. } = event
240         {
241                 network_graph.handle_network_update(upd);
242         }
243 }
244
245 /// Updates scorer based on event and returns whether an update occurred so we can decide whether
246 /// to persist.
247 fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
248         scorer: &'a S, event: &Event, duration_since_epoch: Duration,
249 ) -> bool {
250         match event {
251                 Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
252                         let mut score = scorer.write_lock();
253                         score.payment_path_failed(path, *scid, duration_since_epoch);
254                 },
255                 Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
256                         // Reached if the destination explicitly failed it back. We treat this as a successful probe
257                         // because the payment made it all the way to the destination with sufficient liquidity.
258                         let mut score = scorer.write_lock();
259                         score.probe_successful(path, duration_since_epoch);
260                 },
261                 Event::PaymentPathSuccessful { path, .. } => {
262                         let mut score = scorer.write_lock();
263                         score.payment_path_successful(path, duration_since_epoch);
264                 },
265                 Event::ProbeSuccessful { path, .. } => {
266                         let mut score = scorer.write_lock();
267                         score.probe_successful(path, duration_since_epoch);
268                 },
269                 Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
270                         let mut score = scorer.write_lock();
271                         score.probe_failed(path, *scid, duration_since_epoch);
272                 },
273                 _ => return false,
274         }
275         true
276 }
277
278 macro_rules! define_run_body {
279         (
280                 $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
281                 $channel_manager: ident, $process_channel_manager_events: expr,
282                 $peer_manager: ident, $process_onion_message_handler_events: expr, $gossip_sync: ident,
283                 $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
284                 $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
285         ) => { {
286                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
287                 $channel_manager.timer_tick_occurred();
288                 log_trace!($logger, "Rebroadcasting monitor's pending claims on startup");
289                 $chain_monitor.rebroadcast_pending_claims();
290
291                 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
292                 let mut last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER);
293                 let mut last_ping_call = $get_timer(PING_TIMER);
294                 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
295                 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
296                 let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
297                 let mut have_pruned = false;
298                 let mut have_decayed_scorer = false;
299
300                 loop {
301                         $process_channel_manager_events;
302                         $process_chain_monitor_events;
303                         $process_onion_message_handler_events;
304
305                         // Note that the PeerManager::process_events may block on ChannelManager's locks,
306                         // hence it comes last here. When the ChannelManager finishes whatever it's doing,
307                         // we want to ensure we get into `persist_manager` as quickly as we can, especially
308                         // without running the normal event processing above and handing events to users.
309                         //
310                         // Specifically, on an *extremely* slow machine, we may see ChannelManager start
311                         // processing a message effectively at any point during this loop. In order to
312                         // minimize the time between such processing completing and persisting the updated
313                         // ChannelManager, we want to minimize methods blocking on a ChannelManager
314                         // generally, and as a fallback place such blocking only immediately before
315                         // persistence.
316                         $peer_manager.as_ref().process_events();
317
318                         // Exit the loop if the background processor was requested to stop.
319                         if $loop_exit_check {
320                                 log_trace!($logger, "Terminating background processor.");
321                                 break;
322                         }
323
324                         // We wait up to 100ms, but track how long it takes to detect being put to sleep,
325                         // see `await_start`'s use below.
326                         let mut await_start = None;
327                         if $check_slow_await { await_start = Some($get_timer(1)); }
328                         $await;
329                         let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false };
330
331                         // Exit the loop if the background processor was requested to stop.
332                         if $loop_exit_check {
333                                 log_trace!($logger, "Terminating background processor.");
334                                 break;
335                         }
336
337                         if $channel_manager.get_and_clear_needs_persistence() {
338                                 log_trace!($logger, "Persisting ChannelManager...");
339                                 $persister.persist_manager(&*$channel_manager)?;
340                                 log_trace!($logger, "Done persisting ChannelManager.");
341                         }
342                         if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
343                                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
344                                 $channel_manager.timer_tick_occurred();
345                                 last_freshness_call = $get_timer(FRESHNESS_TIMER);
346                         }
347                         if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) {
348                                 log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred");
349                                 $peer_manager.onion_message_handler().timer_tick_occurred();
350                                 last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER);
351                         }
352                         if await_slow {
353                                 // On various platforms, we may be starved of CPU cycles for several reasons.
354                                 // E.g. on iOS, if we've been in the background, we will be entirely paused.
355                                 // Similarly, if we're on a desktop platform and the device has been asleep, we
356                                 // may not get any cycles.
357                                 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
358                                 // full second, at which point we assume sockets may have been killed (they
359                                 // appear to be at least on some platforms, even if it has only been a second).
360                                 // Note that we have to take care to not get here just because user event
361                                 // processing was slow at the top of the loop. For example, the sample client
362                                 // may call Bitcoin Core RPCs during event handling, which very often takes
363                                 // more than a handful of seconds to complete, and shouldn't disconnect all our
364                                 // peers.
365                                 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
366                                 $peer_manager.as_ref().disconnect_all_peers();
367                                 last_ping_call = $get_timer(PING_TIMER);
368                         } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
369                                 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
370                                 $peer_manager.as_ref().timer_tick_occurred();
371                                 last_ping_call = $get_timer(PING_TIMER);
372                         }
373
374                         // Note that we want to run a graph prune once not long after startup before
375                         // falling back to our usual hourly prunes. This avoids short-lived clients never
376                         // pruning their network graph. We run once 60 seconds after startup before
377                         // continuing our normal cadence. For RGS, since 60 seconds is likely too long,
378                         // we prune after an initial sync completes.
379                         let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
380                         let prune_timer_elapsed = $timer_elapsed(&mut last_prune_call, prune_timer);
381                         let should_prune = match $gossip_sync {
382                                 GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed,
383                                 _ => prune_timer_elapsed,
384                         };
385                         if should_prune {
386                                 // The network graph must not be pruned while rapid sync completion is pending
387                                 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
388                                         if let Some(duration_since_epoch) = $time_fetch() {
389                                                 log_trace!($logger, "Pruning and persisting network graph.");
390                                                 network_graph.remove_stale_channels_and_tracking_with_time(duration_since_epoch.as_secs());
391                                         } else {
392                                                 log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
393                                                 log_trace!($logger, "Persisting network graph.");
394                                         }
395
396                                         if let Err(e) = $persister.persist_graph(network_graph) {
397                                                 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
398                                         }
399
400                                         have_pruned = true;
401                                 }
402                                 let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
403                                 last_prune_call = $get_timer(prune_timer);
404                         }
405
406                         if !have_decayed_scorer {
407                                 if let Some(ref scorer) = $scorer {
408                                         if let Some(duration_since_epoch) = $time_fetch() {
409                                                 scorer.write_lock().decay_liquidity_certainty(duration_since_epoch);
410                                         }
411                                 }
412                                 have_decayed_scorer = true;
413                         }
414
415                         if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
416                                 if let Some(ref scorer) = $scorer {
417                                         log_trace!($logger, "Decaying and persisting scorer");
418                                         if let Some(duration_since_epoch) = $time_fetch() {
419                                                 scorer.write_lock().decay_liquidity_certainty(duration_since_epoch);
420                                         }
421                                         if let Err(e) = $persister.persist_scorer(&scorer) {
422                                                 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
423                                         }
424                                 }
425                                 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
426                         }
427
428                         if $timer_elapsed(&mut last_rebroadcast_call, REBROADCAST_TIMER) {
429                                 log_trace!($logger, "Rebroadcasting monitor's pending claims");
430                                 $chain_monitor.rebroadcast_pending_claims();
431                                 last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
432                         }
433                 }
434
435                 // After we exit, ensure we persist the ChannelManager one final time - this avoids
436                 // some races where users quit while channel updates were in-flight, with
437                 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
438                 $persister.persist_manager(&*$channel_manager)?;
439
440                 // Persist Scorer on exit
441                 if let Some(ref scorer) = $scorer {
442                         $persister.persist_scorer(&scorer)?;
443                 }
444
445                 // Persist NetworkGraph on exit
446                 if let Some(network_graph) = $gossip_sync.network_graph() {
447                         $persister.persist_graph(network_graph)?;
448                 }
449
450                 Ok(())
451         } }
452 }
453
454 #[cfg(feature = "futures")]
455 pub(crate) mod futures_util {
456         use core::future::Future;
457         use core::task::{Poll, Waker, RawWaker, RawWakerVTable};
458         use core::pin::Pin;
459         use core::marker::Unpin;
460         pub(crate) struct Selector<
461                 A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
462         > {
463                 pub a: A,
464                 pub b: B,
465                 pub c: C,
466         }
467         pub(crate) enum SelectorOutput {
468                 A, B, C(bool),
469         }
470
471         impl<
472                 A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
473         > Future for Selector<A, B, C> {
474                 type Output = SelectorOutput;
475                 fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<SelectorOutput> {
476                         match Pin::new(&mut self.a).poll(ctx) {
477                                 Poll::Ready(()) => { return Poll::Ready(SelectorOutput::A); },
478                                 Poll::Pending => {},
479                         }
480                         match Pin::new(&mut self.b).poll(ctx) {
481                                 Poll::Ready(()) => { return Poll::Ready(SelectorOutput::B); },
482                                 Poll::Pending => {},
483                         }
484                         match Pin::new(&mut self.c).poll(ctx) {
485                                 Poll::Ready(res) => { return Poll::Ready(SelectorOutput::C(res)); },
486                                 Poll::Pending => {},
487                         }
488                         Poll::Pending
489                 }
490         }
491
492         // If we want to poll a future without an async context to figure out if it has completed or
493         // not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
494         // but sadly there's a good bit of boilerplate here.
495         fn dummy_waker_clone(_: *const ()) -> RawWaker { RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE) }
496         fn dummy_waker_action(_: *const ()) { }
497
498         const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
499                 dummy_waker_clone, dummy_waker_action, dummy_waker_action, dummy_waker_action);
500         pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) } }
501 }
502 #[cfg(feature = "futures")]
503 use futures_util::{Selector, SelectorOutput, dummy_waker};
504 #[cfg(feature = "futures")]
505 use core::task;
506
507 /// Processes background events in a future.
508 ///
509 /// `sleeper` should return a future which completes in the given amount of time and returns a
510 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
511 /// future which outputs `true`, the loop will exit and this function's future will complete.
512 /// The `sleeper` future is free to return early after it has triggered the exit condition.
513 ///
514 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
515 ///
516 /// Requires the `futures` feature. Note that while this method is available without the `std`
517 /// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
518 /// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
519 /// manually instead.
520 ///
521 /// The `mobile_interruptable_platform` flag should be set if we're currently running on a
522 /// mobile device, where we may need to check for interruption of the application regularly. If you
523 /// are unsure, you should set the flag, as the performance impact of it is minimal unless there
524 /// are hundreds or thousands of simultaneous process calls running.
525 ///
526 /// The `fetch_time` parameter should return the current wall clock time, if one is available. If
527 /// no time is available, some features may be disabled, however the node will still operate fine.
528 ///
529 /// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
530 /// could setup `process_events_async` like this:
531 /// ```
532 /// # use lightning::io;
533 /// # use std::sync::{Arc, RwLock};
534 /// # use std::sync::atomic::{AtomicBool, Ordering};
535 /// # use std::time::SystemTime;
536 /// # use lightning_background_processor::{process_events_async, GossipSync};
537 /// # struct MyStore {}
538 /// # impl lightning::util::persist::KVStore for MyStore {
539 /// #     fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
540 /// #     fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
541 /// #     fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
542 /// #     fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
543 /// # }
544 /// # struct MyEventHandler {}
545 /// # impl MyEventHandler {
546 /// #     async fn handle_event(&self, _: lightning::events::Event) {}
547 /// # }
548 /// # #[derive(Eq, PartialEq, Clone, Hash)]
549 /// # struct MySocketDescriptor {}
550 /// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor {
551 /// #     fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
552 /// #     fn disconnect_socket(&mut self) {}
553 /// # }
554 /// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync;
555 /// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync;
556 /// # type MyNodeSigner = dyn lightning::sign::NodeSigner + Send + Sync;
557 /// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync;
558 /// # type MyFilter = dyn lightning::chain::Filter + Send + Sync;
559 /// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync;
560 /// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyStore>>;
561 /// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, Arc<MyUtxoLookup>, MyLogger>;
562 /// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<MyLogger>>;
563 /// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync<Arc<MyNetworkGraph>, Arc<MyUtxoLookup>, Arc<MyLogger>>;
564 /// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyLogger>;
565 /// # type MyScorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
566 ///
567 /// # async fn setup_background_processing(my_persister: Arc<MyStore>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_gossip_sync: Arc<MyGossipSync>, my_logger: Arc<MyLogger>, my_scorer: Arc<MyScorer>, my_peer_manager: Arc<MyPeerManager>) {
568 ///     let background_persister = Arc::clone(&my_persister);
569 ///     let background_event_handler = Arc::clone(&my_event_handler);
570 ///     let background_chain_mon = Arc::clone(&my_chain_monitor);
571 ///     let background_chan_man = Arc::clone(&my_channel_manager);
572 ///     let background_gossip_sync = GossipSync::p2p(Arc::clone(&my_gossip_sync));
573 ///     let background_peer_man = Arc::clone(&my_peer_manager);
574 ///     let background_logger = Arc::clone(&my_logger);
575 ///     let background_scorer = Arc::clone(&my_scorer);
576 ///
577 ///     // Setup the sleeper.
578 ///     let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());
579 ///
580 ///     let sleeper = move |d| {
581 ///             let mut receiver = stop_receiver.clone();
582 ///             Box::pin(async move {
583 ///                     tokio::select!{
584 ///                             _ = tokio::time::sleep(d) => false,
585 ///                             _ = receiver.changed() => true,
586 ///                     }
587 ///             })
588 ///     };
589 ///
590 ///     let mobile_interruptable_platform = false;
591 ///
592 ///     let handle = tokio::spawn(async move {
593 ///             process_events_async(
594 ///                     background_persister,
595 ///                     |e| background_event_handler.handle_event(e),
596 ///                     background_chain_mon,
597 ///                     background_chan_man,
598 ///                     background_gossip_sync,
599 ///                     background_peer_man,
600 ///                     background_logger,
601 ///                     Some(background_scorer),
602 ///                     sleeper,
603 ///                     mobile_interruptable_platform,
604 ///                     || Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap())
605 ///                     )
606 ///                     .await
607 ///                     .expect("Failed to process events");
608 ///     });
609 ///
610 ///     // Stop the background processing.
611 ///     stop_sender.send(()).unwrap();
612 ///     handle.await.unwrap();
613 ///     # }
614 ///```
615 #[cfg(feature = "futures")]
616 pub async fn process_events_async<
617         'a,
618         UL: 'static + Deref + Send + Sync,
619         CF: 'static + Deref + Send + Sync,
620         CW: 'static + Deref + Send + Sync,
621         T: 'static + Deref + Send + Sync,
622         ES: 'static + Deref + Send + Sync,
623         NS: 'static + Deref + Send + Sync,
624         SP: 'static + Deref + Send + Sync,
625         F: 'static + Deref + Send + Sync,
626         R: 'static + Deref + Send + Sync,
627         G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
628         L: 'static + Deref + Send + Sync,
629         P: 'static + Deref + Send + Sync,
630         EventHandlerFuture: core::future::Future<Output = ()>,
631         EventHandler: Fn(Event) -> EventHandlerFuture,
632         PS: 'static + Deref + Send,
633         M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::EcdsaSigner, CF, T, F, L, P>> + Send + Sync,
634         CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
635         PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
636         RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
637         PM: 'static + Deref + Send + Sync,
638         S: 'static + Deref<Target = SC> + Send + Sync,
639         SC: for<'b> WriteableScore<'b>,
640         SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
641         Sleeper: Fn(Duration) -> SleepFuture,
642         FetchTime: Fn() -> Option<Duration>,
643 >(
644         persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
645         gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
646         sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
647 ) -> Result<(), lightning::io::Error>
648 where
649         UL::Target: 'static + UtxoLookup,
650         CF::Target: 'static + chain::Filter,
651         CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
652         T::Target: 'static + BroadcasterInterface,
653         ES::Target: 'static + EntropySource,
654         NS::Target: 'static + NodeSigner,
655         SP::Target: 'static + SignerProvider,
656         F::Target: 'static + FeeEstimator,
657         R::Target: 'static + Router,
658         L::Target: 'static + Logger,
659         P::Target: 'static + Persist<<SP::Target as SignerProvider>::EcdsaSigner>,
660         PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
661         PM::Target: APeerManager + Send + Sync,
662 {
663         let mut should_break = false;
664         let async_event_handler = |event| {
665                 let network_graph = gossip_sync.network_graph();
666                 let event_handler = &event_handler;
667                 let scorer = &scorer;
668                 let logger = &logger;
669                 let persister = &persister;
670                 let fetch_time = &fetch_time;
671                 async move {
672                         if let Some(network_graph) = network_graph {
673                                 handle_network_graph_update(network_graph, &event)
674                         }
675                         if let Some(ref scorer) = scorer {
676                                 if let Some(duration_since_epoch) = fetch_time() {
677                                         if update_scorer(scorer, &event, duration_since_epoch) {
678                                                 log_trace!(logger, "Persisting scorer after update");
679                                                 if let Err(e) = persister.persist_scorer(&scorer) {
680                                                         log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
681                                                 }
682                                         }
683                                 }
684                         }
685                         event_handler(event).await;
686                 }
687         };
688         define_run_body!(
689                 persister, chain_monitor,
690                 chain_monitor.process_pending_events_async(async_event_handler).await,
691                 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
692                 peer_manager, process_onion_message_handler_events_async(&peer_manager, async_event_handler).await,
693                 gossip_sync, logger, scorer, should_break, {
694                         let fut = Selector {
695                                 a: channel_manager.get_event_or_persistence_needed_future(),
696                                 b: chain_monitor.get_update_future(),
697                                 c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }),
698                         };
699                         match fut.await {
700                                 SelectorOutput::A|SelectorOutput::B => {},
701                                 SelectorOutput::C(exit) => {
702                                         should_break = exit;
703                                 }
704                         }
705                 }, |t| sleeper(Duration::from_secs(t)),
706                 |fut: &mut SleepFuture, _| {
707                         let mut waker = dummy_waker();
708                         let mut ctx = task::Context::from_waker(&mut waker);
709                         match core::pin::Pin::new(fut).poll(&mut ctx) {
710                                 task::Poll::Ready(exit) => { should_break = exit; true },
711                                 task::Poll::Pending => false,
712                         }
713                 }, mobile_interruptable_platform, fetch_time,
714         )
715 }
716
717 #[cfg(feature = "futures")]
718 async fn process_onion_message_handler_events_async<
719         EventHandlerFuture: core::future::Future<Output = ()>,
720         EventHandler: Fn(Event) -> EventHandlerFuture,
721         PM: 'static + Deref + Send + Sync,
722 >(
723         peer_manager: &PM, handler: EventHandler
724 )
725 where
726         PM::Target: APeerManager + Send + Sync,
727 {
728         use lightning::events::EventsProvider;
729
730         let events = core::cell::RefCell::new(Vec::new());
731         peer_manager.onion_message_handler().process_pending_events(&|e| events.borrow_mut().push(e));
732
733         for event in events.into_inner() {
734                 handler(event).await
735         }
736 }
737
738 #[cfg(feature = "std")]
739 impl BackgroundProcessor {
740         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
741         /// documentation].
742         ///
743         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
744         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
745         /// either [`join`] or [`stop`].
746         ///
747         /// # Data Persistence
748         ///
749         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
750         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
751         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
752         /// provided implementation.
753         ///
754         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
755         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
756         /// See the `lightning-persister` crate for LDK's provided implementation.
757         ///
758         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
759         /// error or call [`join`] and handle any error that may arise. For the latter case,
760         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
761         ///
762         /// # Event Handling
763         ///
764         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
765         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
766         /// functionality implemented by other handlers.
767         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
768         ///
769         /// # Rapid Gossip Sync
770         ///
771         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
772         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
773         /// until the [`RapidGossipSync`] instance completes its first sync.
774         ///
775         /// [top-level documentation]: BackgroundProcessor
776         /// [`join`]: Self::join
777         /// [`stop`]: Self::stop
778         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
779         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
780         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
781         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
782         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
783         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
784         pub fn start<
785                 'a,
786                 UL: 'static + Deref + Send + Sync,
787                 CF: 'static + Deref + Send + Sync,
788                 CW: 'static + Deref + Send + Sync,
789                 T: 'static + Deref + Send + Sync,
790                 ES: 'static + Deref + Send + Sync,
791                 NS: 'static + Deref + Send + Sync,
792                 SP: 'static + Deref + Send + Sync,
793                 F: 'static + Deref + Send + Sync,
794                 R: 'static + Deref + Send + Sync,
795                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
796                 L: 'static + Deref + Send + Sync,
797                 P: 'static + Deref + Send + Sync,
798                 EH: 'static + EventHandler + Send,
799                 PS: 'static + Deref + Send,
800                 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::EcdsaSigner, CF, T, F, L, P>> + Send + Sync,
801                 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
802                 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
803                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
804                 PM: 'static + Deref + Send + Sync,
805                 S: 'static + Deref<Target = SC> + Send + Sync,
806                 SC: for <'b> WriteableScore<'b>,
807         >(
808                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
809                 gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
810         ) -> Self
811         where
812                 UL::Target: 'static + UtxoLookup,
813                 CF::Target: 'static + chain::Filter,
814                 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
815                 T::Target: 'static + BroadcasterInterface,
816                 ES::Target: 'static + EntropySource,
817                 NS::Target: 'static + NodeSigner,
818                 SP::Target: 'static + SignerProvider,
819                 F::Target: 'static + FeeEstimator,
820                 R::Target: 'static + Router,
821                 L::Target: 'static + Logger,
822                 P::Target: 'static + Persist<<SP::Target as SignerProvider>::EcdsaSigner>,
823                 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
824                 PM::Target: APeerManager + Send + Sync,
825         {
826                 let stop_thread = Arc::new(AtomicBool::new(false));
827                 let stop_thread_clone = stop_thread.clone();
828                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
829                         let event_handler = |event| {
830                                 let network_graph = gossip_sync.network_graph();
831                                 if let Some(network_graph) = network_graph {
832                                         handle_network_graph_update(network_graph, &event)
833                                 }
834                                 if let Some(ref scorer) = scorer {
835                                         use std::time::SystemTime;
836                                         let duration_since_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
837                                                 .expect("Time should be sometime after 1970");
838                                         if update_scorer(scorer, &event, duration_since_epoch) {
839                                                 log_trace!(logger, "Persisting scorer after update");
840                                                 if let Err(e) = persister.persist_scorer(&scorer) {
841                                                         log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
842                                                 }
843                                         }
844                                 }
845                                 event_handler.handle_event(event);
846                         };
847                         define_run_body!(
848                                 persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
849                                 channel_manager, channel_manager.process_pending_events(&event_handler),
850                                 peer_manager,
851                                 peer_manager.onion_message_handler().process_pending_events(&event_handler),
852                                 gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire),
853                                 { Sleeper::from_two_futures(
854                                         channel_manager.get_event_or_persistence_needed_future(),
855                                         chain_monitor.get_update_future()
856                                 ).wait_timeout(Duration::from_millis(100)); },
857                                 |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false,
858                                 || {
859                                         use std::time::SystemTime;
860                                         Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
861                                                 .expect("Time should be sometime after 1970"))
862                                 },
863                         )
864                 });
865                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
866         }
867
868         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
869         /// [`ChannelManager`].
870         ///
871         /// # Panics
872         ///
873         /// This function panics if the background thread has panicked such as while persisting or
874         /// handling events.
875         ///
876         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
877         pub fn join(mut self) -> Result<(), std::io::Error> {
878                 assert!(self.thread_handle.is_some());
879                 self.join_thread()
880         }
881
882         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
883         /// [`ChannelManager`].
884         ///
885         /// # Panics
886         ///
887         /// This function panics if the background thread has panicked such as while persisting or
888         /// handling events.
889         ///
890         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
891         pub fn stop(mut self) -> Result<(), std::io::Error> {
892                 assert!(self.thread_handle.is_some());
893                 self.stop_and_join_thread()
894         }
895
896         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
897                 self.stop_thread.store(true, Ordering::Release);
898                 self.join_thread()
899         }
900
901         fn join_thread(&mut self) -> Result<(), std::io::Error> {
902                 match self.thread_handle.take() {
903                         Some(handle) => handle.join().unwrap(),
904                         None => Ok(()),
905                 }
906         }
907 }
908
909 #[cfg(feature = "std")]
910 impl Drop for BackgroundProcessor {
911         fn drop(&mut self) {
912                 self.stop_and_join_thread().unwrap();
913         }
914 }
915
916 #[cfg(all(feature = "std", test))]
917 mod tests {
918         use bitcoin::blockdata::constants::{genesis_block, ChainHash};
919         use bitcoin::blockdata::locktime::absolute::LockTime;
920         use bitcoin::blockdata::transaction::{Transaction, TxOut};
921         use bitcoin::network::constants::Network;
922         use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1};
923         use lightning::chain::{BestBlock, Confirm, chainmonitor};
924         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
925         use lightning::sign::{InMemorySigner, KeysManager};
926         use lightning::chain::transaction::OutPoint;
927         use lightning::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent};
928         use lightning::{get_event_msg, get_event};
929         use lightning::ln::PaymentHash;
930         use lightning::ln::channelmanager;
931         use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId};
932         use lightning::ln::features::{ChannelFeatures, NodeFeatures};
933         use lightning::ln::functional_test_utils::*;
934         use lightning::ln::msgs::{ChannelMessageHandler, Init};
935         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
936         use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
937         use lightning::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp, LockableScore};
938         use lightning::routing::router::{DefaultRouter, Path, RouteHop, CandidateRouteHop};
939         use lightning::util::config::UserConfig;
940         use lightning::util::ser::Writeable;
941         use lightning::util::test_utils;
942         use lightning::util::persist::{KVStore,
943                 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY,
944                 NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
945                 SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY};
946         use lightning_persister::fs_store::FilesystemStore;
947         use std::collections::VecDeque;
948         use std::{fs, env};
949         use std::path::PathBuf;
950         use std::sync::{Arc, Mutex};
951         use std::sync::mpsc::SyncSender;
952         use std::time::Duration;
953         use lightning_rapid_gossip_sync::RapidGossipSync;
954         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
955
956         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
957
958         #[derive(Clone, Hash, PartialEq, Eq)]
959         struct TestDescriptor{}
960         impl SocketDescriptor for TestDescriptor {
961                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
962                         0
963                 }
964
965                 fn disconnect_socket(&mut self) {}
966         }
967
968         #[cfg(c_bindings)]
969         type LockingWrapper<T> = lightning::routing::scoring::MultiThreadedLockableScore<T>;
970         #[cfg(not(c_bindings))]
971         type LockingWrapper<T> = Mutex<T>;
972
973         type ChannelManager =
974                 channelmanager::ChannelManager<
975                         Arc<ChainMonitor>,
976                         Arc<test_utils::TestBroadcaster>,
977                         Arc<KeysManager>,
978                         Arc<KeysManager>,
979                         Arc<KeysManager>,
980                         Arc<test_utils::TestFeeEstimator>,
981                         Arc<DefaultRouter<
982                                 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
983                                 Arc<test_utils::TestLogger>,
984                                 Arc<LockingWrapper<TestScorer>>,
985                                 (),
986                                 TestScorer>
987                         >,
988                         Arc<test_utils::TestLogger>>;
989
990         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemStore>>;
991
992         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
993         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
994
995         struct Node {
996                 node: Arc<ChannelManager>,
997                 p2p_gossip_sync: PGS,
998                 rapid_gossip_sync: RGS,
999                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
1000                 chain_monitor: Arc<ChainMonitor>,
1001                 kv_store: Arc<FilesystemStore>,
1002                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
1003                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1004                 logger: Arc<test_utils::TestLogger>,
1005                 best_block: BestBlock,
1006                 scorer: Arc<LockingWrapper<TestScorer>>,
1007         }
1008
1009         impl Node {
1010                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
1011                         GossipSync::P2P(self.p2p_gossip_sync.clone())
1012                 }
1013
1014                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
1015                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
1016                 }
1017
1018                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
1019                         GossipSync::None
1020                 }
1021         }
1022
1023         impl Drop for Node {
1024                 fn drop(&mut self) {
1025                         let data_dir = self.kv_store.get_data_dir();
1026                         match fs::remove_dir_all(data_dir.clone()) {
1027                                 Err(e) => println!("Failed to remove test store directory {}: {}", data_dir.display(), e),
1028                                 _ => {}
1029                         }
1030                 }
1031         }
1032
1033         struct Persister {
1034                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
1035                 graph_persistence_notifier: Option<SyncSender<()>>,
1036                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
1037                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
1038                 kv_store: FilesystemStore,
1039         }
1040
1041         impl Persister {
1042                 fn new(data_dir: PathBuf) -> Self {
1043                         let kv_store = FilesystemStore::new(data_dir);
1044                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, kv_store }
1045                 }
1046
1047                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
1048                         Self { graph_error: Some((error, message)), ..self }
1049                 }
1050
1051                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
1052                         Self { graph_persistence_notifier: Some(sender), ..self }
1053                 }
1054
1055                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
1056                         Self { manager_error: Some((error, message)), ..self }
1057                 }
1058
1059                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
1060                         Self { scorer_error: Some((error, message)), ..self }
1061                 }
1062         }
1063
1064         impl KVStore for Persister {
1065                 fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> lightning::io::Result<Vec<u8>> {
1066                         self.kv_store.read(primary_namespace, secondary_namespace, key)
1067                 }
1068
1069                 fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> lightning::io::Result<()> {
1070                         if primary_namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE &&
1071                                 secondary_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE &&
1072                                 key == CHANNEL_MANAGER_PERSISTENCE_KEY
1073                         {
1074                                 if let Some((error, message)) = self.manager_error {
1075                                         return Err(std::io::Error::new(error, message))
1076                                 }
1077                         }
1078
1079                         if primary_namespace == NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE &&
1080                                 secondary_namespace == NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE &&
1081                                 key == NETWORK_GRAPH_PERSISTENCE_KEY
1082                         {
1083                                 if let Some(sender) = &self.graph_persistence_notifier {
1084                                         match sender.send(()) {
1085                                                 Ok(()) => {},
1086                                                 Err(std::sync::mpsc::SendError(())) => println!("Persister failed to notify as receiver went away."),
1087                                         }
1088                                 };
1089
1090                                 if let Some((error, message)) = self.graph_error {
1091                                         return Err(std::io::Error::new(error, message))
1092                                 }
1093                         }
1094
1095                         if primary_namespace == SCORER_PERSISTENCE_PRIMARY_NAMESPACE &&
1096                                 secondary_namespace == SCORER_PERSISTENCE_SECONDARY_NAMESPACE &&
1097                                 key == SCORER_PERSISTENCE_KEY
1098                         {
1099                                 if let Some((error, message)) = self.scorer_error {
1100                                         return Err(std::io::Error::new(error, message))
1101                                 }
1102                         }
1103
1104                         self.kv_store.write(primary_namespace, secondary_namespace, key, buf)
1105                 }
1106
1107                 fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> lightning::io::Result<()> {
1108                         self.kv_store.remove(primary_namespace, secondary_namespace, key, lazy)
1109                 }
1110
1111                 fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> lightning::io::Result<Vec<String>> {
1112                         self.kv_store.list(primary_namespace, secondary_namespace)
1113                 }
1114         }
1115
1116         struct TestScorer {
1117                 event_expectations: Option<VecDeque<TestResult>>,
1118         }
1119
1120         #[derive(Debug)]
1121         enum TestResult {
1122                 PaymentFailure { path: Path, short_channel_id: u64 },
1123                 PaymentSuccess { path: Path },
1124                 ProbeFailure { path: Path },
1125                 ProbeSuccess { path: Path },
1126         }
1127
1128         impl TestScorer {
1129                 fn new() -> Self {
1130                         Self { event_expectations: None }
1131                 }
1132
1133                 fn expect(&mut self, expectation: TestResult) {
1134                         self.event_expectations.get_or_insert_with(VecDeque::new).push_back(expectation);
1135                 }
1136         }
1137
1138         impl lightning::util::ser::Writeable for TestScorer {
1139                 fn write<W: lightning::util::ser::Writer>(&self, _: &mut W) -> Result<(), lightning::io::Error> { Ok(()) }
1140         }
1141
1142         impl ScoreLookUp for TestScorer {
1143                 type ScoreParams = ();
1144                 fn channel_penalty_msat(
1145                         &self, _candidate: &CandidateRouteHop, _usage: ChannelUsage, _score_params: &Self::ScoreParams
1146                 ) -> u64 { unimplemented!(); }
1147         }
1148
1149         impl ScoreUpdate for TestScorer {
1150                 fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64, _: Duration) {
1151                         if let Some(expectations) = &mut self.event_expectations {
1152                                 match expectations.pop_front().unwrap() {
1153                                         TestResult::PaymentFailure { path, short_channel_id } => {
1154                                                 assert_eq!(actual_path, &path);
1155                                                 assert_eq!(actual_short_channel_id, short_channel_id);
1156                                         },
1157                                         TestResult::PaymentSuccess { path } => {
1158                                                 panic!("Unexpected successful payment path: {:?}", path)
1159                                         },
1160                                         TestResult::ProbeFailure { path } => {
1161                                                 panic!("Unexpected probe failure: {:?}", path)
1162                                         },
1163                                         TestResult::ProbeSuccess { path } => {
1164                                                 panic!("Unexpected probe success: {:?}", path)
1165                                         }
1166                                 }
1167                         }
1168                 }
1169
1170                 fn payment_path_successful(&mut self, actual_path: &Path, _: Duration) {
1171                         if let Some(expectations) = &mut self.event_expectations {
1172                                 match expectations.pop_front().unwrap() {
1173                                         TestResult::PaymentFailure { path, .. } => {
1174                                                 panic!("Unexpected payment path failure: {:?}", path)
1175                                         },
1176                                         TestResult::PaymentSuccess { path } => {
1177                                                 assert_eq!(actual_path, &path);
1178                                         },
1179                                         TestResult::ProbeFailure { path } => {
1180                                                 panic!("Unexpected probe failure: {:?}", path)
1181                                         },
1182                                         TestResult::ProbeSuccess { path } => {
1183                                                 panic!("Unexpected probe success: {:?}", path)
1184                                         }
1185                                 }
1186                         }
1187                 }
1188
1189                 fn probe_failed(&mut self, actual_path: &Path, _: u64, _: Duration) {
1190                         if let Some(expectations) = &mut self.event_expectations {
1191                                 match expectations.pop_front().unwrap() {
1192                                         TestResult::PaymentFailure { path, .. } => {
1193                                                 panic!("Unexpected payment path failure: {:?}", path)
1194                                         },
1195                                         TestResult::PaymentSuccess { path } => {
1196                                                 panic!("Unexpected payment path success: {:?}", path)
1197                                         },
1198                                         TestResult::ProbeFailure { path } => {
1199                                                 assert_eq!(actual_path, &path);
1200                                         },
1201                                         TestResult::ProbeSuccess { path } => {
1202                                                 panic!("Unexpected probe success: {:?}", path)
1203                                         }
1204                                 }
1205                         }
1206                 }
1207                 fn probe_successful(&mut self, actual_path: &Path, _: Duration) {
1208                         if let Some(expectations) = &mut self.event_expectations {
1209                                 match expectations.pop_front().unwrap() {
1210                                         TestResult::PaymentFailure { path, .. } => {
1211                                                 panic!("Unexpected payment path failure: {:?}", path)
1212                                         },
1213                                         TestResult::PaymentSuccess { path } => {
1214                                                 panic!("Unexpected payment path success: {:?}", path)
1215                                         },
1216                                         TestResult::ProbeFailure { path } => {
1217                                                 panic!("Unexpected probe failure: {:?}", path)
1218                                         },
1219                                         TestResult::ProbeSuccess { path } => {
1220                                                 assert_eq!(actual_path, &path);
1221                                         }
1222                                 }
1223                         }
1224                 }
1225                 fn decay_liquidity_certainty(&mut self, _: Duration) {}
1226         }
1227
1228         #[cfg(c_bindings)]
1229         impl lightning::routing::scoring::Score for TestScorer {}
1230
1231         impl Drop for TestScorer {
1232                 fn drop(&mut self) {
1233                         if std::thread::panicking() {
1234                                 return;
1235                         }
1236
1237                         if let Some(event_expectations) = &self.event_expectations {
1238                                 if !event_expectations.is_empty() {
1239                                         panic!("Unsatisfied event expectations: {:?}", event_expectations);
1240                                 }
1241                         }
1242                 }
1243         }
1244
1245         fn get_full_filepath(filepath: String, filename: String) -> String {
1246                 let mut path = PathBuf::from(filepath);
1247                 path.push(filename);
1248                 path.to_str().unwrap().to_string()
1249         }
1250
1251         fn create_nodes(num_nodes: usize, persist_dir: &str) -> (String, Vec<Node>) {
1252                 let persist_temp_path = env::temp_dir().join(persist_dir);
1253                 let persist_dir = persist_temp_path.to_string_lossy().to_string();
1254                 let network = Network::Bitcoin;
1255                 let mut nodes = Vec::new();
1256                 for i in 0..num_nodes {
1257                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster::new(network));
1258                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
1259                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
1260                         let genesis_block = genesis_block(network);
1261                         let network_graph = Arc::new(NetworkGraph::new(network, logger.clone()));
1262                         let scorer = Arc::new(LockingWrapper::new(TestScorer::new()));
1263                         let seed = [i as u8; 32];
1264                         let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone(), Default::default()));
1265                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin));
1266                         let kv_store = Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into()));
1267                         let now = Duration::from_secs(genesis_block.header.time as u64);
1268                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
1269                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), kv_store.clone()));
1270                         let best_block = BestBlock::from_network(network);
1271                         let params = ChainParameters { network, best_block };
1272                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params, genesis_block.header.time));
1273                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
1274                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone()));
1275                         let msg_handler = MessageHandler {
1276                                 chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new(ChainHash::using_genesis_block(Network::Testnet))),
1277                                 route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()),
1278                                 onion_message_handler: IgnoringMessageHandler{}, custom_message_handler: IgnoringMessageHandler{}
1279                         };
1280                         let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), keys_manager.clone()));
1281                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, kv_store, tx_broadcaster, network_graph, logger, best_block, scorer };
1282                         nodes.push(node);
1283                 }
1284
1285                 for i in 0..num_nodes {
1286                         for j in (i+1)..num_nodes {
1287                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init {
1288                                         features: nodes[j].node.init_features(), networks: None, remote_network_address: None
1289                                 }, true).unwrap();
1290                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init {
1291                                         features: nodes[i].node.init_features(), networks: None, remote_network_address: None
1292                                 }, false).unwrap();
1293                         }
1294                 }
1295
1296                 (persist_dir, nodes)
1297         }
1298
1299         macro_rules! open_channel {
1300                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1301                         begin_open_channel!($node_a, $node_b, $channel_value);
1302                         let events = $node_a.node.get_and_clear_pending_events();
1303                         assert_eq!(events.len(), 1);
1304                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
1305                         $node_a.node.funding_transaction_generated(&temporary_channel_id, &$node_b.node.get_our_node_id(), tx.clone()).unwrap();
1306                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
1307                         get_event!($node_b, Event::ChannelPending);
1308                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
1309                         get_event!($node_a, Event::ChannelPending);
1310                         tx
1311                 }}
1312         }
1313
1314         macro_rules! begin_open_channel {
1315                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1316                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None, None).unwrap();
1317                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
1318                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
1319                 }}
1320         }
1321
1322         macro_rules! handle_funding_generation_ready {
1323                 ($event: expr, $channel_value: expr) => {{
1324                         match $event {
1325                                 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
1326                                         assert_eq!(channel_value_satoshis, $channel_value);
1327                                         assert_eq!(user_channel_id, 42);
1328
1329                                         let tx = Transaction { version: 1 as i32, lock_time: LockTime::ZERO, input: Vec::new(), output: vec![TxOut {
1330                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
1331                                         }]};
1332                                         (temporary_channel_id, tx)
1333                                 },
1334                                 _ => panic!("Unexpected event"),
1335                         }
1336                 }}
1337         }
1338
1339         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
1340                 for i in 1..=depth {
1341                         let prev_blockhash = node.best_block.block_hash();
1342                         let height = node.best_block.height() + 1;
1343                         let header = create_dummy_header(prev_blockhash, height);
1344                         let txdata = vec![(0, tx)];
1345                         node.best_block = BestBlock::new(header.block_hash(), height);
1346                         match i {
1347                                 1 => {
1348                                         node.node.transactions_confirmed(&header, &txdata, height);
1349                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
1350                                 },
1351                                 x if x == depth => {
1352                                         node.node.best_block_updated(&header, height);
1353                                         node.chain_monitor.best_block_updated(&header, height);
1354                                 },
1355                                 _ => {},
1356                         }
1357                 }
1358         }
1359         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
1360                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
1361         }
1362
1363         #[test]
1364         fn test_background_processor() {
1365                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
1366                 // updates. Also test that when new updates are available, the manager signals that it needs
1367                 // re-persistence and is successfully re-persisted.
1368                 let (persist_dir, nodes) = create_nodes(2, "test_background_processor");
1369
1370                 // Go through the channel creation process so that each node has something to persist. Since
1371                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
1372                 // avoid a race with processing events.
1373                 let tx = open_channel!(nodes[0], nodes[1], 100000);
1374
1375                 // Initiate the background processors to watch each node.
1376                 let data_dir = nodes[0].kv_store.get_data_dir();
1377                 let persister = Arc::new(Persister::new(data_dir));
1378                 let event_handler = |_: _| {};
1379                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1380
1381                 macro_rules! check_persisted_data {
1382                         ($node: expr, $filepath: expr) => {
1383                                 let mut expected_bytes = Vec::new();
1384                                 loop {
1385                                         expected_bytes.clear();
1386                                         match $node.write(&mut expected_bytes) {
1387                                                 Ok(()) => {
1388                                                         match std::fs::read($filepath) {
1389                                                                 Ok(bytes) => {
1390                                                                         if bytes == expected_bytes {
1391                                                                                 break
1392                                                                         } else {
1393                                                                                 continue
1394                                                                         }
1395                                                                 },
1396                                                                 Err(_) => continue
1397                                                         }
1398                                                 },
1399                                                 Err(e) => panic!("Unexpected error: {}", e)
1400                                         }
1401                                 }
1402                         }
1403                 }
1404
1405                 // Check that the initial channel manager data is persisted as expected.
1406                 let filepath = get_full_filepath(format!("{}_persister_0", &persist_dir), "manager".to_string());
1407                 check_persisted_data!(nodes[0].node, filepath.clone());
1408
1409                 loop {
1410                         if !nodes[0].node.get_event_or_persist_condvar_value() { break }
1411                 }
1412
1413                 // Force-close the channel.
1414                 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
1415
1416                 // Check that the force-close updates are persisted.
1417                 check_persisted_data!(nodes[0].node, filepath.clone());
1418                 loop {
1419                         if !nodes[0].node.get_event_or_persist_condvar_value() { break }
1420                 }
1421
1422                 // Check network graph is persisted
1423                 let filepath = get_full_filepath(format!("{}_persister_0", &persist_dir), "network_graph".to_string());
1424                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
1425
1426                 // Check scorer is persisted
1427                 let filepath = get_full_filepath(format!("{}_persister_0", &persist_dir), "scorer".to_string());
1428                 check_persisted_data!(nodes[0].scorer, filepath.clone());
1429
1430                 if !std::thread::panicking() {
1431                         bg_processor.stop().unwrap();
1432                 }
1433         }
1434
1435         #[test]
1436         fn test_timer_tick_called() {
1437                 // Test that:
1438                 // - `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`,
1439                 // - `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`,
1440                 // - `PeerManager::timer_tick_occurred` is called every `PING_TIMER`, and
1441                 // - `OnionMessageHandler::timer_tick_occurred` is called every `ONION_MESSAGE_HANDLER_TIMER`.
1442                 let (_, nodes) = create_nodes(1, "test_timer_tick_called");
1443                 let data_dir = nodes[0].kv_store.get_data_dir();
1444                 let persister = Arc::new(Persister::new(data_dir));
1445                 let event_handler = |_: _| {};
1446                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1447                 loop {
1448                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1449                         let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string();
1450                         let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string();
1451                         let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string();
1452                         let desired_log_4 = "Calling OnionMessageHandler's timer_tick_occurred".to_string();
1453                         if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some() &&
1454                                 log_entries.get(&("lightning_background_processor", desired_log_2)).is_some() &&
1455                                 log_entries.get(&("lightning_background_processor", desired_log_3)).is_some() &&
1456                                 log_entries.get(&("lightning_background_processor", desired_log_4)).is_some() {
1457                                 break
1458                         }
1459                 }
1460
1461                 if !std::thread::panicking() {
1462                         bg_processor.stop().unwrap();
1463                 }
1464         }
1465
1466         #[test]
1467         fn test_channel_manager_persist_error() {
1468                 // Test that if we encounter an error during manager persistence, the thread panics.
1469                 let (_, nodes) = create_nodes(2, "test_persist_error");
1470                 open_channel!(nodes[0], nodes[1], 100000);
1471
1472                 let data_dir = nodes[0].kv_store.get_data_dir();
1473                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1474                 let event_handler = |_: _| {};
1475                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1476                 match bg_processor.join() {
1477                         Ok(_) => panic!("Expected error persisting manager"),
1478                         Err(e) => {
1479                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1480                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1481                         },
1482                 }
1483         }
1484
1485         #[tokio::test]
1486         #[cfg(feature = "futures")]
1487         async fn test_channel_manager_persist_error_async() {
1488                 // Test that if we encounter an error during manager persistence, the thread panics.
1489                 let (_, nodes) = create_nodes(2, "test_persist_error_sync");
1490                 open_channel!(nodes[0], nodes[1], 100000);
1491
1492                 let data_dir = nodes[0].kv_store.get_data_dir();
1493                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1494
1495                 let bp_future = super::process_events_async(
1496                         persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
1497                         nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
1498                         Some(nodes[0].scorer.clone()), move |dur: Duration| {
1499                                 Box::pin(async move {
1500                                         tokio::time::sleep(dur).await;
1501                                         false // Never exit
1502                                 })
1503                         }, false, || Some(Duration::ZERO),
1504                 );
1505                 match bp_future.await {
1506                         Ok(_) => panic!("Expected error persisting manager"),
1507                         Err(e) => {
1508                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1509                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1510                         },
1511                 }
1512         }
1513
1514         #[test]
1515         fn test_network_graph_persist_error() {
1516                 // Test that if we encounter an error during network graph persistence, an error gets returned.
1517                 let (_, nodes) = create_nodes(2, "test_persist_network_graph_error");
1518                 let data_dir = nodes[0].kv_store.get_data_dir();
1519                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1520                 let event_handler = |_: _| {};
1521                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1522
1523                 match bg_processor.stop() {
1524                         Ok(_) => panic!("Expected error persisting network graph"),
1525                         Err(e) => {
1526                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1527                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1528                         },
1529                 }
1530         }
1531
1532         #[test]
1533         fn test_scorer_persist_error() {
1534                 // Test that if we encounter an error during scorer persistence, an error gets returned.
1535                 let (_, nodes) = create_nodes(2, "test_persist_scorer_error");
1536                 let data_dir = nodes[0].kv_store.get_data_dir();
1537                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1538                 let event_handler = |_: _| {};
1539                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1540
1541                 match bg_processor.stop() {
1542                         Ok(_) => panic!("Expected error persisting scorer"),
1543                         Err(e) => {
1544                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1545                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1546                         },
1547                 }
1548         }
1549
1550         #[test]
1551         fn test_background_event_handling() {
1552                 let (_, mut nodes) = create_nodes(2, "test_background_event_handling");
1553                 let channel_value = 100000;
1554                 let data_dir = nodes[0].kv_store.get_data_dir();
1555                 let persister = Arc::new(Persister::new(data_dir.clone()));
1556
1557                 // Set up a background event handler for FundingGenerationReady events.
1558                 let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
1559                 let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
1560                 let event_handler = move |event: Event| match event {
1561                         Event::FundingGenerationReady { .. } => funding_generation_send.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1562                         Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
1563                         Event::ChannelReady { .. } => {},
1564                         _ => panic!("Unexpected event: {:?}", event),
1565                 };
1566
1567                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1568
1569                 // Open a channel and check that the FundingGenerationReady event was handled.
1570                 begin_open_channel!(nodes[0], nodes[1], channel_value);
1571                 let (temporary_channel_id, funding_tx) = funding_generation_recv
1572                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1573                         .expect("FundingGenerationReady not handled within deadline");
1574                 nodes[0].node.funding_transaction_generated(&temporary_channel_id, &nodes[1].node.get_our_node_id(), funding_tx.clone()).unwrap();
1575                 nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, nodes[1].node.get_our_node_id()));
1576                 get_event!(nodes[1], Event::ChannelPending);
1577                 nodes[0].node.handle_funding_signed(&nodes[1].node.get_our_node_id(), &get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, nodes[0].node.get_our_node_id()));
1578                 let _ = channel_pending_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1579                         .expect("ChannelPending not handled within deadline");
1580
1581                 // Confirm the funding transaction.
1582                 confirm_transaction(&mut nodes[0], &funding_tx);
1583                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1584                 confirm_transaction(&mut nodes[1], &funding_tx);
1585                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1586                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1587                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1588                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1589                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1590
1591                 if !std::thread::panicking() {
1592                         bg_processor.stop().unwrap();
1593                 }
1594
1595                 // Set up a background event handler for SpendableOutputs events.
1596                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1597                 let event_handler = move |event: Event| match event {
1598                         Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
1599                         Event::ChannelReady { .. } => {},
1600                         Event::ChannelClosed { .. } => {},
1601                         _ => panic!("Unexpected event: {:?}", event),
1602                 };
1603                 let persister = Arc::new(Persister::new(data_dir));
1604                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1605
1606                 // Force close the channel and check that the SpendableOutputs event was handled.
1607                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1608                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1609                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1610
1611                 let event = receiver
1612                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1613                         .expect("Events not handled within deadline");
1614                 match event {
1615                         Event::SpendableOutputs { .. } => {},
1616                         _ => panic!("Unexpected event: {:?}", event),
1617                 }
1618
1619                 if !std::thread::panicking() {
1620                         bg_processor.stop().unwrap();
1621                 }
1622         }
1623
1624         #[test]
1625         fn test_scorer_persistence() {
1626                 let (_, nodes) = create_nodes(2, "test_scorer_persistence");
1627                 let data_dir = nodes[0].kv_store.get_data_dir();
1628                 let persister = Arc::new(Persister::new(data_dir));
1629                 let event_handler = |_: _| {};
1630                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1631
1632                 loop {
1633                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1634                         let expected_log = "Decaying and persisting scorer".to_string();
1635                         if log_entries.get(&("lightning_background_processor", expected_log)).is_some() {
1636                                 break
1637                         }
1638                 }
1639
1640                 if !std::thread::panicking() {
1641                         bg_processor.stop().unwrap();
1642                 }
1643         }
1644
1645         macro_rules! do_test_not_pruning_network_graph_until_graph_sync_completion {
1646                 ($nodes: expr, $receive: expr, $sleep: expr) => {
1647                         let features = ChannelFeatures::empty();
1648                         $nodes[0].network_graph.add_channel_from_partial_announcement(
1649                                 42, 53, features, $nodes[0].node.get_our_node_id(), $nodes[1].node.get_our_node_id()
1650                         ).expect("Failed to update channel from partial announcement");
1651                         let original_graph_description = $nodes[0].network_graph.to_string();
1652                         assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1653                         assert_eq!($nodes[0].network_graph.read_only().channels().len(), 1);
1654
1655                         loop {
1656                                 $sleep;
1657                                 let log_entries = $nodes[0].logger.lines.lock().unwrap();
1658                                 let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1659                                 if *log_entries.get(&("lightning_background_processor", loop_counter))
1660                                         .unwrap_or(&0) > 1
1661                                 {
1662                                         // Wait until the loop has gone around at least twice.
1663                                         break
1664                                 }
1665                         }
1666
1667                         let initialization_input = vec![
1668                                 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1669                                 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1670                                 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1671                                 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1672                                 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1673                                 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1674                                 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1675                                 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1676                                 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1677                                 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1678                                 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1679                                 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1680                                 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1681                         ];
1682                         $nodes[0].rapid_gossip_sync.update_network_graph_no_std(&initialization_input[..], Some(1642291930)).unwrap();
1683
1684                         // this should have added two channels and pruned the previous one.
1685                         assert_eq!($nodes[0].network_graph.read_only().channels().len(), 2);
1686
1687                         $receive.expect("Network graph not pruned within deadline");
1688
1689                         // all channels should now be pruned
1690                         assert_eq!($nodes[0].network_graph.read_only().channels().len(), 0);
1691                 }
1692         }
1693
1694         #[test]
1695         fn test_not_pruning_network_graph_until_graph_sync_completion() {
1696                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1697
1698                 let (_, nodes) = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion");
1699                 let data_dir = nodes[0].kv_store.get_data_dir();
1700                 let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
1701
1702                 let event_handler = |_: _| {};
1703                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1704
1705                 do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes,
1706                         receiver.recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5)),
1707                         std::thread::sleep(Duration::from_millis(1)));
1708
1709                 background_processor.stop().unwrap();
1710         }
1711
1712         #[tokio::test]
1713         #[cfg(feature = "futures")]
1714         async fn test_not_pruning_network_graph_until_graph_sync_completion_async() {
1715                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1716
1717                 let (_, nodes) = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
1718                 let data_dir = nodes[0].kv_store.get_data_dir();
1719                 let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
1720
1721                 let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
1722                 let bp_future = super::process_events_async(
1723                         persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
1724                         nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
1725                         Some(nodes[0].scorer.clone()), move |dur: Duration| {
1726                                 let mut exit_receiver = exit_receiver.clone();
1727                                 Box::pin(async move {
1728                                         tokio::select! {
1729                                                 _ = tokio::time::sleep(dur) => false,
1730                                                 _ = exit_receiver.changed() => true,
1731                                         }
1732                                 })
1733                         }, false, || Some(Duration::from_secs(1696300000)),
1734                 );
1735
1736                 let t1 = tokio::spawn(bp_future);
1737                 let t2 = tokio::spawn(async move {
1738                         do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes, {
1739                                 let mut i = 0;
1740                                 loop {
1741                                         tokio::time::sleep(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER)).await;
1742                                         if let Ok(()) = receiver.try_recv() { break Ok::<(), ()>(()); }
1743                                         assert!(i < 5);
1744                                         i += 1;
1745                                 }
1746                         }, tokio::time::sleep(Duration::from_millis(1)).await);
1747                         exit_sender.send(()).unwrap();
1748                 });
1749                 let (r1, r2) = tokio::join!(t1, t2);
1750                 r1.unwrap().unwrap();
1751                 r2.unwrap()
1752         }
1753
1754         macro_rules! do_test_payment_path_scoring {
1755                 ($nodes: expr, $receive: expr) => {
1756                         // Ensure that we update the scorer when relevant events are processed. In this case, we ensure
1757                         // that we update the scorer upon a payment path succeeding (note that the channel must be
1758                         // public or else we won't score it).
1759                         // A background event handler for FundingGenerationReady events must be hooked up to a
1760                         // running background processor.
1761                         let scored_scid = 4242;
1762                         let secp_ctx = Secp256k1::new();
1763                         let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
1764                         let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
1765
1766                         let path = Path { hops: vec![RouteHop {
1767                                 pubkey: node_1_id,
1768                                 node_features: NodeFeatures::empty(),
1769                                 short_channel_id: scored_scid,
1770                                 channel_features: ChannelFeatures::empty(),
1771                                 fee_msat: 0,
1772                                 cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
1773                                 maybe_announced_channel: true,
1774                         }], blinded_tail: None };
1775
1776                         $nodes[0].scorer.write_lock().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
1777                         $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1778                                 payment_id: None,
1779                                 payment_hash: PaymentHash([42; 32]),
1780                                 payment_failed_permanently: false,
1781                                 failure: PathFailure::OnPath { network_update: None },
1782                                 path: path.clone(),
1783                                 short_channel_id: Some(scored_scid),
1784                         });
1785                         let event = $receive.expect("PaymentPathFailed not handled within deadline");
1786                         match event {
1787                                 Event::PaymentPathFailed { .. } => {},
1788                                 _ => panic!("Unexpected event"),
1789                         }
1790
1791                         // Ensure we'll score payments that were explicitly failed back by the destination as
1792                         // ProbeSuccess.
1793                         $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
1794                         $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1795                                 payment_id: None,
1796                                 payment_hash: PaymentHash([42; 32]),
1797                                 payment_failed_permanently: true,
1798                                 failure: PathFailure::OnPath { network_update: None },
1799                                 path: path.clone(),
1800                                 short_channel_id: None,
1801                         });
1802                         let event = $receive.expect("PaymentPathFailed not handled within deadline");
1803                         match event {
1804                                 Event::PaymentPathFailed { .. } => {},
1805                                 _ => panic!("Unexpected event"),
1806                         }
1807
1808                         $nodes[0].scorer.write_lock().expect(TestResult::PaymentSuccess { path: path.clone() });
1809                         $nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
1810                                 payment_id: PaymentId([42; 32]),
1811                                 payment_hash: None,
1812                                 path: path.clone(),
1813                         });
1814                         let event = $receive.expect("PaymentPathSuccessful not handled within deadline");
1815                         match event {
1816                                 Event::PaymentPathSuccessful { .. } => {},
1817                                 _ => panic!("Unexpected event"),
1818                         }
1819
1820                         $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
1821                         $nodes[0].node.push_pending_event(Event::ProbeSuccessful {
1822                                 payment_id: PaymentId([42; 32]),
1823                                 payment_hash: PaymentHash([42; 32]),
1824                                 path: path.clone(),
1825                         });
1826                         let event = $receive.expect("ProbeSuccessful not handled within deadline");
1827                         match event {
1828                                 Event::ProbeSuccessful  { .. } => {},
1829                                 _ => panic!("Unexpected event"),
1830                         }
1831
1832                         $nodes[0].scorer.write_lock().expect(TestResult::ProbeFailure { path: path.clone() });
1833                         $nodes[0].node.push_pending_event(Event::ProbeFailed {
1834                                 payment_id: PaymentId([42; 32]),
1835                                 payment_hash: PaymentHash([42; 32]),
1836                                 path,
1837                                 short_channel_id: Some(scored_scid),
1838                         });
1839                         let event = $receive.expect("ProbeFailure not handled within deadline");
1840                         match event {
1841                                 Event::ProbeFailed { .. } => {},
1842                                 _ => panic!("Unexpected event"),
1843                         }
1844                 }
1845         }
1846
1847         #[test]
1848         fn test_payment_path_scoring() {
1849                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1850                 let event_handler = move |event: Event| match event {
1851                         Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
1852                         Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
1853                         Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
1854                         Event::ProbeFailed { .. } => sender.send(event).unwrap(),
1855                         _ => panic!("Unexpected event: {:?}", event),
1856                 };
1857
1858                 let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
1859                 let data_dir = nodes[0].kv_store.get_data_dir();
1860                 let persister = Arc::new(Persister::new(data_dir));
1861                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1862
1863                 do_test_payment_path_scoring!(nodes, receiver.recv_timeout(Duration::from_secs(EVENT_DEADLINE)));
1864
1865                 if !std::thread::panicking() {
1866                         bg_processor.stop().unwrap();
1867                 }
1868
1869                 let log_entries = nodes[0].logger.lines.lock().unwrap();
1870                 let expected_log = "Persisting scorer after update".to_string();
1871                 assert_eq!(*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(), 5);
1872         }
1873
1874         #[tokio::test]
1875         #[cfg(feature = "futures")]
1876         async fn test_payment_path_scoring_async() {
1877                 let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
1878                 let event_handler = move |event: Event| {
1879                         let sender_ref = sender.clone();
1880                         async move {
1881                                 match event {
1882                                         Event::PaymentPathFailed { .. } => { sender_ref.send(event).await.unwrap() },
1883                                         Event::PaymentPathSuccessful { .. } => { sender_ref.send(event).await.unwrap() },
1884                                         Event::ProbeSuccessful { .. } => { sender_ref.send(event).await.unwrap() },
1885                                         Event::ProbeFailed { .. } => { sender_ref.send(event).await.unwrap() },
1886                                         _ => panic!("Unexpected event: {:?}", event),
1887                                 }
1888                         }
1889                 };
1890
1891                 let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async");
1892                 let data_dir = nodes[0].kv_store.get_data_dir();
1893                 let persister = Arc::new(Persister::new(data_dir));
1894
1895                 let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
1896
1897                 let bp_future = super::process_events_async(
1898                         persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
1899                         nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
1900                         Some(nodes[0].scorer.clone()), move |dur: Duration| {
1901                                 let mut exit_receiver = exit_receiver.clone();
1902                                 Box::pin(async move {
1903                                         tokio::select! {
1904                                                 _ = tokio::time::sleep(dur) => false,
1905                                                 _ = exit_receiver.changed() => true,
1906                                         }
1907                                 })
1908                         }, false, || Some(Duration::ZERO),
1909                 );
1910                 let t1 = tokio::spawn(bp_future);
1911                 let t2 = tokio::spawn(async move {
1912                         do_test_payment_path_scoring!(nodes, receiver.recv().await);
1913                         exit_sender.send(()).unwrap();
1914
1915                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1916                         let expected_log = "Persisting scorer after update".to_string();
1917                         assert_eq!(*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(), 5);
1918                 });
1919
1920                 let (r1, r2) = tokio::join!(t1, t2);
1921                 r1.unwrap().unwrap();
1922                 r2.unwrap()
1923         }
1924 }