Score payment paths in BackgroundProcessor
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
8
9 #![deny(missing_docs)]
10 #![deny(unsafe_code)]
11
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
13
14 #![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
15
16 #[cfg(any(test, feature = "std"))]
17 extern crate core;
18
19 #[cfg(not(feature = "std"))]
20 extern crate alloc;
21
22 #[macro_use] extern crate lightning;
23 extern crate lightning_rapid_gossip_sync;
24
25 use lightning::chain;
26 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
27 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
28 use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider};
29 use lightning::ln::channelmanager::ChannelManager;
30 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
31 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
32 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
33 use lightning::routing::router::Router;
34 use lightning::routing::scoring::{Score, WriteableScore};
35 use lightning::util::events::{Event, EventHandler, EventsProvider};
36 use lightning::util::logger::Logger;
37 use lightning::util::persist::Persister;
38 use lightning_rapid_gossip_sync::RapidGossipSync;
39 use lightning::io;
40
41 use core::ops::Deref;
42 use core::time::Duration;
43
44 #[cfg(feature = "std")]
45 use std::sync::Arc;
46 #[cfg(feature = "std")]
47 use core::sync::atomic::{AtomicBool, Ordering};
48 #[cfg(feature = "std")]
49 use std::thread::{self, JoinHandle};
50 #[cfg(feature = "std")]
51 use std::time::Instant;
52
53 #[cfg(feature = "futures")]
54 use futures_util::{select_biased, future::FutureExt, task};
55 #[cfg(not(feature = "std"))]
56 use alloc::vec::Vec;
57
58 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
59 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
60 /// responsibilities are:
61 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
62 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
63 ///   writing it to disk/backups by invoking the callback given to it at startup.
64 ///   [`ChannelManager`] persistence should be done in the background.
65 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
66 ///   at the appropriate intervals.
67 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
68 ///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
69 ///
70 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
71 /// upon as doing so may result in high latency.
72 ///
73 /// # Note
74 ///
75 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
76 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
77 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
78 /// unilateral chain closure fees are at risk.
79 ///
80 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
81 /// [`Event`]: lightning::util::events::Event
82 #[cfg(feature = "std")]
83 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
84 pub struct BackgroundProcessor {
85         stop_thread: Arc<AtomicBool>,
86         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
87 }
88
89 #[cfg(not(test))]
90 const FRESHNESS_TIMER: u64 = 60;
91 #[cfg(test)]
92 const FRESHNESS_TIMER: u64 = 1;
93
94 #[cfg(all(not(test), not(debug_assertions)))]
95 const PING_TIMER: u64 = 10;
96 /// Signature operations take a lot longer without compiler optimisations.
97 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
98 /// timeout is reached.
99 #[cfg(all(not(test), debug_assertions))]
100 const PING_TIMER: u64 = 30;
101 #[cfg(test)]
102 const PING_TIMER: u64 = 1;
103
104 /// Prune the network graph of stale entries hourly.
105 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
106
107 #[cfg(not(test))]
108 const SCORER_PERSIST_TIMER: u64 = 30;
109 #[cfg(test)]
110 const SCORER_PERSIST_TIMER: u64 = 1;
111
112 #[cfg(not(test))]
113 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
114 #[cfg(test)]
115 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
116
117 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
118 pub enum GossipSync<
119         P: Deref<Target = P2PGossipSync<G, A, L>>,
120         R: Deref<Target = RapidGossipSync<G, L>>,
121         G: Deref<Target = NetworkGraph<L>>,
122         A: Deref,
123         L: Deref,
124 >
125 where A::Target: chain::Access, L::Target: Logger {
126         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
127         P2P(P),
128         /// Rapid gossip sync from a trusted server.
129         Rapid(R),
130         /// No gossip sync.
131         None,
132 }
133
134 impl<
135         P: Deref<Target = P2PGossipSync<G, A, L>>,
136         R: Deref<Target = RapidGossipSync<G, L>>,
137         G: Deref<Target = NetworkGraph<L>>,
138         A: Deref,
139         L: Deref,
140 > GossipSync<P, R, G, A, L>
141 where A::Target: chain::Access, L::Target: Logger {
142         fn network_graph(&self) -> Option<&G> {
143                 match self {
144                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
145                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
146                         GossipSync::None => None,
147                 }
148         }
149
150         fn prunable_network_graph(&self) -> Option<&G> {
151                 match self {
152                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
153                         GossipSync::Rapid(gossip_sync) => {
154                                 if gossip_sync.is_initial_sync_complete() {
155                                         Some(gossip_sync.network_graph())
156                                 } else {
157                                         None
158                                 }
159                         },
160                         GossipSync::None => None,
161                 }
162         }
163 }
164
165 /// (C-not exported) as the bindings concretize everything and have constructors for us
166 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
167         GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
168 where
169         A::Target: chain::Access,
170         L::Target: Logger,
171 {
172         /// Initializes a new [`GossipSync::P2P`] variant.
173         pub fn p2p(gossip_sync: P) -> Self {
174                 GossipSync::P2P(gossip_sync)
175         }
176 }
177
178 /// (C-not exported) as the bindings concretize everything and have constructors for us
179 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
180         GossipSync<
181                 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
182                 R,
183                 G,
184                 &'a (dyn chain::Access + Send + Sync),
185                 L,
186         >
187 where
188         L::Target: Logger,
189 {
190         /// Initializes a new [`GossipSync::Rapid`] variant.
191         pub fn rapid(gossip_sync: R) -> Self {
192                 GossipSync::Rapid(gossip_sync)
193         }
194 }
195
196 /// (C-not exported) as the bindings concretize everything and have constructors for us
197 impl<'a, L: Deref>
198         GossipSync<
199                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
200                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
201                 &'a NetworkGraph<L>,
202                 &'a (dyn chain::Access + Send + Sync),
203                 L,
204         >
205 where
206         L::Target: Logger,
207 {
208         /// Initializes a new [`GossipSync::None`] variant.
209         pub fn none() -> Self {
210                 GossipSync::None
211         }
212 }
213
214 fn handle_network_graph_update<L: Deref>(
215         network_graph: &NetworkGraph<L>, event: &Event
216 ) where L::Target: Logger {
217         if let Event::PaymentPathFailed { ref network_update, .. } = event {
218                 if let Some(network_update) = network_update {
219                         network_graph.handle_network_update(&network_update);
220                 }
221         }
222 }
223
224 fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
225         scorer: &'a S, event: &Event
226 ) {
227         let mut score = scorer.lock();
228         match event {
229                 Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
230                         let path = path.iter().collect::<Vec<_>>();
231                         score.payment_path_failed(&path, *scid);
232                 },
233                 Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
234                         // Reached if the destination explicitly failed it back. We treat this as a successful probe
235                         // because the payment made it all the way to the destination with sufficient liquidity.
236                         let path = path.iter().collect::<Vec<_>>();
237                         score.probe_successful(&path);
238                 },
239                 Event::PaymentPathSuccessful { path, .. } => {
240                         let path = path.iter().collect::<Vec<_>>();
241                         score.payment_path_successful(&path);
242                 },
243                 Event::ProbeSuccessful { path, .. } => {
244                         let path = path.iter().collect::<Vec<_>>();
245                         score.probe_successful(&path);
246                 },
247                 Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
248                         let path = path.iter().collect::<Vec<_>>();
249                         score.probe_failed(&path, *scid);
250                 },
251                 _ => {},
252         }
253 }
254
255 macro_rules! define_run_body {
256         ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
257          $channel_manager: ident, $process_channel_manager_events: expr,
258          $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
259          $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
260         => { {
261                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
262                 $channel_manager.timer_tick_occurred();
263
264                 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
265                 let mut last_ping_call = $get_timer(PING_TIMER);
266                 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
267                 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
268                 let mut have_pruned = false;
269
270                 loop {
271                         $process_channel_manager_events;
272                         $process_chain_monitor_events;
273
274                         // Note that the PeerManager::process_events may block on ChannelManager's locks,
275                         // hence it comes last here. When the ChannelManager finishes whatever it's doing,
276                         // we want to ensure we get into `persist_manager` as quickly as we can, especially
277                         // without running the normal event processing above and handing events to users.
278                         //
279                         // Specifically, on an *extremely* slow machine, we may see ChannelManager start
280                         // processing a message effectively at any point during this loop. In order to
281                         // minimize the time between such processing completing and persisting the updated
282                         // ChannelManager, we want to minimize methods blocking on a ChannelManager
283                         // generally, and as a fallback place such blocking only immediately before
284                         // persistence.
285                         $peer_manager.process_events();
286
287                         // We wait up to 100ms, but track how long it takes to detect being put to sleep,
288                         // see `await_start`'s use below.
289                         let mut await_start = $get_timer(1);
290                         let updates_available = $await;
291                         let await_slow = $timer_elapsed(&mut await_start, 1);
292
293                         if updates_available {
294                                 log_trace!($logger, "Persisting ChannelManager...");
295                                 $persister.persist_manager(&*$channel_manager)?;
296                                 log_trace!($logger, "Done persisting ChannelManager.");
297                         }
298                         // Exit the loop if the background processor was requested to stop.
299                         if $loop_exit_check {
300                                 log_trace!($logger, "Terminating background processor.");
301                                 break;
302                         }
303                         if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
304                                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
305                                 $channel_manager.timer_tick_occurred();
306                                 last_freshness_call = $get_timer(FRESHNESS_TIMER);
307                         }
308                         if await_slow {
309                                 // On various platforms, we may be starved of CPU cycles for several reasons.
310                                 // E.g. on iOS, if we've been in the background, we will be entirely paused.
311                                 // Similarly, if we're on a desktop platform and the device has been asleep, we
312                                 // may not get any cycles.
313                                 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
314                                 // full second, at which point we assume sockets may have been killed (they
315                                 // appear to be at least on some platforms, even if it has only been a second).
316                                 // Note that we have to take care to not get here just because user event
317                                 // processing was slow at the top of the loop. For example, the sample client
318                                 // may call Bitcoin Core RPCs during event handling, which very often takes
319                                 // more than a handful of seconds to complete, and shouldn't disconnect all our
320                                 // peers.
321                                 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
322                                 $peer_manager.disconnect_all_peers();
323                                 last_ping_call = $get_timer(PING_TIMER);
324                         } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
325                                 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
326                                 $peer_manager.timer_tick_occurred();
327                                 last_ping_call = $get_timer(PING_TIMER);
328                         }
329
330                         // Note that we want to run a graph prune once not long after startup before
331                         // falling back to our usual hourly prunes. This avoids short-lived clients never
332                         // pruning their network graph. We run once 60 seconds after startup before
333                         // continuing our normal cadence.
334                         if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
335                                 // The network graph must not be pruned while rapid sync completion is pending
336                                 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
337                                         #[cfg(feature = "std")] {
338                                                 log_trace!($logger, "Pruning and persisting network graph.");
339                                                 network_graph.remove_stale_channels_and_tracking();
340                                         }
341                                         #[cfg(not(feature = "std"))] {
342                                                 log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
343                                                 log_trace!($logger, "Persisting network graph.");
344                                         }
345
346                                         if let Err(e) = $persister.persist_graph(network_graph) {
347                                                 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
348                                         }
349
350                                         last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
351                                         have_pruned = true;
352                                 }
353                         }
354
355                         if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
356                                 if let Some(ref scorer) = $scorer {
357                                         log_trace!($logger, "Persisting scorer");
358                                         if let Err(e) = $persister.persist_scorer(&scorer) {
359                                                 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
360                                         }
361                                 }
362                                 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
363                         }
364                 }
365
366                 // After we exit, ensure we persist the ChannelManager one final time - this avoids
367                 // some races where users quit while channel updates were in-flight, with
368                 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
369                 $persister.persist_manager(&*$channel_manager)?;
370
371                 // Persist Scorer on exit
372                 if let Some(ref scorer) = $scorer {
373                         $persister.persist_scorer(&scorer)?;
374                 }
375
376                 // Persist NetworkGraph on exit
377                 if let Some(network_graph) = $gossip_sync.network_graph() {
378                         $persister.persist_graph(network_graph)?;
379                 }
380
381                 Ok(())
382         } }
383 }
384
385 /// Processes background events in a future.
386 ///
387 /// `sleeper` should return a future which completes in the given amount of time and returns a
388 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
389 /// future which outputs true, the loop will exit and this function's future will complete.
390 ///
391 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
392 ///
393 /// Requires the `futures` feature. Note that while this method is available without the `std`
394 /// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
395 /// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
396 /// manually instead.
397 #[cfg(feature = "futures")]
398 pub async fn process_events_async<
399         'a,
400         CA: 'static + Deref + Send + Sync,
401         CF: 'static + Deref + Send + Sync,
402         CW: 'static + Deref + Send + Sync,
403         T: 'static + Deref + Send + Sync,
404         ES: 'static + Deref + Send + Sync,
405         NS: 'static + Deref + Send + Sync,
406         SP: 'static + Deref + Send + Sync,
407         F: 'static + Deref + Send + Sync,
408         R: 'static + Deref + Send + Sync,
409         G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
410         L: 'static + Deref + Send + Sync,
411         P: 'static + Deref + Send + Sync,
412         Descriptor: 'static + SocketDescriptor + Send + Sync,
413         CMH: 'static + Deref + Send + Sync,
414         RMH: 'static + Deref + Send + Sync,
415         OMH: 'static + Deref + Send + Sync,
416         EventHandlerFuture: core::future::Future<Output = ()>,
417         EventHandler: Fn(Event) -> EventHandlerFuture,
418         PS: 'static + Deref + Send,
419         M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
420         CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
421         PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
422         RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
423         UMH: 'static + Deref + Send + Sync,
424         PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
425         S: 'static + Deref<Target = SC> + Send + Sync,
426         SC: for<'b> WriteableScore<'b>,
427         SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
428         Sleeper: Fn(Duration) -> SleepFuture
429 >(
430         persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
431         gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
432         sleeper: Sleeper,
433 ) -> Result<(), io::Error>
434 where
435         CA::Target: 'static + chain::Access,
436         CF::Target: 'static + chain::Filter,
437         CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
438         T::Target: 'static + BroadcasterInterface,
439         ES::Target: 'static + EntropySource,
440         NS::Target: 'static + NodeSigner,
441         SP::Target: 'static + SignerProvider,
442         F::Target: 'static + FeeEstimator,
443         R::Target: 'static + Router,
444         L::Target: 'static + Logger,
445         P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
446         CMH::Target: 'static + ChannelMessageHandler,
447         OMH::Target: 'static + OnionMessageHandler,
448         RMH::Target: 'static + RoutingMessageHandler,
449         UMH::Target: 'static + CustomMessageHandler,
450         PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
451 {
452         let mut should_break = true;
453         let async_event_handler = |event| {
454                 let network_graph = gossip_sync.network_graph();
455                 let event_handler = &event_handler;
456                 let scorer = &scorer;
457                 async move {
458                         if let Some(network_graph) = network_graph {
459                                 handle_network_graph_update(network_graph, &event)
460                         }
461                         if let Some(ref scorer) = scorer {
462                                 update_scorer(scorer, &event);
463                         }
464                         event_handler(event).await;
465                 }
466         };
467         define_run_body!(persister,
468                 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
469                 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
470                 gossip_sync, peer_manager, logger, scorer, should_break, {
471                         select_biased! {
472                                 _ = channel_manager.get_persistable_update_future().fuse() => true,
473                                 exit = sleeper(Duration::from_millis(100)).fuse() => {
474                                         should_break = exit;
475                                         false
476                                 }
477                         }
478                 }, |t| sleeper(Duration::from_secs(t)),
479                 |fut: &mut SleepFuture, _| {
480                         let mut waker = task::noop_waker();
481                         let mut ctx = task::Context::from_waker(&mut waker);
482                         core::pin::Pin::new(fut).poll(&mut ctx).is_ready()
483                 })
484 }
485
486 #[cfg(feature = "std")]
487 impl BackgroundProcessor {
488         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
489         /// documentation].
490         ///
491         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
492         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
493         /// either [`join`] or [`stop`].
494         ///
495         /// # Data Persistence
496         ///
497         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
498         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
499         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
500         /// provided implementation.
501         ///
502         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
503         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
504         /// See the `lightning-persister` crate for LDK's provided implementation.
505         ///
506         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
507         /// error or call [`join`] and handle any error that may arise. For the latter case,
508         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
509         ///
510         /// # Event Handling
511         ///
512         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
513         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
514         /// functionality implemented by other handlers.
515         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
516         ///
517         /// # Rapid Gossip Sync
518         ///
519         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
520         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
521         /// until the [`RapidGossipSync`] instance completes its first sync.
522         ///
523         /// [top-level documentation]: BackgroundProcessor
524         /// [`join`]: Self::join
525         /// [`stop`]: Self::stop
526         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
527         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
528         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
529         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
530         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
531         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
532         pub fn start<
533                 'a,
534                 CA: 'static + Deref + Send + Sync,
535                 CF: 'static + Deref + Send + Sync,
536                 CW: 'static + Deref + Send + Sync,
537                 T: 'static + Deref + Send + Sync,
538                 ES: 'static + Deref + Send + Sync,
539                 NS: 'static + Deref + Send + Sync,
540                 SP: 'static + Deref + Send + Sync,
541                 F: 'static + Deref + Send + Sync,
542                 R: 'static + Deref + Send + Sync,
543                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
544                 L: 'static + Deref + Send + Sync,
545                 P: 'static + Deref + Send + Sync,
546                 Descriptor: 'static + SocketDescriptor + Send + Sync,
547                 CMH: 'static + Deref + Send + Sync,
548                 OMH: 'static + Deref + Send + Sync,
549                 RMH: 'static + Deref + Send + Sync,
550                 EH: 'static + EventHandler + Send,
551                 PS: 'static + Deref + Send,
552                 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
553                 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
554                 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
555                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
556                 UMH: 'static + Deref + Send + Sync,
557                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
558                 S: 'static + Deref<Target = SC> + Send + Sync,
559                 SC: for <'b> WriteableScore<'b>,
560         >(
561                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
562                 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
563         ) -> Self
564         where
565                 CA::Target: 'static + chain::Access,
566                 CF::Target: 'static + chain::Filter,
567                 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
568                 T::Target: 'static + BroadcasterInterface,
569                 ES::Target: 'static + EntropySource,
570                 NS::Target: 'static + NodeSigner,
571                 SP::Target: 'static + SignerProvider,
572                 F::Target: 'static + FeeEstimator,
573                 R::Target: 'static + Router,
574                 L::Target: 'static + Logger,
575                 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
576                 CMH::Target: 'static + ChannelMessageHandler,
577                 OMH::Target: 'static + OnionMessageHandler,
578                 RMH::Target: 'static + RoutingMessageHandler,
579                 UMH::Target: 'static + CustomMessageHandler,
580                 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
581         {
582                 let stop_thread = Arc::new(AtomicBool::new(false));
583                 let stop_thread_clone = stop_thread.clone();
584                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
585                         let event_handler = |event| {
586                                 let network_graph = gossip_sync.network_graph();
587                                 if let Some(network_graph) = network_graph {
588                                         handle_network_graph_update(network_graph, &event)
589                                 }
590                                 if let Some(ref scorer) = scorer {
591                                         update_scorer(scorer, &event);
592                                 }
593                                 event_handler.handle_event(event);
594                         };
595                         define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
596                                 channel_manager, channel_manager.process_pending_events(&event_handler),
597                                 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
598                                 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)),
599                                 |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
600                 });
601                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
602         }
603
604         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
605         /// [`ChannelManager`].
606         ///
607         /// # Panics
608         ///
609         /// This function panics if the background thread has panicked such as while persisting or
610         /// handling events.
611         ///
612         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
613         pub fn join(mut self) -> Result<(), std::io::Error> {
614                 assert!(self.thread_handle.is_some());
615                 self.join_thread()
616         }
617
618         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
619         /// [`ChannelManager`].
620         ///
621         /// # Panics
622         ///
623         /// This function panics if the background thread has panicked such as while persisting or
624         /// handling events.
625         ///
626         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
627         pub fn stop(mut self) -> Result<(), std::io::Error> {
628                 assert!(self.thread_handle.is_some());
629                 self.stop_and_join_thread()
630         }
631
632         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
633                 self.stop_thread.store(true, Ordering::Release);
634                 self.join_thread()
635         }
636
637         fn join_thread(&mut self) -> Result<(), std::io::Error> {
638                 match self.thread_handle.take() {
639                         Some(handle) => handle.join().unwrap(),
640                         None => Ok(()),
641                 }
642         }
643 }
644
645 #[cfg(feature = "std")]
646 impl Drop for BackgroundProcessor {
647         fn drop(&mut self) {
648                 self.stop_and_join_thread().unwrap();
649         }
650 }
651
652 #[cfg(all(feature = "std", test))]
653 mod tests {
654         use bitcoin::blockdata::block::BlockHeader;
655         use bitcoin::blockdata::constants::genesis_block;
656         use bitcoin::blockdata::locktime::PackedLockTime;
657         use bitcoin::blockdata::transaction::{Transaction, TxOut};
658         use bitcoin::network::constants::Network;
659         use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1};
660         use lightning::chain::{BestBlock, Confirm, chainmonitor};
661         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
662         use lightning::chain::keysinterface::{InMemorySigner, EntropySource, KeysManager};
663         use lightning::chain::transaction::OutPoint;
664         use lightning::get_event_msg;
665         use lightning::ln::PaymentHash;
666         use lightning::ln::channelmanager;
667         use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId};
668         use lightning::ln::features::{ChannelFeatures, NodeFeatures};
669         use lightning::ln::msgs::{ChannelMessageHandler, Init};
670         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
671         use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
672         use lightning::routing::router::{DefaultRouter, RouteHop};
673         use lightning::routing::scoring::{ChannelUsage, Score};
674         use lightning::util::config::UserConfig;
675         use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
676         use lightning::util::ser::Writeable;
677         use lightning::util::test_utils;
678         use lightning::util::persist::KVStorePersister;
679         use lightning_invoice::payment::{InvoicePayer, Retry};
680         use lightning_persister::FilesystemPersister;
681         use std::collections::VecDeque;
682         use std::fs;
683         use std::path::PathBuf;
684         use std::sync::{Arc, Mutex};
685         use std::sync::mpsc::SyncSender;
686         use std::time::Duration;
687         use bitcoin::hashes::Hash;
688         use bitcoin::TxMerkleNode;
689         use lightning_rapid_gossip_sync::RapidGossipSync;
690         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
691
692         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
693
694         #[derive(Clone, Hash, PartialEq, Eq)]
695         struct TestDescriptor{}
696         impl SocketDescriptor for TestDescriptor {
697                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
698                         0
699                 }
700
701                 fn disconnect_socket(&mut self) {}
702         }
703
704         type ChannelManager = channelmanager::ChannelManager<Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<KeysManager>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<DefaultRouter< Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>, Arc<Mutex<TestScorer>>>>, Arc<test_utils::TestLogger>>;
705
706         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
707
708         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
709         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
710
711         struct Node {
712                 node: Arc<ChannelManager>,
713                 p2p_gossip_sync: PGS,
714                 rapid_gossip_sync: RGS,
715                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
716                 chain_monitor: Arc<ChainMonitor>,
717                 persister: Arc<FilesystemPersister>,
718                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
719                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
720                 logger: Arc<test_utils::TestLogger>,
721                 best_block: BestBlock,
722                 scorer: Arc<Mutex<TestScorer>>,
723         }
724
725         impl Node {
726                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
727                         GossipSync::P2P(self.p2p_gossip_sync.clone())
728                 }
729
730                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
731                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
732                 }
733
734                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
735                         GossipSync::None
736                 }
737         }
738
739         impl Drop for Node {
740                 fn drop(&mut self) {
741                         let data_dir = self.persister.get_data_dir();
742                         match fs::remove_dir_all(data_dir.clone()) {
743                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
744                                 _ => {}
745                         }
746                 }
747         }
748
749         struct Persister {
750                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
751                 graph_persistence_notifier: Option<SyncSender<()>>,
752                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
753                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
754                 filesystem_persister: FilesystemPersister,
755         }
756
757         impl Persister {
758                 fn new(data_dir: String) -> Self {
759                         let filesystem_persister = FilesystemPersister::new(data_dir.clone());
760                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
761                 }
762
763                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
764                         Self { graph_error: Some((error, message)), ..self }
765                 }
766
767                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
768                         Self { graph_persistence_notifier: Some(sender), ..self }
769                 }
770
771                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
772                         Self { manager_error: Some((error, message)), ..self }
773                 }
774
775                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
776                         Self { scorer_error: Some((error, message)), ..self }
777                 }
778         }
779
780         impl KVStorePersister for Persister {
781                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
782                         if key == "manager" {
783                                 if let Some((error, message)) = self.manager_error {
784                                         return Err(std::io::Error::new(error, message))
785                                 }
786                         }
787
788                         if key == "network_graph" {
789                                 if let Some(sender) = &self.graph_persistence_notifier {
790                                         sender.send(()).unwrap();
791                                 };
792
793                                 if let Some((error, message)) = self.graph_error {
794                                         return Err(std::io::Error::new(error, message))
795                                 }
796                         }
797
798                         if key == "scorer" {
799                                 if let Some((error, message)) = self.scorer_error {
800                                         return Err(std::io::Error::new(error, message))
801                                 }
802                         }
803
804                         self.filesystem_persister.persist(key, object)
805                 }
806         }
807
808         struct TestScorer {
809                 event_expectations: Option<VecDeque<TestResult>>,
810         }
811
812         #[derive(Debug)]
813         enum TestResult {
814                 PaymentFailure { path: Vec<RouteHop>, short_channel_id: u64 },
815                 PaymentSuccess { path: Vec<RouteHop> },
816                 ProbeFailure { path: Vec<RouteHop> },
817                 ProbeSuccess { path: Vec<RouteHop> },
818         }
819
820         impl TestScorer {
821                 fn new() -> Self {
822                         Self { event_expectations: None }
823                 }
824
825                 fn expect(&mut self, expectation: TestResult) {
826                         self.event_expectations.get_or_insert_with(|| VecDeque::new()).push_back(expectation);
827                 }
828         }
829
830         impl lightning::util::ser::Writeable for TestScorer {
831                 fn write<W: lightning::util::ser::Writer>(&self, _: &mut W) -> Result<(), lightning::io::Error> { Ok(()) }
832         }
833
834         impl Score for TestScorer {
835                 fn channel_penalty_msat(
836                         &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage
837                 ) -> u64 { unimplemented!(); }
838
839                 fn payment_path_failed(&mut self, actual_path: &[&RouteHop], actual_short_channel_id: u64) {
840                         if let Some(expectations) = &mut self.event_expectations {
841                                 match expectations.pop_front().unwrap() {
842                                         TestResult::PaymentFailure { path, short_channel_id } => {
843                                                 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
844                                                 assert_eq!(actual_short_channel_id, short_channel_id);
845                                         },
846                                         TestResult::PaymentSuccess { path } => {
847                                                 panic!("Unexpected successful payment path: {:?}", path)
848                                         },
849                                         TestResult::ProbeFailure { path } => {
850                                                 panic!("Unexpected probe failure: {:?}", path)
851                                         },
852                                         TestResult::ProbeSuccess { path } => {
853                                                 panic!("Unexpected probe success: {:?}", path)
854                                         }
855                                 }
856                         }
857                 }
858
859                 fn payment_path_successful(&mut self, actual_path: &[&RouteHop]) {
860                         if let Some(expectations) = &mut self.event_expectations {
861                                 match expectations.pop_front().unwrap() {
862                                         TestResult::PaymentFailure { path, .. } => {
863                                                 panic!("Unexpected payment path failure: {:?}", path)
864                                         },
865                                         TestResult::PaymentSuccess { path } => {
866                                                 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
867                                         },
868                                         TestResult::ProbeFailure { path } => {
869                                                 panic!("Unexpected probe failure: {:?}", path)
870                                         },
871                                         TestResult::ProbeSuccess { path } => {
872                                                 panic!("Unexpected probe success: {:?}", path)
873                                         }
874                                 }
875                         }
876                 }
877
878                 fn probe_failed(&mut self, actual_path: &[&RouteHop], _: u64) {
879                         if let Some(expectations) = &mut self.event_expectations {
880                                 match expectations.pop_front().unwrap() {
881                                         TestResult::PaymentFailure { path, .. } => {
882                                                 panic!("Unexpected payment path failure: {:?}", path)
883                                         },
884                                         TestResult::PaymentSuccess { path } => {
885                                                 panic!("Unexpected payment path success: {:?}", path)
886                                         },
887                                         TestResult::ProbeFailure { path } => {
888                                                 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
889                                         },
890                                         TestResult::ProbeSuccess { path } => {
891                                                 panic!("Unexpected probe success: {:?}", path)
892                                         }
893                                 }
894                         }
895                 }
896                 fn probe_successful(&mut self, actual_path: &[&RouteHop]) {
897                         if let Some(expectations) = &mut self.event_expectations {
898                                 match expectations.pop_front().unwrap() {
899                                         TestResult::PaymentFailure { path, .. } => {
900                                                 panic!("Unexpected payment path failure: {:?}", path)
901                                         },
902                                         TestResult::PaymentSuccess { path } => {
903                                                 panic!("Unexpected payment path success: {:?}", path)
904                                         },
905                                         TestResult::ProbeFailure { path } => {
906                                                 panic!("Unexpected probe failure: {:?}", path)
907                                         },
908                                         TestResult::ProbeSuccess { path } => {
909                                                 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
910                                         }
911                                 }
912                         }
913                 }
914         }
915
916         impl Drop for TestScorer {
917                 fn drop(&mut self) {
918                         if std::thread::panicking() {
919                                 return;
920                         }
921
922                         if let Some(event_expectations) = &self.event_expectations {
923                                 if !event_expectations.is_empty() {
924                                         panic!("Unsatisfied event expectations: {:?}", event_expectations);
925                                 }
926                         }
927                 }
928         }
929
930         fn get_full_filepath(filepath: String, filename: String) -> String {
931                 let mut path = PathBuf::from(filepath);
932                 path.push(filename);
933                 path.to_str().unwrap().to_string()
934         }
935
936         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
937                 let mut nodes = Vec::new();
938                 for i in 0..num_nodes {
939                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
940                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
941                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
942                         let network = Network::Testnet;
943                         let genesis_block = genesis_block(network);
944                         let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
945                         let scorer = Arc::new(Mutex::new(TestScorer::new()));
946                         let seed = [i as u8; 32];
947                         let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone()));
948                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
949                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
950                         let now = Duration::from_secs(genesis_block.header.time as u64);
951                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
952                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
953                         let best_block = BestBlock::from_genesis(network);
954                         let params = ChainParameters { network, best_block };
955                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
956                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
957                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
958                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
959                         let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), IgnoringMessageHandler{}, keys_manager.clone()));
960                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
961                         nodes.push(node);
962                 }
963
964                 for i in 0..num_nodes {
965                         for j in (i+1)..num_nodes {
966                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }).unwrap();
967                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }).unwrap();
968                         }
969                 }
970
971                 nodes
972         }
973
974         macro_rules! open_channel {
975                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
976                         begin_open_channel!($node_a, $node_b, $channel_value);
977                         let events = $node_a.node.get_and_clear_pending_events();
978                         assert_eq!(events.len(), 1);
979                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
980                         end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
981                         tx
982                 }}
983         }
984
985         macro_rules! begin_open_channel {
986                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
987                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
988                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
989                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
990                 }}
991         }
992
993         macro_rules! handle_funding_generation_ready {
994                 ($event: expr, $channel_value: expr) => {{
995                         match $event {
996                                 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
997                                         assert_eq!(channel_value_satoshis, $channel_value);
998                                         assert_eq!(user_channel_id, 42);
999
1000                                         let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
1001                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
1002                                         }]};
1003                                         (temporary_channel_id, tx)
1004                                 },
1005                                 _ => panic!("Unexpected event"),
1006                         }
1007                 }}
1008         }
1009
1010         macro_rules! end_open_channel {
1011                 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
1012                         $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
1013                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
1014                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
1015                 }}
1016         }
1017
1018         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
1019                 for i in 1..=depth {
1020                         let prev_blockhash = node.best_block.block_hash();
1021                         let height = node.best_block.height() + 1;
1022                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
1023                         let txdata = vec![(0, tx)];
1024                         node.best_block = BestBlock::new(header.block_hash(), height);
1025                         match i {
1026                                 1 => {
1027                                         node.node.transactions_confirmed(&header, &txdata, height);
1028                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
1029                                 },
1030                                 x if x == depth => {
1031                                         node.node.best_block_updated(&header, height);
1032                                         node.chain_monitor.best_block_updated(&header, height);
1033                                 },
1034                                 _ => {},
1035                         }
1036                 }
1037         }
1038         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
1039                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
1040         }
1041
1042         #[test]
1043         fn test_background_processor() {
1044                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
1045                 // updates. Also test that when new updates are available, the manager signals that it needs
1046                 // re-persistence and is successfully re-persisted.
1047                 let nodes = create_nodes(2, "test_background_processor".to_string());
1048
1049                 // Go through the channel creation process so that each node has something to persist. Since
1050                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
1051                 // avoid a race with processing events.
1052                 let tx = open_channel!(nodes[0], nodes[1], 100000);
1053
1054                 // Initiate the background processors to watch each node.
1055                 let data_dir = nodes[0].persister.get_data_dir();
1056                 let persister = Arc::new(Persister::new(data_dir));
1057                 let event_handler = |_: _| {};
1058                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1059
1060                 macro_rules! check_persisted_data {
1061                         ($node: expr, $filepath: expr) => {
1062                                 let mut expected_bytes = Vec::new();
1063                                 loop {
1064                                         expected_bytes.clear();
1065                                         match $node.write(&mut expected_bytes) {
1066                                                 Ok(()) => {
1067                                                         match std::fs::read($filepath) {
1068                                                                 Ok(bytes) => {
1069                                                                         if bytes == expected_bytes {
1070                                                                                 break
1071                                                                         } else {
1072                                                                                 continue
1073                                                                         }
1074                                                                 },
1075                                                                 Err(_) => continue
1076                                                         }
1077                                                 },
1078                                                 Err(e) => panic!("Unexpected error: {}", e)
1079                                         }
1080                                 }
1081                         }
1082                 }
1083
1084                 // Check that the initial channel manager data is persisted as expected.
1085                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
1086                 check_persisted_data!(nodes[0].node, filepath.clone());
1087
1088                 loop {
1089                         if !nodes[0].node.get_persistence_condvar_value() { break }
1090                 }
1091
1092                 // Force-close the channel.
1093                 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
1094
1095                 // Check that the force-close updates are persisted.
1096                 check_persisted_data!(nodes[0].node, filepath.clone());
1097                 loop {
1098                         if !nodes[0].node.get_persistence_condvar_value() { break }
1099                 }
1100
1101                 // Check network graph is persisted
1102                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
1103                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
1104
1105                 // Check scorer is persisted
1106                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
1107                 check_persisted_data!(nodes[0].scorer, filepath.clone());
1108
1109                 assert!(bg_processor.stop().is_ok());
1110         }
1111
1112         #[test]
1113         fn test_timer_tick_called() {
1114                 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
1115                 // `FRESHNESS_TIMER`.
1116                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
1117                 let data_dir = nodes[0].persister.get_data_dir();
1118                 let persister = Arc::new(Persister::new(data_dir));
1119                 let event_handler = |_: _| {};
1120                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1121                 loop {
1122                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1123                         let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
1124                         let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
1125                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
1126                                         log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
1127                                 break
1128                         }
1129                 }
1130
1131                 assert!(bg_processor.stop().is_ok());
1132         }
1133
1134         #[test]
1135         fn test_channel_manager_persist_error() {
1136                 // Test that if we encounter an error during manager persistence, the thread panics.
1137                 let nodes = create_nodes(2, "test_persist_error".to_string());
1138                 open_channel!(nodes[0], nodes[1], 100000);
1139
1140                 let data_dir = nodes[0].persister.get_data_dir();
1141                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1142                 let event_handler = |_: _| {};
1143                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1144                 match bg_processor.join() {
1145                         Ok(_) => panic!("Expected error persisting manager"),
1146                         Err(e) => {
1147                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1148                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1149                         },
1150                 }
1151         }
1152
1153         #[test]
1154         fn test_network_graph_persist_error() {
1155                 // Test that if we encounter an error during network graph persistence, an error gets returned.
1156                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
1157                 let data_dir = nodes[0].persister.get_data_dir();
1158                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1159                 let event_handler = |_: _| {};
1160                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1161
1162                 match bg_processor.stop() {
1163                         Ok(_) => panic!("Expected error persisting network graph"),
1164                         Err(e) => {
1165                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1166                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1167                         },
1168                 }
1169         }
1170
1171         #[test]
1172         fn test_scorer_persist_error() {
1173                 // Test that if we encounter an error during scorer persistence, an error gets returned.
1174                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
1175                 let data_dir = nodes[0].persister.get_data_dir();
1176                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1177                 let event_handler = |_: _| {};
1178                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1179
1180                 match bg_processor.stop() {
1181                         Ok(_) => panic!("Expected error persisting scorer"),
1182                         Err(e) => {
1183                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1184                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1185                         },
1186                 }
1187         }
1188
1189         #[test]
1190         fn test_background_event_handling() {
1191                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
1192                 let channel_value = 100000;
1193                 let data_dir = nodes[0].persister.get_data_dir();
1194                 let persister = Arc::new(Persister::new(data_dir.clone()));
1195
1196                 // Set up a background event handler for FundingGenerationReady events.
1197                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1198                 let event_handler = move |event: Event| match event {
1199                         Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1200                         Event::ChannelReady { .. } => {},
1201                         _ => panic!("Unexpected event: {:?}", event),
1202                 };
1203
1204                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1205
1206                 // Open a channel and check that the FundingGenerationReady event was handled.
1207                 begin_open_channel!(nodes[0], nodes[1], channel_value);
1208                 let (temporary_channel_id, funding_tx) = receiver
1209                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1210                         .expect("FundingGenerationReady not handled within deadline");
1211                 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1212
1213                 // Confirm the funding transaction.
1214                 confirm_transaction(&mut nodes[0], &funding_tx);
1215                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1216                 confirm_transaction(&mut nodes[1], &funding_tx);
1217                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1218                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1219                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1220                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1221                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1222
1223                 assert!(bg_processor.stop().is_ok());
1224
1225                 // Set up a background event handler for SpendableOutputs events.
1226                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1227                 let event_handler = move |event: Event| match event {
1228                         Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
1229                         Event::ChannelReady { .. } => {},
1230                         Event::ChannelClosed { .. } => {},
1231                         _ => panic!("Unexpected event: {:?}", event),
1232                 };
1233                 let persister = Arc::new(Persister::new(data_dir));
1234                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1235
1236                 // Force close the channel and check that the SpendableOutputs event was handled.
1237                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1238                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1239                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1240
1241                 let event = receiver
1242                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1243                         .expect("Events not handled within deadline");
1244                 match event {
1245                         Event::SpendableOutputs { .. } => {},
1246                         _ => panic!("Unexpected event: {:?}", event),
1247                 }
1248
1249                 assert!(bg_processor.stop().is_ok());
1250         }
1251
1252         #[test]
1253         fn test_scorer_persistence() {
1254                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1255                 let data_dir = nodes[0].persister.get_data_dir();
1256                 let persister = Arc::new(Persister::new(data_dir));
1257                 let event_handler = |_: _| {};
1258                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1259
1260                 loop {
1261                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1262                         let expected_log = "Persisting scorer".to_string();
1263                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1264                                 break
1265                         }
1266                 }
1267
1268                 assert!(bg_processor.stop().is_ok());
1269         }
1270
1271         #[test]
1272         fn test_not_pruning_network_graph_until_graph_sync_completion() {
1273                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1274                 let data_dir = nodes[0].persister.get_data_dir();
1275                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1276                 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1277                 let network_graph = nodes[0].network_graph.clone();
1278                 let features = ChannelFeatures::empty();
1279                 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1280                         .expect("Failed to update channel from partial announcement");
1281                 let original_graph_description = network_graph.to_string();
1282                 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1283                 assert_eq!(network_graph.read_only().channels().len(), 1);
1284
1285                 let event_handler = |_: _| {};
1286                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1287
1288                 loop {
1289                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1290                         let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1291                         if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
1292                                 .unwrap_or(&0) > 1
1293                         {
1294                                 // Wait until the loop has gone around at least twice.
1295                                 break
1296                         }
1297                 }
1298
1299                 let initialization_input = vec![
1300                         76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1301                         79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1302                         0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1303                         187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1304                         157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1305                         88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1306                         204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1307                         181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1308                         110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1309                         76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1310                         226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1311                         0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1312                         0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1313                 ];
1314                 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1315
1316                 // this should have added two channels
1317                 assert_eq!(network_graph.read_only().channels().len(), 3);
1318
1319                 let _ = receiver
1320                         .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1321                         .expect("Network graph not pruned within deadline");
1322
1323                 background_processor.stop().unwrap();
1324
1325                 // all channels should now be pruned
1326                 assert_eq!(network_graph.read_only().channels().len(), 0);
1327         }
1328
1329         #[test]
1330         fn test_invoice_payer() {
1331                 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1332                 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1333                 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1334
1335                 // Initiate the background processors to watch each node.
1336                 let data_dir = nodes[0].persister.get_data_dir();
1337                 let persister = Arc::new(Persister::new(data_dir));
1338                 let router = Arc::new(DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer)));
1339                 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: _| {}, Retry::Attempts(2)));
1340                 let event_handler = Arc::clone(&invoice_payer);
1341                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1342                 assert!(bg_processor.stop().is_ok());
1343         }
1344
1345         #[test]
1346         fn test_payment_path_scoring() {
1347                 // Ensure that we update the scorer when relevant events are processed. In this case, we ensure
1348                 // that we update the scorer upon a payment path succeeding (note that the channel must be
1349                 // public or else we won't score it).
1350                 // Set up a background event handler for FundingGenerationReady events.
1351                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1352                 let event_handler = move |event: Event| match event {
1353                         Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
1354                         Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
1355                         Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
1356                         Event::ProbeFailed { .. } => sender.send(event).unwrap(),
1357                         _ => panic!("Unexpected event: {:?}", event),
1358                 };
1359
1360                 let nodes = create_nodes(1, "test_payment_path_scoring".to_string());
1361                 let data_dir = nodes[0].persister.get_data_dir();
1362                 let persister = Arc::new(Persister::new(data_dir.clone()));
1363                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1364
1365                 let scored_scid = 4242;
1366                 let secp_ctx = Secp256k1::new();
1367                 let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
1368                 let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
1369
1370                 let path = vec![RouteHop {
1371                         pubkey: node_1_id,
1372                         node_features: NodeFeatures::empty(),
1373                         short_channel_id: scored_scid,
1374                         channel_features: ChannelFeatures::empty(),
1375                         fee_msat: 0,
1376                         cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
1377                 }];
1378
1379                 nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
1380                 nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1381                         payment_id: None,
1382                         payment_hash: PaymentHash([42; 32]),
1383                         payment_failed_permanently: false,
1384                         network_update: None,
1385                         all_paths_failed: true,
1386                         path: path.clone(),
1387                         short_channel_id: Some(scored_scid),
1388                         retry: None,
1389                 });
1390                 let event = receiver
1391                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1392                         .expect("PaymentPathFailed not handled within deadline");
1393                 match event {
1394                         Event::PaymentPathFailed { .. } => {},
1395                         _ => panic!("Unexpected event"),
1396                 }
1397
1398                 // Ensure we'll score payments that were explicitly failed back by the destination as
1399                 // ProbeSuccess.
1400                 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1401                 nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1402                         payment_id: None,
1403                         payment_hash: PaymentHash([42; 32]),
1404                         payment_failed_permanently: true,
1405                         network_update: None,
1406                         all_paths_failed: true,
1407                         path: path.clone(),
1408                         short_channel_id: None,
1409                         retry: None,
1410                 });
1411                 let event = receiver
1412                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1413                         .expect("PaymentPathFailed not handled within deadline");
1414                 match event {
1415                         Event::PaymentPathFailed { .. } => {},
1416                         _ => panic!("Unexpected event"),
1417                 }
1418
1419                 nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() });
1420                 nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
1421                         payment_id: PaymentId([42; 32]),
1422                         payment_hash: None,
1423                         path: path.clone(),
1424                 });
1425                 let event = receiver
1426                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1427                         .expect("PaymentPathSuccessful not handled within deadline");
1428                 match event {
1429                         Event::PaymentPathSuccessful { .. } => {},
1430                         _ => panic!("Unexpected event"),
1431                 }
1432
1433                 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1434                 nodes[0].node.push_pending_event(Event::ProbeSuccessful {
1435                         payment_id: PaymentId([42; 32]),
1436                         payment_hash: PaymentHash([42; 32]),
1437                         path: path.clone(),
1438                 });
1439                 let event = receiver
1440                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1441                         .expect("ProbeSuccessful not handled within deadline");
1442                 match event {
1443                         Event::ProbeSuccessful  { .. } => {},
1444                         _ => panic!("Unexpected event"),
1445                 }
1446
1447                 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() });
1448                 nodes[0].node.push_pending_event(Event::ProbeFailed {
1449                         payment_id: PaymentId([42; 32]),
1450                         payment_hash: PaymentHash([42; 32]),
1451                         path: path.clone(),
1452                         short_channel_id: Some(scored_scid),
1453                 });
1454                 let event = receiver
1455                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1456                         .expect("ProbeFailure not handled within deadline");
1457                 match event {
1458                         Event::ProbeFailed { .. } => {},
1459                         _ => panic!("Unexpected event"),
1460                 }
1461
1462                 assert!(bg_processor.stop().is_ok());
1463         }
1464 }