055b5ca6f9937d70cc5b3e76babd9b8778a6b7cc
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
8
9 #![deny(missing_docs)]
10 #![deny(unsafe_code)]
11
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
13
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
16
17 use lightning::chain;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::{Sign, KeysInterface};
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::scoring::WriteableScore;
26 use lightning::util::events::{Event, EventHandler, EventsProvider};
27 use lightning::util::logger::Logger;
28 use lightning::util::persist::Persister;
29 use lightning_rapid_gossip_sync::RapidGossipSync;
30 use std::sync::Arc;
31 use std::sync::atomic::{AtomicBool, Ordering};
32 use std::thread;
33 use std::thread::JoinHandle;
34 use std::time::{Duration, Instant};
35 use std::ops::Deref;
36
37 #[cfg(feature = "futures")]
38 use futures_util::{select_biased, future::FutureExt};
39
40 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
41 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
42 /// responsibilities are:
43 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
44 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
45 ///   writing it to disk/backups by invoking the callback given to it at startup.
46 ///   [`ChannelManager`] persistence should be done in the background.
47 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
48 ///   at the appropriate intervals.
49 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
50 ///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
51 ///
52 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
53 /// upon as doing so may result in high latency.
54 ///
55 /// # Note
56 ///
57 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
58 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
59 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
60 /// unilateral chain closure fees are at risk.
61 ///
62 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
63 /// [`Event`]: lightning::util::events::Event
64 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
65 pub struct BackgroundProcessor {
66         stop_thread: Arc<AtomicBool>,
67         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
68 }
69
70 #[cfg(not(test))]
71 const FRESHNESS_TIMER: u64 = 60;
72 #[cfg(test)]
73 const FRESHNESS_TIMER: u64 = 1;
74
75 #[cfg(all(not(test), not(debug_assertions)))]
76 const PING_TIMER: u64 = 10;
77 /// Signature operations take a lot longer without compiler optimisations.
78 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
79 /// timeout is reached.
80 #[cfg(all(not(test), debug_assertions))]
81 const PING_TIMER: u64 = 30;
82 #[cfg(test)]
83 const PING_TIMER: u64 = 1;
84
85 /// Prune the network graph of stale entries hourly.
86 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
87
88 #[cfg(not(test))]
89 const SCORER_PERSIST_TIMER: u64 = 30;
90 #[cfg(test)]
91 const SCORER_PERSIST_TIMER: u64 = 1;
92
93 #[cfg(not(test))]
94 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
95 #[cfg(test)]
96 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
97
98 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
99 pub enum GossipSync<
100         P: Deref<Target = P2PGossipSync<G, A, L>>,
101         R: Deref<Target = RapidGossipSync<G, L>>,
102         G: Deref<Target = NetworkGraph<L>>,
103         A: Deref,
104         L: Deref,
105 >
106 where A::Target: chain::Access, L::Target: Logger {
107         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
108         P2P(P),
109         /// Rapid gossip sync from a trusted server.
110         Rapid(R),
111         /// No gossip sync.
112         None,
113 }
114
115 impl<
116         P: Deref<Target = P2PGossipSync<G, A, L>>,
117         R: Deref<Target = RapidGossipSync<G, L>>,
118         G: Deref<Target = NetworkGraph<L>>,
119         A: Deref,
120         L: Deref,
121 > GossipSync<P, R, G, A, L>
122 where A::Target: chain::Access, L::Target: Logger {
123         fn network_graph(&self) -> Option<&G> {
124                 match self {
125                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
126                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
127                         GossipSync::None => None,
128                 }
129         }
130
131         fn prunable_network_graph(&self) -> Option<&G> {
132                 match self {
133                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
134                         GossipSync::Rapid(gossip_sync) => {
135                                 if gossip_sync.is_initial_sync_complete() {
136                                         Some(gossip_sync.network_graph())
137                                 } else {
138                                         None
139                                 }
140                         },
141                         GossipSync::None => None,
142                 }
143         }
144 }
145
146 /// (C-not exported) as the bindings concretize everything and have constructors for us
147 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
148         GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
149 where
150         A::Target: chain::Access,
151         L::Target: Logger,
152 {
153         /// Initializes a new [`GossipSync::P2P`] variant.
154         pub fn p2p(gossip_sync: P) -> Self {
155                 GossipSync::P2P(gossip_sync)
156         }
157 }
158
159 /// (C-not exported) as the bindings concretize everything and have constructors for us
160 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
161         GossipSync<
162                 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
163                 R,
164                 G,
165                 &'a (dyn chain::Access + Send + Sync),
166                 L,
167         >
168 where
169         L::Target: Logger,
170 {
171         /// Initializes a new [`GossipSync::Rapid`] variant.
172         pub fn rapid(gossip_sync: R) -> Self {
173                 GossipSync::Rapid(gossip_sync)
174         }
175 }
176
177 /// (C-not exported) as the bindings concretize everything and have constructors for us
178 impl<'a, L: Deref>
179         GossipSync<
180                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
181                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
182                 &'a NetworkGraph<L>,
183                 &'a (dyn chain::Access + Send + Sync),
184                 L,
185         >
186 where
187         L::Target: Logger,
188 {
189         /// Initializes a new [`GossipSync::None`] variant.
190         pub fn none() -> Self {
191                 GossipSync::None
192         }
193 }
194
195 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
196 struct DecoratingEventHandler<
197         'a,
198         E: EventHandler,
199         PGS: Deref<Target = P2PGossipSync<G, A, L>>,
200         RGS: Deref<Target = RapidGossipSync<G, L>>,
201         G: Deref<Target = NetworkGraph<L>>,
202         A: Deref,
203         L: Deref,
204 >
205 where A::Target: chain::Access, L::Target: Logger {
206         event_handler: E,
207         gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
208 }
209
210 impl<
211         'a,
212         E: EventHandler,
213         PGS: Deref<Target = P2PGossipSync<G, A, L>>,
214         RGS: Deref<Target = RapidGossipSync<G, L>>,
215         G: Deref<Target = NetworkGraph<L>>,
216         A: Deref,
217         L: Deref,
218 > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
219 where A::Target: chain::Access, L::Target: Logger {
220         fn handle_event(&self, event: &Event) {
221                 if let Some(network_graph) = self.gossip_sync.network_graph() {
222                         network_graph.handle_event(event);
223                 }
224                 self.event_handler.handle_event(event);
225         }
226 }
227
228 macro_rules! define_run_body {
229         ($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
230          $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
231          $loop_exit_check: expr, $await: expr)
232         => { {
233                 let event_handler = DecoratingEventHandler {
234                         event_handler: $event_handler,
235                         gossip_sync: &$gossip_sync,
236                 };
237
238                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
239                 $channel_manager.timer_tick_occurred();
240
241                 let mut last_freshness_call = Instant::now();
242                 let mut last_ping_call = Instant::now();
243                 let mut last_prune_call = Instant::now();
244                 let mut last_scorer_persist_call = Instant::now();
245                 let mut have_pruned = false;
246
247                 loop {
248                         $channel_manager.process_pending_events(&event_handler);
249                         $chain_monitor.process_pending_events(&event_handler);
250
251                         // Note that the PeerManager::process_events may block on ChannelManager's locks,
252                         // hence it comes last here. When the ChannelManager finishes whatever it's doing,
253                         // we want to ensure we get into `persist_manager` as quickly as we can, especially
254                         // without running the normal event processing above and handing events to users.
255                         //
256                         // Specifically, on an *extremely* slow machine, we may see ChannelManager start
257                         // processing a message effectively at any point during this loop. In order to
258                         // minimize the time between such processing completing and persisting the updated
259                         // ChannelManager, we want to minimize methods blocking on a ChannelManager
260                         // generally, and as a fallback place such blocking only immediately before
261                         // persistence.
262                         $peer_manager.process_events();
263
264                         // We wait up to 100ms, but track how long it takes to detect being put to sleep,
265                         // see `await_start`'s use below.
266                         let await_start = Instant::now();
267                         let updates_available = $await;
268                         let await_time = await_start.elapsed();
269
270                         if updates_available {
271                                 log_trace!($logger, "Persisting ChannelManager...");
272                                 $persister.persist_manager(&*$channel_manager)?;
273                                 log_trace!($logger, "Done persisting ChannelManager.");
274                         }
275                         // Exit the loop if the background processor was requested to stop.
276                         if $loop_exit_check {
277                                 log_trace!($logger, "Terminating background processor.");
278                                 break;
279                         }
280                         if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
281                                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
282                                 $channel_manager.timer_tick_occurred();
283                                 last_freshness_call = Instant::now();
284                         }
285                         if await_time > Duration::from_secs(1) {
286                                 // On various platforms, we may be starved of CPU cycles for several reasons.
287                                 // E.g. on iOS, if we've been in the background, we will be entirely paused.
288                                 // Similarly, if we're on a desktop platform and the device has been asleep, we
289                                 // may not get any cycles.
290                                 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
291                                 // full second, at which point we assume sockets may have been killed (they
292                                 // appear to be at least on some platforms, even if it has only been a second).
293                                 // Note that we have to take care to not get here just because user event
294                                 // processing was slow at the top of the loop. For example, the sample client
295                                 // may call Bitcoin Core RPCs during event handling, which very often takes
296                                 // more than a handful of seconds to complete, and shouldn't disconnect all our
297                                 // peers.
298                                 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
299                                 $peer_manager.disconnect_all_peers();
300                                 last_ping_call = Instant::now();
301                         } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
302                                 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
303                                 $peer_manager.timer_tick_occurred();
304                                 last_ping_call = Instant::now();
305                         }
306
307                         // Note that we want to run a graph prune once not long after startup before
308                         // falling back to our usual hourly prunes. This avoids short-lived clients never
309                         // pruning their network graph. We run once 60 seconds after startup before
310                         // continuing our normal cadence.
311                         if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
312                                 // The network graph must not be pruned while rapid sync completion is pending
313                                 log_trace!($logger, "Assessing prunability of network graph");
314                                 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
315                                         network_graph.remove_stale_channels_and_tracking();
316
317                                         if let Err(e) = $persister.persist_graph(network_graph) {
318                                                 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
319                                         }
320
321                                         last_prune_call = Instant::now();
322                                         have_pruned = true;
323                                 } else {
324                                         log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
325                                 }
326                         }
327
328                         if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
329                                 if let Some(ref scorer) = $scorer {
330                                         log_trace!($logger, "Persisting scorer");
331                                         if let Err(e) = $persister.persist_scorer(&scorer) {
332                                                 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
333                                         }
334                                 }
335                                 last_scorer_persist_call = Instant::now();
336                         }
337                 }
338
339                 // After we exit, ensure we persist the ChannelManager one final time - this avoids
340                 // some races where users quit while channel updates were in-flight, with
341                 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
342                 $persister.persist_manager(&*$channel_manager)?;
343
344                 // Persist Scorer on exit
345                 if let Some(ref scorer) = $scorer {
346                         $persister.persist_scorer(&scorer)?;
347                 }
348
349                 // Persist NetworkGraph on exit
350                 if let Some(network_graph) = $gossip_sync.network_graph() {
351                         $persister.persist_graph(network_graph)?;
352                 }
353
354                 Ok(())
355         } }
356 }
357
358 /// Processes background events in a future.
359 ///
360 /// `sleeper` should return a future which completes in the given amount of time and returns a
361 /// boolean indicating whether the background processing should continue. Once `sleeper` returns a
362 /// future which outputs false, the loop will exit and this function's future will complete.
363 ///
364 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
365 #[cfg(feature = "futures")]
366 pub async fn process_events_async<
367         'a,
368         Signer: 'static + Sign,
369         CA: 'static + Deref + Send + Sync,
370         CF: 'static + Deref + Send + Sync,
371         CW: 'static + Deref + Send + Sync,
372         T: 'static + Deref + Send + Sync,
373         K: 'static + Deref + Send + Sync,
374         F: 'static + Deref + Send + Sync,
375         G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
376         L: 'static + Deref + Send + Sync,
377         P: 'static + Deref + Send + Sync,
378         Descriptor: 'static + SocketDescriptor + Send + Sync,
379         CMH: 'static + Deref + Send + Sync,
380         RMH: 'static + Deref + Send + Sync,
381         OMH: 'static + Deref + Send + Sync,
382         EH: 'static + EventHandler + Send,
383         PS: 'static + Deref + Send,
384         M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
385         CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
386         PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
387         RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
388         UMH: 'static + Deref + Send + Sync,
389         PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
390         S: 'static + Deref<Target = SC> + Send + Sync,
391         SC: WriteableScore<'a>,
392         SleepFuture: core::future::Future<Output = bool>,
393         Sleeper: Fn(Duration) -> SleepFuture
394 >(
395         persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
396         gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
397         sleeper: Sleeper,
398 ) -> Result<(), std::io::Error>
399 where
400         CA::Target: 'static + chain::Access,
401         CF::Target: 'static + chain::Filter,
402         CW::Target: 'static + chain::Watch<Signer>,
403         T::Target: 'static + BroadcasterInterface,
404         K::Target: 'static + KeysInterface<Signer = Signer>,
405         F::Target: 'static + FeeEstimator,
406         L::Target: 'static + Logger,
407         P::Target: 'static + Persist<Signer>,
408         CMH::Target: 'static + ChannelMessageHandler,
409         OMH::Target: 'static + OnionMessageHandler,
410         RMH::Target: 'static + RoutingMessageHandler,
411         UMH::Target: 'static + CustomMessageHandler,
412         PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
413 {
414         let mut should_continue = true;
415         define_run_body!(persister, event_handler, chain_monitor, channel_manager,
416                 gossip_sync, peer_manager, logger, scorer, should_continue, {
417                         select_biased! {
418                                 _ = channel_manager.get_persistable_update_future().fuse() => true,
419                                 cont = sleeper(Duration::from_millis(100)).fuse() => {
420                                         should_continue = cont;
421                                         false
422                                 }
423                         }
424                 })
425 }
426
427 impl BackgroundProcessor {
428         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
429         /// documentation].
430         ///
431         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
432         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
433         /// either [`join`] or [`stop`].
434         ///
435         /// # Data Persistence
436         ///
437         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
438         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
439         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
440         /// provided implementation.
441         ///
442         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
443         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
444         /// See the `lightning-persister` crate for LDK's provided implementation.
445         ///
446         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
447         /// error or call [`join`] and handle any error that may arise. For the latter case,
448         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
449         ///
450         /// # Event Handling
451         ///
452         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
453         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
454         /// functionality implemented by other handlers.
455         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
456         ///
457         /// # Rapid Gossip Sync
458         ///
459         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
460         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
461         /// until the [`RapidGossipSync`] instance completes its first sync.
462         ///
463         /// [top-level documentation]: BackgroundProcessor
464         /// [`join`]: Self::join
465         /// [`stop`]: Self::stop
466         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
467         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
468         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
469         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
470         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
471         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
472         pub fn start<
473                 'a,
474                 Signer: 'static + Sign,
475                 CA: 'static + Deref + Send + Sync,
476                 CF: 'static + Deref + Send + Sync,
477                 CW: 'static + Deref + Send + Sync,
478                 T: 'static + Deref + Send + Sync,
479                 K: 'static + Deref + Send + Sync,
480                 F: 'static + Deref + Send + Sync,
481                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
482                 L: 'static + Deref + Send + Sync,
483                 P: 'static + Deref + Send + Sync,
484                 Descriptor: 'static + SocketDescriptor + Send + Sync,
485                 CMH: 'static + Deref + Send + Sync,
486                 OMH: 'static + Deref + Send + Sync,
487                 RMH: 'static + Deref + Send + Sync,
488                 EH: 'static + EventHandler + Send,
489                 PS: 'static + Deref + Send,
490                 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
491                 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
492                 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
493                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
494                 UMH: 'static + Deref + Send + Sync,
495                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
496                 S: 'static + Deref<Target = SC> + Send + Sync,
497                 SC: WriteableScore<'a>,
498         >(
499                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
500                 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
501         ) -> Self
502         where
503                 CA::Target: 'static + chain::Access,
504                 CF::Target: 'static + chain::Filter,
505                 CW::Target: 'static + chain::Watch<Signer>,
506                 T::Target: 'static + BroadcasterInterface,
507                 K::Target: 'static + KeysInterface<Signer = Signer>,
508                 F::Target: 'static + FeeEstimator,
509                 L::Target: 'static + Logger,
510                 P::Target: 'static + Persist<Signer>,
511                 CMH::Target: 'static + ChannelMessageHandler,
512                 OMH::Target: 'static + OnionMessageHandler,
513                 RMH::Target: 'static + RoutingMessageHandler,
514                 UMH::Target: 'static + CustomMessageHandler,
515                 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
516         {
517                 let stop_thread = Arc::new(AtomicBool::new(false));
518                 let stop_thread_clone = stop_thread.clone();
519                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
520                         define_run_body!(persister, event_handler, chain_monitor, channel_manager,
521                                 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
522                                 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
523                 });
524                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
525         }
526
527         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
528         /// [`ChannelManager`].
529         ///
530         /// # Panics
531         ///
532         /// This function panics if the background thread has panicked such as while persisting or
533         /// handling events.
534         ///
535         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
536         pub fn join(mut self) -> Result<(), std::io::Error> {
537                 assert!(self.thread_handle.is_some());
538                 self.join_thread()
539         }
540
541         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
542         /// [`ChannelManager`].
543         ///
544         /// # Panics
545         ///
546         /// This function panics if the background thread has panicked such as while persisting or
547         /// handling events.
548         ///
549         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
550         pub fn stop(mut self) -> Result<(), std::io::Error> {
551                 assert!(self.thread_handle.is_some());
552                 self.stop_and_join_thread()
553         }
554
555         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
556                 self.stop_thread.store(true, Ordering::Release);
557                 self.join_thread()
558         }
559
560         fn join_thread(&mut self) -> Result<(), std::io::Error> {
561                 match self.thread_handle.take() {
562                         Some(handle) => handle.join().unwrap(),
563                         None => Ok(()),
564                 }
565         }
566 }
567
568 impl Drop for BackgroundProcessor {
569         fn drop(&mut self) {
570                 self.stop_and_join_thread().unwrap();
571         }
572 }
573
574 #[cfg(test)]
575 mod tests {
576         use bitcoin::blockdata::block::BlockHeader;
577         use bitcoin::blockdata::constants::genesis_block;
578         use bitcoin::blockdata::locktime::PackedLockTime;
579         use bitcoin::blockdata::transaction::{Transaction, TxOut};
580         use bitcoin::network::constants::Network;
581         use lightning::chain::{BestBlock, Confirm, chainmonitor};
582         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
583         use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
584         use lightning::chain::transaction::OutPoint;
585         use lightning::get_event_msg;
586         use lightning::ln::channelmanager::{self, BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
587         use lightning::ln::features::ChannelFeatures;
588         use lightning::ln::msgs::{ChannelMessageHandler, Init};
589         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
590         use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
591         use lightning::util::config::UserConfig;
592         use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
593         use lightning::util::ser::Writeable;
594         use lightning::util::test_utils;
595         use lightning::util::persist::KVStorePersister;
596         use lightning_invoice::payment::{InvoicePayer, Retry};
597         use lightning_invoice::utils::DefaultRouter;
598         use lightning_persister::FilesystemPersister;
599         use std::fs;
600         use std::path::PathBuf;
601         use std::sync::{Arc, Mutex};
602         use std::sync::mpsc::SyncSender;
603         use std::time::Duration;
604         use bitcoin::hashes::Hash;
605         use bitcoin::TxMerkleNode;
606         use lightning::routing::scoring::{FixedPenaltyScorer};
607         use lightning_rapid_gossip_sync::RapidGossipSync;
608         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
609
610         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
611
612         #[derive(Clone, Eq, Hash, PartialEq)]
613         struct TestDescriptor{}
614         impl SocketDescriptor for TestDescriptor {
615                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
616                         0
617                 }
618
619                 fn disconnect_socket(&mut self) {}
620         }
621
622         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
623
624         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
625         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
626
627         struct Node {
628                 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
629                 p2p_gossip_sync: PGS,
630                 rapid_gossip_sync: RGS,
631                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
632                 chain_monitor: Arc<ChainMonitor>,
633                 persister: Arc<FilesystemPersister>,
634                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
635                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
636                 logger: Arc<test_utils::TestLogger>,
637                 best_block: BestBlock,
638                 scorer: Arc<Mutex<FixedPenaltyScorer>>,
639         }
640
641         impl Node {
642                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
643                         GossipSync::P2P(self.p2p_gossip_sync.clone())
644                 }
645
646                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
647                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
648                 }
649
650                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
651                         GossipSync::None
652                 }
653         }
654
655         impl Drop for Node {
656                 fn drop(&mut self) {
657                         let data_dir = self.persister.get_data_dir();
658                         match fs::remove_dir_all(data_dir.clone()) {
659                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
660                                 _ => {}
661                         }
662                 }
663         }
664
665         struct Persister {
666                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
667                 graph_persistence_notifier: Option<SyncSender<()>>,
668                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
669                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
670                 filesystem_persister: FilesystemPersister,
671         }
672
673         impl Persister {
674                 fn new(data_dir: String) -> Self {
675                         let filesystem_persister = FilesystemPersister::new(data_dir.clone());
676                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
677                 }
678
679                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
680                         Self { graph_error: Some((error, message)), ..self }
681                 }
682
683                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
684                         Self { graph_persistence_notifier: Some(sender), ..self }
685                 }
686
687                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
688                         Self { manager_error: Some((error, message)), ..self }
689                 }
690
691                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
692                         Self { scorer_error: Some((error, message)), ..self }
693                 }
694         }
695
696         impl KVStorePersister for Persister {
697                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
698                         if key == "manager" {
699                                 if let Some((error, message)) = self.manager_error {
700                                         return Err(std::io::Error::new(error, message))
701                                 }
702                         }
703
704                         if key == "network_graph" {
705                                 if let Some(sender) = &self.graph_persistence_notifier {
706                                         sender.send(()).unwrap();
707                                 };
708
709                                 if let Some((error, message)) = self.graph_error {
710                                         return Err(std::io::Error::new(error, message))
711                                 }
712                         }
713
714                         if key == "scorer" {
715                                 if let Some((error, message)) = self.scorer_error {
716                                         return Err(std::io::Error::new(error, message))
717                                 }
718                         }
719
720                         self.filesystem_persister.persist(key, object)
721                 }
722         }
723
724         fn get_full_filepath(filepath: String, filename: String) -> String {
725                 let mut path = PathBuf::from(filepath);
726                 path.push(filename);
727                 path.to_str().unwrap().to_string()
728         }
729
730         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
731                 let mut nodes = Vec::new();
732                 for i in 0..num_nodes {
733                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
734                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
735                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
736                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
737                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
738                         let seed = [i as u8; 32];
739                         let network = Network::Testnet;
740                         let genesis_block = genesis_block(network);
741                         let now = Duration::from_secs(genesis_block.header.time as u64);
742                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
743                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
744                         let best_block = BestBlock::from_genesis(network);
745                         let params = ChainParameters { network, best_block };
746                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
747                         let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
748                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
749                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
750                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
751                         let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{}));
752                         let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
753                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
754                         nodes.push(node);
755                 }
756
757                 for i in 0..num_nodes {
758                         for j in (i+1)..num_nodes {
759                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
760                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
761                         }
762                 }
763
764                 nodes
765         }
766
767         macro_rules! open_channel {
768                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
769                         begin_open_channel!($node_a, $node_b, $channel_value);
770                         let events = $node_a.node.get_and_clear_pending_events();
771                         assert_eq!(events.len(), 1);
772                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
773                         end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
774                         tx
775                 }}
776         }
777
778         macro_rules! begin_open_channel {
779                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
780                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
781                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
782                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
783                 }}
784         }
785
786         macro_rules! handle_funding_generation_ready {
787                 ($event: expr, $channel_value: expr) => {{
788                         match $event {
789                                 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
790                                         assert_eq!(channel_value_satoshis, $channel_value);
791                                         assert_eq!(user_channel_id, 42);
792
793                                         let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
794                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
795                                         }]};
796                                         (temporary_channel_id, tx)
797                                 },
798                                 _ => panic!("Unexpected event"),
799                         }
800                 }}
801         }
802
803         macro_rules! end_open_channel {
804                 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
805                         $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
806                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
807                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
808                 }}
809         }
810
811         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
812                 for i in 1..=depth {
813                         let prev_blockhash = node.best_block.block_hash();
814                         let height = node.best_block.height() + 1;
815                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
816                         let txdata = vec![(0, tx)];
817                         node.best_block = BestBlock::new(header.block_hash(), height);
818                         match i {
819                                 1 => {
820                                         node.node.transactions_confirmed(&header, &txdata, height);
821                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
822                                 },
823                                 x if x == depth => {
824                                         node.node.best_block_updated(&header, height);
825                                         node.chain_monitor.best_block_updated(&header, height);
826                                 },
827                                 _ => {},
828                         }
829                 }
830         }
831         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
832                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
833         }
834
835         #[test]
836         fn test_background_processor() {
837                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
838                 // updates. Also test that when new updates are available, the manager signals that it needs
839                 // re-persistence and is successfully re-persisted.
840                 let nodes = create_nodes(2, "test_background_processor".to_string());
841
842                 // Go through the channel creation process so that each node has something to persist. Since
843                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
844                 // avoid a race with processing events.
845                 let tx = open_channel!(nodes[0], nodes[1], 100000);
846
847                 // Initiate the background processors to watch each node.
848                 let data_dir = nodes[0].persister.get_data_dir();
849                 let persister = Arc::new(Persister::new(data_dir));
850                 let event_handler = |_: &_| {};
851                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
852
853                 macro_rules! check_persisted_data {
854                         ($node: expr, $filepath: expr) => {
855                                 let mut expected_bytes = Vec::new();
856                                 loop {
857                                         expected_bytes.clear();
858                                         match $node.write(&mut expected_bytes) {
859                                                 Ok(()) => {
860                                                         match std::fs::read($filepath) {
861                                                                 Ok(bytes) => {
862                                                                         if bytes == expected_bytes {
863                                                                                 break
864                                                                         } else {
865                                                                                 continue
866                                                                         }
867                                                                 },
868                                                                 Err(_) => continue
869                                                         }
870                                                 },
871                                                 Err(e) => panic!("Unexpected error: {}", e)
872                                         }
873                                 }
874                         }
875                 }
876
877                 // Check that the initial channel manager data is persisted as expected.
878                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
879                 check_persisted_data!(nodes[0].node, filepath.clone());
880
881                 loop {
882                         if !nodes[0].node.get_persistence_condvar_value() { break }
883                 }
884
885                 // Force-close the channel.
886                 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
887
888                 // Check that the force-close updates are persisted.
889                 check_persisted_data!(nodes[0].node, filepath.clone());
890                 loop {
891                         if !nodes[0].node.get_persistence_condvar_value() { break }
892                 }
893
894                 // Check network graph is persisted
895                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
896                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
897
898                 // Check scorer is persisted
899                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
900                 check_persisted_data!(nodes[0].scorer, filepath.clone());
901
902                 assert!(bg_processor.stop().is_ok());
903         }
904
905         #[test]
906         fn test_timer_tick_called() {
907                 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
908                 // `FRESHNESS_TIMER`.
909                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
910                 let data_dir = nodes[0].persister.get_data_dir();
911                 let persister = Arc::new(Persister::new(data_dir));
912                 let event_handler = |_: &_| {};
913                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
914                 loop {
915                         let log_entries = nodes[0].logger.lines.lock().unwrap();
916                         let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
917                         let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
918                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
919                                         log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
920                                 break
921                         }
922                 }
923
924                 assert!(bg_processor.stop().is_ok());
925         }
926
927         #[test]
928         fn test_channel_manager_persist_error() {
929                 // Test that if we encounter an error during manager persistence, the thread panics.
930                 let nodes = create_nodes(2, "test_persist_error".to_string());
931                 open_channel!(nodes[0], nodes[1], 100000);
932
933                 let data_dir = nodes[0].persister.get_data_dir();
934                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
935                 let event_handler = |_: &_| {};
936                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
937                 match bg_processor.join() {
938                         Ok(_) => panic!("Expected error persisting manager"),
939                         Err(e) => {
940                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
941                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
942                         },
943                 }
944         }
945
946         #[test]
947         fn test_network_graph_persist_error() {
948                 // Test that if we encounter an error during network graph persistence, an error gets returned.
949                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
950                 let data_dir = nodes[0].persister.get_data_dir();
951                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
952                 let event_handler = |_: &_| {};
953                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
954
955                 match bg_processor.stop() {
956                         Ok(_) => panic!("Expected error persisting network graph"),
957                         Err(e) => {
958                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
959                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
960                         },
961                 }
962         }
963
964         #[test]
965         fn test_scorer_persist_error() {
966                 // Test that if we encounter an error during scorer persistence, an error gets returned.
967                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
968                 let data_dir = nodes[0].persister.get_data_dir();
969                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
970                 let event_handler = |_: &_| {};
971                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
972
973                 match bg_processor.stop() {
974                         Ok(_) => panic!("Expected error persisting scorer"),
975                         Err(e) => {
976                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
977                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
978                         },
979                 }
980         }
981
982         #[test]
983         fn test_background_event_handling() {
984                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
985                 let channel_value = 100000;
986                 let data_dir = nodes[0].persister.get_data_dir();
987                 let persister = Arc::new(Persister::new(data_dir.clone()));
988
989                 // Set up a background event handler for FundingGenerationReady events.
990                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
991                 let event_handler = move |event: &Event| {
992                         sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
993                 };
994                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
995
996                 // Open a channel and check that the FundingGenerationReady event was handled.
997                 begin_open_channel!(nodes[0], nodes[1], channel_value);
998                 let (temporary_channel_id, funding_tx) = receiver
999                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1000                         .expect("FundingGenerationReady not handled within deadline");
1001                 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1002
1003                 // Confirm the funding transaction.
1004                 confirm_transaction(&mut nodes[0], &funding_tx);
1005                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1006                 confirm_transaction(&mut nodes[1], &funding_tx);
1007                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1008                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1009                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1010                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1011                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1012
1013                 assert!(bg_processor.stop().is_ok());
1014
1015                 // Set up a background event handler for SpendableOutputs events.
1016                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1017                 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
1018                 let persister = Arc::new(Persister::new(data_dir));
1019                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1020
1021                 // Force close the channel and check that the SpendableOutputs event was handled.
1022                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1023                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1024                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1025                 let event = receiver
1026                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1027                         .expect("SpendableOutputs not handled within deadline");
1028                 match event {
1029                         Event::SpendableOutputs { .. } => {},
1030                         Event::ChannelClosed { .. } => {},
1031                         _ => panic!("Unexpected event: {:?}", event),
1032                 }
1033
1034                 assert!(bg_processor.stop().is_ok());
1035         }
1036
1037         #[test]
1038         fn test_scorer_persistence() {
1039                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1040                 let data_dir = nodes[0].persister.get_data_dir();
1041                 let persister = Arc::new(Persister::new(data_dir));
1042                 let event_handler = |_: &_| {};
1043                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1044
1045                 loop {
1046                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1047                         let expected_log = "Persisting scorer".to_string();
1048                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1049                                 break
1050                         }
1051                 }
1052
1053                 assert!(bg_processor.stop().is_ok());
1054         }
1055
1056         #[test]
1057         fn test_not_pruning_network_graph_until_graph_sync_completion() {
1058                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1059                 let data_dir = nodes[0].persister.get_data_dir();
1060                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1061                 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1062                 let network_graph = nodes[0].network_graph.clone();
1063                 let features = ChannelFeatures::empty();
1064                 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1065                         .expect("Failed to update channel from partial announcement");
1066                 let original_graph_description = network_graph.to_string();
1067                 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1068                 assert_eq!(network_graph.read_only().channels().len(), 1);
1069
1070                 let event_handler = |_: &_| {};
1071                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1072
1073                 loop {
1074                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1075                         let expected_log_a = "Assessing prunability of network graph".to_string();
1076                         let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
1077                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
1078                                 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
1079                                 break
1080                         }
1081                 }
1082
1083                 let initialization_input = vec![
1084                         76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1085                         79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1086                         0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1087                         187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1088                         157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1089                         88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1090                         204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1091                         181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1092                         110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1093                         76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1094                         226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1095                         0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1096                         0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1097                 ];
1098                 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1099
1100                 // this should have added two channels
1101                 assert_eq!(network_graph.read_only().channels().len(), 3);
1102
1103                 let _ = receiver
1104                         .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1105                         .expect("Network graph not pruned within deadline");
1106
1107                 background_processor.stop().unwrap();
1108
1109                 // all channels should now be pruned
1110                 assert_eq!(network_graph.read_only().channels().len(), 0);
1111         }
1112
1113         #[test]
1114         fn test_invoice_payer() {
1115                 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1116                 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1117                 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1118
1119                 // Initiate the background processors to watch each node.
1120                 let data_dir = nodes[0].persister.get_data_dir();
1121                 let persister = Arc::new(Persister::new(data_dir));
1122                 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
1123                 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
1124                 let event_handler = Arc::clone(&invoice_payer);
1125                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1126                 assert!(bg_processor.stop().is_ok());
1127         }
1128 }