938deee9bd7a0bc61f47387b18e77fa5a5b7c42b
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
8
9 #![deny(missing_docs)]
10 #![deny(unsafe_code)]
11
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
13
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
16
17 use lightning::chain;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::{Sign, KeysInterface};
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::scoring::WriteableScore;
26 use lightning::util::events::{Event, EventHandler, EventsProvider};
27 use lightning::util::logger::Logger;
28 use lightning::util::persist::Persister;
29 use lightning_rapid_gossip_sync::RapidGossipSync;
30 use std::sync::Arc;
31 use std::sync::atomic::{AtomicBool, Ordering};
32 use std::thread;
33 use std::thread::JoinHandle;
34 use std::time::{Duration, Instant};
35 use std::ops::Deref;
36
37 #[cfg(feature = "futures")]
38 use futures::{select, future::FutureExt};
39
40 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
41 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
42 /// responsibilities are:
43 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
44 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
45 ///   writing it to disk/backups by invoking the callback given to it at startup.
46 ///   [`ChannelManager`] persistence should be done in the background.
47 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
48 ///   at the appropriate intervals.
49 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
50 ///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
51 ///
52 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
53 /// upon as doing so may result in high latency.
54 ///
55 /// # Note
56 ///
57 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
58 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
59 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
60 /// unilateral chain closure fees are at risk.
61 ///
62 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
63 /// [`Event`]: lightning::util::events::Event
64 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
65 pub struct BackgroundProcessor {
66         stop_thread: Arc<AtomicBool>,
67         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
68 }
69
70 #[cfg(not(test))]
71 const FRESHNESS_TIMER: u64 = 60;
72 #[cfg(test)]
73 const FRESHNESS_TIMER: u64 = 1;
74
75 #[cfg(all(not(test), not(debug_assertions)))]
76 const PING_TIMER: u64 = 10;
77 /// Signature operations take a lot longer without compiler optimisations.
78 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
79 /// timeout is reached.
80 #[cfg(all(not(test), debug_assertions))]
81 const PING_TIMER: u64 = 30;
82 #[cfg(test)]
83 const PING_TIMER: u64 = 1;
84
85 /// Prune the network graph of stale entries hourly.
86 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
87
88 #[cfg(not(test))]
89 const SCORER_PERSIST_TIMER: u64 = 30;
90 #[cfg(test)]
91 const SCORER_PERSIST_TIMER: u64 = 1;
92
93 #[cfg(not(test))]
94 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
95 #[cfg(test)]
96 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
97
98 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
99 pub enum GossipSync<
100         P: Deref<Target = P2PGossipSync<G, A, L>>,
101         R: Deref<Target = RapidGossipSync<G, L>>,
102         G: Deref<Target = NetworkGraph<L>>,
103         A: Deref,
104         L: Deref,
105 >
106 where A::Target: chain::Access, L::Target: Logger {
107         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
108         P2P(P),
109         /// Rapid gossip sync from a trusted server.
110         Rapid(R),
111         /// No gossip sync.
112         None,
113 }
114
115 impl<
116         P: Deref<Target = P2PGossipSync<G, A, L>>,
117         R: Deref<Target = RapidGossipSync<G, L>>,
118         G: Deref<Target = NetworkGraph<L>>,
119         A: Deref,
120         L: Deref,
121 > GossipSync<P, R, G, A, L>
122 where A::Target: chain::Access, L::Target: Logger {
123         fn network_graph(&self) -> Option<&G> {
124                 match self {
125                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
126                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
127                         GossipSync::None => None,
128                 }
129         }
130
131         fn prunable_network_graph(&self) -> Option<&G> {
132                 match self {
133                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
134                         GossipSync::Rapid(gossip_sync) => {
135                                 if gossip_sync.is_initial_sync_complete() {
136                                         Some(gossip_sync.network_graph())
137                                 } else {
138                                         None
139                                 }
140                         },
141                         GossipSync::None => None,
142                 }
143         }
144 }
145
146 /// (C-not exported) as the bindings concretize everything and have constructors for us
147 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
148         GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
149 where
150         A::Target: chain::Access,
151         L::Target: Logger,
152 {
153         /// Initializes a new [`GossipSync::P2P`] variant.
154         pub fn p2p(gossip_sync: P) -> Self {
155                 GossipSync::P2P(gossip_sync)
156         }
157 }
158
159 /// (C-not exported) as the bindings concretize everything and have constructors for us
160 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
161         GossipSync<
162                 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
163                 R,
164                 G,
165                 &'a (dyn chain::Access + Send + Sync),
166                 L,
167         >
168 where
169         L::Target: Logger,
170 {
171         /// Initializes a new [`GossipSync::Rapid`] variant.
172         pub fn rapid(gossip_sync: R) -> Self {
173                 GossipSync::Rapid(gossip_sync)
174         }
175 }
176
177 /// (C-not exported) as the bindings concretize everything and have constructors for us
178 impl<'a, L: Deref>
179         GossipSync<
180                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
181                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
182                 &'a NetworkGraph<L>,
183                 &'a (dyn chain::Access + Send + Sync),
184                 L,
185         >
186 where
187         L::Target: Logger,
188 {
189         /// Initializes a new [`GossipSync::None`] variant.
190         pub fn none() -> Self {
191                 GossipSync::None
192         }
193 }
194
195 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
196 struct DecoratingEventHandler<
197         'a,
198         E: EventHandler,
199         PGS: Deref<Target = P2PGossipSync<G, A, L>>,
200         RGS: Deref<Target = RapidGossipSync<G, L>>,
201         G: Deref<Target = NetworkGraph<L>>,
202         A: Deref,
203         L: Deref,
204 >
205 where A::Target: chain::Access, L::Target: Logger {
206         event_handler: E,
207         gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
208 }
209
210 impl<
211         'a,
212         E: EventHandler,
213         PGS: Deref<Target = P2PGossipSync<G, A, L>>,
214         RGS: Deref<Target = RapidGossipSync<G, L>>,
215         G: Deref<Target = NetworkGraph<L>>,
216         A: Deref,
217         L: Deref,
218 > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
219 where A::Target: chain::Access, L::Target: Logger {
220         fn handle_event(&self, event: &Event) {
221                 if let Some(network_graph) = self.gossip_sync.network_graph() {
222                         network_graph.handle_event(event);
223                 }
224                 self.event_handler.handle_event(event);
225         }
226 }
227
228 macro_rules! define_run_body {
229         ($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
230          $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
231          $loop_exit_check: expr, $await: expr)
232         => { {
233                 let event_handler = DecoratingEventHandler {
234                         event_handler: $event_handler,
235                         gossip_sync: &$gossip_sync,
236                 };
237
238                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
239                 $channel_manager.timer_tick_occurred();
240
241                 let mut last_freshness_call = Instant::now();
242                 let mut last_ping_call = Instant::now();
243                 let mut last_prune_call = Instant::now();
244                 let mut last_scorer_persist_call = Instant::now();
245                 let mut have_pruned = false;
246
247                 loop {
248                         $channel_manager.process_pending_events(&event_handler);
249                         $chain_monitor.process_pending_events(&event_handler);
250
251                         // Note that the PeerManager::process_events may block on ChannelManager's locks,
252                         // hence it comes last here. When the ChannelManager finishes whatever it's doing,
253                         // we want to ensure we get into `persist_manager` as quickly as we can, especially
254                         // without running the normal event processing above and handing events to users.
255                         //
256                         // Specifically, on an *extremely* slow machine, we may see ChannelManager start
257                         // processing a message effectively at any point during this loop. In order to
258                         // minimize the time between such processing completing and persisting the updated
259                         // ChannelManager, we want to minimize methods blocking on a ChannelManager
260                         // generally, and as a fallback place such blocking only immediately before
261                         // persistence.
262                         $peer_manager.process_events();
263
264                         // We wait up to 100ms, but track how long it takes to detect being put to sleep,
265                         // see `await_start`'s use below.
266                         let await_start = Instant::now();
267                         let updates_available = $await;
268                         let await_time = await_start.elapsed();
269
270                         if updates_available {
271                                 log_trace!($logger, "Persisting ChannelManager...");
272                                 $persister.persist_manager(&*$channel_manager)?;
273                                 log_trace!($logger, "Done persisting ChannelManager.");
274                         }
275                         // Exit the loop if the background processor was requested to stop.
276                         if $loop_exit_check {
277                                 log_trace!($logger, "Terminating background processor.");
278                                 break;
279                         }
280                         if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
281                                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
282                                 $channel_manager.timer_tick_occurred();
283                                 last_freshness_call = Instant::now();
284                         }
285                         if await_time > Duration::from_secs(1) {
286                                 // On various platforms, we may be starved of CPU cycles for several reasons.
287                                 // E.g. on iOS, if we've been in the background, we will be entirely paused.
288                                 // Similarly, if we're on a desktop platform and the device has been asleep, we
289                                 // may not get any cycles.
290                                 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
291                                 // full second, at which point we assume sockets may have been killed (they
292                                 // appear to be at least on some platforms, even if it has only been a second).
293                                 // Note that we have to take care to not get here just because user event
294                                 // processing was slow at the top of the loop. For example, the sample client
295                                 // may call Bitcoin Core RPCs during event handling, which very often takes
296                                 // more than a handful of seconds to complete, and shouldn't disconnect all our
297                                 // peers.
298                                 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
299                                 $peer_manager.disconnect_all_peers();
300                                 last_ping_call = Instant::now();
301                         } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
302                                 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
303                                 $peer_manager.timer_tick_occurred();
304                                 last_ping_call = Instant::now();
305                         }
306
307                         // Note that we want to run a graph prune once not long after startup before
308                         // falling back to our usual hourly prunes. This avoids short-lived clients never
309                         // pruning their network graph. We run once 60 seconds after startup before
310                         // continuing our normal cadence.
311                         if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
312                                 // The network graph must not be pruned while rapid sync completion is pending
313                                 log_trace!($logger, "Assessing prunability of network graph");
314                                 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
315                                         network_graph.remove_stale_channels_and_tracking();
316
317                                         if let Err(e) = $persister.persist_graph(network_graph) {
318                                                 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
319                                         }
320
321                                         last_prune_call = Instant::now();
322                                         have_pruned = true;
323                                 } else {
324                                         log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
325                                 }
326                         }
327
328                         if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
329                                 if let Some(ref scorer) = $scorer {
330                                         log_trace!($logger, "Persisting scorer");
331                                         if let Err(e) = $persister.persist_scorer(&scorer) {
332                                                 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
333                                         }
334                                 }
335                                 last_scorer_persist_call = Instant::now();
336                         }
337                 }
338
339                 // After we exit, ensure we persist the ChannelManager one final time - this avoids
340                 // some races where users quit while channel updates were in-flight, with
341                 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
342                 $persister.persist_manager(&*$channel_manager)?;
343
344                 // Persist Scorer on exit
345                 if let Some(ref scorer) = $scorer {
346                         $persister.persist_scorer(&scorer)?;
347                 }
348
349                 // Persist NetworkGraph on exit
350                 if let Some(network_graph) = $gossip_sync.network_graph() {
351                         $persister.persist_graph(network_graph)?;
352                 }
353
354                 Ok(())
355         } }
356 }
357
358 /// Processes background events in a future.
359 ///
360 /// `sleeper` should return a future which completes in the given amount of time and returns a
361 /// boolean indicating whether the background processing should continue. Once `sleeper` returns a
362 /// future which outputs false, the loop will exit and this function's future will complete.
363 ///
364 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
365 #[cfg(feature = "futures")]
366 pub async fn process_events_async<
367         'a,
368         Signer: 'static + Sign,
369         CA: 'static + Deref + Send + Sync,
370         CF: 'static + Deref + Send + Sync,
371         CW: 'static + Deref + Send + Sync,
372         T: 'static + Deref + Send + Sync,
373         K: 'static + Deref + Send + Sync,
374         F: 'static + Deref + Send + Sync,
375         G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
376         L: 'static + Deref + Send + Sync,
377         P: 'static + Deref + Send + Sync,
378         Descriptor: 'static + SocketDescriptor + Send + Sync,
379         CMH: 'static + Deref + Send + Sync,
380         RMH: 'static + Deref + Send + Sync,
381         EH: 'static + EventHandler + Send,
382         PS: 'static + Deref + Send,
383         M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
384         CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
385         PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
386         RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
387         UMH: 'static + Deref + Send + Sync,
388         PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
389         S: 'static + Deref<Target = SC> + Send + Sync,
390         SC: WriteableScore<'a>,
391         SleepFuture: core::future::Future<Output = bool>,
392         Sleeper: Fn(Duration) -> SleepFuture
393 >(
394         persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
395         gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
396         sleeper: Sleeper,
397 ) -> Result<(), std::io::Error>
398 where
399         CA::Target: 'static + chain::Access,
400         CF::Target: 'static + chain::Filter,
401         CW::Target: 'static + chain::Watch<Signer>,
402         T::Target: 'static + BroadcasterInterface,
403         K::Target: 'static + KeysInterface<Signer = Signer>,
404         F::Target: 'static + FeeEstimator,
405         L::Target: 'static + Logger,
406         P::Target: 'static + Persist<Signer>,
407         CMH::Target: 'static + ChannelMessageHandler,
408         RMH::Target: 'static + RoutingMessageHandler,
409         UMH::Target: 'static + CustomMessageHandler,
410         PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
411 {
412         let mut should_continue = true;
413         define_run_body!(persister, event_handler, chain_monitor, channel_manager,
414                 gossip_sync, peer_manager, logger, scorer, should_continue, {
415                         select! {
416                                 _ = channel_manager.get_persistable_update_future().fuse() => true,
417                                 cont = sleeper(Duration::from_millis(100)).fuse() => {
418                                         should_continue = cont;
419                                         false
420                                 }
421                         }
422                 })
423 }
424
425 impl BackgroundProcessor {
426         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
427         /// documentation].
428         ///
429         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
430         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
431         /// either [`join`] or [`stop`].
432         ///
433         /// # Data Persistence
434         ///
435         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
436         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
437         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
438         /// provided implementation.
439         ///
440         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
441         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
442         /// See the `lightning-persister` crate for LDK's provided implementation.
443         ///
444         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
445         /// error or call [`join`] and handle any error that may arise. For the latter case,
446         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
447         ///
448         /// # Event Handling
449         ///
450         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
451         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
452         /// functionality implemented by other handlers.
453         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
454         ///
455         /// # Rapid Gossip Sync
456         ///
457         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
458         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
459         /// until the [`RapidGossipSync`] instance completes its first sync.
460         ///
461         /// [top-level documentation]: BackgroundProcessor
462         /// [`join`]: Self::join
463         /// [`stop`]: Self::stop
464         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
465         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
466         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
467         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
468         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
469         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
470         pub fn start<
471                 'a,
472                 Signer: 'static + Sign,
473                 CA: 'static + Deref + Send + Sync,
474                 CF: 'static + Deref + Send + Sync,
475                 CW: 'static + Deref + Send + Sync,
476                 T: 'static + Deref + Send + Sync,
477                 K: 'static + Deref + Send + Sync,
478                 F: 'static + Deref + Send + Sync,
479                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
480                 L: 'static + Deref + Send + Sync,
481                 P: 'static + Deref + Send + Sync,
482                 Descriptor: 'static + SocketDescriptor + Send + Sync,
483                 CMH: 'static + Deref + Send + Sync,
484                 OMH: 'static + Deref + Send + Sync,
485                 RMH: 'static + Deref + Send + Sync,
486                 EH: 'static + EventHandler + Send,
487                 PS: 'static + Deref + Send,
488                 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
489                 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
490                 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
491                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
492                 UMH: 'static + Deref + Send + Sync,
493                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
494                 S: 'static + Deref<Target = SC> + Send + Sync,
495                 SC: WriteableScore<'a>,
496         >(
497                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
498                 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
499         ) -> Self
500         where
501                 CA::Target: 'static + chain::Access,
502                 CF::Target: 'static + chain::Filter,
503                 CW::Target: 'static + chain::Watch<Signer>,
504                 T::Target: 'static + BroadcasterInterface,
505                 K::Target: 'static + KeysInterface<Signer = Signer>,
506                 F::Target: 'static + FeeEstimator,
507                 L::Target: 'static + Logger,
508                 P::Target: 'static + Persist<Signer>,
509                 CMH::Target: 'static + ChannelMessageHandler,
510                 OMH::Target: 'static + OnionMessageHandler,
511                 RMH::Target: 'static + RoutingMessageHandler,
512                 UMH::Target: 'static + CustomMessageHandler,
513                 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
514         {
515                 let stop_thread = Arc::new(AtomicBool::new(false));
516                 let stop_thread_clone = stop_thread.clone();
517                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
518                         define_run_body!(persister, event_handler, chain_monitor, channel_manager,
519                                 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
520                                 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
521                 });
522                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
523         }
524
525         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
526         /// [`ChannelManager`].
527         ///
528         /// # Panics
529         ///
530         /// This function panics if the background thread has panicked such as while persisting or
531         /// handling events.
532         ///
533         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
534         pub fn join(mut self) -> Result<(), std::io::Error> {
535                 assert!(self.thread_handle.is_some());
536                 self.join_thread()
537         }
538
539         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
540         /// [`ChannelManager`].
541         ///
542         /// # Panics
543         ///
544         /// This function panics if the background thread has panicked such as while persisting or
545         /// handling events.
546         ///
547         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
548         pub fn stop(mut self) -> Result<(), std::io::Error> {
549                 assert!(self.thread_handle.is_some());
550                 self.stop_and_join_thread()
551         }
552
553         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
554                 self.stop_thread.store(true, Ordering::Release);
555                 self.join_thread()
556         }
557
558         fn join_thread(&mut self) -> Result<(), std::io::Error> {
559                 match self.thread_handle.take() {
560                         Some(handle) => handle.join().unwrap(),
561                         None => Ok(()),
562                 }
563         }
564 }
565
566 impl Drop for BackgroundProcessor {
567         fn drop(&mut self) {
568                 self.stop_and_join_thread().unwrap();
569         }
570 }
571
572 #[cfg(test)]
573 mod tests {
574         use bitcoin::blockdata::block::BlockHeader;
575         use bitcoin::blockdata::constants::genesis_block;
576         use bitcoin::blockdata::locktime::PackedLockTime;
577         use bitcoin::blockdata::transaction::{Transaction, TxOut};
578         use bitcoin::network::constants::Network;
579         use lightning::chain::{BestBlock, Confirm, chainmonitor};
580         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
581         use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
582         use lightning::chain::transaction::OutPoint;
583         use lightning::get_event_msg;
584         use lightning::ln::channelmanager::{self, BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
585         use lightning::ln::features::ChannelFeatures;
586         use lightning::ln::msgs::{ChannelMessageHandler, Init};
587         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
588         use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
589         use lightning::util::config::UserConfig;
590         use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
591         use lightning::util::ser::Writeable;
592         use lightning::util::test_utils;
593         use lightning::util::persist::KVStorePersister;
594         use lightning_invoice::payment::{InvoicePayer, Retry};
595         use lightning_invoice::utils::DefaultRouter;
596         use lightning_persister::FilesystemPersister;
597         use std::fs;
598         use std::path::PathBuf;
599         use std::sync::{Arc, Mutex};
600         use std::sync::mpsc::SyncSender;
601         use std::time::Duration;
602         use bitcoin::hashes::Hash;
603         use bitcoin::TxMerkleNode;
604         use lightning::routing::scoring::{FixedPenaltyScorer};
605         use lightning_rapid_gossip_sync::RapidGossipSync;
606         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
607
608         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
609
610         #[derive(Clone, Eq, Hash, PartialEq)]
611         struct TestDescriptor{}
612         impl SocketDescriptor for TestDescriptor {
613                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
614                         0
615                 }
616
617                 fn disconnect_socket(&mut self) {}
618         }
619
620         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
621
622         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
623         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
624
625         struct Node {
626                 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
627                 p2p_gossip_sync: PGS,
628                 rapid_gossip_sync: RGS,
629                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
630                 chain_monitor: Arc<ChainMonitor>,
631                 persister: Arc<FilesystemPersister>,
632                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
633                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
634                 logger: Arc<test_utils::TestLogger>,
635                 best_block: BestBlock,
636                 scorer: Arc<Mutex<FixedPenaltyScorer>>,
637         }
638
639         impl Node {
640                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
641                         GossipSync::P2P(self.p2p_gossip_sync.clone())
642                 }
643
644                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
645                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
646                 }
647
648                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
649                         GossipSync::None
650                 }
651         }
652
653         impl Drop for Node {
654                 fn drop(&mut self) {
655                         let data_dir = self.persister.get_data_dir();
656                         match fs::remove_dir_all(data_dir.clone()) {
657                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
658                                 _ => {}
659                         }
660                 }
661         }
662
663         struct Persister {
664                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
665                 graph_persistence_notifier: Option<SyncSender<()>>,
666                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
667                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
668                 filesystem_persister: FilesystemPersister,
669         }
670
671         impl Persister {
672                 fn new(data_dir: String) -> Self {
673                         let filesystem_persister = FilesystemPersister::new(data_dir.clone());
674                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
675                 }
676
677                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
678                         Self { graph_error: Some((error, message)), ..self }
679                 }
680
681                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
682                         Self { graph_persistence_notifier: Some(sender), ..self }
683                 }
684
685                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
686                         Self { manager_error: Some((error, message)), ..self }
687                 }
688
689                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
690                         Self { scorer_error: Some((error, message)), ..self }
691                 }
692         }
693
694         impl KVStorePersister for Persister {
695                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
696                         if key == "manager" {
697                                 if let Some((error, message)) = self.manager_error {
698                                         return Err(std::io::Error::new(error, message))
699                                 }
700                         }
701
702                         if key == "network_graph" {
703                                 if let Some(sender) = &self.graph_persistence_notifier {
704                                         sender.send(()).unwrap();
705                                 };
706
707                                 if let Some((error, message)) = self.graph_error {
708                                         return Err(std::io::Error::new(error, message))
709                                 }
710                         }
711
712                         if key == "scorer" {
713                                 if let Some((error, message)) = self.scorer_error {
714                                         return Err(std::io::Error::new(error, message))
715                                 }
716                         }
717
718                         self.filesystem_persister.persist(key, object)
719                 }
720         }
721
722         fn get_full_filepath(filepath: String, filename: String) -> String {
723                 let mut path = PathBuf::from(filepath);
724                 path.push(filename);
725                 path.to_str().unwrap().to_string()
726         }
727
728         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
729                 let mut nodes = Vec::new();
730                 for i in 0..num_nodes {
731                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
732                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
733                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
734                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
735                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
736                         let seed = [i as u8; 32];
737                         let network = Network::Testnet;
738                         let genesis_block = genesis_block(network);
739                         let now = Duration::from_secs(genesis_block.header.time as u64);
740                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
741                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
742                         let best_block = BestBlock::from_genesis(network);
743                         let params = ChainParameters { network, best_block };
744                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
745                         let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
746                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
747                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
748                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
749                         let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{}));
750                         let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
751                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
752                         nodes.push(node);
753                 }
754
755                 for i in 0..num_nodes {
756                         for j in (i+1)..num_nodes {
757                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
758                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
759                         }
760                 }
761
762                 nodes
763         }
764
765         macro_rules! open_channel {
766                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
767                         begin_open_channel!($node_a, $node_b, $channel_value);
768                         let events = $node_a.node.get_and_clear_pending_events();
769                         assert_eq!(events.len(), 1);
770                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
771                         end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
772                         tx
773                 }}
774         }
775
776         macro_rules! begin_open_channel {
777                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
778                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
779                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
780                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
781                 }}
782         }
783
784         macro_rules! handle_funding_generation_ready {
785                 ($event: expr, $channel_value: expr) => {{
786                         match $event {
787                                 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
788                                         assert_eq!(channel_value_satoshis, $channel_value);
789                                         assert_eq!(user_channel_id, 42);
790
791                                         let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
792                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
793                                         }]};
794                                         (temporary_channel_id, tx)
795                                 },
796                                 _ => panic!("Unexpected event"),
797                         }
798                 }}
799         }
800
801         macro_rules! end_open_channel {
802                 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
803                         $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
804                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
805                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
806                 }}
807         }
808
809         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
810                 for i in 1..=depth {
811                         let prev_blockhash = node.best_block.block_hash();
812                         let height = node.best_block.height() + 1;
813                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
814                         let txdata = vec![(0, tx)];
815                         node.best_block = BestBlock::new(header.block_hash(), height);
816                         match i {
817                                 1 => {
818                                         node.node.transactions_confirmed(&header, &txdata, height);
819                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
820                                 },
821                                 x if x == depth => {
822                                         node.node.best_block_updated(&header, height);
823                                         node.chain_monitor.best_block_updated(&header, height);
824                                 },
825                                 _ => {},
826                         }
827                 }
828         }
829         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
830                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
831         }
832
833         #[test]
834         fn test_background_processor() {
835                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
836                 // updates. Also test that when new updates are available, the manager signals that it needs
837                 // re-persistence and is successfully re-persisted.
838                 let nodes = create_nodes(2, "test_background_processor".to_string());
839
840                 // Go through the channel creation process so that each node has something to persist. Since
841                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
842                 // avoid a race with processing events.
843                 let tx = open_channel!(nodes[0], nodes[1], 100000);
844
845                 // Initiate the background processors to watch each node.
846                 let data_dir = nodes[0].persister.get_data_dir();
847                 let persister = Arc::new(Persister::new(data_dir));
848                 let event_handler = |_: &_| {};
849                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
850
851                 macro_rules! check_persisted_data {
852                         ($node: expr, $filepath: expr) => {
853                                 let mut expected_bytes = Vec::new();
854                                 loop {
855                                         expected_bytes.clear();
856                                         match $node.write(&mut expected_bytes) {
857                                                 Ok(()) => {
858                                                         match std::fs::read($filepath) {
859                                                                 Ok(bytes) => {
860                                                                         if bytes == expected_bytes {
861                                                                                 break
862                                                                         } else {
863                                                                                 continue
864                                                                         }
865                                                                 },
866                                                                 Err(_) => continue
867                                                         }
868                                                 },
869                                                 Err(e) => panic!("Unexpected error: {}", e)
870                                         }
871                                 }
872                         }
873                 }
874
875                 // Check that the initial channel manager data is persisted as expected.
876                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
877                 check_persisted_data!(nodes[0].node, filepath.clone());
878
879                 loop {
880                         if !nodes[0].node.get_persistence_condvar_value() { break }
881                 }
882
883                 // Force-close the channel.
884                 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
885
886                 // Check that the force-close updates are persisted.
887                 check_persisted_data!(nodes[0].node, filepath.clone());
888                 loop {
889                         if !nodes[0].node.get_persistence_condvar_value() { break }
890                 }
891
892                 // Check network graph is persisted
893                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
894                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
895
896                 // Check scorer is persisted
897                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
898                 check_persisted_data!(nodes[0].scorer, filepath.clone());
899
900                 assert!(bg_processor.stop().is_ok());
901         }
902
903         #[test]
904         fn test_timer_tick_called() {
905                 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
906                 // `FRESHNESS_TIMER`.
907                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
908                 let data_dir = nodes[0].persister.get_data_dir();
909                 let persister = Arc::new(Persister::new(data_dir));
910                 let event_handler = |_: &_| {};
911                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
912                 loop {
913                         let log_entries = nodes[0].logger.lines.lock().unwrap();
914                         let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
915                         let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
916                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
917                                         log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
918                                 break
919                         }
920                 }
921
922                 assert!(bg_processor.stop().is_ok());
923         }
924
925         #[test]
926         fn test_channel_manager_persist_error() {
927                 // Test that if we encounter an error during manager persistence, the thread panics.
928                 let nodes = create_nodes(2, "test_persist_error".to_string());
929                 open_channel!(nodes[0], nodes[1], 100000);
930
931                 let data_dir = nodes[0].persister.get_data_dir();
932                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
933                 let event_handler = |_: &_| {};
934                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
935                 match bg_processor.join() {
936                         Ok(_) => panic!("Expected error persisting manager"),
937                         Err(e) => {
938                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
939                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
940                         },
941                 }
942         }
943
944         #[test]
945         fn test_network_graph_persist_error() {
946                 // Test that if we encounter an error during network graph persistence, an error gets returned.
947                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
948                 let data_dir = nodes[0].persister.get_data_dir();
949                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
950                 let event_handler = |_: &_| {};
951                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
952
953                 match bg_processor.stop() {
954                         Ok(_) => panic!("Expected error persisting network graph"),
955                         Err(e) => {
956                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
957                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
958                         },
959                 }
960         }
961
962         #[test]
963         fn test_scorer_persist_error() {
964                 // Test that if we encounter an error during scorer persistence, an error gets returned.
965                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
966                 let data_dir = nodes[0].persister.get_data_dir();
967                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
968                 let event_handler = |_: &_| {};
969                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
970
971                 match bg_processor.stop() {
972                         Ok(_) => panic!("Expected error persisting scorer"),
973                         Err(e) => {
974                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
975                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
976                         },
977                 }
978         }
979
980         #[test]
981         fn test_background_event_handling() {
982                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
983                 let channel_value = 100000;
984                 let data_dir = nodes[0].persister.get_data_dir();
985                 let persister = Arc::new(Persister::new(data_dir.clone()));
986
987                 // Set up a background event handler for FundingGenerationReady events.
988                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
989                 let event_handler = move |event: &Event| {
990                         sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
991                 };
992                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
993
994                 // Open a channel and check that the FundingGenerationReady event was handled.
995                 begin_open_channel!(nodes[0], nodes[1], channel_value);
996                 let (temporary_channel_id, funding_tx) = receiver
997                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
998                         .expect("FundingGenerationReady not handled within deadline");
999                 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1000
1001                 // Confirm the funding transaction.
1002                 confirm_transaction(&mut nodes[0], &funding_tx);
1003                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1004                 confirm_transaction(&mut nodes[1], &funding_tx);
1005                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1006                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1007                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1008                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1009                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1010
1011                 assert!(bg_processor.stop().is_ok());
1012
1013                 // Set up a background event handler for SpendableOutputs events.
1014                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1015                 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
1016                 let persister = Arc::new(Persister::new(data_dir));
1017                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1018
1019                 // Force close the channel and check that the SpendableOutputs event was handled.
1020                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1021                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1022                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1023                 let event = receiver
1024                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1025                         .expect("SpendableOutputs not handled within deadline");
1026                 match event {
1027                         Event::SpendableOutputs { .. } => {},
1028                         Event::ChannelClosed { .. } => {},
1029                         _ => panic!("Unexpected event: {:?}", event),
1030                 }
1031
1032                 assert!(bg_processor.stop().is_ok());
1033         }
1034
1035         #[test]
1036         fn test_scorer_persistence() {
1037                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1038                 let data_dir = nodes[0].persister.get_data_dir();
1039                 let persister = Arc::new(Persister::new(data_dir));
1040                 let event_handler = |_: &_| {};
1041                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1042
1043                 loop {
1044                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1045                         let expected_log = "Persisting scorer".to_string();
1046                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1047                                 break
1048                         }
1049                 }
1050
1051                 assert!(bg_processor.stop().is_ok());
1052         }
1053
1054         #[test]
1055         fn test_not_pruning_network_graph_until_graph_sync_completion() {
1056                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1057                 let data_dir = nodes[0].persister.get_data_dir();
1058                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1059                 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1060                 let network_graph = nodes[0].network_graph.clone();
1061                 let features = ChannelFeatures::empty();
1062                 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1063                         .expect("Failed to update channel from partial announcement");
1064                 let original_graph_description = network_graph.to_string();
1065                 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1066                 assert_eq!(network_graph.read_only().channels().len(), 1);
1067
1068                 let event_handler = |_: &_| {};
1069                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1070
1071                 loop {
1072                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1073                         let expected_log_a = "Assessing prunability of network graph".to_string();
1074                         let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
1075                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
1076                                 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
1077                                 break
1078                         }
1079                 }
1080
1081                 let initialization_input = vec![
1082                         76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1083                         79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1084                         0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1085                         187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1086                         157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1087                         88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1088                         204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1089                         181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1090                         110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1091                         76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1092                         226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1093                         0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1094                         0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1095                 ];
1096                 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1097
1098                 // this should have added two channels
1099                 assert_eq!(network_graph.read_only().channels().len(), 3);
1100
1101                 let _ = receiver
1102                         .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1103                         .expect("Network graph not pruned within deadline");
1104
1105                 background_processor.stop().unwrap();
1106
1107                 // all channels should now be pruned
1108                 assert_eq!(network_graph.read_only().channels().len(), 0);
1109         }
1110
1111         #[test]
1112         fn test_invoice_payer() {
1113                 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1114                 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1115                 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1116
1117                 // Initiate the background processors to watch each node.
1118                 let data_dir = nodes[0].persister.get_data_dir();
1119                 let persister = Arc::new(Persister::new(data_dir));
1120                 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
1121                 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
1122                 let event_handler = Arc::clone(&invoice_payer);
1123                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1124                 assert!(bg_processor.stop().is_ok());
1125         }
1126 }