Throw error for RGS data that's more than two weeks old, fixing #1785
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
8
9 #![deny(missing_docs)]
10 #![deny(unsafe_code)]
11
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
13
14 #![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
15
16 #[cfg(any(test, feature = "std"))]
17 extern crate core;
18
19 #[cfg(not(feature = "std"))]
20 extern crate alloc;
21
22 #[macro_use] extern crate lightning;
23 extern crate lightning_rapid_gossip_sync;
24
25 use lightning::chain;
26 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
27 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
28 use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider};
29 use lightning::ln::channelmanager::ChannelManager;
30 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
31 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
32 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
33 use lightning::routing::utxo::UtxoLookup;
34 use lightning::routing::router::Router;
35 use lightning::routing::scoring::{Score, WriteableScore};
36 use lightning::util::events::{Event, EventHandler, EventsProvider};
37 use lightning::util::logger::Logger;
38 use lightning::util::persist::Persister;
39 use lightning_rapid_gossip_sync::RapidGossipSync;
40 use lightning::io;
41
42 use core::ops::Deref;
43 use core::time::Duration;
44
45 #[cfg(feature = "std")]
46 use std::sync::Arc;
47 #[cfg(feature = "std")]
48 use core::sync::atomic::{AtomicBool, Ordering};
49 #[cfg(feature = "std")]
50 use std::thread::{self, JoinHandle};
51 #[cfg(feature = "std")]
52 use std::time::Instant;
53
54 #[cfg(feature = "futures")]
55 use futures_util::{select_biased, future::FutureExt, task};
56 #[cfg(not(feature = "std"))]
57 use alloc::vec::Vec;
58
59 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
60 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
61 /// responsibilities are:
62 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
63 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
64 ///   writing it to disk/backups by invoking the callback given to it at startup.
65 ///   [`ChannelManager`] persistence should be done in the background.
66 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
67 ///   at the appropriate intervals.
68 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
69 ///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
70 ///
71 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
72 /// upon as doing so may result in high latency.
73 ///
74 /// # Note
75 ///
76 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
77 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
78 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
79 /// unilateral chain closure fees are at risk.
80 ///
81 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
82 /// [`Event`]: lightning::util::events::Event
83 #[cfg(feature = "std")]
84 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
85 pub struct BackgroundProcessor {
86         stop_thread: Arc<AtomicBool>,
87         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
88 }
89
90 #[cfg(not(test))]
91 const FRESHNESS_TIMER: u64 = 60;
92 #[cfg(test)]
93 const FRESHNESS_TIMER: u64 = 1;
94
95 #[cfg(all(not(test), not(debug_assertions)))]
96 const PING_TIMER: u64 = 10;
97 /// Signature operations take a lot longer without compiler optimisations.
98 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
99 /// timeout is reached.
100 #[cfg(all(not(test), debug_assertions))]
101 const PING_TIMER: u64 = 30;
102 #[cfg(test)]
103 const PING_TIMER: u64 = 1;
104
105 /// Prune the network graph of stale entries hourly.
106 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
107
108 #[cfg(not(test))]
109 const SCORER_PERSIST_TIMER: u64 = 30;
110 #[cfg(test)]
111 const SCORER_PERSIST_TIMER: u64 = 1;
112
113 #[cfg(not(test))]
114 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
115 #[cfg(test)]
116 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
117
118 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
119 pub enum GossipSync<
120         P: Deref<Target = P2PGossipSync<G, U, L>>,
121         R: Deref<Target = RapidGossipSync<G, L>>,
122         G: Deref<Target = NetworkGraph<L>>,
123         U: Deref,
124         L: Deref,
125 >
126 where U::Target: UtxoLookup, L::Target: Logger {
127         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
128         P2P(P),
129         /// Rapid gossip sync from a trusted server.
130         Rapid(R),
131         /// No gossip sync.
132         None,
133 }
134
135 impl<
136         P: Deref<Target = P2PGossipSync<G, U, L>>,
137         R: Deref<Target = RapidGossipSync<G, L>>,
138         G: Deref<Target = NetworkGraph<L>>,
139         U: Deref,
140         L: Deref,
141 > GossipSync<P, R, G, U, L>
142 where U::Target: UtxoLookup, L::Target: Logger {
143         fn network_graph(&self) -> Option<&G> {
144                 match self {
145                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
146                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
147                         GossipSync::None => None,
148                 }
149         }
150
151         fn prunable_network_graph(&self) -> Option<&G> {
152                 match self {
153                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
154                         GossipSync::Rapid(gossip_sync) => {
155                                 if gossip_sync.is_initial_sync_complete() {
156                                         Some(gossip_sync.network_graph())
157                                 } else {
158                                         None
159                                 }
160                         },
161                         GossipSync::None => None,
162                 }
163         }
164 }
165
166 /// (C-not exported) as the bindings concretize everything and have constructors for us
167 impl<P: Deref<Target = P2PGossipSync<G, U, L>>, G: Deref<Target = NetworkGraph<L>>, U: Deref, L: Deref>
168         GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
169 where
170         U::Target: UtxoLookup,
171         L::Target: Logger,
172 {
173         /// Initializes a new [`GossipSync::P2P`] variant.
174         pub fn p2p(gossip_sync: P) -> Self {
175                 GossipSync::P2P(gossip_sync)
176         }
177 }
178
179 /// (C-not exported) as the bindings concretize everything and have constructors for us
180 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
181         GossipSync<
182                 &P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
183                 R,
184                 G,
185                 &'a (dyn UtxoLookup + Send + Sync),
186                 L,
187         >
188 where
189         L::Target: Logger,
190 {
191         /// Initializes a new [`GossipSync::Rapid`] variant.
192         pub fn rapid(gossip_sync: R) -> Self {
193                 GossipSync::Rapid(gossip_sync)
194         }
195 }
196
197 /// (C-not exported) as the bindings concretize everything and have constructors for us
198 impl<'a, L: Deref>
199         GossipSync<
200                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
201                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
202                 &'a NetworkGraph<L>,
203                 &'a (dyn UtxoLookup + Send + Sync),
204                 L,
205         >
206 where
207         L::Target: Logger,
208 {
209         /// Initializes a new [`GossipSync::None`] variant.
210         pub fn none() -> Self {
211                 GossipSync::None
212         }
213 }
214
215 fn handle_network_graph_update<L: Deref>(
216         network_graph: &NetworkGraph<L>, event: &Event
217 ) where L::Target: Logger {
218         if let Event::PaymentPathFailed { ref network_update, .. } = event {
219                 if let Some(network_update) = network_update {
220                         network_graph.handle_network_update(&network_update);
221                 }
222         }
223 }
224
225 fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
226         scorer: &'a S, event: &Event
227 ) {
228         let mut score = scorer.lock();
229         match event {
230                 Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
231                         let path = path.iter().collect::<Vec<_>>();
232                         score.payment_path_failed(&path, *scid);
233                 },
234                 Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
235                         // Reached if the destination explicitly failed it back. We treat this as a successful probe
236                         // because the payment made it all the way to the destination with sufficient liquidity.
237                         let path = path.iter().collect::<Vec<_>>();
238                         score.probe_successful(&path);
239                 },
240                 Event::PaymentPathSuccessful { path, .. } => {
241                         let path = path.iter().collect::<Vec<_>>();
242                         score.payment_path_successful(&path);
243                 },
244                 Event::ProbeSuccessful { path, .. } => {
245                         let path = path.iter().collect::<Vec<_>>();
246                         score.probe_successful(&path);
247                 },
248                 Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
249                         let path = path.iter().collect::<Vec<_>>();
250                         score.probe_failed(&path, *scid);
251                 },
252                 _ => {},
253         }
254 }
255
256 macro_rules! define_run_body {
257         ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
258          $channel_manager: ident, $process_channel_manager_events: expr,
259          $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
260          $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
261         => { {
262                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
263                 $channel_manager.timer_tick_occurred();
264
265                 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
266                 let mut last_ping_call = $get_timer(PING_TIMER);
267                 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
268                 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
269                 let mut have_pruned = false;
270
271                 loop {
272                         $process_channel_manager_events;
273                         $process_chain_monitor_events;
274
275                         // Note that the PeerManager::process_events may block on ChannelManager's locks,
276                         // hence it comes last here. When the ChannelManager finishes whatever it's doing,
277                         // we want to ensure we get into `persist_manager` as quickly as we can, especially
278                         // without running the normal event processing above and handing events to users.
279                         //
280                         // Specifically, on an *extremely* slow machine, we may see ChannelManager start
281                         // processing a message effectively at any point during this loop. In order to
282                         // minimize the time between such processing completing and persisting the updated
283                         // ChannelManager, we want to minimize methods blocking on a ChannelManager
284                         // generally, and as a fallback place such blocking only immediately before
285                         // persistence.
286                         $peer_manager.process_events();
287
288                         // We wait up to 100ms, but track how long it takes to detect being put to sleep,
289                         // see `await_start`'s use below.
290                         let mut await_start = $get_timer(1);
291                         let updates_available = $await;
292                         let await_slow = $timer_elapsed(&mut await_start, 1);
293
294                         if updates_available {
295                                 log_trace!($logger, "Persisting ChannelManager...");
296                                 $persister.persist_manager(&*$channel_manager)?;
297                                 log_trace!($logger, "Done persisting ChannelManager.");
298                         }
299                         // Exit the loop if the background processor was requested to stop.
300                         if $loop_exit_check {
301                                 log_trace!($logger, "Terminating background processor.");
302                                 break;
303                         }
304                         if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
305                                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
306                                 $channel_manager.timer_tick_occurred();
307                                 last_freshness_call = $get_timer(FRESHNESS_TIMER);
308                         }
309                         if await_slow {
310                                 // On various platforms, we may be starved of CPU cycles for several reasons.
311                                 // E.g. on iOS, if we've been in the background, we will be entirely paused.
312                                 // Similarly, if we're on a desktop platform and the device has been asleep, we
313                                 // may not get any cycles.
314                                 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
315                                 // full second, at which point we assume sockets may have been killed (they
316                                 // appear to be at least on some platforms, even if it has only been a second).
317                                 // Note that we have to take care to not get here just because user event
318                                 // processing was slow at the top of the loop. For example, the sample client
319                                 // may call Bitcoin Core RPCs during event handling, which very often takes
320                                 // more than a handful of seconds to complete, and shouldn't disconnect all our
321                                 // peers.
322                                 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
323                                 $peer_manager.disconnect_all_peers();
324                                 last_ping_call = $get_timer(PING_TIMER);
325                         } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
326                                 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
327                                 $peer_manager.timer_tick_occurred();
328                                 last_ping_call = $get_timer(PING_TIMER);
329                         }
330
331                         // Note that we want to run a graph prune once not long after startup before
332                         // falling back to our usual hourly prunes. This avoids short-lived clients never
333                         // pruning their network graph. We run once 60 seconds after startup before
334                         // continuing our normal cadence.
335                         if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
336                                 // The network graph must not be pruned while rapid sync completion is pending
337                                 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
338                                         #[cfg(feature = "std")] {
339                                                 log_trace!($logger, "Pruning and persisting network graph.");
340                                                 network_graph.remove_stale_channels_and_tracking();
341                                         }
342                                         #[cfg(not(feature = "std"))] {
343                                                 log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
344                                                 log_trace!($logger, "Persisting network graph.");
345                                         }
346
347                                         if let Err(e) = $persister.persist_graph(network_graph) {
348                                                 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
349                                         }
350
351                                         last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
352                                         have_pruned = true;
353                                 }
354                         }
355
356                         if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
357                                 if let Some(ref scorer) = $scorer {
358                                         log_trace!($logger, "Persisting scorer");
359                                         if let Err(e) = $persister.persist_scorer(&scorer) {
360                                                 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
361                                         }
362                                 }
363                                 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
364                         }
365                 }
366
367                 // After we exit, ensure we persist the ChannelManager one final time - this avoids
368                 // some races where users quit while channel updates were in-flight, with
369                 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
370                 $persister.persist_manager(&*$channel_manager)?;
371
372                 // Persist Scorer on exit
373                 if let Some(ref scorer) = $scorer {
374                         $persister.persist_scorer(&scorer)?;
375                 }
376
377                 // Persist NetworkGraph on exit
378                 if let Some(network_graph) = $gossip_sync.network_graph() {
379                         $persister.persist_graph(network_graph)?;
380                 }
381
382                 Ok(())
383         } }
384 }
385
386 /// Processes background events in a future.
387 ///
388 /// `sleeper` should return a future which completes in the given amount of time and returns a
389 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
390 /// future which outputs true, the loop will exit and this function's future will complete.
391 ///
392 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
393 ///
394 /// Requires the `futures` feature. Note that while this method is available without the `std`
395 /// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
396 /// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
397 /// manually instead.
398 #[cfg(feature = "futures")]
399 pub async fn process_events_async<
400         'a,
401         UL: 'static + Deref + Send + Sync,
402         CF: 'static + Deref + Send + Sync,
403         CW: 'static + Deref + Send + Sync,
404         T: 'static + Deref + Send + Sync,
405         ES: 'static + Deref + Send + Sync,
406         NS: 'static + Deref + Send + Sync,
407         SP: 'static + Deref + Send + Sync,
408         F: 'static + Deref + Send + Sync,
409         R: 'static + Deref + Send + Sync,
410         G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
411         L: 'static + Deref + Send + Sync,
412         P: 'static + Deref + Send + Sync,
413         Descriptor: 'static + SocketDescriptor + Send + Sync,
414         CMH: 'static + Deref + Send + Sync,
415         RMH: 'static + Deref + Send + Sync,
416         OMH: 'static + Deref + Send + Sync,
417         EventHandlerFuture: core::future::Future<Output = ()>,
418         EventHandler: Fn(Event) -> EventHandlerFuture,
419         PS: 'static + Deref + Send,
420         M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
421         CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
422         PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
423         RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
424         UMH: 'static + Deref + Send + Sync,
425         PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
426         S: 'static + Deref<Target = SC> + Send + Sync,
427         SC: for<'b> WriteableScore<'b>,
428         SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
429         Sleeper: Fn(Duration) -> SleepFuture
430 >(
431         persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
432         gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
433         sleeper: Sleeper,
434 ) -> Result<(), io::Error>
435 where
436         UL::Target: 'static + UtxoLookup,
437         CF::Target: 'static + chain::Filter,
438         CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
439         T::Target: 'static + BroadcasterInterface,
440         ES::Target: 'static + EntropySource,
441         NS::Target: 'static + NodeSigner,
442         SP::Target: 'static + SignerProvider,
443         F::Target: 'static + FeeEstimator,
444         R::Target: 'static + Router,
445         L::Target: 'static + Logger,
446         P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
447         CMH::Target: 'static + ChannelMessageHandler,
448         OMH::Target: 'static + OnionMessageHandler,
449         RMH::Target: 'static + RoutingMessageHandler,
450         UMH::Target: 'static + CustomMessageHandler,
451         PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
452 {
453         let mut should_break = true;
454         let async_event_handler = |event| {
455                 let network_graph = gossip_sync.network_graph();
456                 let event_handler = &event_handler;
457                 let scorer = &scorer;
458                 async move {
459                         if let Some(network_graph) = network_graph {
460                                 handle_network_graph_update(network_graph, &event)
461                         }
462                         if let Some(ref scorer) = scorer {
463                                 update_scorer(scorer, &event);
464                         }
465                         event_handler(event).await;
466                 }
467         };
468         define_run_body!(persister,
469                 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
470                 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
471                 gossip_sync, peer_manager, logger, scorer, should_break, {
472                         select_biased! {
473                                 _ = channel_manager.get_persistable_update_future().fuse() => true,
474                                 exit = sleeper(Duration::from_millis(100)).fuse() => {
475                                         should_break = exit;
476                                         false
477                                 }
478                         }
479                 }, |t| sleeper(Duration::from_secs(t)),
480                 |fut: &mut SleepFuture, _| {
481                         let mut waker = task::noop_waker();
482                         let mut ctx = task::Context::from_waker(&mut waker);
483                         core::pin::Pin::new(fut).poll(&mut ctx).is_ready()
484                 })
485 }
486
487 #[cfg(feature = "std")]
488 impl BackgroundProcessor {
489         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
490         /// documentation].
491         ///
492         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
493         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
494         /// either [`join`] or [`stop`].
495         ///
496         /// # Data Persistence
497         ///
498         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
499         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
500         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
501         /// provided implementation.
502         ///
503         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
504         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
505         /// See the `lightning-persister` crate for LDK's provided implementation.
506         ///
507         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
508         /// error or call [`join`] and handle any error that may arise. For the latter case,
509         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
510         ///
511         /// # Event Handling
512         ///
513         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
514         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
515         /// functionality implemented by other handlers.
516         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
517         ///
518         /// # Rapid Gossip Sync
519         ///
520         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
521         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
522         /// until the [`RapidGossipSync`] instance completes its first sync.
523         ///
524         /// [top-level documentation]: BackgroundProcessor
525         /// [`join`]: Self::join
526         /// [`stop`]: Self::stop
527         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
528         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
529         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
530         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
531         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
532         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
533         pub fn start<
534                 'a,
535                 UL: 'static + Deref + Send + Sync,
536                 CF: 'static + Deref + Send + Sync,
537                 CW: 'static + Deref + Send + Sync,
538                 T: 'static + Deref + Send + Sync,
539                 ES: 'static + Deref + Send + Sync,
540                 NS: 'static + Deref + Send + Sync,
541                 SP: 'static + Deref + Send + Sync,
542                 F: 'static + Deref + Send + Sync,
543                 R: 'static + Deref + Send + Sync,
544                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
545                 L: 'static + Deref + Send + Sync,
546                 P: 'static + Deref + Send + Sync,
547                 Descriptor: 'static + SocketDescriptor + Send + Sync,
548                 CMH: 'static + Deref + Send + Sync,
549                 OMH: 'static + Deref + Send + Sync,
550                 RMH: 'static + Deref + Send + Sync,
551                 EH: 'static + EventHandler + Send,
552                 PS: 'static + Deref + Send,
553                 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
554                 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
555                 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
556                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
557                 UMH: 'static + Deref + Send + Sync,
558                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
559                 S: 'static + Deref<Target = SC> + Send + Sync,
560                 SC: for <'b> WriteableScore<'b>,
561         >(
562                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
563                 gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
564         ) -> Self
565         where
566                 UL::Target: 'static + UtxoLookup,
567                 CF::Target: 'static + chain::Filter,
568                 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
569                 T::Target: 'static + BroadcasterInterface,
570                 ES::Target: 'static + EntropySource,
571                 NS::Target: 'static + NodeSigner,
572                 SP::Target: 'static + SignerProvider,
573                 F::Target: 'static + FeeEstimator,
574                 R::Target: 'static + Router,
575                 L::Target: 'static + Logger,
576                 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
577                 CMH::Target: 'static + ChannelMessageHandler,
578                 OMH::Target: 'static + OnionMessageHandler,
579                 RMH::Target: 'static + RoutingMessageHandler,
580                 UMH::Target: 'static + CustomMessageHandler,
581                 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
582         {
583                 let stop_thread = Arc::new(AtomicBool::new(false));
584                 let stop_thread_clone = stop_thread.clone();
585                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
586                         let event_handler = |event| {
587                                 let network_graph = gossip_sync.network_graph();
588                                 if let Some(network_graph) = network_graph {
589                                         handle_network_graph_update(network_graph, &event)
590                                 }
591                                 if let Some(ref scorer) = scorer {
592                                         update_scorer(scorer, &event);
593                                 }
594                                 event_handler.handle_event(event);
595                         };
596                         define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
597                                 channel_manager, channel_manager.process_pending_events(&event_handler),
598                                 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
599                                 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)),
600                                 |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
601                 });
602                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
603         }
604
605         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
606         /// [`ChannelManager`].
607         ///
608         /// # Panics
609         ///
610         /// This function panics if the background thread has panicked such as while persisting or
611         /// handling events.
612         ///
613         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
614         pub fn join(mut self) -> Result<(), std::io::Error> {
615                 assert!(self.thread_handle.is_some());
616                 self.join_thread()
617         }
618
619         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
620         /// [`ChannelManager`].
621         ///
622         /// # Panics
623         ///
624         /// This function panics if the background thread has panicked such as while persisting or
625         /// handling events.
626         ///
627         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
628         pub fn stop(mut self) -> Result<(), std::io::Error> {
629                 assert!(self.thread_handle.is_some());
630                 self.stop_and_join_thread()
631         }
632
633         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
634                 self.stop_thread.store(true, Ordering::Release);
635                 self.join_thread()
636         }
637
638         fn join_thread(&mut self) -> Result<(), std::io::Error> {
639                 match self.thread_handle.take() {
640                         Some(handle) => handle.join().unwrap(),
641                         None => Ok(()),
642                 }
643         }
644 }
645
646 #[cfg(feature = "std")]
647 impl Drop for BackgroundProcessor {
648         fn drop(&mut self) {
649                 self.stop_and_join_thread().unwrap();
650         }
651 }
652
653 #[cfg(all(feature = "std", test))]
654 mod tests {
655         use bitcoin::blockdata::block::BlockHeader;
656         use bitcoin::blockdata::constants::genesis_block;
657         use bitcoin::blockdata::locktime::PackedLockTime;
658         use bitcoin::blockdata::transaction::{Transaction, TxOut};
659         use bitcoin::network::constants::Network;
660         use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1};
661         use lightning::chain::{BestBlock, Confirm, chainmonitor};
662         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
663         use lightning::chain::keysinterface::{InMemorySigner, KeysManager};
664         use lightning::chain::transaction::OutPoint;
665         use lightning::get_event_msg;
666         use lightning::ln::PaymentHash;
667         use lightning::ln::channelmanager;
668         use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId};
669         use lightning::ln::features::{ChannelFeatures, NodeFeatures};
670         use lightning::ln::msgs::{ChannelMessageHandler, Init};
671         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
672         use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
673         use lightning::routing::router::{DefaultRouter, RouteHop};
674         use lightning::routing::scoring::{ChannelUsage, Score};
675         use lightning::util::config::UserConfig;
676         use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
677         use lightning::util::ser::Writeable;
678         use lightning::util::test_utils;
679         use lightning::util::persist::KVStorePersister;
680         use lightning_persister::FilesystemPersister;
681         use std::collections::VecDeque;
682         use std::fs;
683         use std::path::PathBuf;
684         use std::sync::{Arc, Mutex};
685         use std::sync::mpsc::SyncSender;
686         use std::time::Duration;
687         use bitcoin::hashes::Hash;
688         use bitcoin::TxMerkleNode;
689         use lightning_rapid_gossip_sync::RapidGossipSync;
690         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
691
692         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
693
694         #[derive(Clone, Hash, PartialEq, Eq)]
695         struct TestDescriptor{}
696         impl SocketDescriptor for TestDescriptor {
697                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
698                         0
699                 }
700
701                 fn disconnect_socket(&mut self) {}
702         }
703
704         type ChannelManager = channelmanager::ChannelManager<Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<KeysManager>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<DefaultRouter< Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>, Arc<Mutex<TestScorer>>>>, Arc<test_utils::TestLogger>>;
705
706         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
707
708         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
709         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
710
711         struct Node {
712                 node: Arc<ChannelManager>,
713                 p2p_gossip_sync: PGS,
714                 rapid_gossip_sync: RGS,
715                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
716                 chain_monitor: Arc<ChainMonitor>,
717                 persister: Arc<FilesystemPersister>,
718                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
719                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
720                 logger: Arc<test_utils::TestLogger>,
721                 best_block: BestBlock,
722                 scorer: Arc<Mutex<TestScorer>>,
723         }
724
725         impl Node {
726                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
727                         GossipSync::P2P(self.p2p_gossip_sync.clone())
728                 }
729
730                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
731                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
732                 }
733
734                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
735                         GossipSync::None
736                 }
737         }
738
739         impl Drop for Node {
740                 fn drop(&mut self) {
741                         let data_dir = self.persister.get_data_dir();
742                         match fs::remove_dir_all(data_dir.clone()) {
743                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
744                                 _ => {}
745                         }
746                 }
747         }
748
749         struct Persister {
750                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
751                 graph_persistence_notifier: Option<SyncSender<()>>,
752                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
753                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
754                 filesystem_persister: FilesystemPersister,
755         }
756
757         impl Persister {
758                 fn new(data_dir: String) -> Self {
759                         let filesystem_persister = FilesystemPersister::new(data_dir.clone());
760                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
761                 }
762
763                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
764                         Self { graph_error: Some((error, message)), ..self }
765                 }
766
767                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
768                         Self { graph_persistence_notifier: Some(sender), ..self }
769                 }
770
771                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
772                         Self { manager_error: Some((error, message)), ..self }
773                 }
774
775                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
776                         Self { scorer_error: Some((error, message)), ..self }
777                 }
778         }
779
780         impl KVStorePersister for Persister {
781                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
782                         if key == "manager" {
783                                 if let Some((error, message)) = self.manager_error {
784                                         return Err(std::io::Error::new(error, message))
785                                 }
786                         }
787
788                         if key == "network_graph" {
789                                 if let Some(sender) = &self.graph_persistence_notifier {
790                                         sender.send(()).unwrap();
791                                 };
792
793                                 if let Some((error, message)) = self.graph_error {
794                                         return Err(std::io::Error::new(error, message))
795                                 }
796                         }
797
798                         if key == "scorer" {
799                                 if let Some((error, message)) = self.scorer_error {
800                                         return Err(std::io::Error::new(error, message))
801                                 }
802                         }
803
804                         self.filesystem_persister.persist(key, object)
805                 }
806         }
807
808         struct TestScorer {
809                 event_expectations: Option<VecDeque<TestResult>>,
810         }
811
812         #[derive(Debug)]
813         enum TestResult {
814                 PaymentFailure { path: Vec<RouteHop>, short_channel_id: u64 },
815                 PaymentSuccess { path: Vec<RouteHop> },
816                 ProbeFailure { path: Vec<RouteHop> },
817                 ProbeSuccess { path: Vec<RouteHop> },
818         }
819
820         impl TestScorer {
821                 fn new() -> Self {
822                         Self { event_expectations: None }
823                 }
824
825                 fn expect(&mut self, expectation: TestResult) {
826                         self.event_expectations.get_or_insert_with(|| VecDeque::new()).push_back(expectation);
827                 }
828         }
829
830         impl lightning::util::ser::Writeable for TestScorer {
831                 fn write<W: lightning::util::ser::Writer>(&self, _: &mut W) -> Result<(), lightning::io::Error> { Ok(()) }
832         }
833
834         impl Score for TestScorer {
835                 fn channel_penalty_msat(
836                         &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage
837                 ) -> u64 { unimplemented!(); }
838
839                 fn payment_path_failed(&mut self, actual_path: &[&RouteHop], actual_short_channel_id: u64) {
840                         if let Some(expectations) = &mut self.event_expectations {
841                                 match expectations.pop_front().unwrap() {
842                                         TestResult::PaymentFailure { path, short_channel_id } => {
843                                                 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
844                                                 assert_eq!(actual_short_channel_id, short_channel_id);
845                                         },
846                                         TestResult::PaymentSuccess { path } => {
847                                                 panic!("Unexpected successful payment path: {:?}", path)
848                                         },
849                                         TestResult::ProbeFailure { path } => {
850                                                 panic!("Unexpected probe failure: {:?}", path)
851                                         },
852                                         TestResult::ProbeSuccess { path } => {
853                                                 panic!("Unexpected probe success: {:?}", path)
854                                         }
855                                 }
856                         }
857                 }
858
859                 fn payment_path_successful(&mut self, actual_path: &[&RouteHop]) {
860                         if let Some(expectations) = &mut self.event_expectations {
861                                 match expectations.pop_front().unwrap() {
862                                         TestResult::PaymentFailure { path, .. } => {
863                                                 panic!("Unexpected payment path failure: {:?}", path)
864                                         },
865                                         TestResult::PaymentSuccess { path } => {
866                                                 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
867                                         },
868                                         TestResult::ProbeFailure { path } => {
869                                                 panic!("Unexpected probe failure: {:?}", path)
870                                         },
871                                         TestResult::ProbeSuccess { path } => {
872                                                 panic!("Unexpected probe success: {:?}", path)
873                                         }
874                                 }
875                         }
876                 }
877
878                 fn probe_failed(&mut self, actual_path: &[&RouteHop], _: u64) {
879                         if let Some(expectations) = &mut self.event_expectations {
880                                 match expectations.pop_front().unwrap() {
881                                         TestResult::PaymentFailure { path, .. } => {
882                                                 panic!("Unexpected payment path failure: {:?}", path)
883                                         },
884                                         TestResult::PaymentSuccess { path } => {
885                                                 panic!("Unexpected payment path success: {:?}", path)
886                                         },
887                                         TestResult::ProbeFailure { path } => {
888                                                 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
889                                         },
890                                         TestResult::ProbeSuccess { path } => {
891                                                 panic!("Unexpected probe success: {:?}", path)
892                                         }
893                                 }
894                         }
895                 }
896                 fn probe_successful(&mut self, actual_path: &[&RouteHop]) {
897                         if let Some(expectations) = &mut self.event_expectations {
898                                 match expectations.pop_front().unwrap() {
899                                         TestResult::PaymentFailure { path, .. } => {
900                                                 panic!("Unexpected payment path failure: {:?}", path)
901                                         },
902                                         TestResult::PaymentSuccess { path } => {
903                                                 panic!("Unexpected payment path success: {:?}", path)
904                                         },
905                                         TestResult::ProbeFailure { path } => {
906                                                 panic!("Unexpected probe failure: {:?}", path)
907                                         },
908                                         TestResult::ProbeSuccess { path } => {
909                                                 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
910                                         }
911                                 }
912                         }
913                 }
914         }
915
916         impl Drop for TestScorer {
917                 fn drop(&mut self) {
918                         if std::thread::panicking() {
919                                 return;
920                         }
921
922                         if let Some(event_expectations) = &self.event_expectations {
923                                 if !event_expectations.is_empty() {
924                                         panic!("Unsatisfied event expectations: {:?}", event_expectations);
925                                 }
926                         }
927                 }
928         }
929
930         fn get_full_filepath(filepath: String, filename: String) -> String {
931                 let mut path = PathBuf::from(filepath);
932                 path.push(filename);
933                 path.to_str().unwrap().to_string()
934         }
935
936         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
937                 let mut nodes = Vec::new();
938                 for i in 0..num_nodes {
939                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
940                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
941                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
942                         let network = Network::Testnet;
943                         let genesis_block = genesis_block(network);
944                         let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
945                         let scorer = Arc::new(Mutex::new(TestScorer::new()));
946                         let seed = [i as u8; 32];
947                         let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone()));
948                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
949                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
950                         let now = Duration::from_secs(genesis_block.header.time as u64);
951                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
952                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
953                         let best_block = BestBlock::from_genesis(network);
954                         let params = ChainParameters { network, best_block };
955                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
956                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
957                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
958                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
959                         let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), IgnoringMessageHandler{}, keys_manager.clone()));
960                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
961                         nodes.push(node);
962                 }
963
964                 for i in 0..num_nodes {
965                         for j in (i+1)..num_nodes {
966                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }).unwrap();
967                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }).unwrap();
968                         }
969                 }
970
971                 nodes
972         }
973
974         macro_rules! open_channel {
975                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
976                         begin_open_channel!($node_a, $node_b, $channel_value);
977                         let events = $node_a.node.get_and_clear_pending_events();
978                         assert_eq!(events.len(), 1);
979                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
980                         end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
981                         tx
982                 }}
983         }
984
985         macro_rules! begin_open_channel {
986                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
987                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
988                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
989                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
990                 }}
991         }
992
993         macro_rules! handle_funding_generation_ready {
994                 ($event: expr, $channel_value: expr) => {{
995                         match $event {
996                                 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
997                                         assert_eq!(channel_value_satoshis, $channel_value);
998                                         assert_eq!(user_channel_id, 42);
999
1000                                         let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
1001                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
1002                                         }]};
1003                                         (temporary_channel_id, tx)
1004                                 },
1005                                 _ => panic!("Unexpected event"),
1006                         }
1007                 }}
1008         }
1009
1010         macro_rules! end_open_channel {
1011                 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
1012                         $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
1013                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
1014                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
1015                 }}
1016         }
1017
1018         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
1019                 for i in 1..=depth {
1020                         let prev_blockhash = node.best_block.block_hash();
1021                         let height = node.best_block.height() + 1;
1022                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
1023                         let txdata = vec![(0, tx)];
1024                         node.best_block = BestBlock::new(header.block_hash(), height);
1025                         match i {
1026                                 1 => {
1027                                         node.node.transactions_confirmed(&header, &txdata, height);
1028                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
1029                                 },
1030                                 x if x == depth => {
1031                                         node.node.best_block_updated(&header, height);
1032                                         node.chain_monitor.best_block_updated(&header, height);
1033                                 },
1034                                 _ => {},
1035                         }
1036                 }
1037         }
1038         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
1039                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
1040         }
1041
1042         #[test]
1043         fn test_background_processor() {
1044                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
1045                 // updates. Also test that when new updates are available, the manager signals that it needs
1046                 // re-persistence and is successfully re-persisted.
1047                 let nodes = create_nodes(2, "test_background_processor".to_string());
1048
1049                 // Go through the channel creation process so that each node has something to persist. Since
1050                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
1051                 // avoid a race with processing events.
1052                 let tx = open_channel!(nodes[0], nodes[1], 100000);
1053
1054                 // Initiate the background processors to watch each node.
1055                 let data_dir = nodes[0].persister.get_data_dir();
1056                 let persister = Arc::new(Persister::new(data_dir));
1057                 let event_handler = |_: _| {};
1058                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1059
1060                 macro_rules! check_persisted_data {
1061                         ($node: expr, $filepath: expr) => {
1062                                 let mut expected_bytes = Vec::new();
1063                                 loop {
1064                                         expected_bytes.clear();
1065                                         match $node.write(&mut expected_bytes) {
1066                                                 Ok(()) => {
1067                                                         match std::fs::read($filepath) {
1068                                                                 Ok(bytes) => {
1069                                                                         if bytes == expected_bytes {
1070                                                                                 break
1071                                                                         } else {
1072                                                                                 continue
1073                                                                         }
1074                                                                 },
1075                                                                 Err(_) => continue
1076                                                         }
1077                                                 },
1078                                                 Err(e) => panic!("Unexpected error: {}", e)
1079                                         }
1080                                 }
1081                         }
1082                 }
1083
1084                 // Check that the initial channel manager data is persisted as expected.
1085                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
1086                 check_persisted_data!(nodes[0].node, filepath.clone());
1087
1088                 loop {
1089                         if !nodes[0].node.get_persistence_condvar_value() { break }
1090                 }
1091
1092                 // Force-close the channel.
1093                 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
1094
1095                 // Check that the force-close updates are persisted.
1096                 check_persisted_data!(nodes[0].node, filepath.clone());
1097                 loop {
1098                         if !nodes[0].node.get_persistence_condvar_value() { break }
1099                 }
1100
1101                 // Check network graph is persisted
1102                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
1103                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
1104
1105                 // Check scorer is persisted
1106                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
1107                 check_persisted_data!(nodes[0].scorer, filepath.clone());
1108
1109                 assert!(bg_processor.stop().is_ok());
1110         }
1111
1112         #[test]
1113         fn test_timer_tick_called() {
1114                 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
1115                 // `FRESHNESS_TIMER`.
1116                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
1117                 let data_dir = nodes[0].persister.get_data_dir();
1118                 let persister = Arc::new(Persister::new(data_dir));
1119                 let event_handler = |_: _| {};
1120                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1121                 loop {
1122                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1123                         let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
1124                         let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
1125                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
1126                                         log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
1127                                 break
1128                         }
1129                 }
1130
1131                 assert!(bg_processor.stop().is_ok());
1132         }
1133
1134         #[test]
1135         fn test_channel_manager_persist_error() {
1136                 // Test that if we encounter an error during manager persistence, the thread panics.
1137                 let nodes = create_nodes(2, "test_persist_error".to_string());
1138                 open_channel!(nodes[0], nodes[1], 100000);
1139
1140                 let data_dir = nodes[0].persister.get_data_dir();
1141                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1142                 let event_handler = |_: _| {};
1143                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1144                 match bg_processor.join() {
1145                         Ok(_) => panic!("Expected error persisting manager"),
1146                         Err(e) => {
1147                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1148                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1149                         },
1150                 }
1151         }
1152
1153         #[test]
1154         fn test_network_graph_persist_error() {
1155                 // Test that if we encounter an error during network graph persistence, an error gets returned.
1156                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
1157                 let data_dir = nodes[0].persister.get_data_dir();
1158                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1159                 let event_handler = |_: _| {};
1160                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1161
1162                 match bg_processor.stop() {
1163                         Ok(_) => panic!("Expected error persisting network graph"),
1164                         Err(e) => {
1165                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1166                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1167                         },
1168                 }
1169         }
1170
1171         #[test]
1172         fn test_scorer_persist_error() {
1173                 // Test that if we encounter an error during scorer persistence, an error gets returned.
1174                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
1175                 let data_dir = nodes[0].persister.get_data_dir();
1176                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1177                 let event_handler = |_: _| {};
1178                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1179
1180                 match bg_processor.stop() {
1181                         Ok(_) => panic!("Expected error persisting scorer"),
1182                         Err(e) => {
1183                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1184                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1185                         },
1186                 }
1187         }
1188
1189         #[test]
1190         fn test_background_event_handling() {
1191                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
1192                 let channel_value = 100000;
1193                 let data_dir = nodes[0].persister.get_data_dir();
1194                 let persister = Arc::new(Persister::new(data_dir.clone()));
1195
1196                 // Set up a background event handler for FundingGenerationReady events.
1197                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1198                 let event_handler = move |event: Event| match event {
1199                         Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1200                         Event::ChannelReady { .. } => {},
1201                         _ => panic!("Unexpected event: {:?}", event),
1202                 };
1203
1204                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1205
1206                 // Open a channel and check that the FundingGenerationReady event was handled.
1207                 begin_open_channel!(nodes[0], nodes[1], channel_value);
1208                 let (temporary_channel_id, funding_tx) = receiver
1209                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1210                         .expect("FundingGenerationReady not handled within deadline");
1211                 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1212
1213                 // Confirm the funding transaction.
1214                 confirm_transaction(&mut nodes[0], &funding_tx);
1215                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1216                 confirm_transaction(&mut nodes[1], &funding_tx);
1217                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1218                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1219                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1220                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1221                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1222
1223                 assert!(bg_processor.stop().is_ok());
1224
1225                 // Set up a background event handler for SpendableOutputs events.
1226                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1227                 let event_handler = move |event: Event| match event {
1228                         Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
1229                         Event::ChannelReady { .. } => {},
1230                         Event::ChannelClosed { .. } => {},
1231                         _ => panic!("Unexpected event: {:?}", event),
1232                 };
1233                 let persister = Arc::new(Persister::new(data_dir));
1234                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1235
1236                 // Force close the channel and check that the SpendableOutputs event was handled.
1237                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1238                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1239                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1240
1241                 let event = receiver
1242                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1243                         .expect("Events not handled within deadline");
1244                 match event {
1245                         Event::SpendableOutputs { .. } => {},
1246                         _ => panic!("Unexpected event: {:?}", event),
1247                 }
1248
1249                 assert!(bg_processor.stop().is_ok());
1250         }
1251
1252         #[test]
1253         fn test_scorer_persistence() {
1254                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1255                 let data_dir = nodes[0].persister.get_data_dir();
1256                 let persister = Arc::new(Persister::new(data_dir));
1257                 let event_handler = |_: _| {};
1258                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1259
1260                 loop {
1261                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1262                         let expected_log = "Persisting scorer".to_string();
1263                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1264                                 break
1265                         }
1266                 }
1267
1268                 assert!(bg_processor.stop().is_ok());
1269         }
1270
1271         #[test]
1272         fn test_not_pruning_network_graph_until_graph_sync_completion() {
1273                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1274                 let data_dir = nodes[0].persister.get_data_dir();
1275                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1276                 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1277                 let network_graph = nodes[0].network_graph.clone();
1278                 let features = ChannelFeatures::empty();
1279                 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1280                         .expect("Failed to update channel from partial announcement");
1281                 let original_graph_description = network_graph.to_string();
1282                 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1283                 assert_eq!(network_graph.read_only().channels().len(), 1);
1284
1285                 let event_handler = |_: _| {};
1286                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1287
1288                 loop {
1289                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1290                         let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1291                         if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
1292                                 .unwrap_or(&0) > 1
1293                         {
1294                                 // Wait until the loop has gone around at least twice.
1295                                 break
1296                         }
1297                 }
1298
1299                 let initialization_input = vec![
1300                         76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1301                         79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1302                         0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1303                         187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1304                         157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1305                         88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1306                         204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1307                         181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1308                         110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1309                         76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1310                         226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1311                         0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1312                         0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1313                 ];
1314                 nodes[0].rapid_gossip_sync.update_network_graph_no_std(&initialization_input[..], Some(1642291930)).unwrap();
1315
1316                 // this should have added two channels
1317                 assert_eq!(network_graph.read_only().channels().len(), 3);
1318
1319                 let _ = receiver
1320                         .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1321                         .expect("Network graph not pruned within deadline");
1322
1323                 background_processor.stop().unwrap();
1324
1325                 // all channels should now be pruned
1326                 assert_eq!(network_graph.read_only().channels().len(), 0);
1327         }
1328
1329         #[test]
1330         fn test_payment_path_scoring() {
1331                 // Ensure that we update the scorer when relevant events are processed. In this case, we ensure
1332                 // that we update the scorer upon a payment path succeeding (note that the channel must be
1333                 // public or else we won't score it).
1334                 // Set up a background event handler for FundingGenerationReady events.
1335                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1336                 let event_handler = move |event: Event| match event {
1337                         Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
1338                         Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
1339                         Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
1340                         Event::ProbeFailed { .. } => sender.send(event).unwrap(),
1341                         _ => panic!("Unexpected event: {:?}", event),
1342                 };
1343
1344                 let nodes = create_nodes(1, "test_payment_path_scoring".to_string());
1345                 let data_dir = nodes[0].persister.get_data_dir();
1346                 let persister = Arc::new(Persister::new(data_dir.clone()));
1347                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1348
1349                 let scored_scid = 4242;
1350                 let secp_ctx = Secp256k1::new();
1351                 let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
1352                 let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
1353
1354                 let path = vec![RouteHop {
1355                         pubkey: node_1_id,
1356                         node_features: NodeFeatures::empty(),
1357                         short_channel_id: scored_scid,
1358                         channel_features: ChannelFeatures::empty(),
1359                         fee_msat: 0,
1360                         cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
1361                 }];
1362
1363                 nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
1364                 nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1365                         payment_id: None,
1366                         payment_hash: PaymentHash([42; 32]),
1367                         payment_failed_permanently: false,
1368                         network_update: None,
1369                         all_paths_failed: true,
1370                         path: path.clone(),
1371                         short_channel_id: Some(scored_scid),
1372                         retry: None,
1373                 });
1374                 let event = receiver
1375                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1376                         .expect("PaymentPathFailed not handled within deadline");
1377                 match event {
1378                         Event::PaymentPathFailed { .. } => {},
1379                         _ => panic!("Unexpected event"),
1380                 }
1381
1382                 // Ensure we'll score payments that were explicitly failed back by the destination as
1383                 // ProbeSuccess.
1384                 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1385                 nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1386                         payment_id: None,
1387                         payment_hash: PaymentHash([42; 32]),
1388                         payment_failed_permanently: true,
1389                         network_update: None,
1390                         all_paths_failed: true,
1391                         path: path.clone(),
1392                         short_channel_id: None,
1393                         retry: None,
1394                 });
1395                 let event = receiver
1396                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1397                         .expect("PaymentPathFailed not handled within deadline");
1398                 match event {
1399                         Event::PaymentPathFailed { .. } => {},
1400                         _ => panic!("Unexpected event"),
1401                 }
1402
1403                 nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() });
1404                 nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
1405                         payment_id: PaymentId([42; 32]),
1406                         payment_hash: None,
1407                         path: path.clone(),
1408                 });
1409                 let event = receiver
1410                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1411                         .expect("PaymentPathSuccessful not handled within deadline");
1412                 match event {
1413                         Event::PaymentPathSuccessful { .. } => {},
1414                         _ => panic!("Unexpected event"),
1415                 }
1416
1417                 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1418                 nodes[0].node.push_pending_event(Event::ProbeSuccessful {
1419                         payment_id: PaymentId([42; 32]),
1420                         payment_hash: PaymentHash([42; 32]),
1421                         path: path.clone(),
1422                 });
1423                 let event = receiver
1424                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1425                         .expect("ProbeSuccessful not handled within deadline");
1426                 match event {
1427                         Event::ProbeSuccessful  { .. } => {},
1428                         _ => panic!("Unexpected event"),
1429                 }
1430
1431                 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() });
1432                 nodes[0].node.push_pending_event(Event::ProbeFailed {
1433                         payment_id: PaymentId([42; 32]),
1434                         payment_hash: PaymentHash([42; 32]),
1435                         path: path.clone(),
1436                         short_channel_id: Some(scored_scid),
1437                 });
1438                 let event = receiver
1439                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1440                         .expect("ProbeFailure not handled within deadline");
1441                 match event {
1442                         Event::ProbeFailed { .. } => {},
1443                         _ => panic!("Unexpected event"),
1444                 }
1445
1446                 assert!(bg_processor.stop().is_ok());
1447         }
1448 }