Fix `background-processor` doc build for `futures`
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
8
9 #![deny(missing_docs)]
10 #![deny(unsafe_code)]
11
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
13
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
16
17 use lightning::chain;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::{Sign, KeysInterface};
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::scoring::WriteableScore;
26 use lightning::util::events::{Event, EventHandler, EventsProvider};
27 use lightning::util::logger::Logger;
28 use lightning::util::persist::Persister;
29 use lightning_rapid_gossip_sync::RapidGossipSync;
30 use std::sync::Arc;
31 use std::sync::atomic::{AtomicBool, Ordering};
32 use std::thread;
33 use std::thread::JoinHandle;
34 use std::time::{Duration, Instant};
35 use std::ops::Deref;
36
37 #[cfg(feature = "futures")]
38 use futures::{select, future::FutureExt};
39
40 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
41 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
42 /// responsibilities are:
43 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
44 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
45 ///   writing it to disk/backups by invoking the callback given to it at startup.
46 ///   [`ChannelManager`] persistence should be done in the background.
47 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
48 ///   at the appropriate intervals.
49 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
50 ///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
51 ///
52 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
53 /// upon as doing so may result in high latency.
54 ///
55 /// # Note
56 ///
57 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
58 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
59 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
60 /// unilateral chain closure fees are at risk.
61 ///
62 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
63 /// [`Event`]: lightning::util::events::Event
64 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
65 pub struct BackgroundProcessor {
66         stop_thread: Arc<AtomicBool>,
67         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
68 }
69
70 #[cfg(not(test))]
71 const FRESHNESS_TIMER: u64 = 60;
72 #[cfg(test)]
73 const FRESHNESS_TIMER: u64 = 1;
74
75 #[cfg(all(not(test), not(debug_assertions)))]
76 const PING_TIMER: u64 = 10;
77 /// Signature operations take a lot longer without compiler optimisations.
78 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
79 /// timeout is reached.
80 #[cfg(all(not(test), debug_assertions))]
81 const PING_TIMER: u64 = 30;
82 #[cfg(test)]
83 const PING_TIMER: u64 = 1;
84
85 /// Prune the network graph of stale entries hourly.
86 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
87
88 #[cfg(not(test))]
89 const SCORER_PERSIST_TIMER: u64 = 30;
90 #[cfg(test)]
91 const SCORER_PERSIST_TIMER: u64 = 1;
92
93 #[cfg(not(test))]
94 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
95 #[cfg(test)]
96 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
97
98 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
99 pub enum GossipSync<
100         P: Deref<Target = P2PGossipSync<G, A, L>>,
101         R: Deref<Target = RapidGossipSync<G, L>>,
102         G: Deref<Target = NetworkGraph<L>>,
103         A: Deref,
104         L: Deref,
105 >
106 where A::Target: chain::Access, L::Target: Logger {
107         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
108         P2P(P),
109         /// Rapid gossip sync from a trusted server.
110         Rapid(R),
111         /// No gossip sync.
112         None,
113 }
114
115 impl<
116         P: Deref<Target = P2PGossipSync<G, A, L>>,
117         R: Deref<Target = RapidGossipSync<G, L>>,
118         G: Deref<Target = NetworkGraph<L>>,
119         A: Deref,
120         L: Deref,
121 > GossipSync<P, R, G, A, L>
122 where A::Target: chain::Access, L::Target: Logger {
123         fn network_graph(&self) -> Option<&G> {
124                 match self {
125                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
126                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
127                         GossipSync::None => None,
128                 }
129         }
130
131         fn prunable_network_graph(&self) -> Option<&G> {
132                 match self {
133                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
134                         GossipSync::Rapid(gossip_sync) => {
135                                 if gossip_sync.is_initial_sync_complete() {
136                                         Some(gossip_sync.network_graph())
137                                 } else {
138                                         None
139                                 }
140                         },
141                         GossipSync::None => None,
142                 }
143         }
144 }
145
146 /// (C-not exported) as the bindings concretize everything and have constructors for us
147 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
148         GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
149 where
150         A::Target: chain::Access,
151         L::Target: Logger,
152 {
153         /// Initializes a new [`GossipSync::P2P`] variant.
154         pub fn p2p(gossip_sync: P) -> Self {
155                 GossipSync::P2P(gossip_sync)
156         }
157 }
158
159 /// (C-not exported) as the bindings concretize everything and have constructors for us
160 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
161         GossipSync<
162                 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
163                 R,
164                 G,
165                 &'a (dyn chain::Access + Send + Sync),
166                 L,
167         >
168 where
169         L::Target: Logger,
170 {
171         /// Initializes a new [`GossipSync::Rapid`] variant.
172         pub fn rapid(gossip_sync: R) -> Self {
173                 GossipSync::Rapid(gossip_sync)
174         }
175 }
176
177 /// (C-not exported) as the bindings concretize everything and have constructors for us
178 impl<'a, L: Deref>
179         GossipSync<
180                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
181                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
182                 &'a NetworkGraph<L>,
183                 &'a (dyn chain::Access + Send + Sync),
184                 L,
185         >
186 where
187         L::Target: Logger,
188 {
189         /// Initializes a new [`GossipSync::None`] variant.
190         pub fn none() -> Self {
191                 GossipSync::None
192         }
193 }
194
195 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
196 struct DecoratingEventHandler<
197         'a,
198         E: EventHandler,
199         PGS: Deref<Target = P2PGossipSync<G, A, L>>,
200         RGS: Deref<Target = RapidGossipSync<G, L>>,
201         G: Deref<Target = NetworkGraph<L>>,
202         A: Deref,
203         L: Deref,
204 >
205 where A::Target: chain::Access, L::Target: Logger {
206         event_handler: E,
207         gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
208 }
209
210 impl<
211         'a,
212         E: EventHandler,
213         PGS: Deref<Target = P2PGossipSync<G, A, L>>,
214         RGS: Deref<Target = RapidGossipSync<G, L>>,
215         G: Deref<Target = NetworkGraph<L>>,
216         A: Deref,
217         L: Deref,
218 > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
219 where A::Target: chain::Access, L::Target: Logger {
220         fn handle_event(&self, event: &Event) {
221                 if let Some(network_graph) = self.gossip_sync.network_graph() {
222                         network_graph.handle_event(event);
223                 }
224                 self.event_handler.handle_event(event);
225         }
226 }
227
228 macro_rules! define_run_body {
229         ($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
230          $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
231          $loop_exit_check: expr, $await: expr)
232         => { {
233                 let event_handler = DecoratingEventHandler {
234                         event_handler: $event_handler,
235                         gossip_sync: &$gossip_sync,
236                 };
237
238                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
239                 $channel_manager.timer_tick_occurred();
240
241                 let mut last_freshness_call = Instant::now();
242                 let mut last_ping_call = Instant::now();
243                 let mut last_prune_call = Instant::now();
244                 let mut last_scorer_persist_call = Instant::now();
245                 let mut have_pruned = false;
246
247                 loop {
248                         $channel_manager.process_pending_events(&event_handler);
249                         $chain_monitor.process_pending_events(&event_handler);
250
251                         // Note that the PeerManager::process_events may block on ChannelManager's locks,
252                         // hence it comes last here. When the ChannelManager finishes whatever it's doing,
253                         // we want to ensure we get into `persist_manager` as quickly as we can, especially
254                         // without running the normal event processing above and handing events to users.
255                         //
256                         // Specifically, on an *extremely* slow machine, we may see ChannelManager start
257                         // processing a message effectively at any point during this loop. In order to
258                         // minimize the time between such processing completing and persisting the updated
259                         // ChannelManager, we want to minimize methods blocking on a ChannelManager
260                         // generally, and as a fallback place such blocking only immediately before
261                         // persistence.
262                         $peer_manager.process_events();
263
264                         // We wait up to 100ms, but track how long it takes to detect being put to sleep,
265                         // see `await_start`'s use below.
266                         let await_start = Instant::now();
267                         let updates_available = $await;
268                         let await_time = await_start.elapsed();
269
270                         if updates_available {
271                                 log_trace!($logger, "Persisting ChannelManager...");
272                                 $persister.persist_manager(&*$channel_manager)?;
273                                 log_trace!($logger, "Done persisting ChannelManager.");
274                         }
275                         // Exit the loop if the background processor was requested to stop.
276                         if $loop_exit_check {
277                                 log_trace!($logger, "Terminating background processor.");
278                                 break;
279                         }
280                         if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
281                                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
282                                 $channel_manager.timer_tick_occurred();
283                                 last_freshness_call = Instant::now();
284                         }
285                         if await_time > Duration::from_secs(1) {
286                                 // On various platforms, we may be starved of CPU cycles for several reasons.
287                                 // E.g. on iOS, if we've been in the background, we will be entirely paused.
288                                 // Similarly, if we're on a desktop platform and the device has been asleep, we
289                                 // may not get any cycles.
290                                 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
291                                 // full second, at which point we assume sockets may have been killed (they
292                                 // appear to be at least on some platforms, even if it has only been a second).
293                                 // Note that we have to take care to not get here just because user event
294                                 // processing was slow at the top of the loop. For example, the sample client
295                                 // may call Bitcoin Core RPCs during event handling, which very often takes
296                                 // more than a handful of seconds to complete, and shouldn't disconnect all our
297                                 // peers.
298                                 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
299                                 $peer_manager.disconnect_all_peers();
300                                 last_ping_call = Instant::now();
301                         } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
302                                 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
303                                 $peer_manager.timer_tick_occurred();
304                                 last_ping_call = Instant::now();
305                         }
306
307                         // Note that we want to run a graph prune once not long after startup before
308                         // falling back to our usual hourly prunes. This avoids short-lived clients never
309                         // pruning their network graph. We run once 60 seconds after startup before
310                         // continuing our normal cadence.
311                         if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
312                                 // The network graph must not be pruned while rapid sync completion is pending
313                                 log_trace!($logger, "Assessing prunability of network graph");
314                                 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
315                                         network_graph.remove_stale_channels_and_tracking();
316
317                                         if let Err(e) = $persister.persist_graph(network_graph) {
318                                                 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
319                                         }
320
321                                         last_prune_call = Instant::now();
322                                         have_pruned = true;
323                                 } else {
324                                         log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
325                                 }
326                         }
327
328                         if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
329                                 if let Some(ref scorer) = $scorer {
330                                         log_trace!($logger, "Persisting scorer");
331                                         if let Err(e) = $persister.persist_scorer(&scorer) {
332                                                 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
333                                         }
334                                 }
335                                 last_scorer_persist_call = Instant::now();
336                         }
337                 }
338
339                 // After we exit, ensure we persist the ChannelManager one final time - this avoids
340                 // some races where users quit while channel updates were in-flight, with
341                 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
342                 $persister.persist_manager(&*$channel_manager)?;
343
344                 // Persist Scorer on exit
345                 if let Some(ref scorer) = $scorer {
346                         $persister.persist_scorer(&scorer)?;
347                 }
348
349                 // Persist NetworkGraph on exit
350                 if let Some(network_graph) = $gossip_sync.network_graph() {
351                         $persister.persist_graph(network_graph)?;
352                 }
353
354                 Ok(())
355         } }
356 }
357
358 /// Processes background events in a future.
359 ///
360 /// `sleeper` should return a future which completes in the given amount of time and returns a
361 /// boolean indicating whether the background processing should continue. Once `sleeper` returns a
362 /// future which outputs false, the loop will exit and this function's future will complete.
363 ///
364 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
365 #[cfg(feature = "futures")]
366 pub async fn process_events_async<
367         'a,
368         Signer: 'static + Sign,
369         CA: 'static + Deref + Send + Sync,
370         CF: 'static + Deref + Send + Sync,
371         CW: 'static + Deref + Send + Sync,
372         T: 'static + Deref + Send + Sync,
373         K: 'static + Deref + Send + Sync,
374         F: 'static + Deref + Send + Sync,
375         G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
376         L: 'static + Deref + Send + Sync,
377         P: 'static + Deref + Send + Sync,
378         Descriptor: 'static + SocketDescriptor + Send + Sync,
379         CMH: 'static + Deref + Send + Sync,
380         CUMH: 'static + Deref + Send + Sync,
381         RMH: 'static + Deref + Send + Sync,
382         OMH: 'static + Deref + Send + Sync,
383         EH: 'static + EventHandler + Send,
384         PS: 'static + Deref + Send,
385         M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
386         CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
387         PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
388         RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
389         UMH: 'static + Deref + Send + Sync,
390         PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
391         S: 'static + Deref<Target = SC> + Send + Sync,
392         SC: WriteableScore<'a>,
393         SleepFuture: core::future::Future<Output = bool>,
394         Sleeper: Fn(Duration) -> SleepFuture
395 >(
396         persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
397         gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
398         sleeper: Sleeper,
399 ) -> Result<(), std::io::Error>
400 where
401         CA::Target: 'static + chain::Access,
402         CF::Target: 'static + chain::Filter,
403         CW::Target: 'static + chain::Watch<Signer>,
404         T::Target: 'static + BroadcasterInterface,
405         K::Target: 'static + KeysInterface<Signer = Signer>,
406         F::Target: 'static + FeeEstimator,
407         L::Target: 'static + Logger,
408         P::Target: 'static + Persist<Signer>,
409         CMH::Target: 'static + ChannelMessageHandler,
410         OMH::Target: 'static + OnionMessageHandler,
411         RMH::Target: 'static + RoutingMessageHandler,
412         UMH::Target: 'static + CustomMessageHandler,
413         PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
414 {
415         let mut should_continue = true;
416         define_run_body!(persister, event_handler, chain_monitor, channel_manager,
417                 gossip_sync, peer_manager, logger, scorer, should_continue, {
418                         select! {
419                                 _ = channel_manager.get_persistable_update_future().fuse() => true,
420                                 cont = sleeper(Duration::from_millis(100)).fuse() => {
421                                         should_continue = cont;
422                                         false
423                                 }
424                         }
425                 })
426 }
427
428 impl BackgroundProcessor {
429         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
430         /// documentation].
431         ///
432         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
433         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
434         /// either [`join`] or [`stop`].
435         ///
436         /// # Data Persistence
437         ///
438         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
439         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
440         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
441         /// provided implementation.
442         ///
443         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
444         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
445         /// See the `lightning-persister` crate for LDK's provided implementation.
446         ///
447         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
448         /// error or call [`join`] and handle any error that may arise. For the latter case,
449         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
450         ///
451         /// # Event Handling
452         ///
453         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
454         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
455         /// functionality implemented by other handlers.
456         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
457         ///
458         /// # Rapid Gossip Sync
459         ///
460         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
461         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
462         /// until the [`RapidGossipSync`] instance completes its first sync.
463         ///
464         /// [top-level documentation]: BackgroundProcessor
465         /// [`join`]: Self::join
466         /// [`stop`]: Self::stop
467         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
468         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
469         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
470         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
471         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
472         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
473         pub fn start<
474                 'a,
475                 Signer: 'static + Sign,
476                 CA: 'static + Deref + Send + Sync,
477                 CF: 'static + Deref + Send + Sync,
478                 CW: 'static + Deref + Send + Sync,
479                 T: 'static + Deref + Send + Sync,
480                 K: 'static + Deref + Send + Sync,
481                 F: 'static + Deref + Send + Sync,
482                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
483                 L: 'static + Deref + Send + Sync,
484                 P: 'static + Deref + Send + Sync,
485                 Descriptor: 'static + SocketDescriptor + Send + Sync,
486                 CMH: 'static + Deref + Send + Sync,
487                 OMH: 'static + Deref + Send + Sync,
488                 RMH: 'static + Deref + Send + Sync,
489                 EH: 'static + EventHandler + Send,
490                 PS: 'static + Deref + Send,
491                 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
492                 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
493                 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
494                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
495                 UMH: 'static + Deref + Send + Sync,
496                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
497                 S: 'static + Deref<Target = SC> + Send + Sync,
498                 SC: WriteableScore<'a>,
499         >(
500                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
501                 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
502         ) -> Self
503         where
504                 CA::Target: 'static + chain::Access,
505                 CF::Target: 'static + chain::Filter,
506                 CW::Target: 'static + chain::Watch<Signer>,
507                 T::Target: 'static + BroadcasterInterface,
508                 K::Target: 'static + KeysInterface<Signer = Signer>,
509                 F::Target: 'static + FeeEstimator,
510                 L::Target: 'static + Logger,
511                 P::Target: 'static + Persist<Signer>,
512                 CMH::Target: 'static + ChannelMessageHandler,
513                 OMH::Target: 'static + OnionMessageHandler,
514                 RMH::Target: 'static + RoutingMessageHandler,
515                 UMH::Target: 'static + CustomMessageHandler,
516                 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
517         {
518                 let stop_thread = Arc::new(AtomicBool::new(false));
519                 let stop_thread_clone = stop_thread.clone();
520                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
521                         define_run_body!(persister, event_handler, chain_monitor, channel_manager,
522                                 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
523                                 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
524                 });
525                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
526         }
527
528         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
529         /// [`ChannelManager`].
530         ///
531         /// # Panics
532         ///
533         /// This function panics if the background thread has panicked such as while persisting or
534         /// handling events.
535         ///
536         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
537         pub fn join(mut self) -> Result<(), std::io::Error> {
538                 assert!(self.thread_handle.is_some());
539                 self.join_thread()
540         }
541
542         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
543         /// [`ChannelManager`].
544         ///
545         /// # Panics
546         ///
547         /// This function panics if the background thread has panicked such as while persisting or
548         /// handling events.
549         ///
550         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
551         pub fn stop(mut self) -> Result<(), std::io::Error> {
552                 assert!(self.thread_handle.is_some());
553                 self.stop_and_join_thread()
554         }
555
556         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
557                 self.stop_thread.store(true, Ordering::Release);
558                 self.join_thread()
559         }
560
561         fn join_thread(&mut self) -> Result<(), std::io::Error> {
562                 match self.thread_handle.take() {
563                         Some(handle) => handle.join().unwrap(),
564                         None => Ok(()),
565                 }
566         }
567 }
568
569 impl Drop for BackgroundProcessor {
570         fn drop(&mut self) {
571                 self.stop_and_join_thread().unwrap();
572         }
573 }
574
575 #[cfg(test)]
576 mod tests {
577         use bitcoin::blockdata::block::BlockHeader;
578         use bitcoin::blockdata::constants::genesis_block;
579         use bitcoin::blockdata::locktime::PackedLockTime;
580         use bitcoin::blockdata::transaction::{Transaction, TxOut};
581         use bitcoin::network::constants::Network;
582         use lightning::chain::{BestBlock, Confirm, chainmonitor};
583         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
584         use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
585         use lightning::chain::transaction::OutPoint;
586         use lightning::get_event_msg;
587         use lightning::ln::channelmanager::{self, BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
588         use lightning::ln::features::ChannelFeatures;
589         use lightning::ln::msgs::{ChannelMessageHandler, Init};
590         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
591         use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
592         use lightning::util::config::UserConfig;
593         use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
594         use lightning::util::ser::Writeable;
595         use lightning::util::test_utils;
596         use lightning::util::persist::KVStorePersister;
597         use lightning_invoice::payment::{InvoicePayer, Retry};
598         use lightning_invoice::utils::DefaultRouter;
599         use lightning_persister::FilesystemPersister;
600         use std::fs;
601         use std::path::PathBuf;
602         use std::sync::{Arc, Mutex};
603         use std::sync::mpsc::SyncSender;
604         use std::time::Duration;
605         use bitcoin::hashes::Hash;
606         use bitcoin::TxMerkleNode;
607         use lightning::routing::scoring::{FixedPenaltyScorer};
608         use lightning_rapid_gossip_sync::RapidGossipSync;
609         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
610
611         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
612
613         #[derive(Clone, Eq, Hash, PartialEq)]
614         struct TestDescriptor{}
615         impl SocketDescriptor for TestDescriptor {
616                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
617                         0
618                 }
619
620                 fn disconnect_socket(&mut self) {}
621         }
622
623         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
624
625         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
626         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
627
628         struct Node {
629                 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
630                 p2p_gossip_sync: PGS,
631                 rapid_gossip_sync: RGS,
632                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
633                 chain_monitor: Arc<ChainMonitor>,
634                 persister: Arc<FilesystemPersister>,
635                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
636                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
637                 logger: Arc<test_utils::TestLogger>,
638                 best_block: BestBlock,
639                 scorer: Arc<Mutex<FixedPenaltyScorer>>,
640         }
641
642         impl Node {
643                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
644                         GossipSync::P2P(self.p2p_gossip_sync.clone())
645                 }
646
647                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
648                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
649                 }
650
651                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
652                         GossipSync::None
653                 }
654         }
655
656         impl Drop for Node {
657                 fn drop(&mut self) {
658                         let data_dir = self.persister.get_data_dir();
659                         match fs::remove_dir_all(data_dir.clone()) {
660                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
661                                 _ => {}
662                         }
663                 }
664         }
665
666         struct Persister {
667                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
668                 graph_persistence_notifier: Option<SyncSender<()>>,
669                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
670                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
671                 filesystem_persister: FilesystemPersister,
672         }
673
674         impl Persister {
675                 fn new(data_dir: String) -> Self {
676                         let filesystem_persister = FilesystemPersister::new(data_dir.clone());
677                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
678                 }
679
680                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
681                         Self { graph_error: Some((error, message)), ..self }
682                 }
683
684                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
685                         Self { graph_persistence_notifier: Some(sender), ..self }
686                 }
687
688                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
689                         Self { manager_error: Some((error, message)), ..self }
690                 }
691
692                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
693                         Self { scorer_error: Some((error, message)), ..self }
694                 }
695         }
696
697         impl KVStorePersister for Persister {
698                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
699                         if key == "manager" {
700                                 if let Some((error, message)) = self.manager_error {
701                                         return Err(std::io::Error::new(error, message))
702                                 }
703                         }
704
705                         if key == "network_graph" {
706                                 if let Some(sender) = &self.graph_persistence_notifier {
707                                         sender.send(()).unwrap();
708                                 };
709
710                                 if let Some((error, message)) = self.graph_error {
711                                         return Err(std::io::Error::new(error, message))
712                                 }
713                         }
714
715                         if key == "scorer" {
716                                 if let Some((error, message)) = self.scorer_error {
717                                         return Err(std::io::Error::new(error, message))
718                                 }
719                         }
720
721                         self.filesystem_persister.persist(key, object)
722                 }
723         }
724
725         fn get_full_filepath(filepath: String, filename: String) -> String {
726                 let mut path = PathBuf::from(filepath);
727                 path.push(filename);
728                 path.to_str().unwrap().to_string()
729         }
730
731         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
732                 let mut nodes = Vec::new();
733                 for i in 0..num_nodes {
734                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
735                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
736                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
737                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
738                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
739                         let seed = [i as u8; 32];
740                         let network = Network::Testnet;
741                         let genesis_block = genesis_block(network);
742                         let now = Duration::from_secs(genesis_block.header.time as u64);
743                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
744                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
745                         let best_block = BestBlock::from_genesis(network);
746                         let params = ChainParameters { network, best_block };
747                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
748                         let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
749                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
750                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
751                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
752                         let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{}));
753                         let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
754                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
755                         nodes.push(node);
756                 }
757
758                 for i in 0..num_nodes {
759                         for j in (i+1)..num_nodes {
760                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
761                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
762                         }
763                 }
764
765                 nodes
766         }
767
768         macro_rules! open_channel {
769                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
770                         begin_open_channel!($node_a, $node_b, $channel_value);
771                         let events = $node_a.node.get_and_clear_pending_events();
772                         assert_eq!(events.len(), 1);
773                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
774                         end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
775                         tx
776                 }}
777         }
778
779         macro_rules! begin_open_channel {
780                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
781                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
782                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
783                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
784                 }}
785         }
786
787         macro_rules! handle_funding_generation_ready {
788                 ($event: expr, $channel_value: expr) => {{
789                         match $event {
790                                 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
791                                         assert_eq!(channel_value_satoshis, $channel_value);
792                                         assert_eq!(user_channel_id, 42);
793
794                                         let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
795                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
796                                         }]};
797                                         (temporary_channel_id, tx)
798                                 },
799                                 _ => panic!("Unexpected event"),
800                         }
801                 }}
802         }
803
804         macro_rules! end_open_channel {
805                 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
806                         $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
807                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
808                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
809                 }}
810         }
811
812         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
813                 for i in 1..=depth {
814                         let prev_blockhash = node.best_block.block_hash();
815                         let height = node.best_block.height() + 1;
816                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
817                         let txdata = vec![(0, tx)];
818                         node.best_block = BestBlock::new(header.block_hash(), height);
819                         match i {
820                                 1 => {
821                                         node.node.transactions_confirmed(&header, &txdata, height);
822                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
823                                 },
824                                 x if x == depth => {
825                                         node.node.best_block_updated(&header, height);
826                                         node.chain_monitor.best_block_updated(&header, height);
827                                 },
828                                 _ => {},
829                         }
830                 }
831         }
832         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
833                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
834         }
835
836         #[test]
837         fn test_background_processor() {
838                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
839                 // updates. Also test that when new updates are available, the manager signals that it needs
840                 // re-persistence and is successfully re-persisted.
841                 let nodes = create_nodes(2, "test_background_processor".to_string());
842
843                 // Go through the channel creation process so that each node has something to persist. Since
844                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
845                 // avoid a race with processing events.
846                 let tx = open_channel!(nodes[0], nodes[1], 100000);
847
848                 // Initiate the background processors to watch each node.
849                 let data_dir = nodes[0].persister.get_data_dir();
850                 let persister = Arc::new(Persister::new(data_dir));
851                 let event_handler = |_: &_| {};
852                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
853
854                 macro_rules! check_persisted_data {
855                         ($node: expr, $filepath: expr) => {
856                                 let mut expected_bytes = Vec::new();
857                                 loop {
858                                         expected_bytes.clear();
859                                         match $node.write(&mut expected_bytes) {
860                                                 Ok(()) => {
861                                                         match std::fs::read($filepath) {
862                                                                 Ok(bytes) => {
863                                                                         if bytes == expected_bytes {
864                                                                                 break
865                                                                         } else {
866                                                                                 continue
867                                                                         }
868                                                                 },
869                                                                 Err(_) => continue
870                                                         }
871                                                 },
872                                                 Err(e) => panic!("Unexpected error: {}", e)
873                                         }
874                                 }
875                         }
876                 }
877
878                 // Check that the initial channel manager data is persisted as expected.
879                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
880                 check_persisted_data!(nodes[0].node, filepath.clone());
881
882                 loop {
883                         if !nodes[0].node.get_persistence_condvar_value() { break }
884                 }
885
886                 // Force-close the channel.
887                 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
888
889                 // Check that the force-close updates are persisted.
890                 check_persisted_data!(nodes[0].node, filepath.clone());
891                 loop {
892                         if !nodes[0].node.get_persistence_condvar_value() { break }
893                 }
894
895                 // Check network graph is persisted
896                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
897                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
898
899                 // Check scorer is persisted
900                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
901                 check_persisted_data!(nodes[0].scorer, filepath.clone());
902
903                 assert!(bg_processor.stop().is_ok());
904         }
905
906         #[test]
907         fn test_timer_tick_called() {
908                 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
909                 // `FRESHNESS_TIMER`.
910                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
911                 let data_dir = nodes[0].persister.get_data_dir();
912                 let persister = Arc::new(Persister::new(data_dir));
913                 let event_handler = |_: &_| {};
914                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
915                 loop {
916                         let log_entries = nodes[0].logger.lines.lock().unwrap();
917                         let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
918                         let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
919                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
920                                         log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
921                                 break
922                         }
923                 }
924
925                 assert!(bg_processor.stop().is_ok());
926         }
927
928         #[test]
929         fn test_channel_manager_persist_error() {
930                 // Test that if we encounter an error during manager persistence, the thread panics.
931                 let nodes = create_nodes(2, "test_persist_error".to_string());
932                 open_channel!(nodes[0], nodes[1], 100000);
933
934                 let data_dir = nodes[0].persister.get_data_dir();
935                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
936                 let event_handler = |_: &_| {};
937                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
938                 match bg_processor.join() {
939                         Ok(_) => panic!("Expected error persisting manager"),
940                         Err(e) => {
941                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
942                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
943                         },
944                 }
945         }
946
947         #[test]
948         fn test_network_graph_persist_error() {
949                 // Test that if we encounter an error during network graph persistence, an error gets returned.
950                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
951                 let data_dir = nodes[0].persister.get_data_dir();
952                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
953                 let event_handler = |_: &_| {};
954                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
955
956                 match bg_processor.stop() {
957                         Ok(_) => panic!("Expected error persisting network graph"),
958                         Err(e) => {
959                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
960                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
961                         },
962                 }
963         }
964
965         #[test]
966         fn test_scorer_persist_error() {
967                 // Test that if we encounter an error during scorer persistence, an error gets returned.
968                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
969                 let data_dir = nodes[0].persister.get_data_dir();
970                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
971                 let event_handler = |_: &_| {};
972                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
973
974                 match bg_processor.stop() {
975                         Ok(_) => panic!("Expected error persisting scorer"),
976                         Err(e) => {
977                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
978                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
979                         },
980                 }
981         }
982
983         #[test]
984         fn test_background_event_handling() {
985                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
986                 let channel_value = 100000;
987                 let data_dir = nodes[0].persister.get_data_dir();
988                 let persister = Arc::new(Persister::new(data_dir.clone()));
989
990                 // Set up a background event handler for FundingGenerationReady events.
991                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
992                 let event_handler = move |event: &Event| {
993                         sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
994                 };
995                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
996
997                 // Open a channel and check that the FundingGenerationReady event was handled.
998                 begin_open_channel!(nodes[0], nodes[1], channel_value);
999                 let (temporary_channel_id, funding_tx) = receiver
1000                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1001                         .expect("FundingGenerationReady not handled within deadline");
1002                 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1003
1004                 // Confirm the funding transaction.
1005                 confirm_transaction(&mut nodes[0], &funding_tx);
1006                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1007                 confirm_transaction(&mut nodes[1], &funding_tx);
1008                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1009                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1010                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1011                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1012                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1013
1014                 assert!(bg_processor.stop().is_ok());
1015
1016                 // Set up a background event handler for SpendableOutputs events.
1017                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1018                 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
1019                 let persister = Arc::new(Persister::new(data_dir));
1020                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1021
1022                 // Force close the channel and check that the SpendableOutputs event was handled.
1023                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1024                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1025                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1026                 let event = receiver
1027                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1028                         .expect("SpendableOutputs not handled within deadline");
1029                 match event {
1030                         Event::SpendableOutputs { .. } => {},
1031                         Event::ChannelClosed { .. } => {},
1032                         _ => panic!("Unexpected event: {:?}", event),
1033                 }
1034
1035                 assert!(bg_processor.stop().is_ok());
1036         }
1037
1038         #[test]
1039         fn test_scorer_persistence() {
1040                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1041                 let data_dir = nodes[0].persister.get_data_dir();
1042                 let persister = Arc::new(Persister::new(data_dir));
1043                 let event_handler = |_: &_| {};
1044                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1045
1046                 loop {
1047                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1048                         let expected_log = "Persisting scorer".to_string();
1049                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1050                                 break
1051                         }
1052                 }
1053
1054                 assert!(bg_processor.stop().is_ok());
1055         }
1056
1057         #[test]
1058         fn test_not_pruning_network_graph_until_graph_sync_completion() {
1059                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1060                 let data_dir = nodes[0].persister.get_data_dir();
1061                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1062                 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1063                 let network_graph = nodes[0].network_graph.clone();
1064                 let features = ChannelFeatures::empty();
1065                 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1066                         .expect("Failed to update channel from partial announcement");
1067                 let original_graph_description = network_graph.to_string();
1068                 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1069                 assert_eq!(network_graph.read_only().channels().len(), 1);
1070
1071                 let event_handler = |_: &_| {};
1072                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1073
1074                 loop {
1075                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1076                         let expected_log_a = "Assessing prunability of network graph".to_string();
1077                         let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
1078                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
1079                                 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
1080                                 break
1081                         }
1082                 }
1083
1084                 let initialization_input = vec![
1085                         76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1086                         79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1087                         0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1088                         187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1089                         157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1090                         88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1091                         204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1092                         181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1093                         110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1094                         76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1095                         226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1096                         0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1097                         0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1098                 ];
1099                 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1100
1101                 // this should have added two channels
1102                 assert_eq!(network_graph.read_only().channels().len(), 3);
1103
1104                 let _ = receiver
1105                         .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1106                         .expect("Network graph not pruned within deadline");
1107
1108                 background_processor.stop().unwrap();
1109
1110                 // all channels should now be pruned
1111                 assert_eq!(network_graph.read_only().channels().len(), 0);
1112         }
1113
1114         #[test]
1115         fn test_invoice_payer() {
1116                 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1117                 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1118                 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1119
1120                 // Initiate the background processors to watch each node.
1121                 let data_dir = nodes[0].persister.get_data_dir();
1122                 let persister = Arc::new(Persister::new(data_dir));
1123                 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
1124                 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
1125                 let event_handler = Arc::clone(&invoice_payer);
1126                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1127                 assert!(bg_processor.stop().is_ok());
1128         }
1129 }