Merge pull request #1655 from valentinewallace/2022-08-fix-ci-doc-links-check
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
8
9 #![deny(missing_docs)]
10 #![deny(unsafe_code)]
11
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
13
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
16
17 use lightning::chain;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::{Sign, KeysInterface};
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::scoring::WriteableScore;
26 use lightning::util::events::{Event, EventHandler, EventsProvider};
27 use lightning::util::logger::Logger;
28 use lightning::util::persist::Persister;
29 use lightning_rapid_gossip_sync::RapidGossipSync;
30 use std::sync::Arc;
31 use std::sync::atomic::{AtomicBool, Ordering};
32 use std::thread;
33 use std::thread::JoinHandle;
34 use std::time::{Duration, Instant};
35 use std::ops::Deref;
36
37 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
38 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
39 /// responsibilities are:
40 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
41 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
42 ///   writing it to disk/backups by invoking the callback given to it at startup.
43 ///   [`ChannelManager`] persistence should be done in the background.
44 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
45 ///   at the appropriate intervals.
46 /// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`GossipSync`] with a [`NetworkGraph`]
47 ///   is provided to [`BackgroundProcessor::start`]).
48 ///
49 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
50 /// upon as doing so may result in high latency.
51 ///
52 /// # Note
53 ///
54 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
55 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
56 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
57 /// unilateral chain closure fees are at risk.
58 ///
59 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
60 /// [`Event`]: lightning::util::events::Event
61 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
62 pub struct BackgroundProcessor {
63         stop_thread: Arc<AtomicBool>,
64         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
65 }
66
67 #[cfg(not(test))]
68 const FRESHNESS_TIMER: u64 = 60;
69 #[cfg(test)]
70 const FRESHNESS_TIMER: u64 = 1;
71
72 #[cfg(all(not(test), not(debug_assertions)))]
73 const PING_TIMER: u64 = 10;
74 /// Signature operations take a lot longer without compiler optimisations.
75 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
76 /// timeout is reached.
77 #[cfg(all(not(test), debug_assertions))]
78 const PING_TIMER: u64 = 30;
79 #[cfg(test)]
80 const PING_TIMER: u64 = 1;
81
82 /// Prune the network graph of stale entries hourly.
83 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
84
85 #[cfg(not(test))]
86 const SCORER_PERSIST_TIMER: u64 = 30;
87 #[cfg(test)]
88 const SCORER_PERSIST_TIMER: u64 = 1;
89
90 #[cfg(not(test))]
91 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
92 #[cfg(test)]
93 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
94
95 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
96 pub enum GossipSync<
97         P: Deref<Target = P2PGossipSync<G, A, L>>,
98         R: Deref<Target = RapidGossipSync<G, L>>,
99         G: Deref<Target = NetworkGraph<L>>,
100         A: Deref,
101         L: Deref,
102 >
103 where A::Target: chain::Access, L::Target: Logger {
104         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
105         P2P(P),
106         /// Rapid gossip sync from a trusted server.
107         Rapid(R),
108         /// No gossip sync.
109         None,
110 }
111
112 impl<
113         P: Deref<Target = P2PGossipSync<G, A, L>>,
114         R: Deref<Target = RapidGossipSync<G, L>>,
115         G: Deref<Target = NetworkGraph<L>>,
116         A: Deref,
117         L: Deref,
118 > GossipSync<P, R, G, A, L>
119 where A::Target: chain::Access, L::Target: Logger {
120         fn network_graph(&self) -> Option<&G> {
121                 match self {
122                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
123                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
124                         GossipSync::None => None,
125                 }
126         }
127
128         fn prunable_network_graph(&self) -> Option<&G> {
129                 match self {
130                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
131                         GossipSync::Rapid(gossip_sync) => {
132                                 if gossip_sync.is_initial_sync_complete() {
133                                         Some(gossip_sync.network_graph())
134                                 } else {
135                                         None
136                                 }
137                         },
138                         GossipSync::None => None,
139                 }
140         }
141 }
142
143 /// (C-not exported) as the bindings concretize everything and have constructors for us
144 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
145         GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
146 where
147         A::Target: chain::Access,
148         L::Target: Logger,
149 {
150         /// Initializes a new [`GossipSync::P2P`] variant.
151         pub fn p2p(gossip_sync: P) -> Self {
152                 GossipSync::P2P(gossip_sync)
153         }
154 }
155
156 /// (C-not exported) as the bindings concretize everything and have constructors for us
157 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
158         GossipSync<
159                 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
160                 R,
161                 G,
162                 &'a (dyn chain::Access + Send + Sync),
163                 L,
164         >
165 where
166         L::Target: Logger,
167 {
168         /// Initializes a new [`GossipSync::Rapid`] variant.
169         pub fn rapid(gossip_sync: R) -> Self {
170                 GossipSync::Rapid(gossip_sync)
171         }
172 }
173
174 /// (C-not exported) as the bindings concretize everything and have constructors for us
175 impl<'a, L: Deref>
176         GossipSync<
177                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
178                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
179                 &'a NetworkGraph<L>,
180                 &'a (dyn chain::Access + Send + Sync),
181                 L,
182         >
183 where
184         L::Target: Logger,
185 {
186         /// Initializes a new [`GossipSync::None`] variant.
187         pub fn none() -> Self {
188                 GossipSync::None
189         }
190 }
191
192 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
193 struct DecoratingEventHandler<
194         'a,
195         E: EventHandler,
196         PGS: Deref<Target = P2PGossipSync<G, A, L>>,
197         RGS: Deref<Target = RapidGossipSync<G, L>>,
198         G: Deref<Target = NetworkGraph<L>>,
199         A: Deref,
200         L: Deref,
201 >
202 where A::Target: chain::Access, L::Target: Logger {
203         event_handler: E,
204         gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
205 }
206
207 impl<
208         'a,
209         E: EventHandler,
210         PGS: Deref<Target = P2PGossipSync<G, A, L>>,
211         RGS: Deref<Target = RapidGossipSync<G, L>>,
212         G: Deref<Target = NetworkGraph<L>>,
213         A: Deref,
214         L: Deref,
215 > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
216 where A::Target: chain::Access, L::Target: Logger {
217         fn handle_event(&self, event: &Event) {
218                 if let Some(network_graph) = self.gossip_sync.network_graph() {
219                         network_graph.handle_event(event);
220                 }
221                 self.event_handler.handle_event(event);
222         }
223 }
224
225 impl BackgroundProcessor {
226         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
227         /// documentation].
228         ///
229         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
230         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
231         /// either [`join`] or [`stop`].
232         ///
233         /// # Data Persistence
234         ///
235         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
236         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
237         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
238         /// provided implementation.
239         ///
240         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
241         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
242         /// See the `lightning-persister` crate for LDK's provided implementation.
243         ///
244         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
245         /// error or call [`join`] and handle any error that may arise. For the latter case,
246         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
247         ///
248         /// # Event Handling
249         ///
250         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
251         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
252         /// functionality implemented by other handlers.
253         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
254         ///
255         /// # Rapid Gossip Sync
256         ///
257         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
258         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
259         /// until the [`RapidGossipSync`] instance completes its first sync.
260         ///
261         /// [top-level documentation]: BackgroundProcessor
262         /// [`join`]: Self::join
263         /// [`stop`]: Self::stop
264         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
265         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
266         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
267         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
268         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
269         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
270         pub fn start<
271                 'a,
272                 Signer: 'static + Sign,
273                 CA: 'static + Deref + Send + Sync,
274                 CF: 'static + Deref + Send + Sync,
275                 CW: 'static + Deref + Send + Sync,
276                 T: 'static + Deref + Send + Sync,
277                 K: 'static + Deref + Send + Sync,
278                 F: 'static + Deref + Send + Sync,
279                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
280                 L: 'static + Deref + Send + Sync,
281                 P: 'static + Deref + Send + Sync,
282                 Descriptor: 'static + SocketDescriptor + Send + Sync,
283                 CMH: 'static + Deref + Send + Sync,
284                 RMH: 'static + Deref + Send + Sync,
285                 EH: 'static + EventHandler + Send,
286                 PS: 'static + Deref + Send,
287                 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
288                 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
289                 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
290                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
291                 UMH: 'static + Deref + Send + Sync,
292                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
293                 S: 'static + Deref<Target = SC> + Send + Sync,
294                 SC: WriteableScore<'a>,
295         >(
296                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
297                 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
298         ) -> Self
299         where
300                 CA::Target: 'static + chain::Access,
301                 CF::Target: 'static + chain::Filter,
302                 CW::Target: 'static + chain::Watch<Signer>,
303                 T::Target: 'static + BroadcasterInterface,
304                 K::Target: 'static + KeysInterface<Signer = Signer>,
305                 F::Target: 'static + FeeEstimator,
306                 L::Target: 'static + Logger,
307                 P::Target: 'static + Persist<Signer>,
308                 CMH::Target: 'static + ChannelMessageHandler,
309                 RMH::Target: 'static + RoutingMessageHandler,
310                 UMH::Target: 'static + CustomMessageHandler,
311                 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
312         {
313                 let stop_thread = Arc::new(AtomicBool::new(false));
314                 let stop_thread_clone = stop_thread.clone();
315                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
316                         let event_handler = DecoratingEventHandler {
317                                 event_handler,
318                                 gossip_sync: &gossip_sync,
319                         };
320
321                         log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
322                         channel_manager.timer_tick_occurred();
323
324                         let mut last_freshness_call = Instant::now();
325                         let mut last_ping_call = Instant::now();
326                         let mut last_prune_call = Instant::now();
327                         let mut last_scorer_persist_call = Instant::now();
328                         let mut have_pruned = false;
329
330                         loop {
331                                 channel_manager.process_pending_events(&event_handler);
332                                 chain_monitor.process_pending_events(&event_handler);
333
334                                 // Note that the PeerManager::process_events may block on ChannelManager's locks,
335                                 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
336                                 // we want to ensure we get into `persist_manager` as quickly as we can, especially
337                                 // without running the normal event processing above and handing events to users.
338                                 //
339                                 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
340                                 // processing a message effectively at any point during this loop. In order to
341                                 // minimize the time between such processing completing and persisting the updated
342                                 // ChannelManager, we want to minimize methods blocking on a ChannelManager
343                                 // generally, and as a fallback place such blocking only immediately before
344                                 // persistence.
345                                 peer_manager.process_events();
346
347                                 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
348                                 // see `await_start`'s use below.
349                                 let await_start = Instant::now();
350                                 let updates_available =
351                                         channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
352                                 let await_time = await_start.elapsed();
353
354                                 if updates_available {
355                                         log_trace!(logger, "Persisting ChannelManager...");
356                                         persister.persist_manager(&*channel_manager)?;
357                                         log_trace!(logger, "Done persisting ChannelManager.");
358                                 }
359                                 // Exit the loop if the background processor was requested to stop.
360                                 if stop_thread.load(Ordering::Acquire) == true {
361                                         log_trace!(logger, "Terminating background processor.");
362                                         break;
363                                 }
364                                 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
365                                         log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
366                                         channel_manager.timer_tick_occurred();
367                                         last_freshness_call = Instant::now();
368                                 }
369                                 if await_time > Duration::from_secs(1) {
370                                         // On various platforms, we may be starved of CPU cycles for several reasons.
371                                         // E.g. on iOS, if we've been in the background, we will be entirely paused.
372                                         // Similarly, if we're on a desktop platform and the device has been asleep, we
373                                         // may not get any cycles.
374                                         // We detect this by checking if our max-100ms-sleep, above, ran longer than a
375                                         // full second, at which point we assume sockets may have been killed (they
376                                         // appear to be at least on some platforms, even if it has only been a second).
377                                         // Note that we have to take care to not get here just because user event
378                                         // processing was slow at the top of the loop. For example, the sample client
379                                         // may call Bitcoin Core RPCs during event handling, which very often takes
380                                         // more than a handful of seconds to complete, and shouldn't disconnect all our
381                                         // peers.
382                                         log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
383                                         peer_manager.disconnect_all_peers();
384                                         last_ping_call = Instant::now();
385                                 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
386                                         log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
387                                         peer_manager.timer_tick_occurred();
388                                         last_ping_call = Instant::now();
389                                 }
390
391                                 // Note that we want to run a graph prune once not long after startup before
392                                 // falling back to our usual hourly prunes. This avoids short-lived clients never
393                                 // pruning their network graph. We run once 60 seconds after startup before
394                                 // continuing our normal cadence.
395                                 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
396                                         // The network graph must not be pruned while rapid sync completion is pending
397                                         log_trace!(logger, "Assessing prunability of network graph");
398                                         if let Some(network_graph) = gossip_sync.prunable_network_graph() {
399                                                 network_graph.remove_stale_channels();
400
401                                                 if let Err(e) = persister.persist_graph(network_graph) {
402                                                         log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
403                                                 }
404
405                                                 last_prune_call = Instant::now();
406                                                 have_pruned = true;
407                                         } else {
408                                                 log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
409                                         }
410                                 }
411
412                                 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
413                                         if let Some(ref scorer) = scorer {
414                                                 log_trace!(logger, "Persisting scorer");
415                                                 if let Err(e) = persister.persist_scorer(&scorer) {
416                                                         log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
417                                                 }
418                                         }
419                                         last_scorer_persist_call = Instant::now();
420                                 }
421                         }
422
423                         // After we exit, ensure we persist the ChannelManager one final time - this avoids
424                         // some races where users quit while channel updates were in-flight, with
425                         // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
426                         persister.persist_manager(&*channel_manager)?;
427
428                         // Persist Scorer on exit
429                         if let Some(ref scorer) = scorer {
430                                 persister.persist_scorer(&scorer)?;
431                         }
432
433                         // Persist NetworkGraph on exit
434                         if let Some(network_graph) = gossip_sync.network_graph() {
435                                 persister.persist_graph(network_graph)?;
436                         }
437
438                         Ok(())
439                 });
440                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
441         }
442
443         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
444         /// [`ChannelManager`].
445         ///
446         /// # Panics
447         ///
448         /// This function panics if the background thread has panicked such as while persisting or
449         /// handling events.
450         ///
451         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
452         pub fn join(mut self) -> Result<(), std::io::Error> {
453                 assert!(self.thread_handle.is_some());
454                 self.join_thread()
455         }
456
457         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
458         /// [`ChannelManager`].
459         ///
460         /// # Panics
461         ///
462         /// This function panics if the background thread has panicked such as while persisting or
463         /// handling events.
464         ///
465         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
466         pub fn stop(mut self) -> Result<(), std::io::Error> {
467                 assert!(self.thread_handle.is_some());
468                 self.stop_and_join_thread()
469         }
470
471         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
472                 self.stop_thread.store(true, Ordering::Release);
473                 self.join_thread()
474         }
475
476         fn join_thread(&mut self) -> Result<(), std::io::Error> {
477                 match self.thread_handle.take() {
478                         Some(handle) => handle.join().unwrap(),
479                         None => Ok(()),
480                 }
481         }
482 }
483
484 impl Drop for BackgroundProcessor {
485         fn drop(&mut self) {
486                 self.stop_and_join_thread().unwrap();
487         }
488 }
489
490 #[cfg(test)]
491 mod tests {
492         use bitcoin::blockdata::block::BlockHeader;
493         use bitcoin::blockdata::constants::genesis_block;
494         use bitcoin::blockdata::transaction::{Transaction, TxOut};
495         use bitcoin::network::constants::Network;
496         use lightning::chain::{BestBlock, Confirm, chainmonitor};
497         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
498         use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
499         use lightning::chain::transaction::OutPoint;
500         use lightning::get_event_msg;
501         use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
502         use lightning::ln::features::{ChannelFeatures, InitFeatures};
503         use lightning::ln::msgs::{ChannelMessageHandler, Init};
504         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
505         use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
506         use lightning::util::config::UserConfig;
507         use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
508         use lightning::util::ser::Writeable;
509         use lightning::util::test_utils;
510         use lightning::util::persist::KVStorePersister;
511         use lightning_invoice::payment::{InvoicePayer, Retry};
512         use lightning_invoice::utils::DefaultRouter;
513         use lightning_persister::FilesystemPersister;
514         use std::fs;
515         use std::path::PathBuf;
516         use std::sync::{Arc, Mutex};
517         use std::sync::mpsc::SyncSender;
518         use std::time::Duration;
519         use lightning::routing::scoring::{FixedPenaltyScorer};
520         use lightning_rapid_gossip_sync::RapidGossipSync;
521         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
522
523         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
524
525         #[derive(Clone, Eq, Hash, PartialEq)]
526         struct TestDescriptor{}
527         impl SocketDescriptor for TestDescriptor {
528                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
529                         0
530                 }
531
532                 fn disconnect_socket(&mut self) {}
533         }
534
535         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
536
537         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
538         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
539
540         struct Node {
541                 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
542                 p2p_gossip_sync: PGS,
543                 rapid_gossip_sync: RGS,
544                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
545                 chain_monitor: Arc<ChainMonitor>,
546                 persister: Arc<FilesystemPersister>,
547                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
548                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
549                 logger: Arc<test_utils::TestLogger>,
550                 best_block: BestBlock,
551                 scorer: Arc<Mutex<FixedPenaltyScorer>>,
552         }
553
554         impl Node {
555                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
556                         GossipSync::P2P(self.p2p_gossip_sync.clone())
557                 }
558
559                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
560                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
561                 }
562
563                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
564                         GossipSync::None
565                 }
566         }
567
568         impl Drop for Node {
569                 fn drop(&mut self) {
570                         let data_dir = self.persister.get_data_dir();
571                         match fs::remove_dir_all(data_dir.clone()) {
572                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
573                                 _ => {}
574                         }
575                 }
576         }
577
578         struct Persister {
579                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
580                 graph_persistence_notifier: Option<SyncSender<()>>,
581                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
582                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
583                 filesystem_persister: FilesystemPersister,
584         }
585
586         impl Persister {
587                 fn new(data_dir: String) -> Self {
588                         let filesystem_persister = FilesystemPersister::new(data_dir.clone());
589                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
590                 }
591
592                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
593                         Self { graph_error: Some((error, message)), ..self }
594                 }
595
596                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
597                         Self { graph_persistence_notifier: Some(sender), ..self }
598                 }
599
600                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
601                         Self { manager_error: Some((error, message)), ..self }
602                 }
603
604                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
605                         Self { scorer_error: Some((error, message)), ..self }
606                 }
607         }
608
609         impl KVStorePersister for Persister {
610                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
611                         if key == "manager" {
612                                 if let Some((error, message)) = self.manager_error {
613                                         return Err(std::io::Error::new(error, message))
614                                 }
615                         }
616
617                         if key == "network_graph" {
618                                 if let Some(sender) = &self.graph_persistence_notifier {
619                                         sender.send(()).unwrap();
620                                 };
621
622                                 if let Some((error, message)) = self.graph_error {
623                                         return Err(std::io::Error::new(error, message))
624                                 }
625                         }
626
627                         if key == "scorer" {
628                                 if let Some((error, message)) = self.scorer_error {
629                                         return Err(std::io::Error::new(error, message))
630                                 }
631                         }
632
633                         self.filesystem_persister.persist(key, object)
634                 }
635         }
636
637         fn get_full_filepath(filepath: String, filename: String) -> String {
638                 let mut path = PathBuf::from(filepath);
639                 path.push(filename);
640                 path.to_str().unwrap().to_string()
641         }
642
643         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
644                 let mut nodes = Vec::new();
645                 for i in 0..num_nodes {
646                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
647                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
648                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
649                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
650                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
651                         let seed = [i as u8; 32];
652                         let network = Network::Testnet;
653                         let genesis_block = genesis_block(network);
654                         let now = Duration::from_secs(genesis_block.header.time as u64);
655                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
656                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
657                         let best_block = BestBlock::from_genesis(network);
658                         let params = ChainParameters { network, best_block };
659                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
660                         let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
661                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
662                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
663                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
664                         let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
665                         let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
666                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
667                         nodes.push(node);
668                 }
669
670                 for i in 0..num_nodes {
671                         for j in (i+1)..num_nodes {
672                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
673                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
674                         }
675                 }
676
677                 nodes
678         }
679
680         macro_rules! open_channel {
681                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
682                         begin_open_channel!($node_a, $node_b, $channel_value);
683                         let events = $node_a.node.get_and_clear_pending_events();
684                         assert_eq!(events.len(), 1);
685                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
686                         end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
687                         tx
688                 }}
689         }
690
691         macro_rules! begin_open_channel {
692                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
693                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
694                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
695                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
696                 }}
697         }
698
699         macro_rules! handle_funding_generation_ready {
700                 ($event: expr, $channel_value: expr) => {{
701                         match $event {
702                                 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
703                                         assert_eq!(channel_value_satoshis, $channel_value);
704                                         assert_eq!(user_channel_id, 42);
705
706                                         let tx = Transaction { version: 1 as i32, lock_time: 0, input: Vec::new(), output: vec![TxOut {
707                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
708                                         }]};
709                                         (temporary_channel_id, tx)
710                                 },
711                                 _ => panic!("Unexpected event"),
712                         }
713                 }}
714         }
715
716         macro_rules! end_open_channel {
717                 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
718                         $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
719                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
720                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
721                 }}
722         }
723
724         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
725                 for i in 1..=depth {
726                         let prev_blockhash = node.best_block.block_hash();
727                         let height = node.best_block.height() + 1;
728                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: Default::default(), time: height, bits: 42, nonce: 42 };
729                         let txdata = vec![(0, tx)];
730                         node.best_block = BestBlock::new(header.block_hash(), height);
731                         match i {
732                                 1 => {
733                                         node.node.transactions_confirmed(&header, &txdata, height);
734                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
735                                 },
736                                 x if x == depth => {
737                                         node.node.best_block_updated(&header, height);
738                                         node.chain_monitor.best_block_updated(&header, height);
739                                 },
740                                 _ => {},
741                         }
742                 }
743         }
744         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
745                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
746         }
747
748         #[test]
749         fn test_background_processor() {
750                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
751                 // updates. Also test that when new updates are available, the manager signals that it needs
752                 // re-persistence and is successfully re-persisted.
753                 let nodes = create_nodes(2, "test_background_processor".to_string());
754
755                 // Go through the channel creation process so that each node has something to persist. Since
756                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
757                 // avoid a race with processing events.
758                 let tx = open_channel!(nodes[0], nodes[1], 100000);
759
760                 // Initiate the background processors to watch each node.
761                 let data_dir = nodes[0].persister.get_data_dir();
762                 let persister = Arc::new(Persister::new(data_dir));
763                 let event_handler = |_: &_| {};
764                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
765
766                 macro_rules! check_persisted_data {
767                         ($node: expr, $filepath: expr) => {
768                                 let mut expected_bytes = Vec::new();
769                                 loop {
770                                         expected_bytes.clear();
771                                         match $node.write(&mut expected_bytes) {
772                                                 Ok(()) => {
773                                                         match std::fs::read($filepath) {
774                                                                 Ok(bytes) => {
775                                                                         if bytes == expected_bytes {
776                                                                                 break
777                                                                         } else {
778                                                                                 continue
779                                                                         }
780                                                                 },
781                                                                 Err(_) => continue
782                                                         }
783                                                 },
784                                                 Err(e) => panic!("Unexpected error: {}", e)
785                                         }
786                                 }
787                         }
788                 }
789
790                 // Check that the initial channel manager data is persisted as expected.
791                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
792                 check_persisted_data!(nodes[0].node, filepath.clone());
793
794                 loop {
795                         if !nodes[0].node.get_persistence_condvar_value() { break }
796                 }
797
798                 // Force-close the channel.
799                 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
800
801                 // Check that the force-close updates are persisted.
802                 check_persisted_data!(nodes[0].node, filepath.clone());
803                 loop {
804                         if !nodes[0].node.get_persistence_condvar_value() { break }
805                 }
806
807                 // Check network graph is persisted
808                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
809                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
810
811                 // Check scorer is persisted
812                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
813                 check_persisted_data!(nodes[0].scorer, filepath.clone());
814
815                 assert!(bg_processor.stop().is_ok());
816         }
817
818         #[test]
819         fn test_timer_tick_called() {
820                 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
821                 // `FRESHNESS_TIMER`.
822                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
823                 let data_dir = nodes[0].persister.get_data_dir();
824                 let persister = Arc::new(Persister::new(data_dir));
825                 let event_handler = |_: &_| {};
826                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
827                 loop {
828                         let log_entries = nodes[0].logger.lines.lock().unwrap();
829                         let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
830                         let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
831                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
832                                         log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
833                                 break
834                         }
835                 }
836
837                 assert!(bg_processor.stop().is_ok());
838         }
839
840         #[test]
841         fn test_channel_manager_persist_error() {
842                 // Test that if we encounter an error during manager persistence, the thread panics.
843                 let nodes = create_nodes(2, "test_persist_error".to_string());
844                 open_channel!(nodes[0], nodes[1], 100000);
845
846                 let data_dir = nodes[0].persister.get_data_dir();
847                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
848                 let event_handler = |_: &_| {};
849                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
850                 match bg_processor.join() {
851                         Ok(_) => panic!("Expected error persisting manager"),
852                         Err(e) => {
853                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
854                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
855                         },
856                 }
857         }
858
859         #[test]
860         fn test_network_graph_persist_error() {
861                 // Test that if we encounter an error during network graph persistence, an error gets returned.
862                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
863                 let data_dir = nodes[0].persister.get_data_dir();
864                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
865                 let event_handler = |_: &_| {};
866                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
867
868                 match bg_processor.stop() {
869                         Ok(_) => panic!("Expected error persisting network graph"),
870                         Err(e) => {
871                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
872                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
873                         },
874                 }
875         }
876
877         #[test]
878         fn test_scorer_persist_error() {
879                 // Test that if we encounter an error during scorer persistence, an error gets returned.
880                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
881                 let data_dir = nodes[0].persister.get_data_dir();
882                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
883                 let event_handler = |_: &_| {};
884                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
885
886                 match bg_processor.stop() {
887                         Ok(_) => panic!("Expected error persisting scorer"),
888                         Err(e) => {
889                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
890                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
891                         },
892                 }
893         }
894
895         #[test]
896         fn test_background_event_handling() {
897                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
898                 let channel_value = 100000;
899                 let data_dir = nodes[0].persister.get_data_dir();
900                 let persister = Arc::new(Persister::new(data_dir.clone()));
901
902                 // Set up a background event handler for FundingGenerationReady events.
903                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
904                 let event_handler = move |event: &Event| {
905                         sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
906                 };
907                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
908
909                 // Open a channel and check that the FundingGenerationReady event was handled.
910                 begin_open_channel!(nodes[0], nodes[1], channel_value);
911                 let (temporary_channel_id, funding_tx) = receiver
912                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
913                         .expect("FundingGenerationReady not handled within deadline");
914                 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
915
916                 // Confirm the funding transaction.
917                 confirm_transaction(&mut nodes[0], &funding_tx);
918                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
919                 confirm_transaction(&mut nodes[1], &funding_tx);
920                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
921                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
922                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
923                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
924                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
925
926                 assert!(bg_processor.stop().is_ok());
927
928                 // Set up a background event handler for SpendableOutputs events.
929                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
930                 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
931                 let persister = Arc::new(Persister::new(data_dir));
932                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
933
934                 // Force close the channel and check that the SpendableOutputs event was handled.
935                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
936                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
937                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
938                 let event = receiver
939                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
940                         .expect("SpendableOutputs not handled within deadline");
941                 match event {
942                         Event::SpendableOutputs { .. } => {},
943                         Event::ChannelClosed { .. } => {},
944                         _ => panic!("Unexpected event: {:?}", event),
945                 }
946
947                 assert!(bg_processor.stop().is_ok());
948         }
949
950         #[test]
951         fn test_scorer_persistence() {
952                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
953                 let data_dir = nodes[0].persister.get_data_dir();
954                 let persister = Arc::new(Persister::new(data_dir));
955                 let event_handler = |_: &_| {};
956                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
957
958                 loop {
959                         let log_entries = nodes[0].logger.lines.lock().unwrap();
960                         let expected_log = "Persisting scorer".to_string();
961                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
962                                 break
963                         }
964                 }
965
966                 assert!(bg_processor.stop().is_ok());
967         }
968
969         #[test]
970         fn test_not_pruning_network_graph_until_graph_sync_completion() {
971                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
972                 let data_dir = nodes[0].persister.get_data_dir();
973                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
974                 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
975                 let network_graph = nodes[0].network_graph.clone();
976                 let features = ChannelFeatures::empty();
977                 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
978                         .expect("Failed to update channel from partial announcement");
979                 let original_graph_description = network_graph.to_string();
980                 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
981                 assert_eq!(network_graph.read_only().channels().len(), 1);
982
983                 let event_handler = |_: &_| {};
984                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
985
986                 loop {
987                         let log_entries = nodes[0].logger.lines.lock().unwrap();
988                         let expected_log_a = "Assessing prunability of network graph".to_string();
989                         let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
990                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
991                                 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
992                                 break
993                         }
994                 }
995
996                 let initialization_input = vec![
997                         76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
998                         79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
999                         0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1000                         187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1001                         157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1002                         88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1003                         204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1004                         181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1005                         110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1006                         76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1007                         226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1008                         0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1009                         0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1010                 ];
1011                 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1012
1013                 // this should have added two channels
1014                 assert_eq!(network_graph.read_only().channels().len(), 3);
1015
1016                 let _ = receiver
1017                         .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1018                         .expect("Network graph not pruned within deadline");
1019
1020                 background_processor.stop().unwrap();
1021
1022                 // all channels should now be pruned
1023                 assert_eq!(network_graph.read_only().channels().len(), 0);
1024         }
1025
1026         #[test]
1027         fn test_invoice_payer() {
1028                 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1029                 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1030                 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1031
1032                 // Initiate the background processors to watch each node.
1033                 let data_dir = nodes[0].persister.get_data_dir();
1034                 let persister = Arc::new(Persister::new(data_dir));
1035                 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
1036                 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
1037                 let event_handler = Arc::clone(&invoice_payer);
1038                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1039                 assert!(bg_processor.stop().is_ok());
1040         }
1041 }