af9fbcd15f5114a985025fa588a58d2e8e6fc3f4
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
8
9 #![deny(missing_docs)]
10 #![cfg_attr(not(feature = "futures"), deny(unsafe_code))]
11
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
13
14 #![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
15
16 #[cfg(any(test, feature = "std"))]
17 extern crate core;
18
19 #[cfg(not(feature = "std"))]
20 extern crate alloc;
21
22 #[macro_use] extern crate lightning;
23 extern crate lightning_rapid_gossip_sync;
24
25 use lightning::chain;
26 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
27 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
28 use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider};
29 use lightning::events::{Event, PathFailure};
30 #[cfg(feature = "std")]
31 use lightning::events::{EventHandler, EventsProvider};
32 use lightning::ln::channelmanager::ChannelManager;
33 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
34 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
35 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
36 use lightning::routing::utxo::UtxoLookup;
37 use lightning::routing::router::Router;
38 use lightning::routing::scoring::{Score, WriteableScore};
39 use lightning::util::logger::Logger;
40 use lightning::util::persist::Persister;
41 #[cfg(feature = "std")]
42 use lightning::util::wakers::Sleeper;
43 use lightning_rapid_gossip_sync::RapidGossipSync;
44
45 use core::ops::Deref;
46 use core::time::Duration;
47
48 #[cfg(feature = "std")]
49 use std::sync::Arc;
50 #[cfg(feature = "std")]
51 use core::sync::atomic::{AtomicBool, Ordering};
52 #[cfg(feature = "std")]
53 use std::thread::{self, JoinHandle};
54 #[cfg(feature = "std")]
55 use std::time::Instant;
56
57 #[cfg(not(feature = "std"))]
58 use alloc::vec::Vec;
59
60 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
61 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
62 /// responsibilities are:
63 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
64 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
65 ///   writing it to disk/backups by invoking the callback given to it at startup.
66 ///   [`ChannelManager`] persistence should be done in the background.
67 /// * Calling [`ChannelManager::timer_tick_occurred`], [`ChainMonitor::rebroadcast_pending_claims`]
68 ///   and [`PeerManager::timer_tick_occurred`] at the appropriate intervals.
69 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
70 ///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
71 ///
72 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
73 /// upon as doing so may result in high latency.
74 ///
75 /// # Note
76 ///
77 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
78 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
79 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
80 /// unilateral chain closure fees are at risk.
81 ///
82 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
83 /// [`Event`]: lightning::events::Event
84 #[cfg(feature = "std")]
85 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
86 pub struct BackgroundProcessor {
87         stop_thread: Arc<AtomicBool>,
88         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
89 }
90
91 #[cfg(not(test))]
92 const FRESHNESS_TIMER: u64 = 60;
93 #[cfg(test)]
94 const FRESHNESS_TIMER: u64 = 1;
95
96 #[cfg(all(not(test), not(debug_assertions)))]
97 const PING_TIMER: u64 = 10;
98 /// Signature operations take a lot longer without compiler optimisations.
99 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
100 /// timeout is reached.
101 #[cfg(all(not(test), debug_assertions))]
102 const PING_TIMER: u64 = 30;
103 #[cfg(test)]
104 const PING_TIMER: u64 = 1;
105
106 /// Prune the network graph of stale entries hourly.
107 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
108
109 #[cfg(not(test))]
110 const SCORER_PERSIST_TIMER: u64 = 30;
111 #[cfg(test)]
112 const SCORER_PERSIST_TIMER: u64 = 1;
113
114 #[cfg(not(test))]
115 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
116 #[cfg(test)]
117 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
118
119 #[cfg(not(test))]
120 const REBROADCAST_TIMER: u64 = 30;
121 #[cfg(test)]
122 const REBROADCAST_TIMER: u64 = 1;
123
124 #[cfg(feature = "futures")]
125 /// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
126 const fn min_u64(a: u64, b: u64) -> u64 { if a < b { a } else { b } }
127 #[cfg(feature = "futures")]
128 const FASTEST_TIMER: u64 = min_u64(min_u64(FRESHNESS_TIMER, PING_TIMER),
129         min_u64(SCORER_PERSIST_TIMER, min_u64(FIRST_NETWORK_PRUNE_TIMER, REBROADCAST_TIMER)));
130
131 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
132 pub enum GossipSync<
133         P: Deref<Target = P2PGossipSync<G, U, L>>,
134         R: Deref<Target = RapidGossipSync<G, L>>,
135         G: Deref<Target = NetworkGraph<L>>,
136         U: Deref,
137         L: Deref,
138 >
139 where U::Target: UtxoLookup, L::Target: Logger {
140         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
141         P2P(P),
142         /// Rapid gossip sync from a trusted server.
143         Rapid(R),
144         /// No gossip sync.
145         None,
146 }
147
148 impl<
149         P: Deref<Target = P2PGossipSync<G, U, L>>,
150         R: Deref<Target = RapidGossipSync<G, L>>,
151         G: Deref<Target = NetworkGraph<L>>,
152         U: Deref,
153         L: Deref,
154 > GossipSync<P, R, G, U, L>
155 where U::Target: UtxoLookup, L::Target: Logger {
156         fn network_graph(&self) -> Option<&G> {
157                 match self {
158                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
159                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
160                         GossipSync::None => None,
161                 }
162         }
163
164         fn prunable_network_graph(&self) -> Option<&G> {
165                 match self {
166                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
167                         GossipSync::Rapid(gossip_sync) => {
168                                 if gossip_sync.is_initial_sync_complete() {
169                                         Some(gossip_sync.network_graph())
170                                 } else {
171                                         None
172                                 }
173                         },
174                         GossipSync::None => None,
175                 }
176         }
177 }
178
179 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
180 impl<P: Deref<Target = P2PGossipSync<G, U, L>>, G: Deref<Target = NetworkGraph<L>>, U: Deref, L: Deref>
181         GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
182 where
183         U::Target: UtxoLookup,
184         L::Target: Logger,
185 {
186         /// Initializes a new [`GossipSync::P2P`] variant.
187         pub fn p2p(gossip_sync: P) -> Self {
188                 GossipSync::P2P(gossip_sync)
189         }
190 }
191
192 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
193 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
194         GossipSync<
195                 &P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
196                 R,
197                 G,
198                 &'a (dyn UtxoLookup + Send + Sync),
199                 L,
200         >
201 where
202         L::Target: Logger,
203 {
204         /// Initializes a new [`GossipSync::Rapid`] variant.
205         pub fn rapid(gossip_sync: R) -> Self {
206                 GossipSync::Rapid(gossip_sync)
207         }
208 }
209
210 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
211 impl<'a, L: Deref>
212         GossipSync<
213                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
214                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
215                 &'a NetworkGraph<L>,
216                 &'a (dyn UtxoLookup + Send + Sync),
217                 L,
218         >
219 where
220         L::Target: Logger,
221 {
222         /// Initializes a new [`GossipSync::None`] variant.
223         pub fn none() -> Self {
224                 GossipSync::None
225         }
226 }
227
228 fn handle_network_graph_update<L: Deref>(
229         network_graph: &NetworkGraph<L>, event: &Event
230 ) where L::Target: Logger {
231         if let Event::PaymentPathFailed {
232                 failure: PathFailure::OnPath { network_update: Some(ref upd) }, .. } = event
233         {
234                 network_graph.handle_network_update(upd);
235         }
236 }
237
238 fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
239         scorer: &'a S, event: &Event
240 ) {
241         let mut score = scorer.lock();
242         match event {
243                 Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
244                         let path = path.iter().collect::<Vec<_>>();
245                         score.payment_path_failed(&path, *scid);
246                 },
247                 Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
248                         // Reached if the destination explicitly failed it back. We treat this as a successful probe
249                         // because the payment made it all the way to the destination with sufficient liquidity.
250                         let path = path.iter().collect::<Vec<_>>();
251                         score.probe_successful(&path);
252                 },
253                 Event::PaymentPathSuccessful { path, .. } => {
254                         let path = path.iter().collect::<Vec<_>>();
255                         score.payment_path_successful(&path);
256                 },
257                 Event::ProbeSuccessful { path, .. } => {
258                         let path = path.iter().collect::<Vec<_>>();
259                         score.probe_successful(&path);
260                 },
261                 Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
262                         let path = path.iter().collect::<Vec<_>>();
263                         score.probe_failed(&path, *scid);
264                 },
265                 _ => {},
266         }
267 }
268
269 macro_rules! define_run_body {
270         ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
271          $channel_manager: ident, $process_channel_manager_events: expr,
272          $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
273          $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr,
274          $check_slow_await: expr)
275         => { {
276                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
277                 $channel_manager.timer_tick_occurred();
278                 log_trace!($logger, "Rebroadcasting monitor's pending claims on startup");
279                 $chain_monitor.rebroadcast_pending_claims();
280
281                 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
282                 let mut last_ping_call = $get_timer(PING_TIMER);
283                 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
284                 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
285                 let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
286                 let mut have_pruned = false;
287
288                 loop {
289                         $process_channel_manager_events;
290                         $process_chain_monitor_events;
291
292                         // Note that the PeerManager::process_events may block on ChannelManager's locks,
293                         // hence it comes last here. When the ChannelManager finishes whatever it's doing,
294                         // we want to ensure we get into `persist_manager` as quickly as we can, especially
295                         // without running the normal event processing above and handing events to users.
296                         //
297                         // Specifically, on an *extremely* slow machine, we may see ChannelManager start
298                         // processing a message effectively at any point during this loop. In order to
299                         // minimize the time between such processing completing and persisting the updated
300                         // ChannelManager, we want to minimize methods blocking on a ChannelManager
301                         // generally, and as a fallback place such blocking only immediately before
302                         // persistence.
303                         $peer_manager.process_events();
304
305                         // We wait up to 100ms, but track how long it takes to detect being put to sleep,
306                         // see `await_start`'s use below.
307                         let mut await_start = None;
308                         if $check_slow_await { await_start = Some($get_timer(1)); }
309                         let updates_available = $await;
310                         let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false };
311
312                         if updates_available {
313                                 log_trace!($logger, "Persisting ChannelManager...");
314                                 $persister.persist_manager(&*$channel_manager)?;
315                                 log_trace!($logger, "Done persisting ChannelManager.");
316                         }
317                         // Exit the loop if the background processor was requested to stop.
318                         if $loop_exit_check {
319                                 log_trace!($logger, "Terminating background processor.");
320                                 break;
321                         }
322                         if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
323                                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
324                                 $channel_manager.timer_tick_occurred();
325                                 last_freshness_call = $get_timer(FRESHNESS_TIMER);
326                         }
327                         if await_slow {
328                                 // On various platforms, we may be starved of CPU cycles for several reasons.
329                                 // E.g. on iOS, if we've been in the background, we will be entirely paused.
330                                 // Similarly, if we're on a desktop platform and the device has been asleep, we
331                                 // may not get any cycles.
332                                 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
333                                 // full second, at which point we assume sockets may have been killed (they
334                                 // appear to be at least on some platforms, even if it has only been a second).
335                                 // Note that we have to take care to not get here just because user event
336                                 // processing was slow at the top of the loop. For example, the sample client
337                                 // may call Bitcoin Core RPCs during event handling, which very often takes
338                                 // more than a handful of seconds to complete, and shouldn't disconnect all our
339                                 // peers.
340                                 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
341                                 $peer_manager.disconnect_all_peers();
342                                 last_ping_call = $get_timer(PING_TIMER);
343                         } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
344                                 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
345                                 $peer_manager.timer_tick_occurred();
346                                 last_ping_call = $get_timer(PING_TIMER);
347                         }
348
349                         // Note that we want to run a graph prune once not long after startup before
350                         // falling back to our usual hourly prunes. This avoids short-lived clients never
351                         // pruning their network graph. We run once 60 seconds after startup before
352                         // continuing our normal cadence.
353                         let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
354                         if $timer_elapsed(&mut last_prune_call, prune_timer) {
355                                 // The network graph must not be pruned while rapid sync completion is pending
356                                 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
357                                         #[cfg(feature = "std")] {
358                                                 log_trace!($logger, "Pruning and persisting network graph.");
359                                                 network_graph.remove_stale_channels_and_tracking();
360                                         }
361                                         #[cfg(not(feature = "std"))] {
362                                                 log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
363                                                 log_trace!($logger, "Persisting network graph.");
364                                         }
365
366                                         if let Err(e) = $persister.persist_graph(network_graph) {
367                                                 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
368                                         }
369
370                                         have_pruned = true;
371                                 }
372                                 let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
373                                 last_prune_call = $get_timer(prune_timer);
374                         }
375
376                         if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
377                                 if let Some(ref scorer) = $scorer {
378                                         log_trace!($logger, "Persisting scorer");
379                                         if let Err(e) = $persister.persist_scorer(&scorer) {
380                                                 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
381                                         }
382                                 }
383                                 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
384                         }
385
386                         if $timer_elapsed(&mut last_rebroadcast_call, REBROADCAST_TIMER) {
387                                 log_trace!($logger, "Rebroadcasting monitor's pending claims");
388                                 $chain_monitor.rebroadcast_pending_claims();
389                                 last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
390                         }
391                 }
392
393                 // After we exit, ensure we persist the ChannelManager one final time - this avoids
394                 // some races where users quit while channel updates were in-flight, with
395                 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
396                 $persister.persist_manager(&*$channel_manager)?;
397
398                 // Persist Scorer on exit
399                 if let Some(ref scorer) = $scorer {
400                         $persister.persist_scorer(&scorer)?;
401                 }
402
403                 // Persist NetworkGraph on exit
404                 if let Some(network_graph) = $gossip_sync.network_graph() {
405                         $persister.persist_graph(network_graph)?;
406                 }
407
408                 Ok(())
409         } }
410 }
411
412 #[cfg(feature = "futures")]
413 pub(crate) mod futures_util {
414         use core::future::Future;
415         use core::task::{Poll, Waker, RawWaker, RawWakerVTable};
416         use core::pin::Pin;
417         use core::marker::Unpin;
418         pub(crate) struct Selector<
419                 A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
420         > {
421                 pub a: A,
422                 pub b: B,
423                 pub c: C,
424         }
425         pub(crate) enum SelectorOutput {
426                 A, B, C(bool),
427         }
428
429         impl<
430                 A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
431         > Future for Selector<A, B, C> {
432                 type Output = SelectorOutput;
433                 fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<SelectorOutput> {
434                         match Pin::new(&mut self.a).poll(ctx) {
435                                 Poll::Ready(()) => { return Poll::Ready(SelectorOutput::A); },
436                                 Poll::Pending => {},
437                         }
438                         match Pin::new(&mut self.b).poll(ctx) {
439                                 Poll::Ready(()) => { return Poll::Ready(SelectorOutput::B); },
440                                 Poll::Pending => {},
441                         }
442                         match Pin::new(&mut self.c).poll(ctx) {
443                                 Poll::Ready(res) => { return Poll::Ready(SelectorOutput::C(res)); },
444                                 Poll::Pending => {},
445                         }
446                         Poll::Pending
447                 }
448         }
449
450         // If we want to poll a future without an async context to figure out if it has completed or
451         // not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
452         // but sadly there's a good bit of boilerplate here.
453         fn dummy_waker_clone(_: *const ()) -> RawWaker { RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE) }
454         fn dummy_waker_action(_: *const ()) { }
455
456         const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
457                 dummy_waker_clone, dummy_waker_action, dummy_waker_action, dummy_waker_action);
458         pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) } }
459 }
460 #[cfg(feature = "futures")]
461 use futures_util::{Selector, SelectorOutput, dummy_waker};
462 #[cfg(feature = "futures")]
463 use core::task;
464
465 /// Processes background events in a future.
466 ///
467 /// `sleeper` should return a future which completes in the given amount of time and returns a
468 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
469 /// future which outputs true, the loop will exit and this function's future will complete.
470 ///
471 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
472 ///
473 /// Requires the `futures` feature. Note that while this method is available without the `std`
474 /// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
475 /// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
476 /// manually instead.
477 ///
478 /// The `mobile_interruptable_platform` flag should be set if we're currently running on a
479 /// mobile device, where we may need to check for interruption of the application regularly. If you
480 /// are unsure, you should set the flag, as the performance impact of it is minimal unless there
481 /// are hundreds or thousands of simultaneous process calls running.
482 #[cfg(feature = "futures")]
483 pub async fn process_events_async<
484         'a,
485         UL: 'static + Deref + Send + Sync,
486         CF: 'static + Deref + Send + Sync,
487         CW: 'static + Deref + Send + Sync,
488         T: 'static + Deref + Send + Sync,
489         ES: 'static + Deref + Send + Sync,
490         NS: 'static + Deref + Send + Sync,
491         SP: 'static + Deref + Send + Sync,
492         F: 'static + Deref + Send + Sync,
493         R: 'static + Deref + Send + Sync,
494         G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
495         L: 'static + Deref + Send + Sync,
496         P: 'static + Deref + Send + Sync,
497         Descriptor: 'static + SocketDescriptor + Send + Sync,
498         CMH: 'static + Deref + Send + Sync,
499         RMH: 'static + Deref + Send + Sync,
500         OMH: 'static + Deref + Send + Sync,
501         EventHandlerFuture: core::future::Future<Output = ()>,
502         EventHandler: Fn(Event) -> EventHandlerFuture,
503         PS: 'static + Deref + Send,
504         M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
505         CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
506         PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
507         RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
508         UMH: 'static + Deref + Send + Sync,
509         PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
510         S: 'static + Deref<Target = SC> + Send + Sync,
511         SC: for<'b> WriteableScore<'b>,
512         SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
513         Sleeper: Fn(Duration) -> SleepFuture
514 >(
515         persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
516         gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
517         sleeper: Sleeper, mobile_interruptable_platform: bool,
518 ) -> Result<(), lightning::io::Error>
519 where
520         UL::Target: 'static + UtxoLookup,
521         CF::Target: 'static + chain::Filter,
522         CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
523         T::Target: 'static + BroadcasterInterface,
524         ES::Target: 'static + EntropySource,
525         NS::Target: 'static + NodeSigner,
526         SP::Target: 'static + SignerProvider,
527         F::Target: 'static + FeeEstimator,
528         R::Target: 'static + Router,
529         L::Target: 'static + Logger,
530         P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
531         CMH::Target: 'static + ChannelMessageHandler,
532         OMH::Target: 'static + OnionMessageHandler,
533         RMH::Target: 'static + RoutingMessageHandler,
534         UMH::Target: 'static + CustomMessageHandler,
535         PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
536 {
537         let mut should_break = false;
538         let async_event_handler = |event| {
539                 let network_graph = gossip_sync.network_graph();
540                 let event_handler = &event_handler;
541                 let scorer = &scorer;
542                 async move {
543                         if let Some(network_graph) = network_graph {
544                                 handle_network_graph_update(network_graph, &event)
545                         }
546                         if let Some(ref scorer) = scorer {
547                                 update_scorer(scorer, &event);
548                         }
549                         event_handler(event).await;
550                 }
551         };
552         define_run_body!(persister,
553                 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
554                 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
555                 gossip_sync, peer_manager, logger, scorer, should_break, {
556                         let fut = Selector {
557                                 a: channel_manager.get_persistable_update_future(),
558                                 b: chain_monitor.get_update_future(),
559                                 c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }),
560                         };
561                         match fut.await {
562                                 SelectorOutput::A => true,
563                                 SelectorOutput::B => false,
564                                 SelectorOutput::C(exit) => {
565                                         should_break = exit;
566                                         false
567                                 }
568                         }
569                 }, |t| sleeper(Duration::from_secs(t)),
570                 |fut: &mut SleepFuture, _| {
571                         let mut waker = dummy_waker();
572                         let mut ctx = task::Context::from_waker(&mut waker);
573                         match core::pin::Pin::new(fut).poll(&mut ctx) {
574                                 task::Poll::Ready(exit) => { should_break = exit; true },
575                                 task::Poll::Pending => false,
576                         }
577                 }, mobile_interruptable_platform)
578 }
579
580 #[cfg(feature = "std")]
581 impl BackgroundProcessor {
582         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
583         /// documentation].
584         ///
585         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
586         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
587         /// either [`join`] or [`stop`].
588         ///
589         /// # Data Persistence
590         ///
591         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
592         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
593         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
594         /// provided implementation.
595         ///
596         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
597         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
598         /// See the `lightning-persister` crate for LDK's provided implementation.
599         ///
600         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
601         /// error or call [`join`] and handle any error that may arise. For the latter case,
602         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
603         ///
604         /// # Event Handling
605         ///
606         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
607         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
608         /// functionality implemented by other handlers.
609         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
610         ///
611         /// # Rapid Gossip Sync
612         ///
613         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
614         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
615         /// until the [`RapidGossipSync`] instance completes its first sync.
616         ///
617         /// [top-level documentation]: BackgroundProcessor
618         /// [`join`]: Self::join
619         /// [`stop`]: Self::stop
620         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
621         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
622         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
623         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
624         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
625         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
626         pub fn start<
627                 'a,
628                 UL: 'static + Deref + Send + Sync,
629                 CF: 'static + Deref + Send + Sync,
630                 CW: 'static + Deref + Send + Sync,
631                 T: 'static + Deref + Send + Sync,
632                 ES: 'static + Deref + Send + Sync,
633                 NS: 'static + Deref + Send + Sync,
634                 SP: 'static + Deref + Send + Sync,
635                 F: 'static + Deref + Send + Sync,
636                 R: 'static + Deref + Send + Sync,
637                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
638                 L: 'static + Deref + Send + Sync,
639                 P: 'static + Deref + Send + Sync,
640                 Descriptor: 'static + SocketDescriptor + Send + Sync,
641                 CMH: 'static + Deref + Send + Sync,
642                 OMH: 'static + Deref + Send + Sync,
643                 RMH: 'static + Deref + Send + Sync,
644                 EH: 'static + EventHandler + Send,
645                 PS: 'static + Deref + Send,
646                 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
647                 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
648                 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
649                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
650                 UMH: 'static + Deref + Send + Sync,
651                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
652                 S: 'static + Deref<Target = SC> + Send + Sync,
653                 SC: for <'b> WriteableScore<'b>,
654         >(
655                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
656                 gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
657         ) -> Self
658         where
659                 UL::Target: 'static + UtxoLookup,
660                 CF::Target: 'static + chain::Filter,
661                 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
662                 T::Target: 'static + BroadcasterInterface,
663                 ES::Target: 'static + EntropySource,
664                 NS::Target: 'static + NodeSigner,
665                 SP::Target: 'static + SignerProvider,
666                 F::Target: 'static + FeeEstimator,
667                 R::Target: 'static + Router,
668                 L::Target: 'static + Logger,
669                 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
670                 CMH::Target: 'static + ChannelMessageHandler,
671                 OMH::Target: 'static + OnionMessageHandler,
672                 RMH::Target: 'static + RoutingMessageHandler,
673                 UMH::Target: 'static + CustomMessageHandler,
674                 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
675         {
676                 let stop_thread = Arc::new(AtomicBool::new(false));
677                 let stop_thread_clone = stop_thread.clone();
678                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
679                         let event_handler = |event| {
680                                 let network_graph = gossip_sync.network_graph();
681                                 if let Some(network_graph) = network_graph {
682                                         handle_network_graph_update(network_graph, &event)
683                                 }
684                                 if let Some(ref scorer) = scorer {
685                                         update_scorer(scorer, &event);
686                                 }
687                                 event_handler.handle_event(event);
688                         };
689                         define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
690                                 channel_manager, channel_manager.process_pending_events(&event_handler),
691                                 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
692                                 Sleeper::from_two_futures(
693                                         channel_manager.get_persistable_update_future(),
694                                         chain_monitor.get_update_future()
695                                 ).wait_timeout(Duration::from_millis(100)),
696                                 |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false)
697                 });
698                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
699         }
700
701         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
702         /// [`ChannelManager`].
703         ///
704         /// # Panics
705         ///
706         /// This function panics if the background thread has panicked such as while persisting or
707         /// handling events.
708         ///
709         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
710         pub fn join(mut self) -> Result<(), std::io::Error> {
711                 assert!(self.thread_handle.is_some());
712                 self.join_thread()
713         }
714
715         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
716         /// [`ChannelManager`].
717         ///
718         /// # Panics
719         ///
720         /// This function panics if the background thread has panicked such as while persisting or
721         /// handling events.
722         ///
723         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
724         pub fn stop(mut self) -> Result<(), std::io::Error> {
725                 assert!(self.thread_handle.is_some());
726                 self.stop_and_join_thread()
727         }
728
729         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
730                 self.stop_thread.store(true, Ordering::Release);
731                 self.join_thread()
732         }
733
734         fn join_thread(&mut self) -> Result<(), std::io::Error> {
735                 match self.thread_handle.take() {
736                         Some(handle) => handle.join().unwrap(),
737                         None => Ok(()),
738                 }
739         }
740 }
741
742 #[cfg(feature = "std")]
743 impl Drop for BackgroundProcessor {
744         fn drop(&mut self) {
745                 self.stop_and_join_thread().unwrap();
746         }
747 }
748
749 #[cfg(all(feature = "std", test))]
750 mod tests {
751         use bitcoin::blockdata::block::BlockHeader;
752         use bitcoin::blockdata::constants::genesis_block;
753         use bitcoin::blockdata::locktime::PackedLockTime;
754         use bitcoin::blockdata::transaction::{Transaction, TxOut};
755         use bitcoin::network::constants::Network;
756         use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1};
757         use lightning::chain::{BestBlock, Confirm, chainmonitor};
758         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
759         use lightning::chain::keysinterface::{InMemorySigner, KeysManager};
760         use lightning::chain::transaction::OutPoint;
761         use lightning::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent};
762         use lightning::{get_event_msg, get_event};
763         use lightning::ln::PaymentHash;
764         use lightning::ln::channelmanager;
765         use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId};
766         use lightning::ln::features::{ChannelFeatures, NodeFeatures};
767         use lightning::ln::msgs::{ChannelMessageHandler, Init};
768         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
769         use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
770         use lightning::routing::router::{DefaultRouter, RouteHop};
771         use lightning::routing::scoring::{ChannelUsage, Score};
772         use lightning::util::config::UserConfig;
773         use lightning::util::ser::Writeable;
774         use lightning::util::test_utils;
775         use lightning::util::persist::KVStorePersister;
776         use lightning_persister::FilesystemPersister;
777         use std::collections::VecDeque;
778         use std::fs;
779         use std::path::PathBuf;
780         use std::sync::{Arc, Mutex};
781         use std::sync::mpsc::SyncSender;
782         use std::time::Duration;
783         use bitcoin::hashes::Hash;
784         use bitcoin::TxMerkleNode;
785         use lightning_rapid_gossip_sync::RapidGossipSync;
786         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
787
788         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
789
790         #[derive(Clone, Hash, PartialEq, Eq)]
791         struct TestDescriptor{}
792         impl SocketDescriptor for TestDescriptor {
793                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
794                         0
795                 }
796
797                 fn disconnect_socket(&mut self) {}
798         }
799
800         type ChannelManager = channelmanager::ChannelManager<Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<KeysManager>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<DefaultRouter< Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>, Arc<Mutex<TestScorer>>>>, Arc<test_utils::TestLogger>>;
801
802         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
803
804         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
805         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
806
807         struct Node {
808                 node: Arc<ChannelManager>,
809                 p2p_gossip_sync: PGS,
810                 rapid_gossip_sync: RGS,
811                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
812                 chain_monitor: Arc<ChainMonitor>,
813                 persister: Arc<FilesystemPersister>,
814                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
815                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
816                 logger: Arc<test_utils::TestLogger>,
817                 best_block: BestBlock,
818                 scorer: Arc<Mutex<TestScorer>>,
819         }
820
821         impl Node {
822                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
823                         GossipSync::P2P(self.p2p_gossip_sync.clone())
824                 }
825
826                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
827                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
828                 }
829
830                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
831                         GossipSync::None
832                 }
833         }
834
835         impl Drop for Node {
836                 fn drop(&mut self) {
837                         let data_dir = self.persister.get_data_dir();
838                         match fs::remove_dir_all(data_dir.clone()) {
839                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
840                                 _ => {}
841                         }
842                 }
843         }
844
845         struct Persister {
846                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
847                 graph_persistence_notifier: Option<SyncSender<()>>,
848                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
849                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
850                 filesystem_persister: FilesystemPersister,
851         }
852
853         impl Persister {
854                 fn new(data_dir: String) -> Self {
855                         let filesystem_persister = FilesystemPersister::new(data_dir);
856                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
857                 }
858
859                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
860                         Self { graph_error: Some((error, message)), ..self }
861                 }
862
863                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
864                         Self { graph_persistence_notifier: Some(sender), ..self }
865                 }
866
867                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
868                         Self { manager_error: Some((error, message)), ..self }
869                 }
870
871                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
872                         Self { scorer_error: Some((error, message)), ..self }
873                 }
874         }
875
876         impl KVStorePersister for Persister {
877                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
878                         if key == "manager" {
879                                 if let Some((error, message)) = self.manager_error {
880                                         return Err(std::io::Error::new(error, message))
881                                 }
882                         }
883
884                         if key == "network_graph" {
885                                 if let Some(sender) = &self.graph_persistence_notifier {
886                                         match sender.send(()) {
887                                                 Ok(()) => {},
888                                                 Err(std::sync::mpsc::SendError(())) => println!("Persister failed to notify as receiver went away."),
889                                         }
890                                 };
891
892                                 if let Some((error, message)) = self.graph_error {
893                                         return Err(std::io::Error::new(error, message))
894                                 }
895                         }
896
897                         if key == "scorer" {
898                                 if let Some((error, message)) = self.scorer_error {
899                                         return Err(std::io::Error::new(error, message))
900                                 }
901                         }
902
903                         self.filesystem_persister.persist(key, object)
904                 }
905         }
906
907         struct TestScorer {
908                 event_expectations: Option<VecDeque<TestResult>>,
909         }
910
911         #[derive(Debug)]
912         enum TestResult {
913                 PaymentFailure { path: Vec<RouteHop>, short_channel_id: u64 },
914                 PaymentSuccess { path: Vec<RouteHop> },
915                 ProbeFailure { path: Vec<RouteHop> },
916                 ProbeSuccess { path: Vec<RouteHop> },
917         }
918
919         impl TestScorer {
920                 fn new() -> Self {
921                         Self { event_expectations: None }
922                 }
923
924                 fn expect(&mut self, expectation: TestResult) {
925                         self.event_expectations.get_or_insert_with(VecDeque::new).push_back(expectation);
926                 }
927         }
928
929         impl lightning::util::ser::Writeable for TestScorer {
930                 fn write<W: lightning::util::ser::Writer>(&self, _: &mut W) -> Result<(), lightning::io::Error> { Ok(()) }
931         }
932
933         impl Score for TestScorer {
934                 fn channel_penalty_msat(
935                         &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage
936                 ) -> u64 { unimplemented!(); }
937
938                 fn payment_path_failed(&mut self, actual_path: &[&RouteHop], actual_short_channel_id: u64) {
939                         if let Some(expectations) = &mut self.event_expectations {
940                                 match expectations.pop_front().unwrap() {
941                                         TestResult::PaymentFailure { path, short_channel_id } => {
942                                                 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
943                                                 assert_eq!(actual_short_channel_id, short_channel_id);
944                                         },
945                                         TestResult::PaymentSuccess { path } => {
946                                                 panic!("Unexpected successful payment path: {:?}", path)
947                                         },
948                                         TestResult::ProbeFailure { path } => {
949                                                 panic!("Unexpected probe failure: {:?}", path)
950                                         },
951                                         TestResult::ProbeSuccess { path } => {
952                                                 panic!("Unexpected probe success: {:?}", path)
953                                         }
954                                 }
955                         }
956                 }
957
958                 fn payment_path_successful(&mut self, actual_path: &[&RouteHop]) {
959                         if let Some(expectations) = &mut self.event_expectations {
960                                 match expectations.pop_front().unwrap() {
961                                         TestResult::PaymentFailure { path, .. } => {
962                                                 panic!("Unexpected payment path failure: {:?}", path)
963                                         },
964                                         TestResult::PaymentSuccess { path } => {
965                                                 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
966                                         },
967                                         TestResult::ProbeFailure { path } => {
968                                                 panic!("Unexpected probe failure: {:?}", path)
969                                         },
970                                         TestResult::ProbeSuccess { path } => {
971                                                 panic!("Unexpected probe success: {:?}", path)
972                                         }
973                                 }
974                         }
975                 }
976
977                 fn probe_failed(&mut self, actual_path: &[&RouteHop], _: u64) {
978                         if let Some(expectations) = &mut self.event_expectations {
979                                 match expectations.pop_front().unwrap() {
980                                         TestResult::PaymentFailure { path, .. } => {
981                                                 panic!("Unexpected payment path failure: {:?}", path)
982                                         },
983                                         TestResult::PaymentSuccess { path } => {
984                                                 panic!("Unexpected payment path success: {:?}", path)
985                                         },
986                                         TestResult::ProbeFailure { path } => {
987                                                 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
988                                         },
989                                         TestResult::ProbeSuccess { path } => {
990                                                 panic!("Unexpected probe success: {:?}", path)
991                                         }
992                                 }
993                         }
994                 }
995                 fn probe_successful(&mut self, actual_path: &[&RouteHop]) {
996                         if let Some(expectations) = &mut self.event_expectations {
997                                 match expectations.pop_front().unwrap() {
998                                         TestResult::PaymentFailure { path, .. } => {
999                                                 panic!("Unexpected payment path failure: {:?}", path)
1000                                         },
1001                                         TestResult::PaymentSuccess { path } => {
1002                                                 panic!("Unexpected payment path success: {:?}", path)
1003                                         },
1004                                         TestResult::ProbeFailure { path } => {
1005                                                 panic!("Unexpected probe failure: {:?}", path)
1006                                         },
1007                                         TestResult::ProbeSuccess { path } => {
1008                                                 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
1009                                         }
1010                                 }
1011                         }
1012                 }
1013         }
1014
1015         impl Drop for TestScorer {
1016                 fn drop(&mut self) {
1017                         if std::thread::panicking() {
1018                                 return;
1019                         }
1020
1021                         if let Some(event_expectations) = &self.event_expectations {
1022                                 if !event_expectations.is_empty() {
1023                                         panic!("Unsatisfied event expectations: {:?}", event_expectations);
1024                                 }
1025                         }
1026                 }
1027         }
1028
1029         fn get_full_filepath(filepath: String, filename: String) -> String {
1030                 let mut path = PathBuf::from(filepath);
1031                 path.push(filename);
1032                 path.to_str().unwrap().to_string()
1033         }
1034
1035         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
1036                 let network = Network::Testnet;
1037                 let mut nodes = Vec::new();
1038                 for i in 0..num_nodes {
1039                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster::new(network));
1040                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
1041                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
1042                         let genesis_block = genesis_block(network);
1043                         let network_graph = Arc::new(NetworkGraph::new(network, logger.clone()));
1044                         let scorer = Arc::new(Mutex::new(TestScorer::new()));
1045                         let seed = [i as u8; 32];
1046                         let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone()));
1047                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
1048                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
1049                         let now = Duration::from_secs(genesis_block.header.time as u64);
1050                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
1051                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
1052                         let best_block = BestBlock::from_network(network);
1053                         let params = ChainParameters { network, best_block };
1054                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
1055                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
1056                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone()));
1057                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
1058                         let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), IgnoringMessageHandler{}, keys_manager.clone()));
1059                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
1060                         nodes.push(node);
1061                 }
1062
1063                 for i in 0..num_nodes {
1064                         for j in (i+1)..num_nodes {
1065                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }, true).unwrap();
1066                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }, false).unwrap();
1067                         }
1068                 }
1069
1070                 nodes
1071         }
1072
1073         macro_rules! open_channel {
1074                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1075                         begin_open_channel!($node_a, $node_b, $channel_value);
1076                         let events = $node_a.node.get_and_clear_pending_events();
1077                         assert_eq!(events.len(), 1);
1078                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
1079                         $node_a.node.funding_transaction_generated(&temporary_channel_id, &$node_b.node.get_our_node_id(), tx.clone()).unwrap();
1080                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
1081                         get_event!($node_b, Event::ChannelPending);
1082                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
1083                         get_event!($node_a, Event::ChannelPending);
1084                         tx
1085                 }}
1086         }
1087
1088         macro_rules! begin_open_channel {
1089                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1090                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
1091                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
1092                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
1093                 }}
1094         }
1095
1096         macro_rules! handle_funding_generation_ready {
1097                 ($event: expr, $channel_value: expr) => {{
1098                         match $event {
1099                                 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
1100                                         assert_eq!(channel_value_satoshis, $channel_value);
1101                                         assert_eq!(user_channel_id, 42);
1102
1103                                         let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
1104                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
1105                                         }]};
1106                                         (temporary_channel_id, tx)
1107                                 },
1108                                 _ => panic!("Unexpected event"),
1109                         }
1110                 }}
1111         }
1112
1113         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
1114                 for i in 1..=depth {
1115                         let prev_blockhash = node.best_block.block_hash();
1116                         let height = node.best_block.height() + 1;
1117                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
1118                         let txdata = vec![(0, tx)];
1119                         node.best_block = BestBlock::new(header.block_hash(), height);
1120                         match i {
1121                                 1 => {
1122                                         node.node.transactions_confirmed(&header, &txdata, height);
1123                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
1124                                 },
1125                                 x if x == depth => {
1126                                         node.node.best_block_updated(&header, height);
1127                                         node.chain_monitor.best_block_updated(&header, height);
1128                                 },
1129                                 _ => {},
1130                         }
1131                 }
1132         }
1133         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
1134                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
1135         }
1136
1137         #[test]
1138         fn test_background_processor() {
1139                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
1140                 // updates. Also test that when new updates are available, the manager signals that it needs
1141                 // re-persistence and is successfully re-persisted.
1142                 let nodes = create_nodes(2, "test_background_processor".to_string());
1143
1144                 // Go through the channel creation process so that each node has something to persist. Since
1145                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
1146                 // avoid a race with processing events.
1147                 let tx = open_channel!(nodes[0], nodes[1], 100000);
1148
1149                 // Initiate the background processors to watch each node.
1150                 let data_dir = nodes[0].persister.get_data_dir();
1151                 let persister = Arc::new(Persister::new(data_dir));
1152                 let event_handler = |_: _| {};
1153                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1154
1155                 macro_rules! check_persisted_data {
1156                         ($node: expr, $filepath: expr) => {
1157                                 let mut expected_bytes = Vec::new();
1158                                 loop {
1159                                         expected_bytes.clear();
1160                                         match $node.write(&mut expected_bytes) {
1161                                                 Ok(()) => {
1162                                                         match std::fs::read($filepath) {
1163                                                                 Ok(bytes) => {
1164                                                                         if bytes == expected_bytes {
1165                                                                                 break
1166                                                                         } else {
1167                                                                                 continue
1168                                                                         }
1169                                                                 },
1170                                                                 Err(_) => continue
1171                                                         }
1172                                                 },
1173                                                 Err(e) => panic!("Unexpected error: {}", e)
1174                                         }
1175                                 }
1176                         }
1177                 }
1178
1179                 // Check that the initial channel manager data is persisted as expected.
1180                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
1181                 check_persisted_data!(nodes[0].node, filepath.clone());
1182
1183                 loop {
1184                         if !nodes[0].node.get_persistence_condvar_value() { break }
1185                 }
1186
1187                 // Force-close the channel.
1188                 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
1189
1190                 // Check that the force-close updates are persisted.
1191                 check_persisted_data!(nodes[0].node, filepath.clone());
1192                 loop {
1193                         if !nodes[0].node.get_persistence_condvar_value() { break }
1194                 }
1195
1196                 // Check network graph is persisted
1197                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
1198                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
1199
1200                 // Check scorer is persisted
1201                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
1202                 check_persisted_data!(nodes[0].scorer, filepath.clone());
1203
1204                 if !std::thread::panicking() {
1205                         bg_processor.stop().unwrap();
1206                 }
1207         }
1208
1209         #[test]
1210         fn test_timer_tick_called() {
1211                 // Test that `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`,
1212                 // `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, and
1213                 // `PeerManager::timer_tick_occurred` every `PING_TIMER`.
1214                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
1215                 let data_dir = nodes[0].persister.get_data_dir();
1216                 let persister = Arc::new(Persister::new(data_dir));
1217                 let event_handler = |_: _| {};
1218                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1219                 loop {
1220                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1221                         let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string();
1222                         let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string();
1223                         let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string();
1224                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log_1)).is_some() &&
1225                                 log_entries.get(&("lightning_background_processor".to_string(), desired_log_2)).is_some() &&
1226                                 log_entries.get(&("lightning_background_processor".to_string(), desired_log_3)).is_some() {
1227                                 break
1228                         }
1229                 }
1230
1231                 if !std::thread::panicking() {
1232                         bg_processor.stop().unwrap();
1233                 }
1234         }
1235
1236         #[test]
1237         fn test_channel_manager_persist_error() {
1238                 // Test that if we encounter an error during manager persistence, the thread panics.
1239                 let nodes = create_nodes(2, "test_persist_error".to_string());
1240                 open_channel!(nodes[0], nodes[1], 100000);
1241
1242                 let data_dir = nodes[0].persister.get_data_dir();
1243                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1244                 let event_handler = |_: _| {};
1245                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1246                 match bg_processor.join() {
1247                         Ok(_) => panic!("Expected error persisting manager"),
1248                         Err(e) => {
1249                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1250                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1251                         },
1252                 }
1253         }
1254
1255         #[tokio::test]
1256         #[cfg(feature = "futures")]
1257         async fn test_channel_manager_persist_error_async() {
1258                 // Test that if we encounter an error during manager persistence, the thread panics.
1259                 let nodes = create_nodes(2, "test_persist_error_sync".to_string());
1260                 open_channel!(nodes[0], nodes[1], 100000);
1261
1262                 let data_dir = nodes[0].persister.get_data_dir();
1263                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1264
1265                 let bp_future = super::process_events_async(
1266                         persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
1267                         nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
1268                         Some(nodes[0].scorer.clone()), move |dur: Duration| {
1269                                 Box::pin(async move {
1270                                         tokio::time::sleep(dur).await;
1271                                         false // Never exit
1272                                 })
1273                         }, false,
1274                 );
1275                 match bp_future.await {
1276                         Ok(_) => panic!("Expected error persisting manager"),
1277                         Err(e) => {
1278                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1279                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1280                         },
1281                 }
1282         }
1283
1284         #[test]
1285         fn test_network_graph_persist_error() {
1286                 // Test that if we encounter an error during network graph persistence, an error gets returned.
1287                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
1288                 let data_dir = nodes[0].persister.get_data_dir();
1289                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1290                 let event_handler = |_: _| {};
1291                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1292
1293                 match bg_processor.stop() {
1294                         Ok(_) => panic!("Expected error persisting network graph"),
1295                         Err(e) => {
1296                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1297                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1298                         },
1299                 }
1300         }
1301
1302         #[test]
1303         fn test_scorer_persist_error() {
1304                 // Test that if we encounter an error during scorer persistence, an error gets returned.
1305                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
1306                 let data_dir = nodes[0].persister.get_data_dir();
1307                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1308                 let event_handler = |_: _| {};
1309                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1310
1311                 match bg_processor.stop() {
1312                         Ok(_) => panic!("Expected error persisting scorer"),
1313                         Err(e) => {
1314                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1315                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1316                         },
1317                 }
1318         }
1319
1320         #[test]
1321         fn test_background_event_handling() {
1322                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
1323                 let channel_value = 100000;
1324                 let data_dir = nodes[0].persister.get_data_dir();
1325                 let persister = Arc::new(Persister::new(data_dir.clone()));
1326
1327                 // Set up a background event handler for FundingGenerationReady events.
1328                 let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
1329                 let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
1330                 let event_handler = move |event: Event| match event {
1331                         Event::FundingGenerationReady { .. } => funding_generation_send.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1332                         Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
1333                         Event::ChannelReady { .. } => {},
1334                         _ => panic!("Unexpected event: {:?}", event),
1335                 };
1336
1337                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1338
1339                 // Open a channel and check that the FundingGenerationReady event was handled.
1340                 begin_open_channel!(nodes[0], nodes[1], channel_value);
1341                 let (temporary_channel_id, funding_tx) = funding_generation_recv
1342                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1343                         .expect("FundingGenerationReady not handled within deadline");
1344                 nodes[0].node.funding_transaction_generated(&temporary_channel_id, &nodes[1].node.get_our_node_id(), funding_tx.clone()).unwrap();
1345                 nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, nodes[1].node.get_our_node_id()));
1346                 get_event!(nodes[1], Event::ChannelPending);
1347                 nodes[0].node.handle_funding_signed(&nodes[1].node.get_our_node_id(), &get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, nodes[0].node.get_our_node_id()));
1348                 let _ = channel_pending_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1349                         .expect("ChannelPending not handled within deadline");
1350
1351                 // Confirm the funding transaction.
1352                 confirm_transaction(&mut nodes[0], &funding_tx);
1353                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1354                 confirm_transaction(&mut nodes[1], &funding_tx);
1355                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1356                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1357                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1358                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1359                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1360
1361                 if !std::thread::panicking() {
1362                         bg_processor.stop().unwrap();
1363                 }
1364
1365                 // Set up a background event handler for SpendableOutputs events.
1366                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1367                 let event_handler = move |event: Event| match event {
1368                         Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
1369                         Event::ChannelReady { .. } => {},
1370                         Event::ChannelClosed { .. } => {},
1371                         _ => panic!("Unexpected event: {:?}", event),
1372                 };
1373                 let persister = Arc::new(Persister::new(data_dir));
1374                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1375
1376                 // Force close the channel and check that the SpendableOutputs event was handled.
1377                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1378                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1379                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1380
1381                 let event = receiver
1382                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1383                         .expect("Events not handled within deadline");
1384                 match event {
1385                         Event::SpendableOutputs { .. } => {},
1386                         _ => panic!("Unexpected event: {:?}", event),
1387                 }
1388
1389                 if !std::thread::panicking() {
1390                         bg_processor.stop().unwrap();
1391                 }
1392         }
1393
1394         #[test]
1395         fn test_scorer_persistence() {
1396                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1397                 let data_dir = nodes[0].persister.get_data_dir();
1398                 let persister = Arc::new(Persister::new(data_dir));
1399                 let event_handler = |_: _| {};
1400                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1401
1402                 loop {
1403                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1404                         let expected_log = "Persisting scorer".to_string();
1405                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1406                                 break
1407                         }
1408                 }
1409
1410                 if !std::thread::panicking() {
1411                         bg_processor.stop().unwrap();
1412                 }
1413         }
1414
1415         macro_rules! do_test_not_pruning_network_graph_until_graph_sync_completion {
1416                 ($nodes: expr, $receive: expr, $sleep: expr) => {
1417                         let features = ChannelFeatures::empty();
1418                         $nodes[0].network_graph.add_channel_from_partial_announcement(
1419                                 42, 53, features, $nodes[0].node.get_our_node_id(), $nodes[1].node.get_our_node_id()
1420                         ).expect("Failed to update channel from partial announcement");
1421                         let original_graph_description = $nodes[0].network_graph.to_string();
1422                         assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1423                         assert_eq!($nodes[0].network_graph.read_only().channels().len(), 1);
1424
1425                         loop {
1426                                 $sleep;
1427                                 let log_entries = $nodes[0].logger.lines.lock().unwrap();
1428                                 let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1429                                 if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
1430                                         .unwrap_or(&0) > 1
1431                                 {
1432                                         // Wait until the loop has gone around at least twice.
1433                                         break
1434                                 }
1435                         }
1436
1437                         let initialization_input = vec![
1438                                 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1439                                 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1440                                 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1441                                 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1442                                 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1443                                 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1444                                 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1445                                 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1446                                 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1447                                 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1448                                 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1449                                 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1450                                 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1451                         ];
1452                         $nodes[0].rapid_gossip_sync.update_network_graph_no_std(&initialization_input[..], Some(1642291930)).unwrap();
1453
1454                         // this should have added two channels and pruned the previous one.
1455                         assert_eq!($nodes[0].network_graph.read_only().channels().len(), 2);
1456
1457                         $receive.expect("Network graph not pruned within deadline");
1458
1459                         // all channels should now be pruned
1460                         assert_eq!($nodes[0].network_graph.read_only().channels().len(), 0);
1461                 }
1462         }
1463
1464         #[test]
1465         fn test_not_pruning_network_graph_until_graph_sync_completion() {
1466                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1467
1468                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1469                 let data_dir = nodes[0].persister.get_data_dir();
1470                 let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
1471
1472                 let event_handler = |_: _| {};
1473                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1474
1475                 do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes,
1476                         receiver.recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5)),
1477                         std::thread::sleep(Duration::from_millis(1)));
1478
1479                 background_processor.stop().unwrap();
1480         }
1481
1482         #[tokio::test]
1483         #[cfg(feature = "futures")]
1484         async fn test_not_pruning_network_graph_until_graph_sync_completion_async() {
1485                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1486
1487                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async".to_string());
1488                 let data_dir = nodes[0].persister.get_data_dir();
1489                 let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
1490
1491                 let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
1492                 let bp_future = super::process_events_async(
1493                         persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
1494                         nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
1495                         Some(nodes[0].scorer.clone()), move |dur: Duration| {
1496                                 let mut exit_receiver = exit_receiver.clone();
1497                                 Box::pin(async move {
1498                                         tokio::select! {
1499                                                 _ = tokio::time::sleep(dur) => false,
1500                                                 _ = exit_receiver.changed() => true,
1501                                         }
1502                                 })
1503                         }, false,
1504                 );
1505
1506                 let t1 = tokio::spawn(bp_future);
1507                 let t2 = tokio::spawn(async move {
1508                         do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes, {
1509                                 let mut i = 0;
1510                                 loop {
1511                                         tokio::time::sleep(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER)).await;
1512                                         if let Ok(()) = receiver.try_recv() { break Ok::<(), ()>(()); }
1513                                         assert!(i < 5);
1514                                         i += 1;
1515                                 }
1516                         }, tokio::time::sleep(Duration::from_millis(1)).await);
1517                         exit_sender.send(()).unwrap();
1518                 });
1519                 let (r1, r2) = tokio::join!(t1, t2);
1520                 r1.unwrap().unwrap();
1521                 r2.unwrap()
1522         }
1523
1524         macro_rules! do_test_payment_path_scoring {
1525                 ($nodes: expr, $receive: expr) => {
1526                         // Ensure that we update the scorer when relevant events are processed. In this case, we ensure
1527                         // that we update the scorer upon a payment path succeeding (note that the channel must be
1528                         // public or else we won't score it).
1529                         // A background event handler for FundingGenerationReady events must be hooked up to a
1530                         // running background processor.
1531                         let scored_scid = 4242;
1532                         let secp_ctx = Secp256k1::new();
1533                         let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
1534                         let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
1535
1536                         let path = vec![RouteHop {
1537                                 pubkey: node_1_id,
1538                                 node_features: NodeFeatures::empty(),
1539                                 short_channel_id: scored_scid,
1540                                 channel_features: ChannelFeatures::empty(),
1541                                 fee_msat: 0,
1542                                 cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
1543                         }];
1544
1545                         $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
1546                         $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1547                                 payment_id: None,
1548                                 payment_hash: PaymentHash([42; 32]),
1549                                 payment_failed_permanently: false,
1550                                 failure: PathFailure::OnPath { network_update: None },
1551                                 path: path.clone(),
1552                                 short_channel_id: Some(scored_scid),
1553                         });
1554                         let event = $receive.expect("PaymentPathFailed not handled within deadline");
1555                         match event {
1556                                 Event::PaymentPathFailed { .. } => {},
1557                                 _ => panic!("Unexpected event"),
1558                         }
1559
1560                         // Ensure we'll score payments that were explicitly failed back by the destination as
1561                         // ProbeSuccess.
1562                         $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1563                         $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1564                                 payment_id: None,
1565                                 payment_hash: PaymentHash([42; 32]),
1566                                 payment_failed_permanently: true,
1567                                 failure: PathFailure::OnPath { network_update: None },
1568                                 path: path.clone(),
1569                                 short_channel_id: None,
1570                         });
1571                         let event = $receive.expect("PaymentPathFailed not handled within deadline");
1572                         match event {
1573                                 Event::PaymentPathFailed { .. } => {},
1574                                 _ => panic!("Unexpected event"),
1575                         }
1576
1577                         $nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() });
1578                         $nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
1579                                 payment_id: PaymentId([42; 32]),
1580                                 payment_hash: None,
1581                                 path: path.clone(),
1582                         });
1583                         let event = $receive.expect("PaymentPathSuccessful not handled within deadline");
1584                         match event {
1585                                 Event::PaymentPathSuccessful { .. } => {},
1586                                 _ => panic!("Unexpected event"),
1587                         }
1588
1589                         $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1590                         $nodes[0].node.push_pending_event(Event::ProbeSuccessful {
1591                                 payment_id: PaymentId([42; 32]),
1592                                 payment_hash: PaymentHash([42; 32]),
1593                                 path: path.clone(),
1594                         });
1595                         let event = $receive.expect("ProbeSuccessful not handled within deadline");
1596                         match event {
1597                                 Event::ProbeSuccessful  { .. } => {},
1598                                 _ => panic!("Unexpected event"),
1599                         }
1600
1601                         $nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() });
1602                         $nodes[0].node.push_pending_event(Event::ProbeFailed {
1603                                 payment_id: PaymentId([42; 32]),
1604                                 payment_hash: PaymentHash([42; 32]),
1605                                 path,
1606                                 short_channel_id: Some(scored_scid),
1607                         });
1608                         let event = $receive.expect("ProbeFailure not handled within deadline");
1609                         match event {
1610                                 Event::ProbeFailed { .. } => {},
1611                                 _ => panic!("Unexpected event"),
1612                         }
1613                 }
1614         }
1615
1616         #[test]
1617         fn test_payment_path_scoring() {
1618                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1619                 let event_handler = move |event: Event| match event {
1620                         Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
1621                         Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
1622                         Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
1623                         Event::ProbeFailed { .. } => sender.send(event).unwrap(),
1624                         _ => panic!("Unexpected event: {:?}", event),
1625                 };
1626
1627                 let nodes = create_nodes(1, "test_payment_path_scoring".to_string());
1628                 let data_dir = nodes[0].persister.get_data_dir();
1629                 let persister = Arc::new(Persister::new(data_dir));
1630                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1631
1632                 do_test_payment_path_scoring!(nodes, receiver.recv_timeout(Duration::from_secs(EVENT_DEADLINE)));
1633
1634                 if !std::thread::panicking() {
1635                         bg_processor.stop().unwrap();
1636                 }
1637         }
1638
1639         #[tokio::test]
1640         #[cfg(feature = "futures")]
1641         async fn test_payment_path_scoring_async() {
1642                 let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
1643                 let event_handler = move |event: Event| {
1644                         let sender_ref = sender.clone();
1645                         async move {
1646                                 match event {
1647                                         Event::PaymentPathFailed { .. } => { sender_ref.send(event).await.unwrap() },
1648                                         Event::PaymentPathSuccessful { .. } => { sender_ref.send(event).await.unwrap() },
1649                                         Event::ProbeSuccessful { .. } => { sender_ref.send(event).await.unwrap() },
1650                                         Event::ProbeFailed { .. } => { sender_ref.send(event).await.unwrap() },
1651                                         _ => panic!("Unexpected event: {:?}", event),
1652                                 }
1653                         }
1654                 };
1655
1656                 let nodes = create_nodes(1, "test_payment_path_scoring_async".to_string());
1657                 let data_dir = nodes[0].persister.get_data_dir();
1658                 let persister = Arc::new(Persister::new(data_dir));
1659
1660                 let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
1661
1662                 let bp_future = super::process_events_async(
1663                         persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(),
1664                         nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
1665                         Some(nodes[0].scorer.clone()), move |dur: Duration| {
1666                                 let mut exit_receiver = exit_receiver.clone();
1667                                 Box::pin(async move {
1668                                         tokio::select! {
1669                                                 _ = tokio::time::sleep(dur) => false,
1670                                                 _ = exit_receiver.changed() => true,
1671                                         }
1672                                 })
1673                         }, false,
1674                 );
1675                 let t1 = tokio::spawn(bp_future);
1676                 let t2 = tokio::spawn(async move {
1677                         do_test_payment_path_scoring!(nodes, receiver.recv().await);
1678                         exit_sender.send(()).unwrap();
1679                 });
1680
1681                 let (r1, r2) = tokio::join!(t1, t2);
1682                 r1.unwrap().unwrap();
1683                 r2.unwrap()
1684         }
1685 }