Check for `background-processor` exit condition before+after sleep
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
8
9 #![deny(missing_docs)]
10 #![cfg_attr(not(feature = "futures"), deny(unsafe_code))]
11
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
13
14 #![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
15
16 #[cfg(any(test, feature = "std"))]
17 extern crate core;
18
19 #[cfg(not(feature = "std"))]
20 extern crate alloc;
21
22 #[macro_use] extern crate lightning;
23 extern crate lightning_rapid_gossip_sync;
24
25 use lightning::chain;
26 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
27 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
28 use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider};
29 use lightning::events::{Event, PathFailure};
30 #[cfg(feature = "std")]
31 use lightning::events::{EventHandler, EventsProvider};
32 use lightning::ln::channelmanager::ChannelManager;
33 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
34 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
35 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
36 use lightning::routing::utxo::UtxoLookup;
37 use lightning::routing::router::Router;
38 use lightning::routing::scoring::{Score, WriteableScore};
39 use lightning::util::logger::Logger;
40 use lightning::util::persist::Persister;
41 #[cfg(feature = "std")]
42 use lightning::util::wakers::Sleeper;
43 use lightning_rapid_gossip_sync::RapidGossipSync;
44
45 use core::ops::Deref;
46 use core::time::Duration;
47
48 #[cfg(feature = "std")]
49 use std::sync::Arc;
50 #[cfg(feature = "std")]
51 use core::sync::atomic::{AtomicBool, Ordering};
52 #[cfg(feature = "std")]
53 use std::thread::{self, JoinHandle};
54 #[cfg(feature = "std")]
55 use std::time::Instant;
56
57 #[cfg(not(feature = "std"))]
58 use alloc::vec::Vec;
59
60 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
61 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
62 /// responsibilities are:
63 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
64 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
65 ///   writing it to disk/backups by invoking the callback given to it at startup.
66 ///   [`ChannelManager`] persistence should be done in the background.
67 /// * Calling [`ChannelManager::timer_tick_occurred`], [`ChainMonitor::rebroadcast_pending_claims`]
68 ///   and [`PeerManager::timer_tick_occurred`] at the appropriate intervals.
69 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
70 ///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
71 ///
72 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
73 /// upon as doing so may result in high latency.
74 ///
75 /// # Note
76 ///
77 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
78 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
79 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
80 /// unilateral chain closure fees are at risk.
81 ///
82 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
83 /// [`Event`]: lightning::events::Event
84 #[cfg(feature = "std")]
85 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
86 pub struct BackgroundProcessor {
87         stop_thread: Arc<AtomicBool>,
88         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
89 }
90
91 #[cfg(not(test))]
92 const FRESHNESS_TIMER: u64 = 60;
93 #[cfg(test)]
94 const FRESHNESS_TIMER: u64 = 1;
95
96 #[cfg(all(not(test), not(debug_assertions)))]
97 const PING_TIMER: u64 = 10;
98 /// Signature operations take a lot longer without compiler optimisations.
99 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
100 /// timeout is reached.
101 #[cfg(all(not(test), debug_assertions))]
102 const PING_TIMER: u64 = 30;
103 #[cfg(test)]
104 const PING_TIMER: u64 = 1;
105
106 /// Prune the network graph of stale entries hourly.
107 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
108
109 #[cfg(not(test))]
110 const SCORER_PERSIST_TIMER: u64 = 30;
111 #[cfg(test)]
112 const SCORER_PERSIST_TIMER: u64 = 1;
113
114 #[cfg(not(test))]
115 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
116 #[cfg(test)]
117 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
118
119 #[cfg(not(test))]
120 const REBROADCAST_TIMER: u64 = 30;
121 #[cfg(test)]
122 const REBROADCAST_TIMER: u64 = 1;
123
124 #[cfg(feature = "futures")]
125 /// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
126 const fn min_u64(a: u64, b: u64) -> u64 { if a < b { a } else { b } }
127 #[cfg(feature = "futures")]
128 const FASTEST_TIMER: u64 = min_u64(min_u64(FRESHNESS_TIMER, PING_TIMER),
129         min_u64(SCORER_PERSIST_TIMER, min_u64(FIRST_NETWORK_PRUNE_TIMER, REBROADCAST_TIMER)));
130
131 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
132 pub enum GossipSync<
133         P: Deref<Target = P2PGossipSync<G, U, L>>,
134         R: Deref<Target = RapidGossipSync<G, L>>,
135         G: Deref<Target = NetworkGraph<L>>,
136         U: Deref,
137         L: Deref,
138 >
139 where U::Target: UtxoLookup, L::Target: Logger {
140         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
141         P2P(P),
142         /// Rapid gossip sync from a trusted server.
143         Rapid(R),
144         /// No gossip sync.
145         None,
146 }
147
148 impl<
149         P: Deref<Target = P2PGossipSync<G, U, L>>,
150         R: Deref<Target = RapidGossipSync<G, L>>,
151         G: Deref<Target = NetworkGraph<L>>,
152         U: Deref,
153         L: Deref,
154 > GossipSync<P, R, G, U, L>
155 where U::Target: UtxoLookup, L::Target: Logger {
156         fn network_graph(&self) -> Option<&G> {
157                 match self {
158                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
159                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
160                         GossipSync::None => None,
161                 }
162         }
163
164         fn prunable_network_graph(&self) -> Option<&G> {
165                 match self {
166                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
167                         GossipSync::Rapid(gossip_sync) => {
168                                 if gossip_sync.is_initial_sync_complete() {
169                                         Some(gossip_sync.network_graph())
170                                 } else {
171                                         None
172                                 }
173                         },
174                         GossipSync::None => None,
175                 }
176         }
177 }
178
179 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
180 impl<P: Deref<Target = P2PGossipSync<G, U, L>>, G: Deref<Target = NetworkGraph<L>>, U: Deref, L: Deref>
181         GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
182 where
183         U::Target: UtxoLookup,
184         L::Target: Logger,
185 {
186         /// Initializes a new [`GossipSync::P2P`] variant.
187         pub fn p2p(gossip_sync: P) -> Self {
188                 GossipSync::P2P(gossip_sync)
189         }
190 }
191
192 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
193 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
194         GossipSync<
195                 &P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
196                 R,
197                 G,
198                 &'a (dyn UtxoLookup + Send + Sync),
199                 L,
200         >
201 where
202         L::Target: Logger,
203 {
204         /// Initializes a new [`GossipSync::Rapid`] variant.
205         pub fn rapid(gossip_sync: R) -> Self {
206                 GossipSync::Rapid(gossip_sync)
207         }
208 }
209
210 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
211 impl<'a, L: Deref>
212         GossipSync<
213                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
214                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
215                 &'a NetworkGraph<L>,
216                 &'a (dyn UtxoLookup + Send + Sync),
217                 L,
218         >
219 where
220         L::Target: Logger,
221 {
222         /// Initializes a new [`GossipSync::None`] variant.
223         pub fn none() -> Self {
224                 GossipSync::None
225         }
226 }
227
228 fn handle_network_graph_update<L: Deref>(
229         network_graph: &NetworkGraph<L>, event: &Event
230 ) where L::Target: Logger {
231         if let Event::PaymentPathFailed {
232                 failure: PathFailure::OnPath { network_update: Some(ref upd) }, .. } = event
233         {
234                 network_graph.handle_network_update(upd);
235         }
236 }
237
238 fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
239         scorer: &'a S, event: &Event
240 ) {
241         let mut score = scorer.lock();
242         match event {
243                 Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
244                         let path = path.iter().collect::<Vec<_>>();
245                         score.payment_path_failed(&path, *scid);
246                 },
247                 Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
248                         // Reached if the destination explicitly failed it back. We treat this as a successful probe
249                         // because the payment made it all the way to the destination with sufficient liquidity.
250                         let path = path.iter().collect::<Vec<_>>();
251                         score.probe_successful(&path);
252                 },
253                 Event::PaymentPathSuccessful { path, .. } => {
254                         let path = path.iter().collect::<Vec<_>>();
255                         score.payment_path_successful(&path);
256                 },
257                 Event::ProbeSuccessful { path, .. } => {
258                         let path = path.iter().collect::<Vec<_>>();
259                         score.probe_successful(&path);
260                 },
261                 Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
262                         let path = path.iter().collect::<Vec<_>>();
263                         score.probe_failed(&path, *scid);
264                 },
265                 _ => {},
266         }
267 }
268
269 macro_rules! define_run_body {
270         ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
271          $channel_manager: ident, $process_channel_manager_events: expr,
272          $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
273          $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr,
274          $check_slow_await: expr)
275         => { {
276                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
277                 $channel_manager.timer_tick_occurred();
278                 log_trace!($logger, "Rebroadcasting monitor's pending claims on startup");
279                 $chain_monitor.rebroadcast_pending_claims();
280
281                 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
282                 let mut last_ping_call = $get_timer(PING_TIMER);
283                 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
284                 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
285                 let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
286                 let mut have_pruned = false;
287
288                 loop {
289                         $process_channel_manager_events;
290                         $process_chain_monitor_events;
291
292                         // Note that the PeerManager::process_events may block on ChannelManager's locks,
293                         // hence it comes last here. When the ChannelManager finishes whatever it's doing,
294                         // we want to ensure we get into `persist_manager` as quickly as we can, especially
295                         // without running the normal event processing above and handing events to users.
296                         //
297                         // Specifically, on an *extremely* slow machine, we may see ChannelManager start
298                         // processing a message effectively at any point during this loop. In order to
299                         // minimize the time between such processing completing and persisting the updated
300                         // ChannelManager, we want to minimize methods blocking on a ChannelManager
301                         // generally, and as a fallback place such blocking only immediately before
302                         // persistence.
303                         $peer_manager.process_events();
304
305                         // Exit the loop if the background processor was requested to stop.
306                         if $loop_exit_check {
307                                 log_trace!($logger, "Terminating background processor.");
308                                 break;
309                         }
310
311                         // We wait up to 100ms, but track how long it takes to detect being put to sleep,
312                         // see `await_start`'s use below.
313                         let mut await_start = None;
314                         if $check_slow_await { await_start = Some($get_timer(1)); }
315                         let updates_available = $await;
316                         let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false };
317
318                         // Exit the loop if the background processor was requested to stop.
319                         if $loop_exit_check {
320                                 log_trace!($logger, "Terminating background processor.");
321                                 break;
322                         }
323
324                         if updates_available {
325                                 log_trace!($logger, "Persisting ChannelManager...");
326                                 $persister.persist_manager(&*$channel_manager)?;
327                                 log_trace!($logger, "Done persisting ChannelManager.");
328                         }
329                         if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
330                                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
331                                 $channel_manager.timer_tick_occurred();
332                                 last_freshness_call = $get_timer(FRESHNESS_TIMER);
333                         }
334                         if await_slow {
335                                 // On various platforms, we may be starved of CPU cycles for several reasons.
336                                 // E.g. on iOS, if we've been in the background, we will be entirely paused.
337                                 // Similarly, if we're on a desktop platform and the device has been asleep, we
338                                 // may not get any cycles.
339                                 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
340                                 // full second, at which point we assume sockets may have been killed (they
341                                 // appear to be at least on some platforms, even if it has only been a second).
342                                 // Note that we have to take care to not get here just because user event
343                                 // processing was slow at the top of the loop. For example, the sample client
344                                 // may call Bitcoin Core RPCs during event handling, which very often takes
345                                 // more than a handful of seconds to complete, and shouldn't disconnect all our
346                                 // peers.
347                                 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
348                                 $peer_manager.disconnect_all_peers();
349                                 last_ping_call = $get_timer(PING_TIMER);
350                         } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
351                                 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
352                                 $peer_manager.timer_tick_occurred();
353                                 last_ping_call = $get_timer(PING_TIMER);
354                         }
355
356                         // Note that we want to run a graph prune once not long after startup before
357                         // falling back to our usual hourly prunes. This avoids short-lived clients never
358                         // pruning their network graph. We run once 60 seconds after startup before
359                         // continuing our normal cadence.
360                         let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
361                         if $timer_elapsed(&mut last_prune_call, prune_timer) {
362                                 // The network graph must not be pruned while rapid sync completion is pending
363                                 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
364                                         #[cfg(feature = "std")] {
365                                                 log_trace!($logger, "Pruning and persisting network graph.");
366                                                 network_graph.remove_stale_channels_and_tracking();
367                                         }
368                                         #[cfg(not(feature = "std"))] {
369                                                 log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
370                                                 log_trace!($logger, "Persisting network graph.");
371                                         }
372
373                                         if let Err(e) = $persister.persist_graph(network_graph) {
374                                                 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
375                                         }
376
377                                         have_pruned = true;
378                                 }
379                                 let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
380                                 last_prune_call = $get_timer(prune_timer);
381                         }
382
383                         if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
384                                 if let Some(ref scorer) = $scorer {
385                                         log_trace!($logger, "Persisting scorer");
386                                         if let Err(e) = $persister.persist_scorer(&scorer) {
387                                                 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
388                                         }
389                                 }
390                                 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
391                         }
392
393                         if $timer_elapsed(&mut last_rebroadcast_call, REBROADCAST_TIMER) {
394                                 log_trace!($logger, "Rebroadcasting monitor's pending claims");
395                                 $chain_monitor.rebroadcast_pending_claims();
396                                 last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
397                         }
398                 }
399
400                 // After we exit, ensure we persist the ChannelManager one final time - this avoids
401                 // some races where users quit while channel updates were in-flight, with
402                 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
403                 $persister.persist_manager(&*$channel_manager)?;
404
405                 // Persist Scorer on exit
406                 if let Some(ref scorer) = $scorer {
407                         $persister.persist_scorer(&scorer)?;
408                 }
409
410                 // Persist NetworkGraph on exit
411                 if let Some(network_graph) = $gossip_sync.network_graph() {
412                         $persister.persist_graph(network_graph)?;
413                 }
414
415                 Ok(())
416         } }
417 }
418
419 #[cfg(feature = "futures")]
420 pub(crate) mod futures_util {
421         use core::future::Future;
422         use core::task::{Poll, Waker, RawWaker, RawWakerVTable};
423         use core::pin::Pin;
424         use core::marker::Unpin;
425         pub(crate) struct Selector<
426                 A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
427         > {
428                 pub a: A,
429                 pub b: B,
430                 pub c: C,
431         }
432         pub(crate) enum SelectorOutput {
433                 A, B, C(bool),
434         }
435
436         impl<
437                 A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
438         > Future for Selector<A, B, C> {
439                 type Output = SelectorOutput;
440                 fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<SelectorOutput> {
441                         match Pin::new(&mut self.a).poll(ctx) {
442                                 Poll::Ready(()) => { return Poll::Ready(SelectorOutput::A); },
443                                 Poll::Pending => {},
444                         }
445                         match Pin::new(&mut self.b).poll(ctx) {
446                                 Poll::Ready(()) => { return Poll::Ready(SelectorOutput::B); },
447                                 Poll::Pending => {},
448                         }
449                         match Pin::new(&mut self.c).poll(ctx) {
450                                 Poll::Ready(res) => { return Poll::Ready(SelectorOutput::C(res)); },
451                                 Poll::Pending => {},
452                         }
453                         Poll::Pending
454                 }
455         }
456
457         // If we want to poll a future without an async context to figure out if it has completed or
458         // not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
459         // but sadly there's a good bit of boilerplate here.
460         fn dummy_waker_clone(_: *const ()) -> RawWaker { RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE) }
461         fn dummy_waker_action(_: *const ()) { }
462
463         const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
464                 dummy_waker_clone, dummy_waker_action, dummy_waker_action, dummy_waker_action);
465         pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) } }
466 }
467 #[cfg(feature = "futures")]
468 use futures_util::{Selector, SelectorOutput, dummy_waker};
469 #[cfg(feature = "futures")]
470 use core::task;
471
472 /// Processes background events in a future.
473 ///
474 /// `sleeper` should return a future which completes in the given amount of time and returns a
475 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
476 /// future which outputs true, the loop will exit and this function's future will complete.
477 ///
478 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
479 ///
480 /// Requires the `futures` feature. Note that while this method is available without the `std`
481 /// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
482 /// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
483 /// manually instead.
484 ///
485 /// The `mobile_interruptable_platform` flag should be set if we're currently running on a
486 /// mobile device, where we may need to check for interruption of the application regularly. If you
487 /// are unsure, you should set the flag, as the performance impact of it is minimal unless there
488 /// are hundreds or thousands of simultaneous process calls running.
489 #[cfg(feature = "futures")]
490 pub async fn process_events_async<
491         'a,
492         UL: 'static + Deref + Send + Sync,
493         CF: 'static + Deref + Send + Sync,
494         CW: 'static + Deref + Send + Sync,
495         T: 'static + Deref + Send + Sync,
496         ES: 'static + Deref + Send + Sync,
497         NS: 'static + Deref + Send + Sync,
498         SP: 'static + Deref + Send + Sync,
499         F: 'static + Deref + Send + Sync,
500         R: 'static + Deref + Send + Sync,
501         G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
502         L: 'static + Deref + Send + Sync,
503         P: 'static + Deref + Send + Sync,
504         Descriptor: 'static + SocketDescriptor + Send + Sync,
505         CMH: 'static + Deref + Send + Sync,
506         RMH: 'static + Deref + Send + Sync,
507         OMH: 'static + Deref + Send + Sync,
508         EventHandlerFuture: core::future::Future<Output = ()>,
509         EventHandler: Fn(Event) -> EventHandlerFuture,
510         PS: 'static + Deref + Send,
511         M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
512         CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
513         PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
514         RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
515         UMH: 'static + Deref + Send + Sync,
516         PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
517         S: 'static + Deref<Target = SC> + Send + Sync,
518         SC: for<'b> WriteableScore<'b>,
519         SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
520         Sleeper: Fn(Duration) -> SleepFuture
521 >(
522         persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
523         gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
524         sleeper: Sleeper, mobile_interruptable_platform: bool,
525 ) -> Result<(), lightning::io::Error>
526 where
527         UL::Target: 'static + UtxoLookup,
528         CF::Target: 'static + chain::Filter,
529         CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
530         T::Target: 'static + BroadcasterInterface,
531         ES::Target: 'static + EntropySource,
532         NS::Target: 'static + NodeSigner,
533         SP::Target: 'static + SignerProvider,
534         F::Target: 'static + FeeEstimator,
535         R::Target: 'static + Router,
536         L::Target: 'static + Logger,
537         P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
538         CMH::Target: 'static + ChannelMessageHandler,
539         OMH::Target: 'static + OnionMessageHandler,
540         RMH::Target: 'static + RoutingMessageHandler,
541         UMH::Target: 'static + CustomMessageHandler,
542         PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
543 {
544         let mut should_break = false;
545         let async_event_handler = |event| {
546                 let network_graph = gossip_sync.network_graph();
547                 let event_handler = &event_handler;
548                 let scorer = &scorer;
549                 async move {
550                         if let Some(network_graph) = network_graph {
551                                 handle_network_graph_update(network_graph, &event)
552                         }
553                         if let Some(ref scorer) = scorer {
554                                 update_scorer(scorer, &event);
555                         }
556                         event_handler(event).await;
557                 }
558         };
559         define_run_body!(persister,
560                 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
561                 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
562                 gossip_sync, peer_manager, logger, scorer, should_break, {
563                         let fut = Selector {
564                                 a: channel_manager.get_persistable_update_future(),
565                                 b: chain_monitor.get_update_future(),
566                                 c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }),
567                         };
568                         match fut.await {
569                                 SelectorOutput::A => true,
570                                 SelectorOutput::B => false,
571                                 SelectorOutput::C(exit) => {
572                                         should_break = exit;
573                                         false
574                                 }
575                         }
576                 }, |t| sleeper(Duration::from_secs(t)),
577                 |fut: &mut SleepFuture, _| {
578                         let mut waker = dummy_waker();
579                         let mut ctx = task::Context::from_waker(&mut waker);
580                         match core::pin::Pin::new(fut).poll(&mut ctx) {
581                                 task::Poll::Ready(exit) => { should_break = exit; true },
582                                 task::Poll::Pending => false,
583                         }
584                 }, mobile_interruptable_platform)
585 }
586
587 #[cfg(feature = "std")]
588 impl BackgroundProcessor {
589         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
590         /// documentation].
591         ///
592         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
593         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
594         /// either [`join`] or [`stop`].
595         ///
596         /// # Data Persistence
597         ///
598         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
599         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
600         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
601         /// provided implementation.
602         ///
603         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
604         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
605         /// See the `lightning-persister` crate for LDK's provided implementation.
606         ///
607         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
608         /// error or call [`join`] and handle any error that may arise. For the latter case,
609         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
610         ///
611         /// # Event Handling
612         ///
613         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
614         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
615         /// functionality implemented by other handlers.
616         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
617         ///
618         /// # Rapid Gossip Sync
619         ///
620         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
621         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
622         /// until the [`RapidGossipSync`] instance completes its first sync.
623         ///
624         /// [top-level documentation]: BackgroundProcessor
625         /// [`join`]: Self::join
626         /// [`stop`]: Self::stop
627         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
628         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
629         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
630         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
631         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
632         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
633         pub fn start<
634                 'a,
635                 UL: 'static + Deref + Send + Sync,
636                 CF: 'static + Deref + Send + Sync,
637                 CW: 'static + Deref + Send + Sync,
638                 T: 'static + Deref + Send + Sync,
639                 ES: 'static + Deref + Send + Sync,
640                 NS: 'static + Deref + Send + Sync,
641                 SP: 'static + Deref + Send + Sync,
642                 F: 'static + Deref + Send + Sync,
643                 R: 'static + Deref + Send + Sync,
644                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
645                 L: 'static + Deref + Send + Sync,
646                 P: 'static + Deref + Send + Sync,
647                 Descriptor: 'static + SocketDescriptor + Send + Sync,
648                 CMH: 'static + Deref + Send + Sync,
649                 OMH: 'static + Deref + Send + Sync,
650                 RMH: 'static + Deref + Send + Sync,
651                 EH: 'static + EventHandler + Send,
652                 PS: 'static + Deref + Send,
653                 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
654                 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
655                 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
656                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
657                 UMH: 'static + Deref + Send + Sync,
658                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
659                 S: 'static + Deref<Target = SC> + Send + Sync,
660                 SC: for <'b> WriteableScore<'b>,
661         >(
662                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
663                 gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
664         ) -> Self
665         where
666                 UL::Target: 'static + UtxoLookup,
667                 CF::Target: 'static + chain::Filter,
668                 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
669                 T::Target: 'static + BroadcasterInterface,
670                 ES::Target: 'static + EntropySource,
671                 NS::Target: 'static + NodeSigner,
672                 SP::Target: 'static + SignerProvider,
673                 F::Target: 'static + FeeEstimator,
674                 R::Target: 'static + Router,
675                 L::Target: 'static + Logger,
676                 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
677                 CMH::Target: 'static + ChannelMessageHandler,
678                 OMH::Target: 'static + OnionMessageHandler,
679                 RMH::Target: 'static + RoutingMessageHandler,
680                 UMH::Target: 'static + CustomMessageHandler,
681                 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
682         {
683                 let stop_thread = Arc::new(AtomicBool::new(false));
684                 let stop_thread_clone = stop_thread.clone();
685                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
686                         let event_handler = |event| {
687                                 let network_graph = gossip_sync.network_graph();
688                                 if let Some(network_graph) = network_graph {
689                                         handle_network_graph_update(network_graph, &event)
690                                 }
691                                 if let Some(ref scorer) = scorer {
692                                         update_scorer(scorer, &event);
693                                 }
694                                 event_handler.handle_event(event);
695                         };
696                         define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
697                                 channel_manager, channel_manager.process_pending_events(&event_handler),
698                                 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
699                                 Sleeper::from_two_futures(
700                                         channel_manager.get_persistable_update_future(),
701                                         chain_monitor.get_update_future()
702                                 ).wait_timeout(Duration::from_millis(100)),
703                                 |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false)
704                 });
705                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
706         }
707
708         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
709         /// [`ChannelManager`].
710         ///
711         /// # Panics
712         ///
713         /// This function panics if the background thread has panicked such as while persisting or
714         /// handling events.
715         ///
716         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
717         pub fn join(mut self) -> Result<(), std::io::Error> {
718                 assert!(self.thread_handle.is_some());
719                 self.join_thread()
720         }
721
722         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
723         /// [`ChannelManager`].
724         ///
725         /// # Panics
726         ///
727         /// This function panics if the background thread has panicked such as while persisting or
728         /// handling events.
729         ///
730         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
731         pub fn stop(mut self) -> Result<(), std::io::Error> {
732                 assert!(self.thread_handle.is_some());
733                 self.stop_and_join_thread()
734         }
735
736         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
737                 self.stop_thread.store(true, Ordering::Release);
738                 self.join_thread()
739         }
740
741         fn join_thread(&mut self) -> Result<(), std::io::Error> {
742                 match self.thread_handle.take() {
743                         Some(handle) => handle.join().unwrap(),
744                         None => Ok(()),
745                 }
746         }
747 }
748
749 #[cfg(feature = "std")]
750 impl Drop for BackgroundProcessor {
751         fn drop(&mut self) {
752                 self.stop_and_join_thread().unwrap();
753         }
754 }
755
756 #[cfg(all(feature = "std", test))]
757 mod tests {
758         use bitcoin::blockdata::block::BlockHeader;
759         use bitcoin::blockdata::constants::genesis_block;
760         use bitcoin::blockdata::locktime::PackedLockTime;
761         use bitcoin::blockdata::transaction::{Transaction, TxOut};
762         use bitcoin::network::constants::Network;
763         use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1};
764         use lightning::chain::{BestBlock, Confirm, chainmonitor};
765         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
766         use lightning::chain::keysinterface::{InMemorySigner, KeysManager};
767         use lightning::chain::transaction::OutPoint;
768         use lightning::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent};
769         use lightning::{get_event_msg, get_event};
770         use lightning::ln::PaymentHash;
771         use lightning::ln::channelmanager;
772         use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId};
773         use lightning::ln::features::{ChannelFeatures, NodeFeatures};
774         use lightning::ln::msgs::{ChannelMessageHandler, Init};
775         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
776         use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
777         use lightning::routing::router::{DefaultRouter, RouteHop};
778         use lightning::routing::scoring::{ChannelUsage, Score};
779         use lightning::util::config::UserConfig;
780         use lightning::util::ser::Writeable;
781         use lightning::util::test_utils;
782         use lightning::util::persist::KVStorePersister;
783         use lightning_persister::FilesystemPersister;
784         use std::collections::VecDeque;
785         use std::fs;
786         use std::path::PathBuf;
787         use std::sync::{Arc, Mutex};
788         use std::sync::mpsc::SyncSender;
789         use std::time::Duration;
790         use bitcoin::hashes::Hash;
791         use bitcoin::TxMerkleNode;
792         use lightning_rapid_gossip_sync::RapidGossipSync;
793         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
794
795         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
796
797         #[derive(Clone, Hash, PartialEq, Eq)]
798         struct TestDescriptor{}
799         impl SocketDescriptor for TestDescriptor {
800                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
801                         0
802                 }
803
804                 fn disconnect_socket(&mut self) {}
805         }
806
807         type ChannelManager = channelmanager::ChannelManager<Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<KeysManager>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<DefaultRouter< Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>, Arc<Mutex<TestScorer>>>>, Arc<test_utils::TestLogger>>;
808
809         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
810
811         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
812         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
813
814         struct Node {
815                 node: Arc<ChannelManager>,
816                 p2p_gossip_sync: PGS,
817                 rapid_gossip_sync: RGS,
818                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
819                 chain_monitor: Arc<ChainMonitor>,
820                 persister: Arc<FilesystemPersister>,
821                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
822                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
823                 logger: Arc<test_utils::TestLogger>,
824                 best_block: BestBlock,
825                 scorer: Arc<Mutex<TestScorer>>,
826         }
827
828         impl Node {
829                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
830                         GossipSync::P2P(self.p2p_gossip_sync.clone())
831                 }
832
833                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
834                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
835                 }
836
837                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
838                         GossipSync::None
839                 }
840         }
841
842         impl Drop for Node {
843                 fn drop(&mut self) {
844                         let data_dir = self.persister.get_data_dir();
845                         match fs::remove_dir_all(data_dir.clone()) {
846                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
847                                 _ => {}
848                         }
849                 }
850         }
851
852         struct Persister {
853                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
854                 graph_persistence_notifier: Option<SyncSender<()>>,
855                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
856                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
857                 filesystem_persister: FilesystemPersister,
858         }
859
860         impl Persister {
861                 fn new(data_dir: String) -> Self {
862                         let filesystem_persister = FilesystemPersister::new(data_dir);
863                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
864                 }
865
866                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
867                         Self { graph_error: Some((error, message)), ..self }
868                 }
869
870                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
871                         Self { graph_persistence_notifier: Some(sender), ..self }
872                 }
873
874                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
875                         Self { manager_error: Some((error, message)), ..self }
876                 }
877
878                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
879                         Self { scorer_error: Some((error, message)), ..self }
880                 }
881         }
882
883         impl KVStorePersister for Persister {
884                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
885                         if key == "manager" {
886                                 if let Some((error, message)) = self.manager_error {
887                                         return Err(std::io::Error::new(error, message))
888                                 }
889                         }
890
891                         if key == "network_graph" {
892                                 if let Some(sender) = &self.graph_persistence_notifier {
893                                         match sender.send(()) {
894                                                 Ok(()) => {},
895                                                 Err(std::sync::mpsc::SendError(())) => println!("Persister failed to notify as receiver went away."),
896                                         }
897                                 };
898
899                                 if let Some((error, message)) = self.graph_error {
900                                         return Err(std::io::Error::new(error, message))
901                                 }
902                         }
903
904                         if key == "scorer" {
905                                 if let Some((error, message)) = self.scorer_error {
906                                         return Err(std::io::Error::new(error, message))
907                                 }
908                         }
909
910                         self.filesystem_persister.persist(key, object)
911                 }
912         }
913
914         struct TestScorer {
915                 event_expectations: Option<VecDeque<TestResult>>,
916         }
917
918         #[derive(Debug)]
919         enum TestResult {
920                 PaymentFailure { path: Vec<RouteHop>, short_channel_id: u64 },
921                 PaymentSuccess { path: Vec<RouteHop> },
922                 ProbeFailure { path: Vec<RouteHop> },
923                 ProbeSuccess { path: Vec<RouteHop> },
924         }
925
926         impl TestScorer {
927                 fn new() -> Self {
928                         Self { event_expectations: None }
929                 }
930
931                 fn expect(&mut self, expectation: TestResult) {
932                         self.event_expectations.get_or_insert_with(VecDeque::new).push_back(expectation);
933                 }
934         }
935
936         impl lightning::util::ser::Writeable for TestScorer {
937                 fn write<W: lightning::util::ser::Writer>(&self, _: &mut W) -> Result<(), lightning::io::Error> { Ok(()) }
938         }
939
940         impl Score for TestScorer {
941                 fn channel_penalty_msat(
942                         &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage
943                 ) -> u64 { unimplemented!(); }
944
945                 fn payment_path_failed(&mut self, actual_path: &[&RouteHop], actual_short_channel_id: u64) {
946                         if let Some(expectations) = &mut self.event_expectations {
947                                 match expectations.pop_front().unwrap() {
948                                         TestResult::PaymentFailure { path, short_channel_id } => {
949                                                 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
950                                                 assert_eq!(actual_short_channel_id, short_channel_id);
951                                         },
952                                         TestResult::PaymentSuccess { path } => {
953                                                 panic!("Unexpected successful payment path: {:?}", path)
954                                         },
955                                         TestResult::ProbeFailure { path } => {
956                                                 panic!("Unexpected probe failure: {:?}", path)
957                                         },
958                                         TestResult::ProbeSuccess { path } => {
959                                                 panic!("Unexpected probe success: {:?}", path)
960                                         }
961                                 }
962                         }
963                 }
964
965                 fn payment_path_successful(&mut self, actual_path: &[&RouteHop]) {
966                         if let Some(expectations) = &mut self.event_expectations {
967                                 match expectations.pop_front().unwrap() {
968                                         TestResult::PaymentFailure { path, .. } => {
969                                                 panic!("Unexpected payment path failure: {:?}", path)
970                                         },
971                                         TestResult::PaymentSuccess { path } => {
972                                                 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
973                                         },
974                                         TestResult::ProbeFailure { path } => {
975                                                 panic!("Unexpected probe failure: {:?}", path)
976                                         },
977                                         TestResult::ProbeSuccess { path } => {
978                                                 panic!("Unexpected probe success: {:?}", path)
979                                         }
980                                 }
981                         }
982                 }
983
984                 fn probe_failed(&mut self, actual_path: &[&RouteHop], _: u64) {
985                         if let Some(expectations) = &mut self.event_expectations {
986                                 match expectations.pop_front().unwrap() {
987                                         TestResult::PaymentFailure { path, .. } => {
988                                                 panic!("Unexpected payment path failure: {:?}", path)
989                                         },
990                                         TestResult::PaymentSuccess { path } => {
991                                                 panic!("Unexpected payment path success: {:?}", path)
992                                         },
993                                         TestResult::ProbeFailure { path } => {
994                                                 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
995                                         },
996                                         TestResult::ProbeSuccess { path } => {
997                                                 panic!("Unexpected probe success: {:?}", path)
998                                         }
999                                 }
1000                         }
1001                 }
1002                 fn probe_successful(&mut self, actual_path: &[&RouteHop]) {
1003                         if let Some(expectations) = &mut self.event_expectations {
1004                                 match expectations.pop_front().unwrap() {
1005                                         TestResult::PaymentFailure { path, .. } => {
1006                                                 panic!("Unexpected payment path failure: {:?}", path)
1007                                         },
1008                                         TestResult::PaymentSuccess { path } => {
1009                                                 panic!("Unexpected payment path success: {:?}", path)
1010                                         },
1011                                         TestResult::ProbeFailure { path } => {
1012                                                 panic!("Unexpected probe failure: {:?}", path)
1013                                         },
1014                                         TestResult::ProbeSuccess { path } => {
1015                                                 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
1016                                         }
1017                                 }
1018                         }
1019                 }
1020         }
1021
1022         impl Drop for TestScorer {
1023                 fn drop(&mut self) {
1024                         if std::thread::panicking() {
1025                                 return;
1026                         }
1027
1028                         if let Some(event_expectations) = &self.event_expectations {
1029                                 if !event_expectations.is_empty() {
1030                                         panic!("Unsatisfied event expectations: {:?}", event_expectations);
1031                                 }
1032                         }
1033                 }
1034         }
1035
1036         fn get_full_filepath(filepath: String, filename: String) -> String {
1037                 let mut path = PathBuf::from(filepath);
1038                 path.push(filename);
1039                 path.to_str().unwrap().to_string()
1040         }
1041
1042         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
1043                 let network = Network::Testnet;
1044                 let mut nodes = Vec::new();
1045                 for i in 0..num_nodes {
1046                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster::new(network));
1047                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
1048                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
1049                         let genesis_block = genesis_block(network);
1050                         let network_graph = Arc::new(NetworkGraph::new(network, logger.clone()));
1051                         let scorer = Arc::new(Mutex::new(TestScorer::new()));
1052                         let seed = [i as u8; 32];
1053                         let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone()));
1054                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
1055                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
1056                         let now = Duration::from_secs(genesis_block.header.time as u64);
1057                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
1058                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
1059                         let best_block = BestBlock::from_network(network);
1060                         let params = ChainParameters { network, best_block };
1061                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
1062                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
1063                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone()));
1064                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
1065                         let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), IgnoringMessageHandler{}, keys_manager.clone()));
1066                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
1067                         nodes.push(node);
1068                 }
1069
1070                 for i in 0..num_nodes {
1071                         for j in (i+1)..num_nodes {
1072                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }, true).unwrap();
1073                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }, false).unwrap();
1074                         }
1075                 }
1076
1077                 nodes
1078         }
1079
1080         macro_rules! open_channel {
1081                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1082                         begin_open_channel!($node_a, $node_b, $channel_value);
1083                         let events = $node_a.node.get_and_clear_pending_events();
1084                         assert_eq!(events.len(), 1);
1085                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
1086                         $node_a.node.funding_transaction_generated(&temporary_channel_id, &$node_b.node.get_our_node_id(), tx.clone()).unwrap();
1087                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
1088                         get_event!($node_b, Event::ChannelPending);
1089                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
1090                         get_event!($node_a, Event::ChannelPending);
1091                         tx
1092                 }}
1093         }
1094
1095         macro_rules! begin_open_channel {
1096                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1097                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
1098                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
1099                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
1100                 }}
1101         }
1102
1103         macro_rules! handle_funding_generation_ready {
1104                 ($event: expr, $channel_value: expr) => {{
1105                         match $event {
1106                                 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
1107                                         assert_eq!(channel_value_satoshis, $channel_value);
1108                                         assert_eq!(user_channel_id, 42);
1109
1110                                         let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
1111                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
1112                                         }]};
1113                                         (temporary_channel_id, tx)
1114                                 },
1115                                 _ => panic!("Unexpected event"),
1116                         }
1117                 }}
1118         }
1119
1120         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
1121                 for i in 1..=depth {
1122                         let prev_blockhash = node.best_block.block_hash();
1123                         let height = node.best_block.height() + 1;
1124                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
1125                         let txdata = vec![(0, tx)];
1126                         node.best_block = BestBlock::new(header.block_hash(), height);
1127                         match i {
1128                                 1 => {
1129                                         node.node.transactions_confirmed(&header, &txdata, height);
1130                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
1131                                 },
1132                                 x if x == depth => {
1133                                         node.node.best_block_updated(&header, height);
1134                                         node.chain_monitor.best_block_updated(&header, height);
1135                                 },
1136                                 _ => {},
1137                         }
1138                 }
1139         }
1140         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
1141                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
1142         }
1143
1144         #[test]
1145         fn test_background_processor() {
1146                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
1147                 // updates. Also test that when new updates are available, the manager signals that it needs
1148                 // re-persistence and is successfully re-persisted.
1149                 let nodes = create_nodes(2, "test_background_processor".to_string());
1150
1151                 // Go through the channel creation process so that each node has something to persist. Since
1152                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
1153                 // avoid a race with processing events.
1154                 let tx = open_channel!(nodes[0], nodes[1], 100000);
1155
1156                 // Initiate the background processors to watch each node.
1157                 let data_dir = nodes[0].persister.get_data_dir();
1158                 let persister = Arc::new(Persister::new(data_dir));
1159                 let event_handler = |_: _| {};
1160                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1161
1162                 macro_rules! check_persisted_data {
1163                         ($node: expr, $filepath: expr) => {
1164                                 let mut expected_bytes = Vec::new();
1165                                 loop {
1166                                         expected_bytes.clear();
1167                                         match $node.write(&mut expected_bytes) {
1168                                                 Ok(()) => {
1169                                                         match std::fs::read($filepath) {
1170                                                                 Ok(bytes) => {
1171                                                                         if bytes == expected_bytes {
1172                                                                                 break
1173                                                                         } else {
1174                                                                                 continue
1175                                                                         }
1176                                                                 },
1177                                                                 Err(_) => continue
1178                                                         }
1179                                                 },
1180                                                 Err(e) => panic!("Unexpected error: {}", e)
1181                                         }
1182                                 }
1183                         }
1184                 }
1185
1186                 // Check that the initial channel manager data is persisted as expected.
1187                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
1188                 check_persisted_data!(nodes[0].node, filepath.clone());
1189
1190                 loop {
1191                         if !nodes[0].node.get_persistence_condvar_value() { break }
1192                 }
1193
1194                 // Force-close the channel.
1195                 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
1196
1197                 // Check that the force-close updates are persisted.
1198                 check_persisted_data!(nodes[0].node, filepath.clone());
1199                 loop {
1200                         if !nodes[0].node.get_persistence_condvar_value() { break }
1201                 }
1202
1203                 // Check network graph is persisted
1204                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
1205                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
1206
1207                 // Check scorer is persisted
1208                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
1209                 check_persisted_data!(nodes[0].scorer, filepath.clone());
1210
1211                 if !std::thread::panicking() {
1212                         bg_processor.stop().unwrap();
1213                 }
1214         }
1215
1216         #[test]
1217         fn test_timer_tick_called() {
1218                 // Test that `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`,
1219                 // `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, and
1220                 // `PeerManager::timer_tick_occurred` every `PING_TIMER`.
1221                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
1222                 let data_dir = nodes[0].persister.get_data_dir();
1223                 let persister = Arc::new(Persister::new(data_dir));
1224                 let event_handler = |_: _| {};
1225                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1226                 loop {
1227                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1228                         let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string();
1229                         let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string();
1230                         let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string();
1231                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log_1)).is_some() &&
1232                                 log_entries.get(&("lightning_background_processor".to_string(), desired_log_2)).is_some() &&
1233                                 log_entries.get(&("lightning_background_processor".to_string(), desired_log_3)).is_some() {
1234                                 break
1235                         }
1236                 }
1237
1238                 if !std::thread::panicking() {
1239                         bg_processor.stop().unwrap();
1240                 }
1241         }
1242
1243         #[test]
1244         fn test_channel_manager_persist_error() {
1245                 // Test that if we encounter an error during manager persistence, the thread panics.
1246                 let nodes = create_nodes(2, "test_persist_error".to_string());
1247                 open_channel!(nodes[0], nodes[1], 100000);
1248
1249                 let data_dir = nodes[0].persister.get_data_dir();
1250                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1251                 let event_handler = |_: _| {};
1252                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1253                 match bg_processor.join() {
1254                         Ok(_) => panic!("Expected error persisting manager"),
1255                         Err(e) => {
1256                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1257                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1258                         },
1259                 }
1260         }
1261
1262         #[tokio::test]
1263         #[cfg(feature = "futures")]
1264         async fn test_channel_manager_persist_error_async() {
1265                 // Test that if we encounter an error during manager persistence, the thread panics.
1266                 let nodes = create_nodes(2, "test_persist_error_sync".to_string());
1267                 open_channel!(nodes[0], nodes[1], 100000);
1268
1269                 let data_dir = nodes[0].persister.get_data_dir();
1270                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1271
1272                 let bp_future = super::process_events_async(
1273                         persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
1274                         nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
1275                         Some(nodes[0].scorer.clone()), move |dur: Duration| {
1276                                 Box::pin(async move {
1277                                         tokio::time::sleep(dur).await;
1278                                         false // Never exit
1279                                 })
1280                         }, false,
1281                 );
1282                 match bp_future.await {
1283                         Ok(_) => panic!("Expected error persisting manager"),
1284                         Err(e) => {
1285                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1286                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1287                         },
1288                 }
1289         }
1290
1291         #[test]
1292         fn test_network_graph_persist_error() {
1293                 // Test that if we encounter an error during network graph persistence, an error gets returned.
1294                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
1295                 let data_dir = nodes[0].persister.get_data_dir();
1296                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1297                 let event_handler = |_: _| {};
1298                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1299
1300                 match bg_processor.stop() {
1301                         Ok(_) => panic!("Expected error persisting network graph"),
1302                         Err(e) => {
1303                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1304                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1305                         },
1306                 }
1307         }
1308
1309         #[test]
1310         fn test_scorer_persist_error() {
1311                 // Test that if we encounter an error during scorer persistence, an error gets returned.
1312                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
1313                 let data_dir = nodes[0].persister.get_data_dir();
1314                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1315                 let event_handler = |_: _| {};
1316                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1317
1318                 match bg_processor.stop() {
1319                         Ok(_) => panic!("Expected error persisting scorer"),
1320                         Err(e) => {
1321                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1322                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1323                         },
1324                 }
1325         }
1326
1327         #[test]
1328         fn test_background_event_handling() {
1329                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
1330                 let channel_value = 100000;
1331                 let data_dir = nodes[0].persister.get_data_dir();
1332                 let persister = Arc::new(Persister::new(data_dir.clone()));
1333
1334                 // Set up a background event handler for FundingGenerationReady events.
1335                 let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
1336                 let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
1337                 let event_handler = move |event: Event| match event {
1338                         Event::FundingGenerationReady { .. } => funding_generation_send.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1339                         Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
1340                         Event::ChannelReady { .. } => {},
1341                         _ => panic!("Unexpected event: {:?}", event),
1342                 };
1343
1344                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1345
1346                 // Open a channel and check that the FundingGenerationReady event was handled.
1347                 begin_open_channel!(nodes[0], nodes[1], channel_value);
1348                 let (temporary_channel_id, funding_tx) = funding_generation_recv
1349                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1350                         .expect("FundingGenerationReady not handled within deadline");
1351                 nodes[0].node.funding_transaction_generated(&temporary_channel_id, &nodes[1].node.get_our_node_id(), funding_tx.clone()).unwrap();
1352                 nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, nodes[1].node.get_our_node_id()));
1353                 get_event!(nodes[1], Event::ChannelPending);
1354                 nodes[0].node.handle_funding_signed(&nodes[1].node.get_our_node_id(), &get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, nodes[0].node.get_our_node_id()));
1355                 let _ = channel_pending_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1356                         .expect("ChannelPending not handled within deadline");
1357
1358                 // Confirm the funding transaction.
1359                 confirm_transaction(&mut nodes[0], &funding_tx);
1360                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1361                 confirm_transaction(&mut nodes[1], &funding_tx);
1362                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1363                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1364                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1365                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1366                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1367
1368                 if !std::thread::panicking() {
1369                         bg_processor.stop().unwrap();
1370                 }
1371
1372                 // Set up a background event handler for SpendableOutputs events.
1373                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1374                 let event_handler = move |event: Event| match event {
1375                         Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
1376                         Event::ChannelReady { .. } => {},
1377                         Event::ChannelClosed { .. } => {},
1378                         _ => panic!("Unexpected event: {:?}", event),
1379                 };
1380                 let persister = Arc::new(Persister::new(data_dir));
1381                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1382
1383                 // Force close the channel and check that the SpendableOutputs event was handled.
1384                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1385                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1386                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1387
1388                 let event = receiver
1389                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1390                         .expect("Events not handled within deadline");
1391                 match event {
1392                         Event::SpendableOutputs { .. } => {},
1393                         _ => panic!("Unexpected event: {:?}", event),
1394                 }
1395
1396                 if !std::thread::panicking() {
1397                         bg_processor.stop().unwrap();
1398                 }
1399         }
1400
1401         #[test]
1402         fn test_scorer_persistence() {
1403                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1404                 let data_dir = nodes[0].persister.get_data_dir();
1405                 let persister = Arc::new(Persister::new(data_dir));
1406                 let event_handler = |_: _| {};
1407                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1408
1409                 loop {
1410                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1411                         let expected_log = "Persisting scorer".to_string();
1412                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1413                                 break
1414                         }
1415                 }
1416
1417                 if !std::thread::panicking() {
1418                         bg_processor.stop().unwrap();
1419                 }
1420         }
1421
1422         macro_rules! do_test_not_pruning_network_graph_until_graph_sync_completion {
1423                 ($nodes: expr, $receive: expr, $sleep: expr) => {
1424                         let features = ChannelFeatures::empty();
1425                         $nodes[0].network_graph.add_channel_from_partial_announcement(
1426                                 42, 53, features, $nodes[0].node.get_our_node_id(), $nodes[1].node.get_our_node_id()
1427                         ).expect("Failed to update channel from partial announcement");
1428                         let original_graph_description = $nodes[0].network_graph.to_string();
1429                         assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1430                         assert_eq!($nodes[0].network_graph.read_only().channels().len(), 1);
1431
1432                         loop {
1433                                 $sleep;
1434                                 let log_entries = $nodes[0].logger.lines.lock().unwrap();
1435                                 let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1436                                 if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
1437                                         .unwrap_or(&0) > 1
1438                                 {
1439                                         // Wait until the loop has gone around at least twice.
1440                                         break
1441                                 }
1442                         }
1443
1444                         let initialization_input = vec![
1445                                 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1446                                 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1447                                 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1448                                 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1449                                 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1450                                 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1451                                 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1452                                 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1453                                 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1454                                 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1455                                 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1456                                 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1457                                 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1458                         ];
1459                         $nodes[0].rapid_gossip_sync.update_network_graph_no_std(&initialization_input[..], Some(1642291930)).unwrap();
1460
1461                         // this should have added two channels and pruned the previous one.
1462                         assert_eq!($nodes[0].network_graph.read_only().channels().len(), 2);
1463
1464                         $receive.expect("Network graph not pruned within deadline");
1465
1466                         // all channels should now be pruned
1467                         assert_eq!($nodes[0].network_graph.read_only().channels().len(), 0);
1468                 }
1469         }
1470
1471         #[test]
1472         fn test_not_pruning_network_graph_until_graph_sync_completion() {
1473                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1474
1475                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1476                 let data_dir = nodes[0].persister.get_data_dir();
1477                 let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
1478
1479                 let event_handler = |_: _| {};
1480                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1481
1482                 do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes,
1483                         receiver.recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5)),
1484                         std::thread::sleep(Duration::from_millis(1)));
1485
1486                 background_processor.stop().unwrap();
1487         }
1488
1489         #[tokio::test]
1490         #[cfg(feature = "futures")]
1491         async fn test_not_pruning_network_graph_until_graph_sync_completion_async() {
1492                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1493
1494                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async".to_string());
1495                 let data_dir = nodes[0].persister.get_data_dir();
1496                 let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
1497
1498                 let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
1499                 let bp_future = super::process_events_async(
1500                         persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
1501                         nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
1502                         Some(nodes[0].scorer.clone()), move |dur: Duration| {
1503                                 let mut exit_receiver = exit_receiver.clone();
1504                                 Box::pin(async move {
1505                                         tokio::select! {
1506                                                 _ = tokio::time::sleep(dur) => false,
1507                                                 _ = exit_receiver.changed() => true,
1508                                         }
1509                                 })
1510                         }, false,
1511                 );
1512
1513                 let t1 = tokio::spawn(bp_future);
1514                 let t2 = tokio::spawn(async move {
1515                         do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes, {
1516                                 let mut i = 0;
1517                                 loop {
1518                                         tokio::time::sleep(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER)).await;
1519                                         if let Ok(()) = receiver.try_recv() { break Ok::<(), ()>(()); }
1520                                         assert!(i < 5);
1521                                         i += 1;
1522                                 }
1523                         }, tokio::time::sleep(Duration::from_millis(1)).await);
1524                         exit_sender.send(()).unwrap();
1525                 });
1526                 let (r1, r2) = tokio::join!(t1, t2);
1527                 r1.unwrap().unwrap();
1528                 r2.unwrap()
1529         }
1530
1531         macro_rules! do_test_payment_path_scoring {
1532                 ($nodes: expr, $receive: expr) => {
1533                         // Ensure that we update the scorer when relevant events are processed. In this case, we ensure
1534                         // that we update the scorer upon a payment path succeeding (note that the channel must be
1535                         // public or else we won't score it).
1536                         // A background event handler for FundingGenerationReady events must be hooked up to a
1537                         // running background processor.
1538                         let scored_scid = 4242;
1539                         let secp_ctx = Secp256k1::new();
1540                         let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
1541                         let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
1542
1543                         let path = vec![RouteHop {
1544                                 pubkey: node_1_id,
1545                                 node_features: NodeFeatures::empty(),
1546                                 short_channel_id: scored_scid,
1547                                 channel_features: ChannelFeatures::empty(),
1548                                 fee_msat: 0,
1549                                 cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
1550                         }];
1551
1552                         $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
1553                         $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1554                                 payment_id: None,
1555                                 payment_hash: PaymentHash([42; 32]),
1556                                 payment_failed_permanently: false,
1557                                 failure: PathFailure::OnPath { network_update: None },
1558                                 path: path.clone(),
1559                                 short_channel_id: Some(scored_scid),
1560                         });
1561                         let event = $receive.expect("PaymentPathFailed not handled within deadline");
1562                         match event {
1563                                 Event::PaymentPathFailed { .. } => {},
1564                                 _ => panic!("Unexpected event"),
1565                         }
1566
1567                         // Ensure we'll score payments that were explicitly failed back by the destination as
1568                         // ProbeSuccess.
1569                         $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1570                         $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1571                                 payment_id: None,
1572                                 payment_hash: PaymentHash([42; 32]),
1573                                 payment_failed_permanently: true,
1574                                 failure: PathFailure::OnPath { network_update: None },
1575                                 path: path.clone(),
1576                                 short_channel_id: None,
1577                         });
1578                         let event = $receive.expect("PaymentPathFailed not handled within deadline");
1579                         match event {
1580                                 Event::PaymentPathFailed { .. } => {},
1581                                 _ => panic!("Unexpected event"),
1582                         }
1583
1584                         $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() });
1585                         $nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
1586                                 payment_id: PaymentId([42; 32]),
1587                                 payment_hash: None,
1588                                 path: path.clone(),
1589                         });
1590                         let event = $receive.expect("PaymentPathSuccessful not handled within deadline");
1591                         match event {
1592                                 Event::PaymentPathSuccessful { .. } => {},
1593                                 _ => panic!("Unexpected event"),
1594                         }
1595
1596                         $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1597                         $nodes[0].node.push_pending_event(Event::ProbeSuccessful {
1598                                 payment_id: PaymentId([42; 32]),
1599                                 payment_hash: PaymentHash([42; 32]),
1600                                 path: path.clone(),
1601                         });
1602                         let event = $receive.expect("ProbeSuccessful not handled within deadline");
1603                         match event {
1604                                 Event::ProbeSuccessful  { .. } => {},
1605                                 _ => panic!("Unexpected event"),
1606                         }
1607
1608                         $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() });
1609                         $nodes[0].node.push_pending_event(Event::ProbeFailed {
1610                                 payment_id: PaymentId([42; 32]),
1611                                 payment_hash: PaymentHash([42; 32]),
1612                                 path,
1613                                 short_channel_id: Some(scored_scid),
1614                         });
1615                         let event = $receive.expect("ProbeFailure not handled within deadline");
1616                         match event {
1617                                 Event::ProbeFailed { .. } => {},
1618                                 _ => panic!("Unexpected event"),
1619                         }
1620                 }
1621         }
1622
1623         #[test]
1624         fn test_payment_path_scoring() {
1625                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1626                 let event_handler = move |event: Event| match event {
1627                         Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
1628                         Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
1629                         Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
1630                         Event::ProbeFailed { .. } => sender.send(event).unwrap(),
1631                         _ => panic!("Unexpected event: {:?}", event),
1632                 };
1633
1634                 let nodes = create_nodes(1, "test_payment_path_scoring".to_string());
1635                 let data_dir = nodes[0].persister.get_data_dir();
1636                 let persister = Arc::new(Persister::new(data_dir));
1637                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1638
1639                 do_test_payment_path_scoring!(nodes, receiver.recv_timeout(Duration::from_secs(EVENT_DEADLINE)));
1640
1641                 if !std::thread::panicking() {
1642                         bg_processor.stop().unwrap();
1643                 }
1644         }
1645
1646         #[tokio::test]
1647         #[cfg(feature = "futures")]
1648         async fn test_payment_path_scoring_async() {
1649                 let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
1650                 let event_handler = move |event: Event| {
1651                         let sender_ref = sender.clone();
1652                         async move {
1653                                 match event {
1654                                         Event::PaymentPathFailed { .. } => { sender_ref.send(event).await.unwrap() },
1655                                         Event::PaymentPathSuccessful { .. } => { sender_ref.send(event).await.unwrap() },
1656                                         Event::ProbeSuccessful { .. } => { sender_ref.send(event).await.unwrap() },
1657                                         Event::ProbeFailed { .. } => { sender_ref.send(event).await.unwrap() },
1658                                         _ => panic!("Unexpected event: {:?}", event),
1659                                 }
1660                         }
1661                 };
1662
1663                 let nodes = create_nodes(1, "test_payment_path_scoring_async".to_string());
1664                 let data_dir = nodes[0].persister.get_data_dir();
1665                 let persister = Arc::new(Persister::new(data_dir));
1666
1667                 let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
1668
1669                 let bp_future = super::process_events_async(
1670                         persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
1671                         nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
1672                         Some(nodes[0].scorer.clone()), move |dur: Duration| {
1673                                 let mut exit_receiver = exit_receiver.clone();
1674                                 Box::pin(async move {
1675                                         tokio::select! {
1676                                                 _ = tokio::time::sleep(dur) => false,
1677                                                 _ = exit_receiver.changed() => true,
1678                                         }
1679                                 })
1680                         }, false,
1681                 );
1682                 let t1 = tokio::spawn(bp_future);
1683                 let t2 = tokio::spawn(async move {
1684                         do_test_payment_path_scoring!(nodes, receiver.recv().await);
1685                         exit_sender.send(()).unwrap();
1686                 });
1687
1688                 let (r1, r2) = tokio::join!(t1, t2);
1689                 r1.unwrap().unwrap();
1690                 r2.unwrap()
1691         }
1692 }