9e2c7203e9e76bb55d65c976b3b5c1477250defb
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
8
9 #![deny(missing_docs)]
10 #![deny(unsafe_code)]
11
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
13
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
16
17 use lightning::chain;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::{Sign, KeysInterface};
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::scoring::WriteableScore;
26 use lightning::util::events::{Event, EventHandler, EventsProvider};
27 use lightning::util::logger::Logger;
28 use lightning::util::persist::Persister;
29 use lightning_rapid_gossip_sync::RapidGossipSync;
30 use std::sync::Arc;
31 use std::sync::atomic::{AtomicBool, Ordering};
32 use std::thread;
33 use std::thread::JoinHandle;
34 use std::time::{Duration, Instant};
35 use std::ops::Deref;
36
37 #[cfg(feature = "futures")]
38 use futures_util::{select_biased, future::FutureExt};
39
40 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
41 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
42 /// responsibilities are:
43 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
44 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
45 ///   writing it to disk/backups by invoking the callback given to it at startup.
46 ///   [`ChannelManager`] persistence should be done in the background.
47 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
48 ///   at the appropriate intervals.
49 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
50 ///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
51 ///
52 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
53 /// upon as doing so may result in high latency.
54 ///
55 /// # Note
56 ///
57 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
58 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
59 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
60 /// unilateral chain closure fees are at risk.
61 ///
62 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
63 /// [`Event`]: lightning::util::events::Event
64 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
65 pub struct BackgroundProcessor {
66         stop_thread: Arc<AtomicBool>,
67         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
68 }
69
70 #[cfg(not(test))]
71 const FRESHNESS_TIMER: u64 = 60;
72 #[cfg(test)]
73 const FRESHNESS_TIMER: u64 = 1;
74
75 #[cfg(all(not(test), not(debug_assertions)))]
76 const PING_TIMER: u64 = 10;
77 /// Signature operations take a lot longer without compiler optimisations.
78 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
79 /// timeout is reached.
80 #[cfg(all(not(test), debug_assertions))]
81 const PING_TIMER: u64 = 30;
82 #[cfg(test)]
83 const PING_TIMER: u64 = 1;
84
85 /// Prune the network graph of stale entries hourly.
86 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
87
88 #[cfg(not(test))]
89 const SCORER_PERSIST_TIMER: u64 = 30;
90 #[cfg(test)]
91 const SCORER_PERSIST_TIMER: u64 = 1;
92
93 #[cfg(not(test))]
94 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
95 #[cfg(test)]
96 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
97
98 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
99 pub enum GossipSync<
100         P: Deref<Target = P2PGossipSync<G, A, L>>,
101         R: Deref<Target = RapidGossipSync<G, L>>,
102         G: Deref<Target = NetworkGraph<L>>,
103         A: Deref,
104         L: Deref,
105 >
106 where A::Target: chain::Access, L::Target: Logger {
107         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
108         P2P(P),
109         /// Rapid gossip sync from a trusted server.
110         Rapid(R),
111         /// No gossip sync.
112         None,
113 }
114
115 impl<
116         P: Deref<Target = P2PGossipSync<G, A, L>>,
117         R: Deref<Target = RapidGossipSync<G, L>>,
118         G: Deref<Target = NetworkGraph<L>>,
119         A: Deref,
120         L: Deref,
121 > GossipSync<P, R, G, A, L>
122 where A::Target: chain::Access, L::Target: Logger {
123         fn network_graph(&self) -> Option<&G> {
124                 match self {
125                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
126                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
127                         GossipSync::None => None,
128                 }
129         }
130
131         fn prunable_network_graph(&self) -> Option<&G> {
132                 match self {
133                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
134                         GossipSync::Rapid(gossip_sync) => {
135                                 if gossip_sync.is_initial_sync_complete() {
136                                         Some(gossip_sync.network_graph())
137                                 } else {
138                                         None
139                                 }
140                         },
141                         GossipSync::None => None,
142                 }
143         }
144 }
145
146 /// (C-not exported) as the bindings concretize everything and have constructors for us
147 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
148         GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
149 where
150         A::Target: chain::Access,
151         L::Target: Logger,
152 {
153         /// Initializes a new [`GossipSync::P2P`] variant.
154         pub fn p2p(gossip_sync: P) -> Self {
155                 GossipSync::P2P(gossip_sync)
156         }
157 }
158
159 /// (C-not exported) as the bindings concretize everything and have constructors for us
160 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
161         GossipSync<
162                 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
163                 R,
164                 G,
165                 &'a (dyn chain::Access + Send + Sync),
166                 L,
167         >
168 where
169         L::Target: Logger,
170 {
171         /// Initializes a new [`GossipSync::Rapid`] variant.
172         pub fn rapid(gossip_sync: R) -> Self {
173                 GossipSync::Rapid(gossip_sync)
174         }
175 }
176
177 /// (C-not exported) as the bindings concretize everything and have constructors for us
178 impl<'a, L: Deref>
179         GossipSync<
180                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
181                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
182                 &'a NetworkGraph<L>,
183                 &'a (dyn chain::Access + Send + Sync),
184                 L,
185         >
186 where
187         L::Target: Logger,
188 {
189         /// Initializes a new [`GossipSync::None`] variant.
190         pub fn none() -> Self {
191                 GossipSync::None
192         }
193 }
194
195 fn handle_network_graph_update<L: Deref>(
196         network_graph: &NetworkGraph<L>, event: &Event
197 ) where L::Target: Logger {
198         if let Event::PaymentPathFailed { ref network_update, .. } = event {
199                 if let Some(network_update) = network_update {
200                         network_graph.handle_network_update(&network_update);
201                 }
202         }
203 }
204
205 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
206 struct DecoratingEventHandler<
207         'a,
208         E: EventHandler,
209         PGS: Deref<Target = P2PGossipSync<G, A, L>>,
210         RGS: Deref<Target = RapidGossipSync<G, L>>,
211         G: Deref<Target = NetworkGraph<L>>,
212         A: Deref,
213         L: Deref,
214 >
215 where A::Target: chain::Access, L::Target: Logger {
216         event_handler: E,
217         gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
218 }
219
220 impl<
221         'a,
222         E: EventHandler,
223         PGS: Deref<Target = P2PGossipSync<G, A, L>>,
224         RGS: Deref<Target = RapidGossipSync<G, L>>,
225         G: Deref<Target = NetworkGraph<L>>,
226         A: Deref,
227         L: Deref,
228 > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
229 where A::Target: chain::Access, L::Target: Logger {
230         fn handle_event(&self, event: Event) {
231                 if let Some(network_graph) = self.gossip_sync.network_graph() {
232                         handle_network_graph_update(network_graph, &event)
233                 }
234                 self.event_handler.handle_event(event);
235         }
236 }
237
238 macro_rules! define_run_body {
239         ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
240          $channel_manager: ident, $process_channel_manager_events: expr,
241          $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
242          $loop_exit_check: expr, $await: expr)
243         => { {
244                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
245                 $channel_manager.timer_tick_occurred();
246
247                 let mut last_freshness_call = Instant::now();
248                 let mut last_ping_call = Instant::now();
249                 let mut last_prune_call = Instant::now();
250                 let mut last_scorer_persist_call = Instant::now();
251                 let mut have_pruned = false;
252
253                 loop {
254                         $process_channel_manager_events;
255                         $process_chain_monitor_events;
256
257                         // Note that the PeerManager::process_events may block on ChannelManager's locks,
258                         // hence it comes last here. When the ChannelManager finishes whatever it's doing,
259                         // we want to ensure we get into `persist_manager` as quickly as we can, especially
260                         // without running the normal event processing above and handing events to users.
261                         //
262                         // Specifically, on an *extremely* slow machine, we may see ChannelManager start
263                         // processing a message effectively at any point during this loop. In order to
264                         // minimize the time between such processing completing and persisting the updated
265                         // ChannelManager, we want to minimize methods blocking on a ChannelManager
266                         // generally, and as a fallback place such blocking only immediately before
267                         // persistence.
268                         $peer_manager.process_events();
269
270                         // We wait up to 100ms, but track how long it takes to detect being put to sleep,
271                         // see `await_start`'s use below.
272                         let await_start = Instant::now();
273                         let updates_available = $await;
274                         let await_time = await_start.elapsed();
275
276                         if updates_available {
277                                 log_trace!($logger, "Persisting ChannelManager...");
278                                 $persister.persist_manager(&*$channel_manager)?;
279                                 log_trace!($logger, "Done persisting ChannelManager.");
280                         }
281                         // Exit the loop if the background processor was requested to stop.
282                         if $loop_exit_check {
283                                 log_trace!($logger, "Terminating background processor.");
284                                 break;
285                         }
286                         if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
287                                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
288                                 $channel_manager.timer_tick_occurred();
289                                 last_freshness_call = Instant::now();
290                         }
291                         if await_time > Duration::from_secs(1) {
292                                 // On various platforms, we may be starved of CPU cycles for several reasons.
293                                 // E.g. on iOS, if we've been in the background, we will be entirely paused.
294                                 // Similarly, if we're on a desktop platform and the device has been asleep, we
295                                 // may not get any cycles.
296                                 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
297                                 // full second, at which point we assume sockets may have been killed (they
298                                 // appear to be at least on some platforms, even if it has only been a second).
299                                 // Note that we have to take care to not get here just because user event
300                                 // processing was slow at the top of the loop. For example, the sample client
301                                 // may call Bitcoin Core RPCs during event handling, which very often takes
302                                 // more than a handful of seconds to complete, and shouldn't disconnect all our
303                                 // peers.
304                                 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
305                                 $peer_manager.disconnect_all_peers();
306                                 last_ping_call = Instant::now();
307                         } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
308                                 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
309                                 $peer_manager.timer_tick_occurred();
310                                 last_ping_call = Instant::now();
311                         }
312
313                         // Note that we want to run a graph prune once not long after startup before
314                         // falling back to our usual hourly prunes. This avoids short-lived clients never
315                         // pruning their network graph. We run once 60 seconds after startup before
316                         // continuing our normal cadence.
317                         if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
318                                 // The network graph must not be pruned while rapid sync completion is pending
319                                 log_trace!($logger, "Assessing prunability of network graph");
320                                 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
321                                         network_graph.remove_stale_channels_and_tracking();
322
323                                         if let Err(e) = $persister.persist_graph(network_graph) {
324                                                 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
325                                         }
326
327                                         last_prune_call = Instant::now();
328                                         have_pruned = true;
329                                 } else {
330                                         log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
331                                 }
332                         }
333
334                         if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
335                                 if let Some(ref scorer) = $scorer {
336                                         log_trace!($logger, "Persisting scorer");
337                                         if let Err(e) = $persister.persist_scorer(&scorer) {
338                                                 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
339                                         }
340                                 }
341                                 last_scorer_persist_call = Instant::now();
342                         }
343                 }
344
345                 // After we exit, ensure we persist the ChannelManager one final time - this avoids
346                 // some races where users quit while channel updates were in-flight, with
347                 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
348                 $persister.persist_manager(&*$channel_manager)?;
349
350                 // Persist Scorer on exit
351                 if let Some(ref scorer) = $scorer {
352                         $persister.persist_scorer(&scorer)?;
353                 }
354
355                 // Persist NetworkGraph on exit
356                 if let Some(network_graph) = $gossip_sync.network_graph() {
357                         $persister.persist_graph(network_graph)?;
358                 }
359
360                 Ok(())
361         } }
362 }
363
364 /// Processes background events in a future.
365 ///
366 /// `sleeper` should return a future which completes in the given amount of time and returns a
367 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
368 /// future which outputs true, the loop will exit and this function's future will complete.
369 ///
370 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
371 #[cfg(feature = "futures")]
372 pub async fn process_events_async<
373         'a,
374         Signer: 'static + Sign,
375         CA: 'static + Deref + Send + Sync,
376         CF: 'static + Deref + Send + Sync,
377         CW: 'static + Deref + Send + Sync,
378         T: 'static + Deref + Send + Sync,
379         K: 'static + Deref + Send + Sync,
380         F: 'static + Deref + Send + Sync,
381         G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
382         L: 'static + Deref + Send + Sync,
383         P: 'static + Deref + Send + Sync,
384         Descriptor: 'static + SocketDescriptor + Send + Sync,
385         CMH: 'static + Deref + Send + Sync,
386         RMH: 'static + Deref + Send + Sync,
387         OMH: 'static + Deref + Send + Sync,
388         EventHandlerFuture: core::future::Future<Output = ()>,
389         EventHandler: Fn(Event) -> EventHandlerFuture,
390         PS: 'static + Deref + Send,
391         M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
392         CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
393         PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
394         RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
395         UMH: 'static + Deref + Send + Sync,
396         PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
397         S: 'static + Deref<Target = SC> + Send + Sync,
398         SC: WriteableScore<'a>,
399         SleepFuture: core::future::Future<Output = bool>,
400         Sleeper: Fn(Duration) -> SleepFuture
401 >(
402         persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
403         gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
404         sleeper: Sleeper,
405 ) -> Result<(), std::io::Error>
406 where
407         CA::Target: 'static + chain::Access,
408         CF::Target: 'static + chain::Filter,
409         CW::Target: 'static + chain::Watch<Signer>,
410         T::Target: 'static + BroadcasterInterface,
411         K::Target: 'static + KeysInterface<Signer = Signer>,
412         F::Target: 'static + FeeEstimator,
413         L::Target: 'static + Logger,
414         P::Target: 'static + Persist<Signer>,
415         CMH::Target: 'static + ChannelMessageHandler,
416         OMH::Target: 'static + OnionMessageHandler,
417         RMH::Target: 'static + RoutingMessageHandler,
418         UMH::Target: 'static + CustomMessageHandler,
419         PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
420 {
421         let mut should_break = true;
422         let async_event_handler = |event| {
423                 let network_graph = gossip_sync.network_graph();
424                 let event_handler = &event_handler;
425                 async move {
426                         if let Some(network_graph) = network_graph {
427                                 handle_network_graph_update(network_graph, &event)
428                         }
429                         event_handler(event).await;
430                 }
431         };
432         define_run_body!(persister,
433                 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
434                 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
435                 gossip_sync, peer_manager, logger, scorer, should_break, {
436                         select_biased! {
437                                 _ = channel_manager.get_persistable_update_future().fuse() => true,
438                                 exit = sleeper(Duration::from_millis(100)).fuse() => {
439                                         should_break = exit;
440                                         false
441                                 }
442                         }
443                 })
444 }
445
446 impl BackgroundProcessor {
447         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
448         /// documentation].
449         ///
450         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
451         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
452         /// either [`join`] or [`stop`].
453         ///
454         /// # Data Persistence
455         ///
456         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
457         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
458         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
459         /// provided implementation.
460         ///
461         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
462         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
463         /// See the `lightning-persister` crate for LDK's provided implementation.
464         ///
465         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
466         /// error or call [`join`] and handle any error that may arise. For the latter case,
467         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
468         ///
469         /// # Event Handling
470         ///
471         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
472         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
473         /// functionality implemented by other handlers.
474         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
475         ///
476         /// # Rapid Gossip Sync
477         ///
478         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
479         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
480         /// until the [`RapidGossipSync`] instance completes its first sync.
481         ///
482         /// [top-level documentation]: BackgroundProcessor
483         /// [`join`]: Self::join
484         /// [`stop`]: Self::stop
485         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
486         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
487         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
488         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
489         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
490         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
491         pub fn start<
492                 'a,
493                 Signer: 'static + Sign,
494                 CA: 'static + Deref + Send + Sync,
495                 CF: 'static + Deref + Send + Sync,
496                 CW: 'static + Deref + Send + Sync,
497                 T: 'static + Deref + Send + Sync,
498                 K: 'static + Deref + Send + Sync,
499                 F: 'static + Deref + Send + Sync,
500                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
501                 L: 'static + Deref + Send + Sync,
502                 P: 'static + Deref + Send + Sync,
503                 Descriptor: 'static + SocketDescriptor + Send + Sync,
504                 CMH: 'static + Deref + Send + Sync,
505                 OMH: 'static + Deref + Send + Sync,
506                 RMH: 'static + Deref + Send + Sync,
507                 EH: 'static + EventHandler + Send,
508                 PS: 'static + Deref + Send,
509                 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
510                 CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
511                 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
512                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
513                 UMH: 'static + Deref + Send + Sync,
514                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
515                 S: 'static + Deref<Target = SC> + Send + Sync,
516                 SC: WriteableScore<'a>,
517         >(
518                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
519                 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
520         ) -> Self
521         where
522                 CA::Target: 'static + chain::Access,
523                 CF::Target: 'static + chain::Filter,
524                 CW::Target: 'static + chain::Watch<Signer>,
525                 T::Target: 'static + BroadcasterInterface,
526                 K::Target: 'static + KeysInterface<Signer = Signer>,
527                 F::Target: 'static + FeeEstimator,
528                 L::Target: 'static + Logger,
529                 P::Target: 'static + Persist<Signer>,
530                 CMH::Target: 'static + ChannelMessageHandler,
531                 OMH::Target: 'static + OnionMessageHandler,
532                 RMH::Target: 'static + RoutingMessageHandler,
533                 UMH::Target: 'static + CustomMessageHandler,
534                 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
535         {
536                 let stop_thread = Arc::new(AtomicBool::new(false));
537                 let stop_thread_clone = stop_thread.clone();
538                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
539                         let event_handler = DecoratingEventHandler {
540                                 event_handler,
541                                 gossip_sync: &gossip_sync,
542                         };
543                         define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
544                                 channel_manager, channel_manager.process_pending_events(&event_handler),
545                                 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
546                                 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
547                 });
548                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
549         }
550
551         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
552         /// [`ChannelManager`].
553         ///
554         /// # Panics
555         ///
556         /// This function panics if the background thread has panicked such as while persisting or
557         /// handling events.
558         ///
559         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
560         pub fn join(mut self) -> Result<(), std::io::Error> {
561                 assert!(self.thread_handle.is_some());
562                 self.join_thread()
563         }
564
565         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
566         /// [`ChannelManager`].
567         ///
568         /// # Panics
569         ///
570         /// This function panics if the background thread has panicked such as while persisting or
571         /// handling events.
572         ///
573         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
574         pub fn stop(mut self) -> Result<(), std::io::Error> {
575                 assert!(self.thread_handle.is_some());
576                 self.stop_and_join_thread()
577         }
578
579         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
580                 self.stop_thread.store(true, Ordering::Release);
581                 self.join_thread()
582         }
583
584         fn join_thread(&mut self) -> Result<(), std::io::Error> {
585                 match self.thread_handle.take() {
586                         Some(handle) => handle.join().unwrap(),
587                         None => Ok(()),
588                 }
589         }
590 }
591
592 impl Drop for BackgroundProcessor {
593         fn drop(&mut self) {
594                 self.stop_and_join_thread().unwrap();
595         }
596 }
597
598 #[cfg(test)]
599 mod tests {
600         use bitcoin::blockdata::block::BlockHeader;
601         use bitcoin::blockdata::constants::genesis_block;
602         use bitcoin::blockdata::locktime::PackedLockTime;
603         use bitcoin::blockdata::transaction::{Transaction, TxOut};
604         use bitcoin::network::constants::Network;
605         use lightning::chain::{BestBlock, Confirm, chainmonitor};
606         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
607         use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
608         use lightning::chain::transaction::OutPoint;
609         use lightning::get_event_msg;
610         use lightning::ln::channelmanager::{self, BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
611         use lightning::ln::features::ChannelFeatures;
612         use lightning::ln::msgs::{ChannelMessageHandler, Init};
613         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
614         use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
615         use lightning::util::config::UserConfig;
616         use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
617         use lightning::util::ser::Writeable;
618         use lightning::util::test_utils;
619         use lightning::util::persist::KVStorePersister;
620         use lightning_invoice::payment::{InvoicePayer, Retry};
621         use lightning_invoice::utils::DefaultRouter;
622         use lightning_persister::FilesystemPersister;
623         use std::fs;
624         use std::path::PathBuf;
625         use std::sync::{Arc, Mutex};
626         use std::sync::mpsc::SyncSender;
627         use std::time::Duration;
628         use bitcoin::hashes::Hash;
629         use bitcoin::TxMerkleNode;
630         use lightning::routing::scoring::{FixedPenaltyScorer};
631         use lightning_rapid_gossip_sync::RapidGossipSync;
632         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
633
634         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
635
636         #[derive(Clone, Hash, PartialEq, Eq)]
637         struct TestDescriptor{}
638         impl SocketDescriptor for TestDescriptor {
639                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
640                         0
641                 }
642
643                 fn disconnect_socket(&mut self) {}
644         }
645
646         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
647
648         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
649         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
650
651         struct Node {
652                 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
653                 p2p_gossip_sync: PGS,
654                 rapid_gossip_sync: RGS,
655                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
656                 chain_monitor: Arc<ChainMonitor>,
657                 persister: Arc<FilesystemPersister>,
658                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
659                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
660                 logger: Arc<test_utils::TestLogger>,
661                 best_block: BestBlock,
662                 scorer: Arc<Mutex<FixedPenaltyScorer>>,
663         }
664
665         impl Node {
666                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
667                         GossipSync::P2P(self.p2p_gossip_sync.clone())
668                 }
669
670                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
671                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
672                 }
673
674                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
675                         GossipSync::None
676                 }
677         }
678
679         impl Drop for Node {
680                 fn drop(&mut self) {
681                         let data_dir = self.persister.get_data_dir();
682                         match fs::remove_dir_all(data_dir.clone()) {
683                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
684                                 _ => {}
685                         }
686                 }
687         }
688
689         struct Persister {
690                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
691                 graph_persistence_notifier: Option<SyncSender<()>>,
692                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
693                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
694                 filesystem_persister: FilesystemPersister,
695         }
696
697         impl Persister {
698                 fn new(data_dir: String) -> Self {
699                         let filesystem_persister = FilesystemPersister::new(data_dir.clone());
700                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
701                 }
702
703                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
704                         Self { graph_error: Some((error, message)), ..self }
705                 }
706
707                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
708                         Self { graph_persistence_notifier: Some(sender), ..self }
709                 }
710
711                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
712                         Self { manager_error: Some((error, message)), ..self }
713                 }
714
715                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
716                         Self { scorer_error: Some((error, message)), ..self }
717                 }
718         }
719
720         impl KVStorePersister for Persister {
721                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
722                         if key == "manager" {
723                                 if let Some((error, message)) = self.manager_error {
724                                         return Err(std::io::Error::new(error, message))
725                                 }
726                         }
727
728                         if key == "network_graph" {
729                                 if let Some(sender) = &self.graph_persistence_notifier {
730                                         sender.send(()).unwrap();
731                                 };
732
733                                 if let Some((error, message)) = self.graph_error {
734                                         return Err(std::io::Error::new(error, message))
735                                 }
736                         }
737
738                         if key == "scorer" {
739                                 if let Some((error, message)) = self.scorer_error {
740                                         return Err(std::io::Error::new(error, message))
741                                 }
742                         }
743
744                         self.filesystem_persister.persist(key, object)
745                 }
746         }
747
748         fn get_full_filepath(filepath: String, filename: String) -> String {
749                 let mut path = PathBuf::from(filepath);
750                 path.push(filename);
751                 path.to_str().unwrap().to_string()
752         }
753
754         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
755                 let mut nodes = Vec::new();
756                 for i in 0..num_nodes {
757                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
758                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
759                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
760                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
761                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
762                         let seed = [i as u8; 32];
763                         let network = Network::Testnet;
764                         let genesis_block = genesis_block(network);
765                         let now = Duration::from_secs(genesis_block.header.time as u64);
766                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
767                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
768                         let best_block = BestBlock::from_genesis(network);
769                         let params = ChainParameters { network, best_block };
770                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
771                         let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
772                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
773                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
774                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
775                         let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{}));
776                         let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
777                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
778                         nodes.push(node);
779                 }
780
781                 for i in 0..num_nodes {
782                         for j in (i+1)..num_nodes {
783                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
784                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
785                         }
786                 }
787
788                 nodes
789         }
790
791         macro_rules! open_channel {
792                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
793                         begin_open_channel!($node_a, $node_b, $channel_value);
794                         let events = $node_a.node.get_and_clear_pending_events();
795                         assert_eq!(events.len(), 1);
796                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
797                         end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
798                         tx
799                 }}
800         }
801
802         macro_rules! begin_open_channel {
803                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
804                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
805                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
806                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
807                 }}
808         }
809
810         macro_rules! handle_funding_generation_ready {
811                 ($event: expr, $channel_value: expr) => {{
812                         match $event {
813                                 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
814                                         assert_eq!(channel_value_satoshis, $channel_value);
815                                         assert_eq!(user_channel_id, 42);
816
817                                         let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
818                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
819                                         }]};
820                                         (temporary_channel_id, tx)
821                                 },
822                                 _ => panic!("Unexpected event"),
823                         }
824                 }}
825         }
826
827         macro_rules! end_open_channel {
828                 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
829                         $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
830                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
831                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
832                 }}
833         }
834
835         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
836                 for i in 1..=depth {
837                         let prev_blockhash = node.best_block.block_hash();
838                         let height = node.best_block.height() + 1;
839                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
840                         let txdata = vec![(0, tx)];
841                         node.best_block = BestBlock::new(header.block_hash(), height);
842                         match i {
843                                 1 => {
844                                         node.node.transactions_confirmed(&header, &txdata, height);
845                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
846                                 },
847                                 x if x == depth => {
848                                         node.node.best_block_updated(&header, height);
849                                         node.chain_monitor.best_block_updated(&header, height);
850                                 },
851                                 _ => {},
852                         }
853                 }
854         }
855         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
856                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
857         }
858
859         #[test]
860         fn test_background_processor() {
861                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
862                 // updates. Also test that when new updates are available, the manager signals that it needs
863                 // re-persistence and is successfully re-persisted.
864                 let nodes = create_nodes(2, "test_background_processor".to_string());
865
866                 // Go through the channel creation process so that each node has something to persist. Since
867                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
868                 // avoid a race with processing events.
869                 let tx = open_channel!(nodes[0], nodes[1], 100000);
870
871                 // Initiate the background processors to watch each node.
872                 let data_dir = nodes[0].persister.get_data_dir();
873                 let persister = Arc::new(Persister::new(data_dir));
874                 let event_handler = |_: _| {};
875                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
876
877                 macro_rules! check_persisted_data {
878                         ($node: expr, $filepath: expr) => {
879                                 let mut expected_bytes = Vec::new();
880                                 loop {
881                                         expected_bytes.clear();
882                                         match $node.write(&mut expected_bytes) {
883                                                 Ok(()) => {
884                                                         match std::fs::read($filepath) {
885                                                                 Ok(bytes) => {
886                                                                         if bytes == expected_bytes {
887                                                                                 break
888                                                                         } else {
889                                                                                 continue
890                                                                         }
891                                                                 },
892                                                                 Err(_) => continue
893                                                         }
894                                                 },
895                                                 Err(e) => panic!("Unexpected error: {}", e)
896                                         }
897                                 }
898                         }
899                 }
900
901                 // Check that the initial channel manager data is persisted as expected.
902                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
903                 check_persisted_data!(nodes[0].node, filepath.clone());
904
905                 loop {
906                         if !nodes[0].node.get_persistence_condvar_value() { break }
907                 }
908
909                 // Force-close the channel.
910                 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
911
912                 // Check that the force-close updates are persisted.
913                 check_persisted_data!(nodes[0].node, filepath.clone());
914                 loop {
915                         if !nodes[0].node.get_persistence_condvar_value() { break }
916                 }
917
918                 // Check network graph is persisted
919                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
920                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
921
922                 // Check scorer is persisted
923                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
924                 check_persisted_data!(nodes[0].scorer, filepath.clone());
925
926                 assert!(bg_processor.stop().is_ok());
927         }
928
929         #[test]
930         fn test_timer_tick_called() {
931                 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
932                 // `FRESHNESS_TIMER`.
933                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
934                 let data_dir = nodes[0].persister.get_data_dir();
935                 let persister = Arc::new(Persister::new(data_dir));
936                 let event_handler = |_: _| {};
937                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
938                 loop {
939                         let log_entries = nodes[0].logger.lines.lock().unwrap();
940                         let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
941                         let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
942                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
943                                         log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
944                                 break
945                         }
946                 }
947
948                 assert!(bg_processor.stop().is_ok());
949         }
950
951         #[test]
952         fn test_channel_manager_persist_error() {
953                 // Test that if we encounter an error during manager persistence, the thread panics.
954                 let nodes = create_nodes(2, "test_persist_error".to_string());
955                 open_channel!(nodes[0], nodes[1], 100000);
956
957                 let data_dir = nodes[0].persister.get_data_dir();
958                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
959                 let event_handler = |_: _| {};
960                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
961                 match bg_processor.join() {
962                         Ok(_) => panic!("Expected error persisting manager"),
963                         Err(e) => {
964                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
965                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
966                         },
967                 }
968         }
969
970         #[test]
971         fn test_network_graph_persist_error() {
972                 // Test that if we encounter an error during network graph persistence, an error gets returned.
973                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
974                 let data_dir = nodes[0].persister.get_data_dir();
975                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
976                 let event_handler = |_: _| {};
977                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
978
979                 match bg_processor.stop() {
980                         Ok(_) => panic!("Expected error persisting network graph"),
981                         Err(e) => {
982                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
983                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
984                         },
985                 }
986         }
987
988         #[test]
989         fn test_scorer_persist_error() {
990                 // Test that if we encounter an error during scorer persistence, an error gets returned.
991                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
992                 let data_dir = nodes[0].persister.get_data_dir();
993                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
994                 let event_handler = |_: _| {};
995                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
996
997                 match bg_processor.stop() {
998                         Ok(_) => panic!("Expected error persisting scorer"),
999                         Err(e) => {
1000                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1001                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1002                         },
1003                 }
1004         }
1005
1006         #[test]
1007         fn test_background_event_handling() {
1008                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
1009                 let channel_value = 100000;
1010                 let data_dir = nodes[0].persister.get_data_dir();
1011                 let persister = Arc::new(Persister::new(data_dir.clone()));
1012
1013                 // Set up a background event handler for FundingGenerationReady events.
1014                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1015                 let event_handler = move |event: Event| match event {
1016                         Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1017                         Event::ChannelReady { .. } => {},
1018                         _ => panic!("Unexpected event: {:?}", event),
1019                 };
1020
1021                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1022
1023                 // Open a channel and check that the FundingGenerationReady event was handled.
1024                 begin_open_channel!(nodes[0], nodes[1], channel_value);
1025                 let (temporary_channel_id, funding_tx) = receiver
1026                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1027                         .expect("FundingGenerationReady not handled within deadline");
1028                 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1029
1030                 // Confirm the funding transaction.
1031                 confirm_transaction(&mut nodes[0], &funding_tx);
1032                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1033                 confirm_transaction(&mut nodes[1], &funding_tx);
1034                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1035                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1036                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1037                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1038                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1039
1040                 assert!(bg_processor.stop().is_ok());
1041
1042                 // Set up a background event handler for SpendableOutputs events.
1043                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1044                 let event_handler = move |event: Event| match event {
1045                         Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
1046                         Event::ChannelReady { .. } => {},
1047                         Event::ChannelClosed { .. } => {},
1048                         _ => panic!("Unexpected event: {:?}", event),
1049                 };
1050                 let persister = Arc::new(Persister::new(data_dir));
1051                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1052
1053                 // Force close the channel and check that the SpendableOutputs event was handled.
1054                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1055                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1056                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1057
1058                 let event = receiver
1059                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1060                         .expect("Events not handled within deadline");
1061                 match event {
1062                         Event::SpendableOutputs { .. } => {},
1063                         _ => panic!("Unexpected event: {:?}", event),
1064                 }
1065
1066                 assert!(bg_processor.stop().is_ok());
1067         }
1068
1069         #[test]
1070         fn test_scorer_persistence() {
1071                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1072                 let data_dir = nodes[0].persister.get_data_dir();
1073                 let persister = Arc::new(Persister::new(data_dir));
1074                 let event_handler = |_: _| {};
1075                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1076
1077                 loop {
1078                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1079                         let expected_log = "Persisting scorer".to_string();
1080                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1081                                 break
1082                         }
1083                 }
1084
1085                 assert!(bg_processor.stop().is_ok());
1086         }
1087
1088         #[test]
1089         fn test_not_pruning_network_graph_until_graph_sync_completion() {
1090                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1091                 let data_dir = nodes[0].persister.get_data_dir();
1092                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1093                 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1094                 let network_graph = nodes[0].network_graph.clone();
1095                 let features = ChannelFeatures::empty();
1096                 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1097                         .expect("Failed to update channel from partial announcement");
1098                 let original_graph_description = network_graph.to_string();
1099                 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1100                 assert_eq!(network_graph.read_only().channels().len(), 1);
1101
1102                 let event_handler = |_: _| {};
1103                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1104
1105                 loop {
1106                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1107                         let expected_log_a = "Assessing prunability of network graph".to_string();
1108                         let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
1109                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
1110                                 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
1111                                 break
1112                         }
1113                 }
1114
1115                 let initialization_input = vec![
1116                         76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1117                         79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1118                         0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1119                         187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1120                         157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1121                         88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1122                         204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1123                         181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1124                         110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1125                         76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1126                         226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1127                         0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1128                         0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1129                 ];
1130                 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1131
1132                 // this should have added two channels
1133                 assert_eq!(network_graph.read_only().channels().len(), 3);
1134
1135                 let _ = receiver
1136                         .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1137                         .expect("Network graph not pruned within deadline");
1138
1139                 background_processor.stop().unwrap();
1140
1141                 // all channels should now be pruned
1142                 assert_eq!(network_graph.read_only().channels().len(), 0);
1143         }
1144
1145         #[test]
1146         fn test_invoice_payer() {
1147                 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1148                 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1149                 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1150
1151                 // Initiate the background processors to watch each node.
1152                 let data_dir = nodes[0].persister.get_data_dir();
1153                 let persister = Arc::new(Persister::new(data_dir));
1154                 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
1155                 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: _| {}, Retry::Attempts(2)));
1156                 let event_handler = Arc::clone(&invoice_payer);
1157                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1158                 assert!(bg_processor.stop().is_ok());
1159         }
1160 }