Pass the current time through `ScoreUpDate` methods
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 #![deny(rustdoc::broken_intra_doc_links)]
6 #![deny(rustdoc::private_intra_doc_links)]
7
8 #![deny(missing_docs)]
9 #![cfg_attr(not(feature = "futures"), deny(unsafe_code))]
10
11 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
12
13 #![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
14
15 #[cfg(any(test, feature = "std"))]
16 extern crate core;
17
18 #[cfg(not(feature = "std"))]
19 extern crate alloc;
20
21 #[macro_use] extern crate lightning;
22 extern crate lightning_rapid_gossip_sync;
23
24 use lightning::chain;
25 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
26 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
27 use lightning::sign::{EntropySource, NodeSigner, SignerProvider};
28 use lightning::events::{Event, PathFailure};
29 #[cfg(feature = "std")]
30 use lightning::events::{EventHandler, EventsProvider};
31 use lightning::ln::channelmanager::ChannelManager;
32 use lightning::ln::msgs::OnionMessageHandler;
33 use lightning::ln::peer_handler::APeerManager;
34 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
35 use lightning::routing::utxo::UtxoLookup;
36 use lightning::routing::router::Router;
37 use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
38 use lightning::util::logger::Logger;
39 use lightning::util::persist::Persister;
40 #[cfg(feature = "std")]
41 use lightning::util::wakers::Sleeper;
42 use lightning_rapid_gossip_sync::RapidGossipSync;
43
44 use core::ops::Deref;
45 use core::time::Duration;
46
47 #[cfg(feature = "std")]
48 use std::sync::Arc;
49 #[cfg(feature = "std")]
50 use core::sync::atomic::{AtomicBool, Ordering};
51 #[cfg(feature = "std")]
52 use std::thread::{self, JoinHandle};
53 #[cfg(feature = "std")]
54 use std::time::Instant;
55
56 #[cfg(not(feature = "std"))]
57 use alloc::vec::Vec;
58
59 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
60 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
61 /// responsibilities are:
62 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
63 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
64 ///   writing it to disk/backups by invoking the callback given to it at startup.
65 ///   [`ChannelManager`] persistence should be done in the background.
66 /// * Calling [`ChannelManager::timer_tick_occurred`], [`ChainMonitor::rebroadcast_pending_claims`]
67 ///   and [`PeerManager::timer_tick_occurred`] at the appropriate intervals.
68 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
69 ///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
70 ///
71 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
72 /// upon as doing so may result in high latency.
73 ///
74 /// # Note
75 ///
76 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
77 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
78 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
79 /// unilateral chain closure fees are at risk.
80 ///
81 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
82 /// [`Event`]: lightning::events::Event
83 /// [`PeerManager::timer_tick_occurred`]: lightning::ln::peer_handler::PeerManager::timer_tick_occurred
84 /// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events
85 #[cfg(feature = "std")]
86 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
87 pub struct BackgroundProcessor {
88         stop_thread: Arc<AtomicBool>,
89         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
90 }
91
92 #[cfg(not(test))]
93 const FRESHNESS_TIMER: u64 = 60;
94 #[cfg(test)]
95 const FRESHNESS_TIMER: u64 = 1;
96
97 #[cfg(all(not(test), not(debug_assertions)))]
98 const PING_TIMER: u64 = 10;
99 /// Signature operations take a lot longer without compiler optimisations.
100 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
101 /// timeout is reached.
102 #[cfg(all(not(test), debug_assertions))]
103 const PING_TIMER: u64 = 30;
104 #[cfg(test)]
105 const PING_TIMER: u64 = 1;
106
107 #[cfg(not(test))]
108 const ONION_MESSAGE_HANDLER_TIMER: u64 = 10;
109 #[cfg(test)]
110 const ONION_MESSAGE_HANDLER_TIMER: u64 = 1;
111
112 /// Prune the network graph of stale entries hourly.
113 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
114
115 #[cfg(not(test))]
116 const SCORER_PERSIST_TIMER: u64 = 60 * 60;
117 #[cfg(test)]
118 const SCORER_PERSIST_TIMER: u64 = 1;
119
120 #[cfg(not(test))]
121 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
122 #[cfg(test)]
123 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
124
125 #[cfg(not(test))]
126 const REBROADCAST_TIMER: u64 = 30;
127 #[cfg(test)]
128 const REBROADCAST_TIMER: u64 = 1;
129
130 #[cfg(feature = "futures")]
131 /// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
132 const fn min_u64(a: u64, b: u64) -> u64 { if a < b { a } else { b } }
133 #[cfg(feature = "futures")]
134 const FASTEST_TIMER: u64 = min_u64(min_u64(FRESHNESS_TIMER, PING_TIMER),
135         min_u64(SCORER_PERSIST_TIMER, min_u64(FIRST_NETWORK_PRUNE_TIMER, REBROADCAST_TIMER)));
136
137 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
138 pub enum GossipSync<
139         P: Deref<Target = P2PGossipSync<G, U, L>>,
140         R: Deref<Target = RapidGossipSync<G, L>>,
141         G: Deref<Target = NetworkGraph<L>>,
142         U: Deref,
143         L: Deref,
144 >
145 where U::Target: UtxoLookup, L::Target: Logger {
146         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
147         P2P(P),
148         /// Rapid gossip sync from a trusted server.
149         Rapid(R),
150         /// No gossip sync.
151         None,
152 }
153
154 impl<
155         P: Deref<Target = P2PGossipSync<G, U, L>>,
156         R: Deref<Target = RapidGossipSync<G, L>>,
157         G: Deref<Target = NetworkGraph<L>>,
158         U: Deref,
159         L: Deref,
160 > GossipSync<P, R, G, U, L>
161 where U::Target: UtxoLookup, L::Target: Logger {
162         fn network_graph(&self) -> Option<&G> {
163                 match self {
164                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
165                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
166                         GossipSync::None => None,
167                 }
168         }
169
170         fn prunable_network_graph(&self) -> Option<&G> {
171                 match self {
172                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
173                         GossipSync::Rapid(gossip_sync) => {
174                                 if gossip_sync.is_initial_sync_complete() {
175                                         Some(gossip_sync.network_graph())
176                                 } else {
177                                         None
178                                 }
179                         },
180                         GossipSync::None => None,
181                 }
182         }
183 }
184
185 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
186 impl<P: Deref<Target = P2PGossipSync<G, U, L>>, G: Deref<Target = NetworkGraph<L>>, U: Deref, L: Deref>
187         GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
188 where
189         U::Target: UtxoLookup,
190         L::Target: Logger,
191 {
192         /// Initializes a new [`GossipSync::P2P`] variant.
193         pub fn p2p(gossip_sync: P) -> Self {
194                 GossipSync::P2P(gossip_sync)
195         }
196 }
197
198 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
199 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
200         GossipSync<
201                 &P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
202                 R,
203                 G,
204                 &'a (dyn UtxoLookup + Send + Sync),
205                 L,
206         >
207 where
208         L::Target: Logger,
209 {
210         /// Initializes a new [`GossipSync::Rapid`] variant.
211         pub fn rapid(gossip_sync: R) -> Self {
212                 GossipSync::Rapid(gossip_sync)
213         }
214 }
215
216 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
217 impl<'a, L: Deref>
218         GossipSync<
219                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
220                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
221                 &'a NetworkGraph<L>,
222                 &'a (dyn UtxoLookup + Send + Sync),
223                 L,
224         >
225 where
226         L::Target: Logger,
227 {
228         /// Initializes a new [`GossipSync::None`] variant.
229         pub fn none() -> Self {
230                 GossipSync::None
231         }
232 }
233
234 fn handle_network_graph_update<L: Deref>(
235         network_graph: &NetworkGraph<L>, event: &Event
236 ) where L::Target: Logger {
237         if let Event::PaymentPathFailed {
238                 failure: PathFailure::OnPath { network_update: Some(ref upd) }, .. } = event
239         {
240                 network_graph.handle_network_update(upd);
241         }
242 }
243
244 /// Updates scorer based on event and returns whether an update occurred so we can decide whether
245 /// to persist.
246 fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
247         scorer: &'a S, event: &Event, duration_since_epoch: Duration,
248 ) -> bool {
249         match event {
250                 Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
251                         let mut score = scorer.write_lock();
252                         score.payment_path_failed(path, *scid, duration_since_epoch);
253                 },
254                 Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
255                         // Reached if the destination explicitly failed it back. We treat this as a successful probe
256                         // because the payment made it all the way to the destination with sufficient liquidity.
257                         let mut score = scorer.write_lock();
258                         score.probe_successful(path, duration_since_epoch);
259                 },
260                 Event::PaymentPathSuccessful { path, .. } => {
261                         let mut score = scorer.write_lock();
262                         score.payment_path_successful(path, duration_since_epoch);
263                 },
264                 Event::ProbeSuccessful { path, .. } => {
265                         let mut score = scorer.write_lock();
266                         score.probe_successful(path, duration_since_epoch);
267                 },
268                 Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
269                         let mut score = scorer.write_lock();
270                         score.probe_failed(path, *scid, duration_since_epoch);
271                 },
272                 _ => return false,
273         }
274         true
275 }
276
277 macro_rules! define_run_body {
278         (
279                 $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
280                 $channel_manager: ident, $process_channel_manager_events: expr,
281                 $peer_manager: ident, $process_onion_message_handler_events: expr, $gossip_sync: ident,
282                 $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
283                 $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
284         ) => { {
285                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
286                 $channel_manager.timer_tick_occurred();
287                 log_trace!($logger, "Rebroadcasting monitor's pending claims on startup");
288                 $chain_monitor.rebroadcast_pending_claims();
289
290                 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
291                 let mut last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER);
292                 let mut last_ping_call = $get_timer(PING_TIMER);
293                 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
294                 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
295                 let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
296                 let mut have_pruned = false;
297
298                 loop {
299                         $process_channel_manager_events;
300                         $process_chain_monitor_events;
301                         $process_onion_message_handler_events;
302
303                         // Note that the PeerManager::process_events may block on ChannelManager's locks,
304                         // hence it comes last here. When the ChannelManager finishes whatever it's doing,
305                         // we want to ensure we get into `persist_manager` as quickly as we can, especially
306                         // without running the normal event processing above and handing events to users.
307                         //
308                         // Specifically, on an *extremely* slow machine, we may see ChannelManager start
309                         // processing a message effectively at any point during this loop. In order to
310                         // minimize the time between such processing completing and persisting the updated
311                         // ChannelManager, we want to minimize methods blocking on a ChannelManager
312                         // generally, and as a fallback place such blocking only immediately before
313                         // persistence.
314                         $peer_manager.as_ref().process_events();
315
316                         // Exit the loop if the background processor was requested to stop.
317                         if $loop_exit_check {
318                                 log_trace!($logger, "Terminating background processor.");
319                                 break;
320                         }
321
322                         // We wait up to 100ms, but track how long it takes to detect being put to sleep,
323                         // see `await_start`'s use below.
324                         let mut await_start = None;
325                         if $check_slow_await { await_start = Some($get_timer(1)); }
326                         $await;
327                         let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false };
328
329                         // Exit the loop if the background processor was requested to stop.
330                         if $loop_exit_check {
331                                 log_trace!($logger, "Terminating background processor.");
332                                 break;
333                         }
334
335                         if $channel_manager.get_and_clear_needs_persistence() {
336                                 log_trace!($logger, "Persisting ChannelManager...");
337                                 $persister.persist_manager(&*$channel_manager)?;
338                                 log_trace!($logger, "Done persisting ChannelManager.");
339                         }
340                         if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
341                                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
342                                 $channel_manager.timer_tick_occurred();
343                                 last_freshness_call = $get_timer(FRESHNESS_TIMER);
344                         }
345                         if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) {
346                                 log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred");
347                                 $peer_manager.onion_message_handler().timer_tick_occurred();
348                                 last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER);
349                         }
350                         if await_slow {
351                                 // On various platforms, we may be starved of CPU cycles for several reasons.
352                                 // E.g. on iOS, if we've been in the background, we will be entirely paused.
353                                 // Similarly, if we're on a desktop platform and the device has been asleep, we
354                                 // may not get any cycles.
355                                 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
356                                 // full second, at which point we assume sockets may have been killed (they
357                                 // appear to be at least on some platforms, even if it has only been a second).
358                                 // Note that we have to take care to not get here just because user event
359                                 // processing was slow at the top of the loop. For example, the sample client
360                                 // may call Bitcoin Core RPCs during event handling, which very often takes
361                                 // more than a handful of seconds to complete, and shouldn't disconnect all our
362                                 // peers.
363                                 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
364                                 $peer_manager.as_ref().disconnect_all_peers();
365                                 last_ping_call = $get_timer(PING_TIMER);
366                         } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
367                                 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
368                                 $peer_manager.as_ref().timer_tick_occurred();
369                                 last_ping_call = $get_timer(PING_TIMER);
370                         }
371
372                         // Note that we want to run a graph prune once not long after startup before
373                         // falling back to our usual hourly prunes. This avoids short-lived clients never
374                         // pruning their network graph. We run once 60 seconds after startup before
375                         // continuing our normal cadence. For RGS, since 60 seconds is likely too long,
376                         // we prune after an initial sync completes.
377                         let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
378                         let prune_timer_elapsed = $timer_elapsed(&mut last_prune_call, prune_timer);
379                         let should_prune = match $gossip_sync {
380                                 GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed,
381                                 _ => prune_timer_elapsed,
382                         };
383                         if should_prune {
384                                 // The network graph must not be pruned while rapid sync completion is pending
385                                 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
386                                         if let Some(duration_since_epoch) = $time_fetch() {
387                                                 log_trace!($logger, "Pruning and persisting network graph.");
388                                                 network_graph.remove_stale_channels_and_tracking_with_time(duration_since_epoch.as_secs());
389                                         } else {
390                                                 log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
391                                                 log_trace!($logger, "Persisting network graph.");
392                                         }
393
394                                         if let Err(e) = $persister.persist_graph(network_graph) {
395                                                 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
396                                         }
397
398                                         have_pruned = true;
399                                 }
400                                 let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
401                                 last_prune_call = $get_timer(prune_timer);
402                         }
403
404                         if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
405                                 if let Some(ref scorer) = $scorer {
406                                         log_trace!($logger, "Persisting scorer");
407                                         if let Err(e) = $persister.persist_scorer(&scorer) {
408                                                 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
409                                         }
410                                 }
411                                 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
412                         }
413
414                         if $timer_elapsed(&mut last_rebroadcast_call, REBROADCAST_TIMER) {
415                                 log_trace!($logger, "Rebroadcasting monitor's pending claims");
416                                 $chain_monitor.rebroadcast_pending_claims();
417                                 last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
418                         }
419                 }
420
421                 // After we exit, ensure we persist the ChannelManager one final time - this avoids
422                 // some races where users quit while channel updates were in-flight, with
423                 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
424                 $persister.persist_manager(&*$channel_manager)?;
425
426                 // Persist Scorer on exit
427                 if let Some(ref scorer) = $scorer {
428                         $persister.persist_scorer(&scorer)?;
429                 }
430
431                 // Persist NetworkGraph on exit
432                 if let Some(network_graph) = $gossip_sync.network_graph() {
433                         $persister.persist_graph(network_graph)?;
434                 }
435
436                 Ok(())
437         } }
438 }
439
440 #[cfg(feature = "futures")]
441 pub(crate) mod futures_util {
442         use core::future::Future;
443         use core::task::{Poll, Waker, RawWaker, RawWakerVTable};
444         use core::pin::Pin;
445         use core::marker::Unpin;
446         pub(crate) struct Selector<
447                 A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
448         > {
449                 pub a: A,
450                 pub b: B,
451                 pub c: C,
452         }
453         pub(crate) enum SelectorOutput {
454                 A, B, C(bool),
455         }
456
457         impl<
458                 A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
459         > Future for Selector<A, B, C> {
460                 type Output = SelectorOutput;
461                 fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<SelectorOutput> {
462                         match Pin::new(&mut self.a).poll(ctx) {
463                                 Poll::Ready(()) => { return Poll::Ready(SelectorOutput::A); },
464                                 Poll::Pending => {},
465                         }
466                         match Pin::new(&mut self.b).poll(ctx) {
467                                 Poll::Ready(()) => { return Poll::Ready(SelectorOutput::B); },
468                                 Poll::Pending => {},
469                         }
470                         match Pin::new(&mut self.c).poll(ctx) {
471                                 Poll::Ready(res) => { return Poll::Ready(SelectorOutput::C(res)); },
472                                 Poll::Pending => {},
473                         }
474                         Poll::Pending
475                 }
476         }
477
478         // If we want to poll a future without an async context to figure out if it has completed or
479         // not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
480         // but sadly there's a good bit of boilerplate here.
481         fn dummy_waker_clone(_: *const ()) -> RawWaker { RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE) }
482         fn dummy_waker_action(_: *const ()) { }
483
484         const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
485                 dummy_waker_clone, dummy_waker_action, dummy_waker_action, dummy_waker_action);
486         pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) } }
487 }
488 #[cfg(feature = "futures")]
489 use futures_util::{Selector, SelectorOutput, dummy_waker};
490 #[cfg(feature = "futures")]
491 use core::task;
492
493 /// Processes background events in a future.
494 ///
495 /// `sleeper` should return a future which completes in the given amount of time and returns a
496 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
497 /// future which outputs `true`, the loop will exit and this function's future will complete.
498 /// The `sleeper` future is free to return early after it has triggered the exit condition.
499 ///
500 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
501 ///
502 /// Requires the `futures` feature. Note that while this method is available without the `std`
503 /// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
504 /// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
505 /// manually instead.
506 ///
507 /// The `mobile_interruptable_platform` flag should be set if we're currently running on a
508 /// mobile device, where we may need to check for interruption of the application regularly. If you
509 /// are unsure, you should set the flag, as the performance impact of it is minimal unless there
510 /// are hundreds or thousands of simultaneous process calls running.
511 ///
512 /// The `fetch_time` parameter should return the current wall clock time, if one is available. If
513 /// no time is available, some features may be disabled, however the node will still operate fine.
514 ///
515 /// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
516 /// could setup `process_events_async` like this:
517 /// ```
518 /// # use lightning::io;
519 /// # use std::sync::{Arc, RwLock};
520 /// # use std::sync::atomic::{AtomicBool, Ordering};
521 /// # use std::time::SystemTime;
522 /// # use lightning_background_processor::{process_events_async, GossipSync};
523 /// # struct MyStore {}
524 /// # impl lightning::util::persist::KVStore for MyStore {
525 /// #     fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
526 /// #     fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
527 /// #     fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
528 /// #     fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
529 /// # }
530 /// # struct MyEventHandler {}
531 /// # impl MyEventHandler {
532 /// #     async fn handle_event(&self, _: lightning::events::Event) {}
533 /// # }
534 /// # #[derive(Eq, PartialEq, Clone, Hash)]
535 /// # struct MySocketDescriptor {}
536 /// # impl lightning::ln::peer_handler::SocketDescriptor for MySocketDescriptor {
537 /// #     fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
538 /// #     fn disconnect_socket(&mut self) {}
539 /// # }
540 /// # type MyBroadcaster = dyn lightning::chain::chaininterface::BroadcasterInterface + Send + Sync;
541 /// # type MyFeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator + Send + Sync;
542 /// # type MyNodeSigner = dyn lightning::sign::NodeSigner + Send + Sync;
543 /// # type MyUtxoLookup = dyn lightning::routing::utxo::UtxoLookup + Send + Sync;
544 /// # type MyFilter = dyn lightning::chain::Filter + Send + Sync;
545 /// # type MyLogger = dyn lightning::util::logger::Logger + Send + Sync;
546 /// # type MyChainMonitor = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<MyFilter>, Arc<MyBroadcaster>, Arc<MyFeeEstimator>, Arc<MyLogger>, Arc<MyStore>>;
547 /// # type MyPeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<MySocketDescriptor, MyChainMonitor, MyBroadcaster, MyFeeEstimator, Arc<MyUtxoLookup>, MyLogger>;
548 /// # type MyNetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<MyLogger>>;
549 /// # type MyGossipSync = lightning::routing::gossip::P2PGossipSync<Arc<MyNetworkGraph>, Arc<MyUtxoLookup>, Arc<MyLogger>>;
550 /// # type MyChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<MyChainMonitor, MyBroadcaster, MyFeeEstimator, MyLogger>;
551 /// # type MyScorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<MyNetworkGraph>, Arc<MyLogger>>>;
552 ///
553 /// # async fn setup_background_processing(my_persister: Arc<MyStore>, my_event_handler: Arc<MyEventHandler>, my_chain_monitor: Arc<MyChainMonitor>, my_channel_manager: Arc<MyChannelManager>, my_gossip_sync: Arc<MyGossipSync>, my_logger: Arc<MyLogger>, my_scorer: Arc<MyScorer>, my_peer_manager: Arc<MyPeerManager>) {
554 ///     let background_persister = Arc::clone(&my_persister);
555 ///     let background_event_handler = Arc::clone(&my_event_handler);
556 ///     let background_chain_mon = Arc::clone(&my_chain_monitor);
557 ///     let background_chan_man = Arc::clone(&my_channel_manager);
558 ///     let background_gossip_sync = GossipSync::p2p(Arc::clone(&my_gossip_sync));
559 ///     let background_peer_man = Arc::clone(&my_peer_manager);
560 ///     let background_logger = Arc::clone(&my_logger);
561 ///     let background_scorer = Arc::clone(&my_scorer);
562 ///
563 ///     // Setup the sleeper.
564 ///     let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());
565 ///
566 ///     let sleeper = move |d| {
567 ///             let mut receiver = stop_receiver.clone();
568 ///             Box::pin(async move {
569 ///                     tokio::select!{
570 ///                             _ = tokio::time::sleep(d) => false,
571 ///                             _ = receiver.changed() => true,
572 ///                     }
573 ///             })
574 ///     };
575 ///
576 ///     let mobile_interruptable_platform = false;
577 ///
578 ///     let handle = tokio::spawn(async move {
579 ///             process_events_async(
580 ///                     background_persister,
581 ///                     |e| background_event_handler.handle_event(e),
582 ///                     background_chain_mon,
583 ///                     background_chan_man,
584 ///                     background_gossip_sync,
585 ///                     background_peer_man,
586 ///                     background_logger,
587 ///                     Some(background_scorer),
588 ///                     sleeper,
589 ///                     mobile_interruptable_platform,
590 ///                     || Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap())
591 ///                     )
592 ///                     .await
593 ///                     .expect("Failed to process events");
594 ///     });
595 ///
596 ///     // Stop the background processing.
597 ///     stop_sender.send(()).unwrap();
598 ///     handle.await.unwrap();
599 ///     # }
600 ///```
601 #[cfg(feature = "futures")]
602 pub async fn process_events_async<
603         'a,
604         UL: 'static + Deref + Send + Sync,
605         CF: 'static + Deref + Send + Sync,
606         CW: 'static + Deref + Send + Sync,
607         T: 'static + Deref + Send + Sync,
608         ES: 'static + Deref + Send + Sync,
609         NS: 'static + Deref + Send + Sync,
610         SP: 'static + Deref + Send + Sync,
611         F: 'static + Deref + Send + Sync,
612         R: 'static + Deref + Send + Sync,
613         G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
614         L: 'static + Deref + Send + Sync,
615         P: 'static + Deref + Send + Sync,
616         EventHandlerFuture: core::future::Future<Output = ()>,
617         EventHandler: Fn(Event) -> EventHandlerFuture,
618         PS: 'static + Deref + Send,
619         M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::EcdsaSigner, CF, T, F, L, P>> + Send + Sync,
620         CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
621         PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
622         RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
623         PM: 'static + Deref + Send + Sync,
624         S: 'static + Deref<Target = SC> + Send + Sync,
625         SC: for<'b> WriteableScore<'b>,
626         SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
627         Sleeper: Fn(Duration) -> SleepFuture,
628         FetchTime: Fn() -> Option<Duration>,
629 >(
630         persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
631         gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
632         sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
633 ) -> Result<(), lightning::io::Error>
634 where
635         UL::Target: 'static + UtxoLookup,
636         CF::Target: 'static + chain::Filter,
637         CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
638         T::Target: 'static + BroadcasterInterface,
639         ES::Target: 'static + EntropySource,
640         NS::Target: 'static + NodeSigner,
641         SP::Target: 'static + SignerProvider,
642         F::Target: 'static + FeeEstimator,
643         R::Target: 'static + Router,
644         L::Target: 'static + Logger,
645         P::Target: 'static + Persist<<SP::Target as SignerProvider>::EcdsaSigner>,
646         PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
647         PM::Target: APeerManager + Send + Sync,
648 {
649         let mut should_break = false;
650         let async_event_handler = |event| {
651                 let network_graph = gossip_sync.network_graph();
652                 let event_handler = &event_handler;
653                 let scorer = &scorer;
654                 let logger = &logger;
655                 let persister = &persister;
656                 let fetch_time = &fetch_time;
657                 async move {
658                         if let Some(network_graph) = network_graph {
659                                 handle_network_graph_update(network_graph, &event)
660                         }
661                         if let Some(ref scorer) = scorer {
662                                 if let Some(duration_since_epoch) = fetch_time() {
663                                         if update_scorer(scorer, &event, duration_since_epoch) {
664                                                 log_trace!(logger, "Persisting scorer after update");
665                                                 if let Err(e) = persister.persist_scorer(&scorer) {
666                                                         log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
667                                                 }
668                                         }
669                                 }
670                         }
671                         event_handler(event).await;
672                 }
673         };
674         define_run_body!(
675                 persister, chain_monitor,
676                 chain_monitor.process_pending_events_async(async_event_handler).await,
677                 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
678                 peer_manager, process_onion_message_handler_events_async(&peer_manager, async_event_handler).await,
679                 gossip_sync, logger, scorer, should_break, {
680                         let fut = Selector {
681                                 a: channel_manager.get_event_or_persistence_needed_future(),
682                                 b: chain_monitor.get_update_future(),
683                                 c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }),
684                         };
685                         match fut.await {
686                                 SelectorOutput::A|SelectorOutput::B => {},
687                                 SelectorOutput::C(exit) => {
688                                         should_break = exit;
689                                 }
690                         }
691                 }, |t| sleeper(Duration::from_secs(t)),
692                 |fut: &mut SleepFuture, _| {
693                         let mut waker = dummy_waker();
694                         let mut ctx = task::Context::from_waker(&mut waker);
695                         match core::pin::Pin::new(fut).poll(&mut ctx) {
696                                 task::Poll::Ready(exit) => { should_break = exit; true },
697                                 task::Poll::Pending => false,
698                         }
699                 }, mobile_interruptable_platform, fetch_time,
700         )
701 }
702
703 #[cfg(feature = "futures")]
704 async fn process_onion_message_handler_events_async<
705         EventHandlerFuture: core::future::Future<Output = ()>,
706         EventHandler: Fn(Event) -> EventHandlerFuture,
707         PM: 'static + Deref + Send + Sync,
708 >(
709         peer_manager: &PM, handler: EventHandler
710 )
711 where
712         PM::Target: APeerManager + Send + Sync,
713 {
714         use lightning::events::EventsProvider;
715
716         let events = core::cell::RefCell::new(Vec::new());
717         peer_manager.onion_message_handler().process_pending_events(&|e| events.borrow_mut().push(e));
718
719         for event in events.into_inner() {
720                 handler(event).await
721         }
722 }
723
724 #[cfg(feature = "std")]
725 impl BackgroundProcessor {
726         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
727         /// documentation].
728         ///
729         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
730         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
731         /// either [`join`] or [`stop`].
732         ///
733         /// # Data Persistence
734         ///
735         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
736         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
737         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
738         /// provided implementation.
739         ///
740         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
741         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
742         /// See the `lightning-persister` crate for LDK's provided implementation.
743         ///
744         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
745         /// error or call [`join`] and handle any error that may arise. For the latter case,
746         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
747         ///
748         /// # Event Handling
749         ///
750         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
751         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
752         /// functionality implemented by other handlers.
753         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
754         ///
755         /// # Rapid Gossip Sync
756         ///
757         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
758         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
759         /// until the [`RapidGossipSync`] instance completes its first sync.
760         ///
761         /// [top-level documentation]: BackgroundProcessor
762         /// [`join`]: Self::join
763         /// [`stop`]: Self::stop
764         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
765         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
766         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
767         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
768         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
769         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
770         pub fn start<
771                 'a,
772                 UL: 'static + Deref + Send + Sync,
773                 CF: 'static + Deref + Send + Sync,
774                 CW: 'static + Deref + Send + Sync,
775                 T: 'static + Deref + Send + Sync,
776                 ES: 'static + Deref + Send + Sync,
777                 NS: 'static + Deref + Send + Sync,
778                 SP: 'static + Deref + Send + Sync,
779                 F: 'static + Deref + Send + Sync,
780                 R: 'static + Deref + Send + Sync,
781                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
782                 L: 'static + Deref + Send + Sync,
783                 P: 'static + Deref + Send + Sync,
784                 EH: 'static + EventHandler + Send,
785                 PS: 'static + Deref + Send,
786                 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::EcdsaSigner, CF, T, F, L, P>> + Send + Sync,
787                 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
788                 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
789                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
790                 PM: 'static + Deref + Send + Sync,
791                 S: 'static + Deref<Target = SC> + Send + Sync,
792                 SC: for <'b> WriteableScore<'b>,
793         >(
794                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
795                 gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
796         ) -> Self
797         where
798                 UL::Target: 'static + UtxoLookup,
799                 CF::Target: 'static + chain::Filter,
800                 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::EcdsaSigner>,
801                 T::Target: 'static + BroadcasterInterface,
802                 ES::Target: 'static + EntropySource,
803                 NS::Target: 'static + NodeSigner,
804                 SP::Target: 'static + SignerProvider,
805                 F::Target: 'static + FeeEstimator,
806                 R::Target: 'static + Router,
807                 L::Target: 'static + Logger,
808                 P::Target: 'static + Persist<<SP::Target as SignerProvider>::EcdsaSigner>,
809                 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
810                 PM::Target: APeerManager + Send + Sync,
811         {
812                 let stop_thread = Arc::new(AtomicBool::new(false));
813                 let stop_thread_clone = stop_thread.clone();
814                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
815                         let event_handler = |event| {
816                                 let network_graph = gossip_sync.network_graph();
817                                 if let Some(network_graph) = network_graph {
818                                         handle_network_graph_update(network_graph, &event)
819                                 }
820                                 if let Some(ref scorer) = scorer {
821                                         use std::time::SystemTime;
822                                         let duration_since_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
823                                                 .expect("Time should be sometime after 1970");
824                                         if update_scorer(scorer, &event, duration_since_epoch) {
825                                                 log_trace!(logger, "Persisting scorer after update");
826                                                 if let Err(e) = persister.persist_scorer(&scorer) {
827                                                         log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
828                                                 }
829                                         }
830                                 }
831                                 event_handler.handle_event(event);
832                         };
833                         define_run_body!(
834                                 persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
835                                 channel_manager, channel_manager.process_pending_events(&event_handler),
836                                 peer_manager,
837                                 peer_manager.onion_message_handler().process_pending_events(&event_handler),
838                                 gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire),
839                                 { Sleeper::from_two_futures(
840                                         channel_manager.get_event_or_persistence_needed_future(),
841                                         chain_monitor.get_update_future()
842                                 ).wait_timeout(Duration::from_millis(100)); },
843                                 |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false,
844                                 || {
845                                         use std::time::SystemTime;
846                                         Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
847                                                 .expect("Time should be sometime after 1970"))
848                                 },
849                         )
850                 });
851                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
852         }
853
854         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
855         /// [`ChannelManager`].
856         ///
857         /// # Panics
858         ///
859         /// This function panics if the background thread has panicked such as while persisting or
860         /// handling events.
861         ///
862         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
863         pub fn join(mut self) -> Result<(), std::io::Error> {
864                 assert!(self.thread_handle.is_some());
865                 self.join_thread()
866         }
867
868         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
869         /// [`ChannelManager`].
870         ///
871         /// # Panics
872         ///
873         /// This function panics if the background thread has panicked such as while persisting or
874         /// handling events.
875         ///
876         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
877         pub fn stop(mut self) -> Result<(), std::io::Error> {
878                 assert!(self.thread_handle.is_some());
879                 self.stop_and_join_thread()
880         }
881
882         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
883                 self.stop_thread.store(true, Ordering::Release);
884                 self.join_thread()
885         }
886
887         fn join_thread(&mut self) -> Result<(), std::io::Error> {
888                 match self.thread_handle.take() {
889                         Some(handle) => handle.join().unwrap(),
890                         None => Ok(()),
891                 }
892         }
893 }
894
895 #[cfg(feature = "std")]
896 impl Drop for BackgroundProcessor {
897         fn drop(&mut self) {
898                 self.stop_and_join_thread().unwrap();
899         }
900 }
901
902 #[cfg(all(feature = "std", test))]
903 mod tests {
904         use bitcoin::blockdata::constants::{genesis_block, ChainHash};
905         use bitcoin::blockdata::locktime::absolute::LockTime;
906         use bitcoin::blockdata::transaction::{Transaction, TxOut};
907         use bitcoin::network::constants::Network;
908         use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1};
909         use lightning::chain::{BestBlock, Confirm, chainmonitor};
910         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
911         use lightning::sign::{InMemorySigner, KeysManager};
912         use lightning::chain::transaction::OutPoint;
913         use lightning::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent};
914         use lightning::{get_event_msg, get_event};
915         use lightning::ln::PaymentHash;
916         use lightning::ln::channelmanager;
917         use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId};
918         use lightning::ln::features::{ChannelFeatures, NodeFeatures};
919         use lightning::ln::functional_test_utils::*;
920         use lightning::ln::msgs::{ChannelMessageHandler, Init};
921         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
922         use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
923         use lightning::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp, LockableScore};
924         use lightning::routing::router::{DefaultRouter, Path, RouteHop, CandidateRouteHop};
925         use lightning::util::config::UserConfig;
926         use lightning::util::ser::Writeable;
927         use lightning::util::test_utils;
928         use lightning::util::persist::{KVStore,
929                 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY,
930                 NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
931                 SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY};
932         use lightning_persister::fs_store::FilesystemStore;
933         use std::collections::VecDeque;
934         use std::{fs, env};
935         use std::path::PathBuf;
936         use std::sync::{Arc, Mutex};
937         use std::sync::mpsc::SyncSender;
938         use std::time::Duration;
939         use lightning_rapid_gossip_sync::RapidGossipSync;
940         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
941
942         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
943
944         #[derive(Clone, Hash, PartialEq, Eq)]
945         struct TestDescriptor{}
946         impl SocketDescriptor for TestDescriptor {
947                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
948                         0
949                 }
950
951                 fn disconnect_socket(&mut self) {}
952         }
953
954         #[cfg(c_bindings)]
955         type LockingWrapper<T> = lightning::routing::scoring::MultiThreadedLockableScore<T>;
956         #[cfg(not(c_bindings))]
957         type LockingWrapper<T> = Mutex<T>;
958
959         type ChannelManager =
960                 channelmanager::ChannelManager<
961                         Arc<ChainMonitor>,
962                         Arc<test_utils::TestBroadcaster>,
963                         Arc<KeysManager>,
964                         Arc<KeysManager>,
965                         Arc<KeysManager>,
966                         Arc<test_utils::TestFeeEstimator>,
967                         Arc<DefaultRouter<
968                                 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
969                                 Arc<test_utils::TestLogger>,
970                                 Arc<LockingWrapper<TestScorer>>,
971                                 (),
972                                 TestScorer>
973                         >,
974                         Arc<test_utils::TestLogger>>;
975
976         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemStore>>;
977
978         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
979         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
980
981         struct Node {
982                 node: Arc<ChannelManager>,
983                 p2p_gossip_sync: PGS,
984                 rapid_gossip_sync: RGS,
985                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
986                 chain_monitor: Arc<ChainMonitor>,
987                 kv_store: Arc<FilesystemStore>,
988                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
989                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
990                 logger: Arc<test_utils::TestLogger>,
991                 best_block: BestBlock,
992                 scorer: Arc<LockingWrapper<TestScorer>>,
993         }
994
995         impl Node {
996                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
997                         GossipSync::P2P(self.p2p_gossip_sync.clone())
998                 }
999
1000                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
1001                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
1002                 }
1003
1004                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
1005                         GossipSync::None
1006                 }
1007         }
1008
1009         impl Drop for Node {
1010                 fn drop(&mut self) {
1011                         let data_dir = self.kv_store.get_data_dir();
1012                         match fs::remove_dir_all(data_dir.clone()) {
1013                                 Err(e) => println!("Failed to remove test store directory {}: {}", data_dir.display(), e),
1014                                 _ => {}
1015                         }
1016                 }
1017         }
1018
1019         struct Persister {
1020                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
1021                 graph_persistence_notifier: Option<SyncSender<()>>,
1022                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
1023                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
1024                 kv_store: FilesystemStore,
1025         }
1026
1027         impl Persister {
1028                 fn new(data_dir: PathBuf) -> Self {
1029                         let kv_store = FilesystemStore::new(data_dir);
1030                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, kv_store }
1031                 }
1032
1033                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
1034                         Self { graph_error: Some((error, message)), ..self }
1035                 }
1036
1037                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
1038                         Self { graph_persistence_notifier: Some(sender), ..self }
1039                 }
1040
1041                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
1042                         Self { manager_error: Some((error, message)), ..self }
1043                 }
1044
1045                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
1046                         Self { scorer_error: Some((error, message)), ..self }
1047                 }
1048         }
1049
1050         impl KVStore for Persister {
1051                 fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> lightning::io::Result<Vec<u8>> {
1052                         self.kv_store.read(primary_namespace, secondary_namespace, key)
1053                 }
1054
1055                 fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> lightning::io::Result<()> {
1056                         if primary_namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE &&
1057                                 secondary_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE &&
1058                                 key == CHANNEL_MANAGER_PERSISTENCE_KEY
1059                         {
1060                                 if let Some((error, message)) = self.manager_error {
1061                                         return Err(std::io::Error::new(error, message))
1062                                 }
1063                         }
1064
1065                         if primary_namespace == NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE &&
1066                                 secondary_namespace == NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE &&
1067                                 key == NETWORK_GRAPH_PERSISTENCE_KEY
1068                         {
1069                                 if let Some(sender) = &self.graph_persistence_notifier {
1070                                         match sender.send(()) {
1071                                                 Ok(()) => {},
1072                                                 Err(std::sync::mpsc::SendError(())) => println!("Persister failed to notify as receiver went away."),
1073                                         }
1074                                 };
1075
1076                                 if let Some((error, message)) = self.graph_error {
1077                                         return Err(std::io::Error::new(error, message))
1078                                 }
1079                         }
1080
1081                         if primary_namespace == SCORER_PERSISTENCE_PRIMARY_NAMESPACE &&
1082                                 secondary_namespace == SCORER_PERSISTENCE_SECONDARY_NAMESPACE &&
1083                                 key == SCORER_PERSISTENCE_KEY
1084                         {
1085                                 if let Some((error, message)) = self.scorer_error {
1086                                         return Err(std::io::Error::new(error, message))
1087                                 }
1088                         }
1089
1090                         self.kv_store.write(primary_namespace, secondary_namespace, key, buf)
1091                 }
1092
1093                 fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> lightning::io::Result<()> {
1094                         self.kv_store.remove(primary_namespace, secondary_namespace, key, lazy)
1095                 }
1096
1097                 fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> lightning::io::Result<Vec<String>> {
1098                         self.kv_store.list(primary_namespace, secondary_namespace)
1099                 }
1100         }
1101
1102         struct TestScorer {
1103                 event_expectations: Option<VecDeque<TestResult>>,
1104         }
1105
1106         #[derive(Debug)]
1107         enum TestResult {
1108                 PaymentFailure { path: Path, short_channel_id: u64 },
1109                 PaymentSuccess { path: Path },
1110                 ProbeFailure { path: Path },
1111                 ProbeSuccess { path: Path },
1112         }
1113
1114         impl TestScorer {
1115                 fn new() -> Self {
1116                         Self { event_expectations: None }
1117                 }
1118
1119                 fn expect(&mut self, expectation: TestResult) {
1120                         self.event_expectations.get_or_insert_with(VecDeque::new).push_back(expectation);
1121                 }
1122         }
1123
1124         impl lightning::util::ser::Writeable for TestScorer {
1125                 fn write<W: lightning::util::ser::Writer>(&self, _: &mut W) -> Result<(), lightning::io::Error> { Ok(()) }
1126         }
1127
1128         impl ScoreLookUp for TestScorer {
1129                 type ScoreParams = ();
1130                 fn channel_penalty_msat(
1131                         &self, _candidate: &CandidateRouteHop, _usage: ChannelUsage, _score_params: &Self::ScoreParams
1132                 ) -> u64 { unimplemented!(); }
1133         }
1134
1135         impl ScoreUpdate for TestScorer {
1136                 fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64, _: Duration) {
1137                         if let Some(expectations) = &mut self.event_expectations {
1138                                 match expectations.pop_front().unwrap() {
1139                                         TestResult::PaymentFailure { path, short_channel_id } => {
1140                                                 assert_eq!(actual_path, &path);
1141                                                 assert_eq!(actual_short_channel_id, short_channel_id);
1142                                         },
1143                                         TestResult::PaymentSuccess { path } => {
1144                                                 panic!("Unexpected successful payment path: {:?}", path)
1145                                         },
1146                                         TestResult::ProbeFailure { path } => {
1147                                                 panic!("Unexpected probe failure: {:?}", path)
1148                                         },
1149                                         TestResult::ProbeSuccess { path } => {
1150                                                 panic!("Unexpected probe success: {:?}", path)
1151                                         }
1152                                 }
1153                         }
1154                 }
1155
1156                 fn payment_path_successful(&mut self, actual_path: &Path, _: Duration) {
1157                         if let Some(expectations) = &mut self.event_expectations {
1158                                 match expectations.pop_front().unwrap() {
1159                                         TestResult::PaymentFailure { path, .. } => {
1160                                                 panic!("Unexpected payment path failure: {:?}", path)
1161                                         },
1162                                         TestResult::PaymentSuccess { path } => {
1163                                                 assert_eq!(actual_path, &path);
1164                                         },
1165                                         TestResult::ProbeFailure { path } => {
1166                                                 panic!("Unexpected probe failure: {:?}", path)
1167                                         },
1168                                         TestResult::ProbeSuccess { path } => {
1169                                                 panic!("Unexpected probe success: {:?}", path)
1170                                         }
1171                                 }
1172                         }
1173                 }
1174
1175                 fn probe_failed(&mut self, actual_path: &Path, _: u64, _: Duration) {
1176                         if let Some(expectations) = &mut self.event_expectations {
1177                                 match expectations.pop_front().unwrap() {
1178                                         TestResult::PaymentFailure { path, .. } => {
1179                                                 panic!("Unexpected payment path failure: {:?}", path)
1180                                         },
1181                                         TestResult::PaymentSuccess { path } => {
1182                                                 panic!("Unexpected payment path success: {:?}", path)
1183                                         },
1184                                         TestResult::ProbeFailure { path } => {
1185                                                 assert_eq!(actual_path, &path);
1186                                         },
1187                                         TestResult::ProbeSuccess { path } => {
1188                                                 panic!("Unexpected probe success: {:?}", path)
1189                                         }
1190                                 }
1191                         }
1192                 }
1193                 fn probe_successful(&mut self, actual_path: &Path, _: Duration) {
1194                         if let Some(expectations) = &mut self.event_expectations {
1195                                 match expectations.pop_front().unwrap() {
1196                                         TestResult::PaymentFailure { path, .. } => {
1197                                                 panic!("Unexpected payment path failure: {:?}", path)
1198                                         },
1199                                         TestResult::PaymentSuccess { path } => {
1200                                                 panic!("Unexpected payment path success: {:?}", path)
1201                                         },
1202                                         TestResult::ProbeFailure { path } => {
1203                                                 panic!("Unexpected probe failure: {:?}", path)
1204                                         },
1205                                         TestResult::ProbeSuccess { path } => {
1206                                                 assert_eq!(actual_path, &path);
1207                                         }
1208                                 }
1209                         }
1210                 }
1211         }
1212
1213         #[cfg(c_bindings)]
1214         impl lightning::routing::scoring::Score for TestScorer {}
1215
1216         impl Drop for TestScorer {
1217                 fn drop(&mut self) {
1218                         if std::thread::panicking() {
1219                                 return;
1220                         }
1221
1222                         if let Some(event_expectations) = &self.event_expectations {
1223                                 if !event_expectations.is_empty() {
1224                                         panic!("Unsatisfied event expectations: {:?}", event_expectations);
1225                                 }
1226                         }
1227                 }
1228         }
1229
1230         fn get_full_filepath(filepath: String, filename: String) -> String {
1231                 let mut path = PathBuf::from(filepath);
1232                 path.push(filename);
1233                 path.to_str().unwrap().to_string()
1234         }
1235
1236         fn create_nodes(num_nodes: usize, persist_dir: &str) -> (String, Vec<Node>) {
1237                 let persist_temp_path = env::temp_dir().join(persist_dir);
1238                 let persist_dir = persist_temp_path.to_string_lossy().to_string();
1239                 let network = Network::Bitcoin;
1240                 let mut nodes = Vec::new();
1241                 for i in 0..num_nodes {
1242                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster::new(network));
1243                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
1244                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
1245                         let genesis_block = genesis_block(network);
1246                         let network_graph = Arc::new(NetworkGraph::new(network, logger.clone()));
1247                         let scorer = Arc::new(LockingWrapper::new(TestScorer::new()));
1248                         let seed = [i as u8; 32];
1249                         let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone(), Default::default()));
1250                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin));
1251                         let kv_store = Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into()));
1252                         let now = Duration::from_secs(genesis_block.header.time as u64);
1253                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
1254                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), kv_store.clone()));
1255                         let best_block = BestBlock::from_network(network);
1256                         let params = ChainParameters { network, best_block };
1257                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params, genesis_block.header.time));
1258                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
1259                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone()));
1260                         let msg_handler = MessageHandler {
1261                                 chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new(ChainHash::using_genesis_block(Network::Testnet))),
1262                                 route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()),
1263                                 onion_message_handler: IgnoringMessageHandler{}, custom_message_handler: IgnoringMessageHandler{}
1264                         };
1265                         let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), keys_manager.clone()));
1266                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, kv_store, tx_broadcaster, network_graph, logger, best_block, scorer };
1267                         nodes.push(node);
1268                 }
1269
1270                 for i in 0..num_nodes {
1271                         for j in (i+1)..num_nodes {
1272                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init {
1273                                         features: nodes[j].node.init_features(), networks: None, remote_network_address: None
1274                                 }, true).unwrap();
1275                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init {
1276                                         features: nodes[i].node.init_features(), networks: None, remote_network_address: None
1277                                 }, false).unwrap();
1278                         }
1279                 }
1280
1281                 (persist_dir, nodes)
1282         }
1283
1284         macro_rules! open_channel {
1285                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1286                         begin_open_channel!($node_a, $node_b, $channel_value);
1287                         let events = $node_a.node.get_and_clear_pending_events();
1288                         assert_eq!(events.len(), 1);
1289                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
1290                         $node_a.node.funding_transaction_generated(&temporary_channel_id, &$node_b.node.get_our_node_id(), tx.clone()).unwrap();
1291                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
1292                         get_event!($node_b, Event::ChannelPending);
1293                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
1294                         get_event!($node_a, Event::ChannelPending);
1295                         tx
1296                 }}
1297         }
1298
1299         macro_rules! begin_open_channel {
1300                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1301                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None, None).unwrap();
1302                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
1303                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
1304                 }}
1305         }
1306
1307         macro_rules! handle_funding_generation_ready {
1308                 ($event: expr, $channel_value: expr) => {{
1309                         match $event {
1310                                 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
1311                                         assert_eq!(channel_value_satoshis, $channel_value);
1312                                         assert_eq!(user_channel_id, 42);
1313
1314                                         let tx = Transaction { version: 1 as i32, lock_time: LockTime::ZERO, input: Vec::new(), output: vec![TxOut {
1315                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
1316                                         }]};
1317                                         (temporary_channel_id, tx)
1318                                 },
1319                                 _ => panic!("Unexpected event"),
1320                         }
1321                 }}
1322         }
1323
1324         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
1325                 for i in 1..=depth {
1326                         let prev_blockhash = node.best_block.block_hash();
1327                         let height = node.best_block.height() + 1;
1328                         let header = create_dummy_header(prev_blockhash, height);
1329                         let txdata = vec![(0, tx)];
1330                         node.best_block = BestBlock::new(header.block_hash(), height);
1331                         match i {
1332                                 1 => {
1333                                         node.node.transactions_confirmed(&header, &txdata, height);
1334                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
1335                                 },
1336                                 x if x == depth => {
1337                                         node.node.best_block_updated(&header, height);
1338                                         node.chain_monitor.best_block_updated(&header, height);
1339                                 },
1340                                 _ => {},
1341                         }
1342                 }
1343         }
1344         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
1345                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
1346         }
1347
1348         #[test]
1349         fn test_background_processor() {
1350                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
1351                 // updates. Also test that when new updates are available, the manager signals that it needs
1352                 // re-persistence and is successfully re-persisted.
1353                 let (persist_dir, nodes) = create_nodes(2, "test_background_processor");
1354
1355                 // Go through the channel creation process so that each node has something to persist. Since
1356                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
1357                 // avoid a race with processing events.
1358                 let tx = open_channel!(nodes[0], nodes[1], 100000);
1359
1360                 // Initiate the background processors to watch each node.
1361                 let data_dir = nodes[0].kv_store.get_data_dir();
1362                 let persister = Arc::new(Persister::new(data_dir));
1363                 let event_handler = |_: _| {};
1364                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1365
1366                 macro_rules! check_persisted_data {
1367                         ($node: expr, $filepath: expr) => {
1368                                 let mut expected_bytes = Vec::new();
1369                                 loop {
1370                                         expected_bytes.clear();
1371                                         match $node.write(&mut expected_bytes) {
1372                                                 Ok(()) => {
1373                                                         match std::fs::read($filepath) {
1374                                                                 Ok(bytes) => {
1375                                                                         if bytes == expected_bytes {
1376                                                                                 break
1377                                                                         } else {
1378                                                                                 continue
1379                                                                         }
1380                                                                 },
1381                                                                 Err(_) => continue
1382                                                         }
1383                                                 },
1384                                                 Err(e) => panic!("Unexpected error: {}", e)
1385                                         }
1386                                 }
1387                         }
1388                 }
1389
1390                 // Check that the initial channel manager data is persisted as expected.
1391                 let filepath = get_full_filepath(format!("{}_persister_0", &persist_dir), "manager".to_string());
1392                 check_persisted_data!(nodes[0].node, filepath.clone());
1393
1394                 loop {
1395                         if !nodes[0].node.get_event_or_persist_condvar_value() { break }
1396                 }
1397
1398                 // Force-close the channel.
1399                 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
1400
1401                 // Check that the force-close updates are persisted.
1402                 check_persisted_data!(nodes[0].node, filepath.clone());
1403                 loop {
1404                         if !nodes[0].node.get_event_or_persist_condvar_value() { break }
1405                 }
1406
1407                 // Check network graph is persisted
1408                 let filepath = get_full_filepath(format!("{}_persister_0", &persist_dir), "network_graph".to_string());
1409                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
1410
1411                 // Check scorer is persisted
1412                 let filepath = get_full_filepath(format!("{}_persister_0", &persist_dir), "scorer".to_string());
1413                 check_persisted_data!(nodes[0].scorer, filepath.clone());
1414
1415                 if !std::thread::panicking() {
1416                         bg_processor.stop().unwrap();
1417                 }
1418         }
1419
1420         #[test]
1421         fn test_timer_tick_called() {
1422                 // Test that:
1423                 // - `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`,
1424                 // - `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`,
1425                 // - `PeerManager::timer_tick_occurred` is called every `PING_TIMER`, and
1426                 // - `OnionMessageHandler::timer_tick_occurred` is called every `ONION_MESSAGE_HANDLER_TIMER`.
1427                 let (_, nodes) = create_nodes(1, "test_timer_tick_called");
1428                 let data_dir = nodes[0].kv_store.get_data_dir();
1429                 let persister = Arc::new(Persister::new(data_dir));
1430                 let event_handler = |_: _| {};
1431                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1432                 loop {
1433                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1434                         let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string();
1435                         let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string();
1436                         let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string();
1437                         let desired_log_4 = "Calling OnionMessageHandler's timer_tick_occurred".to_string();
1438                         if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some() &&
1439                                 log_entries.get(&("lightning_background_processor", desired_log_2)).is_some() &&
1440                                 log_entries.get(&("lightning_background_processor", desired_log_3)).is_some() &&
1441                                 log_entries.get(&("lightning_background_processor", desired_log_4)).is_some() {
1442                                 break
1443                         }
1444                 }
1445
1446                 if !std::thread::panicking() {
1447                         bg_processor.stop().unwrap();
1448                 }
1449         }
1450
1451         #[test]
1452         fn test_channel_manager_persist_error() {
1453                 // Test that if we encounter an error during manager persistence, the thread panics.
1454                 let (_, nodes) = create_nodes(2, "test_persist_error");
1455                 open_channel!(nodes[0], nodes[1], 100000);
1456
1457                 let data_dir = nodes[0].kv_store.get_data_dir();
1458                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1459                 let event_handler = |_: _| {};
1460                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1461                 match bg_processor.join() {
1462                         Ok(_) => panic!("Expected error persisting manager"),
1463                         Err(e) => {
1464                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1465                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1466                         },
1467                 }
1468         }
1469
1470         #[tokio::test]
1471         #[cfg(feature = "futures")]
1472         async fn test_channel_manager_persist_error_async() {
1473                 // Test that if we encounter an error during manager persistence, the thread panics.
1474                 let (_, nodes) = create_nodes(2, "test_persist_error_sync");
1475                 open_channel!(nodes[0], nodes[1], 100000);
1476
1477                 let data_dir = nodes[0].kv_store.get_data_dir();
1478                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1479
1480                 let bp_future = super::process_events_async(
1481                         persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
1482                         nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
1483                         Some(nodes[0].scorer.clone()), move |dur: Duration| {
1484                                 Box::pin(async move {
1485                                         tokio::time::sleep(dur).await;
1486                                         false // Never exit
1487                                 })
1488                         }, false, || Some(Duration::ZERO),
1489                 );
1490                 match bp_future.await {
1491                         Ok(_) => panic!("Expected error persisting manager"),
1492                         Err(e) => {
1493                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1494                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1495                         },
1496                 }
1497         }
1498
1499         #[test]
1500         fn test_network_graph_persist_error() {
1501                 // Test that if we encounter an error during network graph persistence, an error gets returned.
1502                 let (_, nodes) = create_nodes(2, "test_persist_network_graph_error");
1503                 let data_dir = nodes[0].kv_store.get_data_dir();
1504                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1505                 let event_handler = |_: _| {};
1506                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1507
1508                 match bg_processor.stop() {
1509                         Ok(_) => panic!("Expected error persisting network graph"),
1510                         Err(e) => {
1511                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1512                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1513                         },
1514                 }
1515         }
1516
1517         #[test]
1518         fn test_scorer_persist_error() {
1519                 // Test that if we encounter an error during scorer persistence, an error gets returned.
1520                 let (_, nodes) = create_nodes(2, "test_persist_scorer_error");
1521                 let data_dir = nodes[0].kv_store.get_data_dir();
1522                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1523                 let event_handler = |_: _| {};
1524                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1525
1526                 match bg_processor.stop() {
1527                         Ok(_) => panic!("Expected error persisting scorer"),
1528                         Err(e) => {
1529                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1530                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1531                         },
1532                 }
1533         }
1534
1535         #[test]
1536         fn test_background_event_handling() {
1537                 let (_, mut nodes) = create_nodes(2, "test_background_event_handling");
1538                 let channel_value = 100000;
1539                 let data_dir = nodes[0].kv_store.get_data_dir();
1540                 let persister = Arc::new(Persister::new(data_dir.clone()));
1541
1542                 // Set up a background event handler for FundingGenerationReady events.
1543                 let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
1544                 let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
1545                 let event_handler = move |event: Event| match event {
1546                         Event::FundingGenerationReady { .. } => funding_generation_send.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1547                         Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
1548                         Event::ChannelReady { .. } => {},
1549                         _ => panic!("Unexpected event: {:?}", event),
1550                 };
1551
1552                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1553
1554                 // Open a channel and check that the FundingGenerationReady event was handled.
1555                 begin_open_channel!(nodes[0], nodes[1], channel_value);
1556                 let (temporary_channel_id, funding_tx) = funding_generation_recv
1557                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1558                         .expect("FundingGenerationReady not handled within deadline");
1559                 nodes[0].node.funding_transaction_generated(&temporary_channel_id, &nodes[1].node.get_our_node_id(), funding_tx.clone()).unwrap();
1560                 nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, nodes[1].node.get_our_node_id()));
1561                 get_event!(nodes[1], Event::ChannelPending);
1562                 nodes[0].node.handle_funding_signed(&nodes[1].node.get_our_node_id(), &get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, nodes[0].node.get_our_node_id()));
1563                 let _ = channel_pending_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1564                         .expect("ChannelPending not handled within deadline");
1565
1566                 // Confirm the funding transaction.
1567                 confirm_transaction(&mut nodes[0], &funding_tx);
1568                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1569                 confirm_transaction(&mut nodes[1], &funding_tx);
1570                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1571                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1572                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1573                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1574                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1575
1576                 if !std::thread::panicking() {
1577                         bg_processor.stop().unwrap();
1578                 }
1579
1580                 // Set up a background event handler for SpendableOutputs events.
1581                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1582                 let event_handler = move |event: Event| match event {
1583                         Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
1584                         Event::ChannelReady { .. } => {},
1585                         Event::ChannelClosed { .. } => {},
1586                         _ => panic!("Unexpected event: {:?}", event),
1587                 };
1588                 let persister = Arc::new(Persister::new(data_dir));
1589                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1590
1591                 // Force close the channel and check that the SpendableOutputs event was handled.
1592                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1593                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1594                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1595
1596                 let event = receiver
1597                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1598                         .expect("Events not handled within deadline");
1599                 match event {
1600                         Event::SpendableOutputs { .. } => {},
1601                         _ => panic!("Unexpected event: {:?}", event),
1602                 }
1603
1604                 if !std::thread::panicking() {
1605                         bg_processor.stop().unwrap();
1606                 }
1607         }
1608
1609         #[test]
1610         fn test_scorer_persistence() {
1611                 let (_, nodes) = create_nodes(2, "test_scorer_persistence");
1612                 let data_dir = nodes[0].kv_store.get_data_dir();
1613                 let persister = Arc::new(Persister::new(data_dir));
1614                 let event_handler = |_: _| {};
1615                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1616
1617                 loop {
1618                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1619                         let expected_log = "Persisting scorer".to_string();
1620                         if log_entries.get(&("lightning_background_processor", expected_log)).is_some() {
1621                                 break
1622                         }
1623                 }
1624
1625                 if !std::thread::panicking() {
1626                         bg_processor.stop().unwrap();
1627                 }
1628         }
1629
1630         macro_rules! do_test_not_pruning_network_graph_until_graph_sync_completion {
1631                 ($nodes: expr, $receive: expr, $sleep: expr) => {
1632                         let features = ChannelFeatures::empty();
1633                         $nodes[0].network_graph.add_channel_from_partial_announcement(
1634                                 42, 53, features, $nodes[0].node.get_our_node_id(), $nodes[1].node.get_our_node_id()
1635                         ).expect("Failed to update channel from partial announcement");
1636                         let original_graph_description = $nodes[0].network_graph.to_string();
1637                         assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1638                         assert_eq!($nodes[0].network_graph.read_only().channels().len(), 1);
1639
1640                         loop {
1641                                 $sleep;
1642                                 let log_entries = $nodes[0].logger.lines.lock().unwrap();
1643                                 let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1644                                 if *log_entries.get(&("lightning_background_processor", loop_counter))
1645                                         .unwrap_or(&0) > 1
1646                                 {
1647                                         // Wait until the loop has gone around at least twice.
1648                                         break
1649                                 }
1650                         }
1651
1652                         let initialization_input = vec![
1653                                 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1654                                 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1655                                 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1656                                 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1657                                 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1658                                 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1659                                 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1660                                 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1661                                 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1662                                 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1663                                 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1664                                 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1665                                 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1666                         ];
1667                         $nodes[0].rapid_gossip_sync.update_network_graph_no_std(&initialization_input[..], Some(1642291930)).unwrap();
1668
1669                         // this should have added two channels and pruned the previous one.
1670                         assert_eq!($nodes[0].network_graph.read_only().channels().len(), 2);
1671
1672                         $receive.expect("Network graph not pruned within deadline");
1673
1674                         // all channels should now be pruned
1675                         assert_eq!($nodes[0].network_graph.read_only().channels().len(), 0);
1676                 }
1677         }
1678
1679         #[test]
1680         fn test_not_pruning_network_graph_until_graph_sync_completion() {
1681                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1682
1683                 let (_, nodes) = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion");
1684                 let data_dir = nodes[0].kv_store.get_data_dir();
1685                 let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
1686
1687                 let event_handler = |_: _| {};
1688                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1689
1690                 do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes,
1691                         receiver.recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5)),
1692                         std::thread::sleep(Duration::from_millis(1)));
1693
1694                 background_processor.stop().unwrap();
1695         }
1696
1697         #[tokio::test]
1698         #[cfg(feature = "futures")]
1699         async fn test_not_pruning_network_graph_until_graph_sync_completion_async() {
1700                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1701
1702                 let (_, nodes) = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
1703                 let data_dir = nodes[0].kv_store.get_data_dir();
1704                 let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
1705
1706                 let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
1707                 let bp_future = super::process_events_async(
1708                         persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
1709                         nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
1710                         Some(nodes[0].scorer.clone()), move |dur: Duration| {
1711                                 let mut exit_receiver = exit_receiver.clone();
1712                                 Box::pin(async move {
1713                                         tokio::select! {
1714                                                 _ = tokio::time::sleep(dur) => false,
1715                                                 _ = exit_receiver.changed() => true,
1716                                         }
1717                                 })
1718                         }, false, || Some(Duration::from_secs(1696300000)),
1719                 );
1720
1721                 let t1 = tokio::spawn(bp_future);
1722                 let t2 = tokio::spawn(async move {
1723                         do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes, {
1724                                 let mut i = 0;
1725                                 loop {
1726                                         tokio::time::sleep(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER)).await;
1727                                         if let Ok(()) = receiver.try_recv() { break Ok::<(), ()>(()); }
1728                                         assert!(i < 5);
1729                                         i += 1;
1730                                 }
1731                         }, tokio::time::sleep(Duration::from_millis(1)).await);
1732                         exit_sender.send(()).unwrap();
1733                 });
1734                 let (r1, r2) = tokio::join!(t1, t2);
1735                 r1.unwrap().unwrap();
1736                 r2.unwrap()
1737         }
1738
1739         macro_rules! do_test_payment_path_scoring {
1740                 ($nodes: expr, $receive: expr) => {
1741                         // Ensure that we update the scorer when relevant events are processed. In this case, we ensure
1742                         // that we update the scorer upon a payment path succeeding (note that the channel must be
1743                         // public or else we won't score it).
1744                         // A background event handler for FundingGenerationReady events must be hooked up to a
1745                         // running background processor.
1746                         let scored_scid = 4242;
1747                         let secp_ctx = Secp256k1::new();
1748                         let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
1749                         let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
1750
1751                         let path = Path { hops: vec![RouteHop {
1752                                 pubkey: node_1_id,
1753                                 node_features: NodeFeatures::empty(),
1754                                 short_channel_id: scored_scid,
1755                                 channel_features: ChannelFeatures::empty(),
1756                                 fee_msat: 0,
1757                                 cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
1758                                 maybe_announced_channel: true,
1759                         }], blinded_tail: None };
1760
1761                         $nodes[0].scorer.write_lock().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
1762                         $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1763                                 payment_id: None,
1764                                 payment_hash: PaymentHash([42; 32]),
1765                                 payment_failed_permanently: false,
1766                                 failure: PathFailure::OnPath { network_update: None },
1767                                 path: path.clone(),
1768                                 short_channel_id: Some(scored_scid),
1769                         });
1770                         let event = $receive.expect("PaymentPathFailed not handled within deadline");
1771                         match event {
1772                                 Event::PaymentPathFailed { .. } => {},
1773                                 _ => panic!("Unexpected event"),
1774                         }
1775
1776                         // Ensure we'll score payments that were explicitly failed back by the destination as
1777                         // ProbeSuccess.
1778                         $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
1779                         $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1780                                 payment_id: None,
1781                                 payment_hash: PaymentHash([42; 32]),
1782                                 payment_failed_permanently: true,
1783                                 failure: PathFailure::OnPath { network_update: None },
1784                                 path: path.clone(),
1785                                 short_channel_id: None,
1786                         });
1787                         let event = $receive.expect("PaymentPathFailed not handled within deadline");
1788                         match event {
1789                                 Event::PaymentPathFailed { .. } => {},
1790                                 _ => panic!("Unexpected event"),
1791                         }
1792
1793                         $nodes[0].scorer.write_lock().expect(TestResult::PaymentSuccess { path: path.clone() });
1794                         $nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
1795                                 payment_id: PaymentId([42; 32]),
1796                                 payment_hash: None,
1797                                 path: path.clone(),
1798                         });
1799                         let event = $receive.expect("PaymentPathSuccessful not handled within deadline");
1800                         match event {
1801                                 Event::PaymentPathSuccessful { .. } => {},
1802                                 _ => panic!("Unexpected event"),
1803                         }
1804
1805                         $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
1806                         $nodes[0].node.push_pending_event(Event::ProbeSuccessful {
1807                                 payment_id: PaymentId([42; 32]),
1808                                 payment_hash: PaymentHash([42; 32]),
1809                                 path: path.clone(),
1810                         });
1811                         let event = $receive.expect("ProbeSuccessful not handled within deadline");
1812                         match event {
1813                                 Event::ProbeSuccessful  { .. } => {},
1814                                 _ => panic!("Unexpected event"),
1815                         }
1816
1817                         $nodes[0].scorer.write_lock().expect(TestResult::ProbeFailure { path: path.clone() });
1818                         $nodes[0].node.push_pending_event(Event::ProbeFailed {
1819                                 payment_id: PaymentId([42; 32]),
1820                                 payment_hash: PaymentHash([42; 32]),
1821                                 path,
1822                                 short_channel_id: Some(scored_scid),
1823                         });
1824                         let event = $receive.expect("ProbeFailure not handled within deadline");
1825                         match event {
1826                                 Event::ProbeFailed { .. } => {},
1827                                 _ => panic!("Unexpected event"),
1828                         }
1829                 }
1830         }
1831
1832         #[test]
1833         fn test_payment_path_scoring() {
1834                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1835                 let event_handler = move |event: Event| match event {
1836                         Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
1837                         Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
1838                         Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
1839                         Event::ProbeFailed { .. } => sender.send(event).unwrap(),
1840                         _ => panic!("Unexpected event: {:?}", event),
1841                 };
1842
1843                 let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
1844                 let data_dir = nodes[0].kv_store.get_data_dir();
1845                 let persister = Arc::new(Persister::new(data_dir));
1846                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1847
1848                 do_test_payment_path_scoring!(nodes, receiver.recv_timeout(Duration::from_secs(EVENT_DEADLINE)));
1849
1850                 if !std::thread::panicking() {
1851                         bg_processor.stop().unwrap();
1852                 }
1853
1854                 let log_entries = nodes[0].logger.lines.lock().unwrap();
1855                 let expected_log = "Persisting scorer after update".to_string();
1856                 assert_eq!(*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(), 5);
1857         }
1858
1859         #[tokio::test]
1860         #[cfg(feature = "futures")]
1861         async fn test_payment_path_scoring_async() {
1862                 let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
1863                 let event_handler = move |event: Event| {
1864                         let sender_ref = sender.clone();
1865                         async move {
1866                                 match event {
1867                                         Event::PaymentPathFailed { .. } => { sender_ref.send(event).await.unwrap() },
1868                                         Event::PaymentPathSuccessful { .. } => { sender_ref.send(event).await.unwrap() },
1869                                         Event::ProbeSuccessful { .. } => { sender_ref.send(event).await.unwrap() },
1870                                         Event::ProbeFailed { .. } => { sender_ref.send(event).await.unwrap() },
1871                                         _ => panic!("Unexpected event: {:?}", event),
1872                                 }
1873                         }
1874                 };
1875
1876                 let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async");
1877                 let data_dir = nodes[0].kv_store.get_data_dir();
1878                 let persister = Arc::new(Persister::new(data_dir));
1879
1880                 let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
1881
1882                 let bp_future = super::process_events_async(
1883                         persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
1884                         nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
1885                         Some(nodes[0].scorer.clone()), move |dur: Duration| {
1886                                 let mut exit_receiver = exit_receiver.clone();
1887                                 Box::pin(async move {
1888                                         tokio::select! {
1889                                                 _ = tokio::time::sleep(dur) => false,
1890                                                 _ = exit_receiver.changed() => true,
1891                                         }
1892                                 })
1893                         }, false, || Some(Duration::ZERO),
1894                 );
1895                 let t1 = tokio::spawn(bp_future);
1896                 let t2 = tokio::spawn(async move {
1897                         do_test_payment_path_scoring!(nodes, receiver.recv().await);
1898                         exit_sender.send(()).unwrap();
1899
1900                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1901                         let expected_log = "Persisting scorer after update".to_string();
1902                         assert_eq!(*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(), 5);
1903                 });
1904
1905                 let (r1, r2) = tokio::join!(t1, t2);
1906                 r1.unwrap().unwrap();
1907                 r2.unwrap()
1908         }
1909 }