Only import lightning::io if futures are enabled
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
8
9 #![deny(missing_docs)]
10 #![deny(unsafe_code)]
11
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
13
14 #![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
15
16 #[cfg(any(test, feature = "std"))]
17 extern crate core;
18
19 #[cfg(not(feature = "std"))]
20 extern crate alloc;
21
22 #[macro_use] extern crate lightning;
23 extern crate lightning_rapid_gossip_sync;
24
25 use lightning::chain;
26 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
27 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
28 use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider};
29 use lightning::ln::channelmanager::ChannelManager;
30 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
31 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
32 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
33 use lightning::routing::utxo::UtxoLookup;
34 use lightning::routing::router::Router;
35 use lightning::routing::scoring::{Score, WriteableScore};
36 use lightning::util::events::{Event, EventHandler, EventsProvider};
37 use lightning::util::logger::Logger;
38 use lightning::util::persist::Persister;
39 use lightning_rapid_gossip_sync::RapidGossipSync;
40
41 use core::ops::Deref;
42 use core::time::Duration;
43
44 #[cfg(feature = "std")]
45 use std::sync::Arc;
46 #[cfg(feature = "std")]
47 use core::sync::atomic::{AtomicBool, Ordering};
48 #[cfg(feature = "std")]
49 use std::thread::{self, JoinHandle};
50 #[cfg(feature = "std")]
51 use std::time::Instant;
52
53 #[cfg(feature = "futures")]
54 use futures_util::{select_biased, future::FutureExt, task};
55 #[cfg(not(feature = "std"))]
56 use alloc::vec::Vec;
57
58 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
59 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
60 /// responsibilities are:
61 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
62 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
63 ///   writing it to disk/backups by invoking the callback given to it at startup.
64 ///   [`ChannelManager`] persistence should be done in the background.
65 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
66 ///   at the appropriate intervals.
67 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
68 ///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
69 ///
70 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
71 /// upon as doing so may result in high latency.
72 ///
73 /// # Note
74 ///
75 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
76 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
77 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
78 /// unilateral chain closure fees are at risk.
79 ///
80 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
81 /// [`Event`]: lightning::util::events::Event
82 #[cfg(feature = "std")]
83 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
84 pub struct BackgroundProcessor {
85         stop_thread: Arc<AtomicBool>,
86         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
87 }
88
89 #[cfg(not(test))]
90 const FRESHNESS_TIMER: u64 = 60;
91 #[cfg(test)]
92 const FRESHNESS_TIMER: u64 = 1;
93
94 #[cfg(all(not(test), not(debug_assertions)))]
95 const PING_TIMER: u64 = 10;
96 /// Signature operations take a lot longer without compiler optimisations.
97 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
98 /// timeout is reached.
99 #[cfg(all(not(test), debug_assertions))]
100 const PING_TIMER: u64 = 30;
101 #[cfg(test)]
102 const PING_TIMER: u64 = 1;
103
104 /// Prune the network graph of stale entries hourly.
105 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
106
107 #[cfg(not(test))]
108 const SCORER_PERSIST_TIMER: u64 = 30;
109 #[cfg(test)]
110 const SCORER_PERSIST_TIMER: u64 = 1;
111
112 #[cfg(not(test))]
113 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
114 #[cfg(test)]
115 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
116
117 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
118 pub enum GossipSync<
119         P: Deref<Target = P2PGossipSync<G, U, L>>,
120         R: Deref<Target = RapidGossipSync<G, L>>,
121         G: Deref<Target = NetworkGraph<L>>,
122         U: Deref,
123         L: Deref,
124 >
125 where U::Target: UtxoLookup, L::Target: Logger {
126         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
127         P2P(P),
128         /// Rapid gossip sync from a trusted server.
129         Rapid(R),
130         /// No gossip sync.
131         None,
132 }
133
134 impl<
135         P: Deref<Target = P2PGossipSync<G, U, L>>,
136         R: Deref<Target = RapidGossipSync<G, L>>,
137         G: Deref<Target = NetworkGraph<L>>,
138         U: Deref,
139         L: Deref,
140 > GossipSync<P, R, G, U, L>
141 where U::Target: UtxoLookup, L::Target: Logger {
142         fn network_graph(&self) -> Option<&G> {
143                 match self {
144                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
145                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
146                         GossipSync::None => None,
147                 }
148         }
149
150         fn prunable_network_graph(&self) -> Option<&G> {
151                 match self {
152                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
153                         GossipSync::Rapid(gossip_sync) => {
154                                 if gossip_sync.is_initial_sync_complete() {
155                                         Some(gossip_sync.network_graph())
156                                 } else {
157                                         None
158                                 }
159                         },
160                         GossipSync::None => None,
161                 }
162         }
163 }
164
165 /// (C-not exported) as the bindings concretize everything and have constructors for us
166 impl<P: Deref<Target = P2PGossipSync<G, U, L>>, G: Deref<Target = NetworkGraph<L>>, U: Deref, L: Deref>
167         GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
168 where
169         U::Target: UtxoLookup,
170         L::Target: Logger,
171 {
172         /// Initializes a new [`GossipSync::P2P`] variant.
173         pub fn p2p(gossip_sync: P) -> Self {
174                 GossipSync::P2P(gossip_sync)
175         }
176 }
177
178 /// (C-not exported) as the bindings concretize everything and have constructors for us
179 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
180         GossipSync<
181                 &P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
182                 R,
183                 G,
184                 &'a (dyn UtxoLookup + Send + Sync),
185                 L,
186         >
187 where
188         L::Target: Logger,
189 {
190         /// Initializes a new [`GossipSync::Rapid`] variant.
191         pub fn rapid(gossip_sync: R) -> Self {
192                 GossipSync::Rapid(gossip_sync)
193         }
194 }
195
196 /// (C-not exported) as the bindings concretize everything and have constructors for us
197 impl<'a, L: Deref>
198         GossipSync<
199                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
200                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
201                 &'a NetworkGraph<L>,
202                 &'a (dyn UtxoLookup + Send + Sync),
203                 L,
204         >
205 where
206         L::Target: Logger,
207 {
208         /// Initializes a new [`GossipSync::None`] variant.
209         pub fn none() -> Self {
210                 GossipSync::None
211         }
212 }
213
214 fn handle_network_graph_update<L: Deref>(
215         network_graph: &NetworkGraph<L>, event: &Event
216 ) where L::Target: Logger {
217         if let Event::PaymentPathFailed { ref network_update, .. } = event {
218                 if let Some(network_update) = network_update {
219                         network_graph.handle_network_update(&network_update);
220                 }
221         }
222 }
223
224 fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
225         scorer: &'a S, event: &Event
226 ) {
227         let mut score = scorer.lock();
228         match event {
229                 Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
230                         let path = path.iter().collect::<Vec<_>>();
231                         score.payment_path_failed(&path, *scid);
232                 },
233                 Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
234                         // Reached if the destination explicitly failed it back. We treat this as a successful probe
235                         // because the payment made it all the way to the destination with sufficient liquidity.
236                         let path = path.iter().collect::<Vec<_>>();
237                         score.probe_successful(&path);
238                 },
239                 Event::PaymentPathSuccessful { path, .. } => {
240                         let path = path.iter().collect::<Vec<_>>();
241                         score.payment_path_successful(&path);
242                 },
243                 Event::ProbeSuccessful { path, .. } => {
244                         let path = path.iter().collect::<Vec<_>>();
245                         score.probe_successful(&path);
246                 },
247                 Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
248                         let path = path.iter().collect::<Vec<_>>();
249                         score.probe_failed(&path, *scid);
250                 },
251                 _ => {},
252         }
253 }
254
255 macro_rules! define_run_body {
256         ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
257          $channel_manager: ident, $process_channel_manager_events: expr,
258          $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
259          $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
260         => { {
261                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
262                 $channel_manager.timer_tick_occurred();
263
264                 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
265                 let mut last_ping_call = $get_timer(PING_TIMER);
266                 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
267                 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
268                 let mut have_pruned = false;
269
270                 loop {
271                         $process_channel_manager_events;
272                         $process_chain_monitor_events;
273
274                         // Note that the PeerManager::process_events may block on ChannelManager's locks,
275                         // hence it comes last here. When the ChannelManager finishes whatever it's doing,
276                         // we want to ensure we get into `persist_manager` as quickly as we can, especially
277                         // without running the normal event processing above and handing events to users.
278                         //
279                         // Specifically, on an *extremely* slow machine, we may see ChannelManager start
280                         // processing a message effectively at any point during this loop. In order to
281                         // minimize the time between such processing completing and persisting the updated
282                         // ChannelManager, we want to minimize methods blocking on a ChannelManager
283                         // generally, and as a fallback place such blocking only immediately before
284                         // persistence.
285                         $peer_manager.process_events();
286
287                         // We wait up to 100ms, but track how long it takes to detect being put to sleep,
288                         // see `await_start`'s use below.
289                         let mut await_start = $get_timer(1);
290                         let updates_available = $await;
291                         let await_slow = $timer_elapsed(&mut await_start, 1);
292
293                         if updates_available {
294                                 log_trace!($logger, "Persisting ChannelManager...");
295                                 $persister.persist_manager(&*$channel_manager)?;
296                                 log_trace!($logger, "Done persisting ChannelManager.");
297                         }
298                         // Exit the loop if the background processor was requested to stop.
299                         if $loop_exit_check {
300                                 log_trace!($logger, "Terminating background processor.");
301                                 break;
302                         }
303                         if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
304                                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
305                                 $channel_manager.timer_tick_occurred();
306                                 last_freshness_call = $get_timer(FRESHNESS_TIMER);
307                         }
308                         if await_slow {
309                                 // On various platforms, we may be starved of CPU cycles for several reasons.
310                                 // E.g. on iOS, if we've been in the background, we will be entirely paused.
311                                 // Similarly, if we're on a desktop platform and the device has been asleep, we
312                                 // may not get any cycles.
313                                 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
314                                 // full second, at which point we assume sockets may have been killed (they
315                                 // appear to be at least on some platforms, even if it has only been a second).
316                                 // Note that we have to take care to not get here just because user event
317                                 // processing was slow at the top of the loop. For example, the sample client
318                                 // may call Bitcoin Core RPCs during event handling, which very often takes
319                                 // more than a handful of seconds to complete, and shouldn't disconnect all our
320                                 // peers.
321                                 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
322                                 $peer_manager.disconnect_all_peers();
323                                 last_ping_call = $get_timer(PING_TIMER);
324                         } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
325                                 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
326                                 $peer_manager.timer_tick_occurred();
327                                 last_ping_call = $get_timer(PING_TIMER);
328                         }
329
330                         // Note that we want to run a graph prune once not long after startup before
331                         // falling back to our usual hourly prunes. This avoids short-lived clients never
332                         // pruning their network graph. We run once 60 seconds after startup before
333                         // continuing our normal cadence.
334                         if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
335                                 // The network graph must not be pruned while rapid sync completion is pending
336                                 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
337                                         #[cfg(feature = "std")] {
338                                                 log_trace!($logger, "Pruning and persisting network graph.");
339                                                 network_graph.remove_stale_channels_and_tracking();
340                                         }
341                                         #[cfg(not(feature = "std"))] {
342                                                 log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
343                                                 log_trace!($logger, "Persisting network graph.");
344                                         }
345
346                                         if let Err(e) = $persister.persist_graph(network_graph) {
347                                                 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
348                                         }
349
350                                         last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
351                                         have_pruned = true;
352                                 }
353                         }
354
355                         if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
356                                 if let Some(ref scorer) = $scorer {
357                                         log_trace!($logger, "Persisting scorer");
358                                         if let Err(e) = $persister.persist_scorer(&scorer) {
359                                                 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
360                                         }
361                                 }
362                                 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
363                         }
364                 }
365
366                 // After we exit, ensure we persist the ChannelManager one final time - this avoids
367                 // some races where users quit while channel updates were in-flight, with
368                 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
369                 $persister.persist_manager(&*$channel_manager)?;
370
371                 // Persist Scorer on exit
372                 if let Some(ref scorer) = $scorer {
373                         $persister.persist_scorer(&scorer)?;
374                 }
375
376                 // Persist NetworkGraph on exit
377                 if let Some(network_graph) = $gossip_sync.network_graph() {
378                         $persister.persist_graph(network_graph)?;
379                 }
380
381                 Ok(())
382         } }
383 }
384
385 /// Processes background events in a future.
386 ///
387 /// `sleeper` should return a future which completes in the given amount of time and returns a
388 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
389 /// future which outputs true, the loop will exit and this function's future will complete.
390 ///
391 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
392 ///
393 /// Requires the `futures` feature. Note that while this method is available without the `std`
394 /// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
395 /// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
396 /// manually instead.
397 #[cfg(feature = "futures")]
398 pub async fn process_events_async<
399         'a,
400         UL: 'static + Deref + Send + Sync,
401         CF: 'static + Deref + Send + Sync,
402         CW: 'static + Deref + Send + Sync,
403         T: 'static + Deref + Send + Sync,
404         ES: 'static + Deref + Send + Sync,
405         NS: 'static + Deref + Send + Sync,
406         SP: 'static + Deref + Send + Sync,
407         F: 'static + Deref + Send + Sync,
408         R: 'static + Deref + Send + Sync,
409         G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
410         L: 'static + Deref + Send + Sync,
411         P: 'static + Deref + Send + Sync,
412         Descriptor: 'static + SocketDescriptor + Send + Sync,
413         CMH: 'static + Deref + Send + Sync,
414         RMH: 'static + Deref + Send + Sync,
415         OMH: 'static + Deref + Send + Sync,
416         EventHandlerFuture: core::future::Future<Output = ()>,
417         EventHandler: Fn(Event) -> EventHandlerFuture,
418         PS: 'static + Deref + Send,
419         M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
420         CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
421         PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
422         RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
423         UMH: 'static + Deref + Send + Sync,
424         PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
425         S: 'static + Deref<Target = SC> + Send + Sync,
426         SC: for<'b> WriteableScore<'b>,
427         SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
428         Sleeper: Fn(Duration) -> SleepFuture
429 >(
430         persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
431         gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
432         sleeper: Sleeper,
433 ) -> Result<(), lightning::io::Error>
434 where
435         UL::Target: 'static + UtxoLookup,
436         CF::Target: 'static + chain::Filter,
437         CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
438         T::Target: 'static + BroadcasterInterface,
439         ES::Target: 'static + EntropySource,
440         NS::Target: 'static + NodeSigner,
441         SP::Target: 'static + SignerProvider,
442         F::Target: 'static + FeeEstimator,
443         R::Target: 'static + Router,
444         L::Target: 'static + Logger,
445         P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
446         CMH::Target: 'static + ChannelMessageHandler,
447         OMH::Target: 'static + OnionMessageHandler,
448         RMH::Target: 'static + RoutingMessageHandler,
449         UMH::Target: 'static + CustomMessageHandler,
450         PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
451 {
452         let mut should_break = true;
453         let async_event_handler = |event| {
454                 let network_graph = gossip_sync.network_graph();
455                 let event_handler = &event_handler;
456                 let scorer = &scorer;
457                 async move {
458                         if let Some(network_graph) = network_graph {
459                                 handle_network_graph_update(network_graph, &event)
460                         }
461                         if let Some(ref scorer) = scorer {
462                                 update_scorer(scorer, &event);
463                         }
464                         event_handler(event).await;
465                 }
466         };
467         define_run_body!(persister,
468                 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
469                 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
470                 gossip_sync, peer_manager, logger, scorer, should_break, {
471                         select_biased! {
472                                 _ = channel_manager.get_persistable_update_future().fuse() => true,
473                                 exit = sleeper(Duration::from_millis(100)).fuse() => {
474                                         should_break = exit;
475                                         false
476                                 }
477                         }
478                 }, |t| sleeper(Duration::from_secs(t)),
479                 |fut: &mut SleepFuture, _| {
480                         let mut waker = task::noop_waker();
481                         let mut ctx = task::Context::from_waker(&mut waker);
482                         core::pin::Pin::new(fut).poll(&mut ctx).is_ready()
483                 })
484 }
485
486 #[cfg(feature = "std")]
487 impl BackgroundProcessor {
488         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
489         /// documentation].
490         ///
491         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
492         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
493         /// either [`join`] or [`stop`].
494         ///
495         /// # Data Persistence
496         ///
497         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
498         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
499         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
500         /// provided implementation.
501         ///
502         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
503         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
504         /// See the `lightning-persister` crate for LDK's provided implementation.
505         ///
506         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
507         /// error or call [`join`] and handle any error that may arise. For the latter case,
508         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
509         ///
510         /// # Event Handling
511         ///
512         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
513         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
514         /// functionality implemented by other handlers.
515         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
516         ///
517         /// # Rapid Gossip Sync
518         ///
519         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
520         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
521         /// until the [`RapidGossipSync`] instance completes its first sync.
522         ///
523         /// [top-level documentation]: BackgroundProcessor
524         /// [`join`]: Self::join
525         /// [`stop`]: Self::stop
526         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
527         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
528         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
529         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
530         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
531         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
532         pub fn start<
533                 'a,
534                 UL: 'static + Deref + Send + Sync,
535                 CF: 'static + Deref + Send + Sync,
536                 CW: 'static + Deref + Send + Sync,
537                 T: 'static + Deref + Send + Sync,
538                 ES: 'static + Deref + Send + Sync,
539                 NS: 'static + Deref + Send + Sync,
540                 SP: 'static + Deref + Send + Sync,
541                 F: 'static + Deref + Send + Sync,
542                 R: 'static + Deref + Send + Sync,
543                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
544                 L: 'static + Deref + Send + Sync,
545                 P: 'static + Deref + Send + Sync,
546                 Descriptor: 'static + SocketDescriptor + Send + Sync,
547                 CMH: 'static + Deref + Send + Sync,
548                 OMH: 'static + Deref + Send + Sync,
549                 RMH: 'static + Deref + Send + Sync,
550                 EH: 'static + EventHandler + Send,
551                 PS: 'static + Deref + Send,
552                 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
553                 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
554                 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
555                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
556                 UMH: 'static + Deref + Send + Sync,
557                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH, NS>> + Send + Sync,
558                 S: 'static + Deref<Target = SC> + Send + Sync,
559                 SC: for <'b> WriteableScore<'b>,
560         >(
561                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
562                 gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
563         ) -> Self
564         where
565                 UL::Target: 'static + UtxoLookup,
566                 CF::Target: 'static + chain::Filter,
567                 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
568                 T::Target: 'static + BroadcasterInterface,
569                 ES::Target: 'static + EntropySource,
570                 NS::Target: 'static + NodeSigner,
571                 SP::Target: 'static + SignerProvider,
572                 F::Target: 'static + FeeEstimator,
573                 R::Target: 'static + Router,
574                 L::Target: 'static + Logger,
575                 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
576                 CMH::Target: 'static + ChannelMessageHandler,
577                 OMH::Target: 'static + OnionMessageHandler,
578                 RMH::Target: 'static + RoutingMessageHandler,
579                 UMH::Target: 'static + CustomMessageHandler,
580                 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
581         {
582                 let stop_thread = Arc::new(AtomicBool::new(false));
583                 let stop_thread_clone = stop_thread.clone();
584                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
585                         let event_handler = |event| {
586                                 let network_graph = gossip_sync.network_graph();
587                                 if let Some(network_graph) = network_graph {
588                                         handle_network_graph_update(network_graph, &event)
589                                 }
590                                 if let Some(ref scorer) = scorer {
591                                         update_scorer(scorer, &event);
592                                 }
593                                 event_handler.handle_event(event);
594                         };
595                         define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
596                                 channel_manager, channel_manager.process_pending_events(&event_handler),
597                                 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
598                                 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)),
599                                 |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
600                 });
601                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
602         }
603
604         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
605         /// [`ChannelManager`].
606         ///
607         /// # Panics
608         ///
609         /// This function panics if the background thread has panicked such as while persisting or
610         /// handling events.
611         ///
612         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
613         pub fn join(mut self) -> Result<(), std::io::Error> {
614                 assert!(self.thread_handle.is_some());
615                 self.join_thread()
616         }
617
618         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
619         /// [`ChannelManager`].
620         ///
621         /// # Panics
622         ///
623         /// This function panics if the background thread has panicked such as while persisting or
624         /// handling events.
625         ///
626         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
627         pub fn stop(mut self) -> Result<(), std::io::Error> {
628                 assert!(self.thread_handle.is_some());
629                 self.stop_and_join_thread()
630         }
631
632         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
633                 self.stop_thread.store(true, Ordering::Release);
634                 self.join_thread()
635         }
636
637         fn join_thread(&mut self) -> Result<(), std::io::Error> {
638                 match self.thread_handle.take() {
639                         Some(handle) => handle.join().unwrap(),
640                         None => Ok(()),
641                 }
642         }
643 }
644
645 #[cfg(feature = "std")]
646 impl Drop for BackgroundProcessor {
647         fn drop(&mut self) {
648                 self.stop_and_join_thread().unwrap();
649         }
650 }
651
652 #[cfg(all(feature = "std", test))]
653 mod tests {
654         use bitcoin::blockdata::block::BlockHeader;
655         use bitcoin::blockdata::constants::genesis_block;
656         use bitcoin::blockdata::locktime::PackedLockTime;
657         use bitcoin::blockdata::transaction::{Transaction, TxOut};
658         use bitcoin::network::constants::Network;
659         use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1};
660         use lightning::chain::{BestBlock, Confirm, chainmonitor};
661         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
662         use lightning::chain::keysinterface::{InMemorySigner, KeysManager};
663         use lightning::chain::transaction::OutPoint;
664         use lightning::get_event_msg;
665         use lightning::ln::PaymentHash;
666         use lightning::ln::channelmanager;
667         use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId};
668         use lightning::ln::features::{ChannelFeatures, NodeFeatures};
669         use lightning::ln::msgs::{ChannelMessageHandler, Init};
670         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
671         use lightning::routing::gossip::{NetworkGraph, NodeId, P2PGossipSync};
672         use lightning::routing::router::{DefaultRouter, RouteHop};
673         use lightning::routing::scoring::{ChannelUsage, Score};
674         use lightning::util::config::UserConfig;
675         use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
676         use lightning::util::ser::Writeable;
677         use lightning::util::test_utils;
678         use lightning::util::persist::KVStorePersister;
679         use lightning_persister::FilesystemPersister;
680         use std::collections::VecDeque;
681         use std::fs;
682         use std::path::PathBuf;
683         use std::sync::{Arc, Mutex};
684         use std::sync::mpsc::SyncSender;
685         use std::time::Duration;
686         use bitcoin::hashes::Hash;
687         use bitcoin::TxMerkleNode;
688         use lightning_rapid_gossip_sync::RapidGossipSync;
689         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
690
691         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
692
693         #[derive(Clone, Hash, PartialEq, Eq)]
694         struct TestDescriptor{}
695         impl SocketDescriptor for TestDescriptor {
696                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
697                         0
698                 }
699
700                 fn disconnect_socket(&mut self) {}
701         }
702
703         type ChannelManager = channelmanager::ChannelManager<Arc<ChainMonitor>, Arc<test_utils::TestBroadcaster>, Arc<KeysManager>, Arc<KeysManager>, Arc<KeysManager>, Arc<test_utils::TestFeeEstimator>, Arc<DefaultRouter< Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>, Arc<Mutex<TestScorer>>>>, Arc<test_utils::TestLogger>>;
704
705         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
706
707         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
708         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
709
710         struct Node {
711                 node: Arc<ChannelManager>,
712                 p2p_gossip_sync: PGS,
713                 rapid_gossip_sync: RGS,
714                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
715                 chain_monitor: Arc<ChainMonitor>,
716                 persister: Arc<FilesystemPersister>,
717                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
718                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
719                 logger: Arc<test_utils::TestLogger>,
720                 best_block: BestBlock,
721                 scorer: Arc<Mutex<TestScorer>>,
722         }
723
724         impl Node {
725                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
726                         GossipSync::P2P(self.p2p_gossip_sync.clone())
727                 }
728
729                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
730                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
731                 }
732
733                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
734                         GossipSync::None
735                 }
736         }
737
738         impl Drop for Node {
739                 fn drop(&mut self) {
740                         let data_dir = self.persister.get_data_dir();
741                         match fs::remove_dir_all(data_dir.clone()) {
742                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
743                                 _ => {}
744                         }
745                 }
746         }
747
748         struct Persister {
749                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
750                 graph_persistence_notifier: Option<SyncSender<()>>,
751                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
752                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
753                 filesystem_persister: FilesystemPersister,
754         }
755
756         impl Persister {
757                 fn new(data_dir: String) -> Self {
758                         let filesystem_persister = FilesystemPersister::new(data_dir.clone());
759                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
760                 }
761
762                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
763                         Self { graph_error: Some((error, message)), ..self }
764                 }
765
766                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
767                         Self { graph_persistence_notifier: Some(sender), ..self }
768                 }
769
770                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
771                         Self { manager_error: Some((error, message)), ..self }
772                 }
773
774                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
775                         Self { scorer_error: Some((error, message)), ..self }
776                 }
777         }
778
779         impl KVStorePersister for Persister {
780                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
781                         if key == "manager" {
782                                 if let Some((error, message)) = self.manager_error {
783                                         return Err(std::io::Error::new(error, message))
784                                 }
785                         }
786
787                         if key == "network_graph" {
788                                 if let Some(sender) = &self.graph_persistence_notifier {
789                                         sender.send(()).unwrap();
790                                 };
791
792                                 if let Some((error, message)) = self.graph_error {
793                                         return Err(std::io::Error::new(error, message))
794                                 }
795                         }
796
797                         if key == "scorer" {
798                                 if let Some((error, message)) = self.scorer_error {
799                                         return Err(std::io::Error::new(error, message))
800                                 }
801                         }
802
803                         self.filesystem_persister.persist(key, object)
804                 }
805         }
806
807         struct TestScorer {
808                 event_expectations: Option<VecDeque<TestResult>>,
809         }
810
811         #[derive(Debug)]
812         enum TestResult {
813                 PaymentFailure { path: Vec<RouteHop>, short_channel_id: u64 },
814                 PaymentSuccess { path: Vec<RouteHop> },
815                 ProbeFailure { path: Vec<RouteHop> },
816                 ProbeSuccess { path: Vec<RouteHop> },
817         }
818
819         impl TestScorer {
820                 fn new() -> Self {
821                         Self { event_expectations: None }
822                 }
823
824                 fn expect(&mut self, expectation: TestResult) {
825                         self.event_expectations.get_or_insert_with(|| VecDeque::new()).push_back(expectation);
826                 }
827         }
828
829         impl lightning::util::ser::Writeable for TestScorer {
830                 fn write<W: lightning::util::ser::Writer>(&self, _: &mut W) -> Result<(), lightning::io::Error> { Ok(()) }
831         }
832
833         impl Score for TestScorer {
834                 fn channel_penalty_msat(
835                         &self, _short_channel_id: u64, _source: &NodeId, _target: &NodeId, _usage: ChannelUsage
836                 ) -> u64 { unimplemented!(); }
837
838                 fn payment_path_failed(&mut self, actual_path: &[&RouteHop], actual_short_channel_id: u64) {
839                         if let Some(expectations) = &mut self.event_expectations {
840                                 match expectations.pop_front().unwrap() {
841                                         TestResult::PaymentFailure { path, short_channel_id } => {
842                                                 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
843                                                 assert_eq!(actual_short_channel_id, short_channel_id);
844                                         },
845                                         TestResult::PaymentSuccess { path } => {
846                                                 panic!("Unexpected successful payment path: {:?}", path)
847                                         },
848                                         TestResult::ProbeFailure { path } => {
849                                                 panic!("Unexpected probe failure: {:?}", path)
850                                         },
851                                         TestResult::ProbeSuccess { path } => {
852                                                 panic!("Unexpected probe success: {:?}", path)
853                                         }
854                                 }
855                         }
856                 }
857
858                 fn payment_path_successful(&mut self, actual_path: &[&RouteHop]) {
859                         if let Some(expectations) = &mut self.event_expectations {
860                                 match expectations.pop_front().unwrap() {
861                                         TestResult::PaymentFailure { path, .. } => {
862                                                 panic!("Unexpected payment path failure: {:?}", path)
863                                         },
864                                         TestResult::PaymentSuccess { path } => {
865                                                 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
866                                         },
867                                         TestResult::ProbeFailure { path } => {
868                                                 panic!("Unexpected probe failure: {:?}", path)
869                                         },
870                                         TestResult::ProbeSuccess { path } => {
871                                                 panic!("Unexpected probe success: {:?}", path)
872                                         }
873                                 }
874                         }
875                 }
876
877                 fn probe_failed(&mut self, actual_path: &[&RouteHop], _: u64) {
878                         if let Some(expectations) = &mut self.event_expectations {
879                                 match expectations.pop_front().unwrap() {
880                                         TestResult::PaymentFailure { path, .. } => {
881                                                 panic!("Unexpected payment path failure: {:?}", path)
882                                         },
883                                         TestResult::PaymentSuccess { path } => {
884                                                 panic!("Unexpected payment path success: {:?}", path)
885                                         },
886                                         TestResult::ProbeFailure { path } => {
887                                                 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
888                                         },
889                                         TestResult::ProbeSuccess { path } => {
890                                                 panic!("Unexpected probe success: {:?}", path)
891                                         }
892                                 }
893                         }
894                 }
895                 fn probe_successful(&mut self, actual_path: &[&RouteHop]) {
896                         if let Some(expectations) = &mut self.event_expectations {
897                                 match expectations.pop_front().unwrap() {
898                                         TestResult::PaymentFailure { path, .. } => {
899                                                 panic!("Unexpected payment path failure: {:?}", path)
900                                         },
901                                         TestResult::PaymentSuccess { path } => {
902                                                 panic!("Unexpected payment path success: {:?}", path)
903                                         },
904                                         TestResult::ProbeFailure { path } => {
905                                                 panic!("Unexpected probe failure: {:?}", path)
906                                         },
907                                         TestResult::ProbeSuccess { path } => {
908                                                 assert_eq!(actual_path, &path.iter().collect::<Vec<_>>()[..]);
909                                         }
910                                 }
911                         }
912                 }
913         }
914
915         impl Drop for TestScorer {
916                 fn drop(&mut self) {
917                         if std::thread::panicking() {
918                                 return;
919                         }
920
921                         if let Some(event_expectations) = &self.event_expectations {
922                                 if !event_expectations.is_empty() {
923                                         panic!("Unsatisfied event expectations: {:?}", event_expectations);
924                                 }
925                         }
926                 }
927         }
928
929         fn get_full_filepath(filepath: String, filename: String) -> String {
930                 let mut path = PathBuf::from(filepath);
931                 path.push(filename);
932                 path.to_str().unwrap().to_string()
933         }
934
935         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
936                 let mut nodes = Vec::new();
937                 for i in 0..num_nodes {
938                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
939                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
940                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
941                         let network = Network::Testnet;
942                         let genesis_block = genesis_block(network);
943                         let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
944                         let scorer = Arc::new(Mutex::new(TestScorer::new()));
945                         let seed = [i as u8; 32];
946                         let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone()));
947                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
948                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
949                         let now = Duration::from_secs(genesis_block.header.time as u64);
950                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
951                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
952                         let best_block = BestBlock::from_genesis(network);
953                         let params = ChainParameters { network, best_block };
954                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
955                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
956                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
957                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
958                         let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), IgnoringMessageHandler{}, keys_manager.clone()));
959                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
960                         nodes.push(node);
961                 }
962
963                 for i in 0..num_nodes {
964                         for j in (i+1)..num_nodes {
965                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }, true).unwrap();
966                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }, false).unwrap();
967                         }
968                 }
969
970                 nodes
971         }
972
973         macro_rules! open_channel {
974                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
975                         begin_open_channel!($node_a, $node_b, $channel_value);
976                         let events = $node_a.node.get_and_clear_pending_events();
977                         assert_eq!(events.len(), 1);
978                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
979                         end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
980                         tx
981                 }}
982         }
983
984         macro_rules! begin_open_channel {
985                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
986                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
987                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
988                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
989                 }}
990         }
991
992         macro_rules! handle_funding_generation_ready {
993                 ($event: expr, $channel_value: expr) => {{
994                         match $event {
995                                 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
996                                         assert_eq!(channel_value_satoshis, $channel_value);
997                                         assert_eq!(user_channel_id, 42);
998
999                                         let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
1000                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
1001                                         }]};
1002                                         (temporary_channel_id, tx)
1003                                 },
1004                                 _ => panic!("Unexpected event"),
1005                         }
1006                 }}
1007         }
1008
1009         macro_rules! end_open_channel {
1010                 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
1011                         $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
1012                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
1013                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
1014                 }}
1015         }
1016
1017         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
1018                 for i in 1..=depth {
1019                         let prev_blockhash = node.best_block.block_hash();
1020                         let height = node.best_block.height() + 1;
1021                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
1022                         let txdata = vec![(0, tx)];
1023                         node.best_block = BestBlock::new(header.block_hash(), height);
1024                         match i {
1025                                 1 => {
1026                                         node.node.transactions_confirmed(&header, &txdata, height);
1027                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
1028                                 },
1029                                 x if x == depth => {
1030                                         node.node.best_block_updated(&header, height);
1031                                         node.chain_monitor.best_block_updated(&header, height);
1032                                 },
1033                                 _ => {},
1034                         }
1035                 }
1036         }
1037         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
1038                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
1039         }
1040
1041         #[test]
1042         fn test_background_processor() {
1043                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
1044                 // updates. Also test that when new updates are available, the manager signals that it needs
1045                 // re-persistence and is successfully re-persisted.
1046                 let nodes = create_nodes(2, "test_background_processor".to_string());
1047
1048                 // Go through the channel creation process so that each node has something to persist. Since
1049                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
1050                 // avoid a race with processing events.
1051                 let tx = open_channel!(nodes[0], nodes[1], 100000);
1052
1053                 // Initiate the background processors to watch each node.
1054                 let data_dir = nodes[0].persister.get_data_dir();
1055                 let persister = Arc::new(Persister::new(data_dir));
1056                 let event_handler = |_: _| {};
1057                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1058
1059                 macro_rules! check_persisted_data {
1060                         ($node: expr, $filepath: expr) => {
1061                                 let mut expected_bytes = Vec::new();
1062                                 loop {
1063                                         expected_bytes.clear();
1064                                         match $node.write(&mut expected_bytes) {
1065                                                 Ok(()) => {
1066                                                         match std::fs::read($filepath) {
1067                                                                 Ok(bytes) => {
1068                                                                         if bytes == expected_bytes {
1069                                                                                 break
1070                                                                         } else {
1071                                                                                 continue
1072                                                                         }
1073                                                                 },
1074                                                                 Err(_) => continue
1075                                                         }
1076                                                 },
1077                                                 Err(e) => panic!("Unexpected error: {}", e)
1078                                         }
1079                                 }
1080                         }
1081                 }
1082
1083                 // Check that the initial channel manager data is persisted as expected.
1084                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
1085                 check_persisted_data!(nodes[0].node, filepath.clone());
1086
1087                 loop {
1088                         if !nodes[0].node.get_persistence_condvar_value() { break }
1089                 }
1090
1091                 // Force-close the channel.
1092                 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
1093
1094                 // Check that the force-close updates are persisted.
1095                 check_persisted_data!(nodes[0].node, filepath.clone());
1096                 loop {
1097                         if !nodes[0].node.get_persistence_condvar_value() { break }
1098                 }
1099
1100                 // Check network graph is persisted
1101                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
1102                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
1103
1104                 // Check scorer is persisted
1105                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
1106                 check_persisted_data!(nodes[0].scorer, filepath.clone());
1107
1108                 assert!(bg_processor.stop().is_ok());
1109         }
1110
1111         #[test]
1112         fn test_timer_tick_called() {
1113                 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
1114                 // `FRESHNESS_TIMER`.
1115                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
1116                 let data_dir = nodes[0].persister.get_data_dir();
1117                 let persister = Arc::new(Persister::new(data_dir));
1118                 let event_handler = |_: _| {};
1119                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1120                 loop {
1121                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1122                         let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
1123                         let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
1124                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
1125                                         log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
1126                                 break
1127                         }
1128                 }
1129
1130                 assert!(bg_processor.stop().is_ok());
1131         }
1132
1133         #[test]
1134         fn test_channel_manager_persist_error() {
1135                 // Test that if we encounter an error during manager persistence, the thread panics.
1136                 let nodes = create_nodes(2, "test_persist_error".to_string());
1137                 open_channel!(nodes[0], nodes[1], 100000);
1138
1139                 let data_dir = nodes[0].persister.get_data_dir();
1140                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1141                 let event_handler = |_: _| {};
1142                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1143                 match bg_processor.join() {
1144                         Ok(_) => panic!("Expected error persisting manager"),
1145                         Err(e) => {
1146                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1147                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1148                         },
1149                 }
1150         }
1151
1152         #[test]
1153         fn test_network_graph_persist_error() {
1154                 // Test that if we encounter an error during network graph persistence, an error gets returned.
1155                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
1156                 let data_dir = nodes[0].persister.get_data_dir();
1157                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1158                 let event_handler = |_: _| {};
1159                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1160
1161                 match bg_processor.stop() {
1162                         Ok(_) => panic!("Expected error persisting network graph"),
1163                         Err(e) => {
1164                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1165                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1166                         },
1167                 }
1168         }
1169
1170         #[test]
1171         fn test_scorer_persist_error() {
1172                 // Test that if we encounter an error during scorer persistence, an error gets returned.
1173                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
1174                 let data_dir = nodes[0].persister.get_data_dir();
1175                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1176                 let event_handler = |_: _| {};
1177                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1178
1179                 match bg_processor.stop() {
1180                         Ok(_) => panic!("Expected error persisting scorer"),
1181                         Err(e) => {
1182                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1183                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1184                         },
1185                 }
1186         }
1187
1188         #[test]
1189         fn test_background_event_handling() {
1190                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
1191                 let channel_value = 100000;
1192                 let data_dir = nodes[0].persister.get_data_dir();
1193                 let persister = Arc::new(Persister::new(data_dir.clone()));
1194
1195                 // Set up a background event handler for FundingGenerationReady events.
1196                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1197                 let event_handler = move |event: Event| match event {
1198                         Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1199                         Event::ChannelReady { .. } => {},
1200                         _ => panic!("Unexpected event: {:?}", event),
1201                 };
1202
1203                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1204
1205                 // Open a channel and check that the FundingGenerationReady event was handled.
1206                 begin_open_channel!(nodes[0], nodes[1], channel_value);
1207                 let (temporary_channel_id, funding_tx) = receiver
1208                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1209                         .expect("FundingGenerationReady not handled within deadline");
1210                 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1211
1212                 // Confirm the funding transaction.
1213                 confirm_transaction(&mut nodes[0], &funding_tx);
1214                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1215                 confirm_transaction(&mut nodes[1], &funding_tx);
1216                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1217                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1218                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1219                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1220                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1221
1222                 assert!(bg_processor.stop().is_ok());
1223
1224                 // Set up a background event handler for SpendableOutputs events.
1225                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1226                 let event_handler = move |event: Event| match event {
1227                         Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
1228                         Event::ChannelReady { .. } => {},
1229                         Event::ChannelClosed { .. } => {},
1230                         _ => panic!("Unexpected event: {:?}", event),
1231                 };
1232                 let persister = Arc::new(Persister::new(data_dir));
1233                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1234
1235                 // Force close the channel and check that the SpendableOutputs event was handled.
1236                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1237                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1238                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1239
1240                 let event = receiver
1241                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1242                         .expect("Events not handled within deadline");
1243                 match event {
1244                         Event::SpendableOutputs { .. } => {},
1245                         _ => panic!("Unexpected event: {:?}", event),
1246                 }
1247
1248                 assert!(bg_processor.stop().is_ok());
1249         }
1250
1251         #[test]
1252         fn test_scorer_persistence() {
1253                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1254                 let data_dir = nodes[0].persister.get_data_dir();
1255                 let persister = Arc::new(Persister::new(data_dir));
1256                 let event_handler = |_: _| {};
1257                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1258
1259                 loop {
1260                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1261                         let expected_log = "Persisting scorer".to_string();
1262                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1263                                 break
1264                         }
1265                 }
1266
1267                 assert!(bg_processor.stop().is_ok());
1268         }
1269
1270         #[test]
1271         fn test_not_pruning_network_graph_until_graph_sync_completion() {
1272                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1273                 let data_dir = nodes[0].persister.get_data_dir();
1274                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1275                 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1276                 let network_graph = nodes[0].network_graph.clone();
1277                 let features = ChannelFeatures::empty();
1278                 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1279                         .expect("Failed to update channel from partial announcement");
1280                 let original_graph_description = network_graph.to_string();
1281                 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1282                 assert_eq!(network_graph.read_only().channels().len(), 1);
1283
1284                 let event_handler = |_: _| {};
1285                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1286
1287                 loop {
1288                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1289                         let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1290                         if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
1291                                 .unwrap_or(&0) > 1
1292                         {
1293                                 // Wait until the loop has gone around at least twice.
1294                                 break
1295                         }
1296                 }
1297
1298                 let initialization_input = vec![
1299                         76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1300                         79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1301                         0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1302                         187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1303                         157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1304                         88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1305                         204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1306                         181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1307                         110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1308                         76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1309                         226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1310                         0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1311                         0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1312                 ];
1313                 nodes[0].rapid_gossip_sync.update_network_graph_no_std(&initialization_input[..], Some(1642291930)).unwrap();
1314
1315                 // this should have added two channels
1316                 assert_eq!(network_graph.read_only().channels().len(), 3);
1317
1318                 let _ = receiver
1319                         .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1320                         .expect("Network graph not pruned within deadline");
1321
1322                 background_processor.stop().unwrap();
1323
1324                 // all channels should now be pruned
1325                 assert_eq!(network_graph.read_only().channels().len(), 0);
1326         }
1327
1328         #[test]
1329         fn test_payment_path_scoring() {
1330                 // Ensure that we update the scorer when relevant events are processed. In this case, we ensure
1331                 // that we update the scorer upon a payment path succeeding (note that the channel must be
1332                 // public or else we won't score it).
1333                 // Set up a background event handler for FundingGenerationReady events.
1334                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1335                 let event_handler = move |event: Event| match event {
1336                         Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
1337                         Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
1338                         Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
1339                         Event::ProbeFailed { .. } => sender.send(event).unwrap(),
1340                         _ => panic!("Unexpected event: {:?}", event),
1341                 };
1342
1343                 let nodes = create_nodes(1, "test_payment_path_scoring".to_string());
1344                 let data_dir = nodes[0].persister.get_data_dir();
1345                 let persister = Arc::new(Persister::new(data_dir.clone()));
1346                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1347
1348                 let scored_scid = 4242;
1349                 let secp_ctx = Secp256k1::new();
1350                 let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
1351                 let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
1352
1353                 let path = vec![RouteHop {
1354                         pubkey: node_1_id,
1355                         node_features: NodeFeatures::empty(),
1356                         short_channel_id: scored_scid,
1357                         channel_features: ChannelFeatures::empty(),
1358                         fee_msat: 0,
1359                         cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
1360                 }];
1361
1362                 nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
1363                 nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1364                         payment_id: None,
1365                         payment_hash: PaymentHash([42; 32]),
1366                         payment_failed_permanently: false,
1367                         network_update: None,
1368                         all_paths_failed: true,
1369                         path: path.clone(),
1370                         short_channel_id: Some(scored_scid),
1371                         retry: None,
1372                 });
1373                 let event = receiver
1374                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1375                         .expect("PaymentPathFailed not handled within deadline");
1376                 match event {
1377                         Event::PaymentPathFailed { .. } => {},
1378                         _ => panic!("Unexpected event"),
1379                 }
1380
1381                 // Ensure we'll score payments that were explicitly failed back by the destination as
1382                 // ProbeSuccess.
1383                 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1384                 nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1385                         payment_id: None,
1386                         payment_hash: PaymentHash([42; 32]),
1387                         payment_failed_permanently: true,
1388                         network_update: None,
1389                         all_paths_failed: true,
1390                         path: path.clone(),
1391                         short_channel_id: None,
1392                         retry: None,
1393                 });
1394                 let event = receiver
1395                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1396                         .expect("PaymentPathFailed not handled within deadline");
1397                 match event {
1398                         Event::PaymentPathFailed { .. } => {},
1399                         _ => panic!("Unexpected event"),
1400                 }
1401
1402                 nodes[0].scorer.lock().unwrap().expect(TestResult::PaymentSuccess { path: path.clone() });
1403                 nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
1404                         payment_id: PaymentId([42; 32]),
1405                         payment_hash: None,
1406                         path: path.clone(),
1407                 });
1408                 let event = receiver
1409                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1410                         .expect("PaymentPathSuccessful not handled within deadline");
1411                 match event {
1412                         Event::PaymentPathSuccessful { .. } => {},
1413                         _ => panic!("Unexpected event"),
1414                 }
1415
1416                 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeSuccess { path: path.clone() });
1417                 nodes[0].node.push_pending_event(Event::ProbeSuccessful {
1418                         payment_id: PaymentId([42; 32]),
1419                         payment_hash: PaymentHash([42; 32]),
1420                         path: path.clone(),
1421                 });
1422                 let event = receiver
1423                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1424                         .expect("ProbeSuccessful not handled within deadline");
1425                 match event {
1426                         Event::ProbeSuccessful  { .. } => {},
1427                         _ => panic!("Unexpected event"),
1428                 }
1429
1430                 nodes[0].scorer.lock().unwrap().expect(TestResult::ProbeFailure { path: path.clone() });
1431                 nodes[0].node.push_pending_event(Event::ProbeFailed {
1432                         payment_id: PaymentId([42; 32]),
1433                         payment_hash: PaymentHash([42; 32]),
1434                         path: path.clone(),
1435                         short_channel_id: Some(scored_scid),
1436                 });
1437                 let event = receiver
1438                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1439                         .expect("ProbeFailure not handled within deadline");
1440                 match event {
1441                         Event::ProbeFailed { .. } => {},
1442                         _ => panic!("Unexpected event"),
1443                 }
1444
1445                 assert!(bg_processor.stop().is_ok());
1446         }
1447 }