9600c462fb3b43f1fab6c7f9c72619b698896bf2
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 #![deny(broken_intra_doc_links)]
6 #![deny(missing_docs)]
7 #![deny(unsafe_code)]
8
9 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
10
11 #[macro_use] extern crate lightning;
12 extern crate lightning_rapid_gossip_sync;
13
14 use lightning::chain;
15 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
16 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
17 use lightning::chain::keysinterface::{Sign, KeysInterface};
18 use lightning::ln::channelmanager::ChannelManager;
19 use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
20 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
21 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
22 use lightning::routing::scoring::{Score, MultiThreadedLockableScore};
23 use lightning::util::events::{Event, EventHandler, EventsProvider};
24 use lightning::util::logger::Logger;
25 use lightning::util::persist::Persister;
26 use lightning_rapid_gossip_sync::RapidGossipSync;
27 use std::sync::Arc;
28 use std::sync::atomic::{AtomicBool, Ordering};
29 use std::thread;
30 use std::thread::JoinHandle;
31 use std::time::{Duration, Instant};
32 use std::ops::Deref;
33
34 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
35 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
36 /// responsibilities are:
37 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
38 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
39 ///   writing it to disk/backups by invoking the callback given to it at startup.
40 ///   [`ChannelManager`] persistence should be done in the background.
41 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
42 ///   at the appropriate intervals.
43 /// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`GossipSync`] with a [`NetworkGraph`]
44 ///   is provided to [`BackgroundProcessor::start`]).
45 ///
46 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
47 /// upon as doing so may result in high latency.
48 ///
49 /// # Note
50 ///
51 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
52 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
53 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
54 /// unilateral chain closure fees are at risk.
55 ///
56 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
57 /// [`Event`]: lightning::util::events::Event
58 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
59 pub struct BackgroundProcessor {
60         stop_thread: Arc<AtomicBool>,
61         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
62 }
63
64 #[cfg(not(test))]
65 const FRESHNESS_TIMER: u64 = 60;
66 #[cfg(test)]
67 const FRESHNESS_TIMER: u64 = 1;
68
69 #[cfg(all(not(test), not(debug_assertions)))]
70 const PING_TIMER: u64 = 10;
71 /// Signature operations take a lot longer without compiler optimisations.
72 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
73 /// timeout is reached.
74 #[cfg(all(not(test), debug_assertions))]
75 const PING_TIMER: u64 = 30;
76 #[cfg(test)]
77 const PING_TIMER: u64 = 1;
78
79 /// Prune the network graph of stale entries hourly.
80 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
81
82 #[cfg(not(test))]
83 const SCORER_PERSIST_TIMER: u64 = 30;
84 #[cfg(test)]
85 const SCORER_PERSIST_TIMER: u64 = 1;
86
87 #[cfg(not(test))]
88 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
89 #[cfg(test)]
90 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
91
92 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
93 pub enum GossipSync<
94         P: Deref<Target = P2PGossipSync<G, A, L>>,
95         R: Deref<Target = RapidGossipSync<G, L>>,
96         G: Deref<Target = NetworkGraph<L>>,
97         A: Deref,
98         L: Deref,
99 >
100 where A::Target: chain::Access, L::Target: Logger {
101         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
102         P2P(P),
103         /// Rapid gossip sync from a trusted server.
104         Rapid(R),
105         /// No gossip sync.
106         None,
107 }
108
109 impl<
110         P: Deref<Target = P2PGossipSync<G, A, L>>,
111         R: Deref<Target = RapidGossipSync<G, L>>,
112         G: Deref<Target = NetworkGraph<L>>,
113         A: Deref,
114         L: Deref,
115 > GossipSync<P, R, G, A, L>
116 where A::Target: chain::Access, L::Target: Logger {
117         fn network_graph(&self) -> Option<&G> {
118                 match self {
119                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
120                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
121                         GossipSync::None => None,
122                 }
123         }
124
125         fn prunable_network_graph(&self) -> Option<&G> {
126                 match self {
127                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
128                         GossipSync::Rapid(gossip_sync) => {
129                                 if gossip_sync.is_initial_sync_complete() {
130                                         Some(gossip_sync.network_graph())
131                                 } else {
132                                         None
133                                 }
134                         },
135                         GossipSync::None => None,
136                 }
137         }
138 }
139
140 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
141 struct DecoratingEventHandler<
142         'a,
143         E: EventHandler,
144         PGS: Deref<Target = P2PGossipSync<G, A, L>>,
145         RGS: Deref<Target = RapidGossipSync<G, L>>,
146         G: Deref<Target = NetworkGraph<L>>,
147         A: Deref,
148         L: Deref,
149 >
150 where A::Target: chain::Access, L::Target: Logger {
151         event_handler: E,
152         gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
153 }
154
155 impl<
156         'a,
157         E: EventHandler,
158         PGS: Deref<Target = P2PGossipSync<G, A, L>>,
159         RGS: Deref<Target = RapidGossipSync<G, L>>,
160         G: Deref<Target = NetworkGraph<L>>,
161         A: Deref,
162         L: Deref,
163 > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
164 where A::Target: chain::Access, L::Target: Logger {
165         fn handle_event(&self, event: &Event) {
166                 if let Some(network_graph) = self.gossip_sync.network_graph() {
167                         network_graph.handle_event(event);
168                 }
169                 self.event_handler.handle_event(event);
170         }
171 }
172
173 impl BackgroundProcessor {
174         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
175         /// documentation].
176         ///
177         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
178         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
179         /// either [`join`] or [`stop`].
180         ///
181         /// # Data Persistence
182         ///
183         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
184         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
185         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
186         /// provided implementation.
187         ///
188         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
189         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
190         /// See the `lightning-persister` crate for LDK's provided implementation.
191         ///
192         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
193         /// error or call [`join`] and handle any error that may arise. For the latter case,
194         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
195         ///
196         /// # Event Handling
197         ///
198         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
199         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
200         /// functionality implemented by other handlers.
201         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
202         ///
203         /// # Rapid Gossip Sync
204         ///
205         /// If rapid gossip sync is meant to run at startup, pass a [`RapidGossipSync`] to `gossip_sync`
206         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
207         /// until the [`RapidGossipSync`] instance completes its first sync.
208         ///
209         /// [top-level documentation]: BackgroundProcessor
210         /// [`join`]: Self::join
211         /// [`stop`]: Self::stop
212         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
213         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
214         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
215         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
216         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
217         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
218         pub fn start<
219                 'a,
220                 Signer: 'static + Sign,
221                 CA: 'static + Deref + Send + Sync,
222                 CF: 'static + Deref + Send + Sync,
223                 CW: 'static + Deref + Send + Sync,
224                 T: 'static + Deref + Send + Sync,
225                 K: 'static + Deref + Send + Sync,
226                 F: 'static + Deref + Send + Sync,
227                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
228                 L: 'static + Deref + Send + Sync,
229                 P: 'static + Deref + Send + Sync,
230                 Descriptor: 'static + SocketDescriptor + Send + Sync,
231                 CMH: 'static + Deref + Send + Sync,
232                 RMH: 'static + Deref + Send + Sync,
233                 EH: 'static + EventHandler + Send,
234                 PS: 'static + Deref + Send,
235                 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
236                 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
237                 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
238                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
239                 UMH: 'static + Deref + Send + Sync,
240                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
241                 SC: Score + Send,
242         >(
243                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
244                 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<&'static MultiThreadedLockableScore<SC>>,
245         ) -> Self
246         where
247                 CA::Target: 'static + chain::Access,
248                 CF::Target: 'static + chain::Filter,
249                 CW::Target: 'static + chain::Watch<Signer>,
250                 T::Target: 'static + BroadcasterInterface,
251                 K::Target: 'static + KeysInterface<Signer = Signer>,
252                 F::Target: 'static + FeeEstimator,
253                 L::Target: 'static + Logger,
254                 P::Target: 'static + Persist<Signer>,
255                 CMH::Target: 'static + ChannelMessageHandler,
256                 RMH::Target: 'static + RoutingMessageHandler,
257                 UMH::Target: 'static + CustomMessageHandler,
258                 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
259         {
260                 let stop_thread = Arc::new(AtomicBool::new(false));
261                 let stop_thread_clone = stop_thread.clone();
262                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
263                         let event_handler = DecoratingEventHandler {
264                                 event_handler,
265                                 gossip_sync: &gossip_sync,
266                         };
267
268                         log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
269                         channel_manager.timer_tick_occurred();
270
271                         let mut last_freshness_call = Instant::now();
272                         let mut last_ping_call = Instant::now();
273                         let mut last_prune_call = Instant::now();
274                         let mut last_scorer_persist_call = Instant::now();
275                         let mut have_pruned = false;
276
277                         loop {
278                                 channel_manager.process_pending_events(&event_handler);
279                                 chain_monitor.process_pending_events(&event_handler);
280
281                                 // Note that the PeerManager::process_events may block on ChannelManager's locks,
282                                 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
283                                 // we want to ensure we get into `persist_manager` as quickly as we can, especially
284                                 // without running the normal event processing above and handing events to users.
285                                 //
286                                 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
287                                 // processing a message effectively at any point during this loop. In order to
288                                 // minimize the time between such processing completing and persisting the updated
289                                 // ChannelManager, we want to minimize methods blocking on a ChannelManager
290                                 // generally, and as a fallback place such blocking only immediately before
291                                 // persistence.
292                                 peer_manager.process_events();
293
294                                 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
295                                 // see `await_start`'s use below.
296                                 let await_start = Instant::now();
297                                 let updates_available =
298                                         channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
299                                 let await_time = await_start.elapsed();
300
301                                 if updates_available {
302                                         log_trace!(logger, "Persisting ChannelManager...");
303                                         persister.persist_manager(&*channel_manager)?;
304                                         log_trace!(logger, "Done persisting ChannelManager.");
305                                 }
306                                 // Exit the loop if the background processor was requested to stop.
307                                 if stop_thread.load(Ordering::Acquire) == true {
308                                         log_trace!(logger, "Terminating background processor.");
309                                         break;
310                                 }
311                                 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
312                                         log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
313                                         channel_manager.timer_tick_occurred();
314                                         last_freshness_call = Instant::now();
315                                 }
316                                 if await_time > Duration::from_secs(1) {
317                                         // On various platforms, we may be starved of CPU cycles for several reasons.
318                                         // E.g. on iOS, if we've been in the background, we will be entirely paused.
319                                         // Similarly, if we're on a desktop platform and the device has been asleep, we
320                                         // may not get any cycles.
321                                         // We detect this by checking if our max-100ms-sleep, above, ran longer than a
322                                         // full second, at which point we assume sockets may have been killed (they
323                                         // appear to be at least on some platforms, even if it has only been a second).
324                                         // Note that we have to take care to not get here just because user event
325                                         // processing was slow at the top of the loop. For example, the sample client
326                                         // may call Bitcoin Core RPCs during event handling, which very often takes
327                                         // more than a handful of seconds to complete, and shouldn't disconnect all our
328                                         // peers.
329                                         log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
330                                         peer_manager.disconnect_all_peers();
331                                         last_ping_call = Instant::now();
332                                 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
333                                         log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
334                                         peer_manager.timer_tick_occurred();
335                                         last_ping_call = Instant::now();
336                                 }
337
338                                 // Note that we want to run a graph prune once not long after startup before
339                                 // falling back to our usual hourly prunes. This avoids short-lived clients never
340                                 // pruning their network graph. We run once 60 seconds after startup before
341                                 // continuing our normal cadence.
342                                 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
343                                         // The network graph must not be pruned while rapid sync completion is pending
344                                         log_trace!(logger, "Assessing prunability of network graph");
345                                         if let Some(network_graph) = gossip_sync.prunable_network_graph() {
346                                                 network_graph.remove_stale_channels();
347
348                                                 if let Err(e) = persister.persist_graph(network_graph) {
349                                                         log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
350                                                 }
351
352                                                 last_prune_call = Instant::now();
353                                                 have_pruned = true;
354                                         } else {
355                                                 log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
356                                         }
357                                 }
358
359                                 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
360                                         if let Some(ref scorer) = scorer {
361                                                 log_trace!(logger, "Persisting scorer");
362                                                 if let Err(e) = persister.persist_scorer(&scorer) {
363                                                         log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
364                                                 }
365                                         }
366                                         last_scorer_persist_call = Instant::now();
367                                 }
368                         }
369
370                         // After we exit, ensure we persist the ChannelManager one final time - this avoids
371                         // some races where users quit while channel updates were in-flight, with
372                         // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
373                         persister.persist_manager(&*channel_manager)?;
374
375                         // Persist Scorer on exit
376                         if let Some(ref scorer) = scorer {
377                                 persister.persist_scorer(&scorer)?;
378                         }
379
380                         // Persist NetworkGraph on exit
381                         if let Some(network_graph) = gossip_sync.network_graph() {
382                                 persister.persist_graph(network_graph)?;
383                         }
384
385                         Ok(())
386                 });
387                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
388         }
389
390         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
391         /// [`ChannelManager`].
392         ///
393         /// # Panics
394         ///
395         /// This function panics if the background thread has panicked such as while persisting or
396         /// handling events.
397         ///
398         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
399         pub fn join(mut self) -> Result<(), std::io::Error> {
400                 assert!(self.thread_handle.is_some());
401                 self.join_thread()
402         }
403
404         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
405         /// [`ChannelManager`].
406         ///
407         /// # Panics
408         ///
409         /// This function panics if the background thread has panicked such as while persisting or
410         /// handling events.
411         ///
412         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
413         pub fn stop(mut self) -> Result<(), std::io::Error> {
414                 assert!(self.thread_handle.is_some());
415                 self.stop_and_join_thread()
416         }
417
418         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
419                 self.stop_thread.store(true, Ordering::Release);
420                 self.join_thread()
421         }
422
423         fn join_thread(&mut self) -> Result<(), std::io::Error> {
424                 match self.thread_handle.take() {
425                         Some(handle) => handle.join().unwrap(),
426                         None => Ok(()),
427                 }
428         }
429 }
430
431 impl Drop for BackgroundProcessor {
432         fn drop(&mut self) {
433                 self.stop_and_join_thread().unwrap();
434         }
435 }
436
437 #[cfg(test)]
438 mod tests {
439         use bitcoin::blockdata::block::BlockHeader;
440         use bitcoin::blockdata::constants::genesis_block;
441         use bitcoin::blockdata::transaction::{Transaction, TxOut};
442         use bitcoin::network::constants::Network;
443         use lightning::chain::{BestBlock, Confirm, chainmonitor};
444         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
445         use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
446         use lightning::chain::transaction::OutPoint;
447         use lightning::get_event_msg;
448         use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
449         use lightning::ln::features::{ChannelFeatures, InitFeatures};
450         use lightning::ln::msgs::{ChannelMessageHandler, Init};
451         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
452         use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
453         use lightning::util::config::UserConfig;
454         use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
455         use lightning::util::ser::Writeable;
456         use lightning::util::test_utils;
457         use lightning::util::persist::KVStorePersister;
458         use lightning_invoice::payment::{InvoicePayer, Retry};
459         use lightning_invoice::utils::DefaultRouter;
460         use lightning_persister::FilesystemPersister;
461         use std::fs;
462         use std::path::PathBuf;
463         use std::sync::{Arc, Mutex};
464         use std::sync::mpsc::SyncSender;
465         use std::time::Duration;
466         use lightning::routing::scoring::{FixedPenaltyScorer};
467         use lightning_rapid_gossip_sync::RapidGossipSync;
468         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
469
470         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
471
472         #[derive(Clone, Eq, Hash, PartialEq)]
473         struct TestDescriptor{}
474         impl SocketDescriptor for TestDescriptor {
475                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
476                         0
477                 }
478
479                 fn disconnect_socket(&mut self) {}
480         }
481
482         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
483
484         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
485         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
486
487         struct Node {
488                 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
489                 p2p_gossip_sync: PGS,
490                 rapid_gossip_sync: RGS,
491                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
492                 chain_monitor: Arc<ChainMonitor>,
493                 persister: Arc<FilesystemPersister>,
494                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
495                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
496                 logger: Arc<test_utils::TestLogger>,
497                 best_block: BestBlock,
498                 scorer: Arc<Mutex<FixedPenaltyScorer>>,
499         }
500
501         impl Node {
502                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
503                         GossipSync::P2P(self.p2p_gossip_sync.clone())
504                 }
505
506                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
507                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
508                 }
509
510                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
511                         GossipSync::None
512                 }
513         }
514
515         impl Drop for Node {
516                 fn drop(&mut self) {
517                         let data_dir = self.persister.get_data_dir();
518                         match fs::remove_dir_all(data_dir.clone()) {
519                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
520                                 _ => {}
521                         }
522                 }
523         }
524
525         struct Persister {
526                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
527                 graph_persistence_notifier: Option<SyncSender<()>>,
528                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
529                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
530                 filesystem_persister: FilesystemPersister,
531         }
532
533         impl Persister {
534                 fn new(data_dir: String) -> Self {
535                         let filesystem_persister = FilesystemPersister::new(data_dir.clone());
536                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
537                 }
538
539                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
540                         Self { graph_error: Some((error, message)), ..self }
541                 }
542
543                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
544                         Self { graph_persistence_notifier: Some(sender), ..self }
545                 }
546
547                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
548                         Self { manager_error: Some((error, message)), ..self }
549                 }
550
551                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
552                         Self { scorer_error: Some((error, message)), ..self }
553                 }
554         }
555
556         impl KVStorePersister for Persister {
557                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
558                         if key == "manager" {
559                                 if let Some((error, message)) = self.manager_error {
560                                         return Err(std::io::Error::new(error, message))
561                                 }
562                         }
563
564                         if key == "network_graph" {
565                                 if let Some(sender) = &self.graph_persistence_notifier {
566                                         sender.send(()).unwrap();
567                                 };
568
569                                 if let Some((error, message)) = self.graph_error {
570                                         return Err(std::io::Error::new(error, message))
571                                 }
572                         }
573
574                         if key == "scorer" {
575                                 if let Some((error, message)) = self.scorer_error {
576                                         return Err(std::io::Error::new(error, message))
577                                 }
578                         }
579
580                         self.filesystem_persister.persist(key, object)
581                 }
582         }
583
584         fn get_full_filepath(filepath: String, filename: String) -> String {
585                 let mut path = PathBuf::from(filepath);
586                 path.push(filename);
587                 path.to_str().unwrap().to_string()
588         }
589
590         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
591                 let mut nodes = Vec::new();
592                 for i in 0..num_nodes {
593                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
594                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
595                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
596                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
597                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
598                         let seed = [i as u8; 32];
599                         let network = Network::Testnet;
600                         let genesis_block = genesis_block(network);
601                         let now = Duration::from_secs(genesis_block.header.time as u64);
602                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
603                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
604                         let best_block = BestBlock::from_genesis(network);
605                         let params = ChainParameters { network, best_block };
606                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
607                         let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
608                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
609                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
610                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
611                         let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
612                         let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
613                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
614                         nodes.push(node);
615                 }
616
617                 for i in 0..num_nodes {
618                         for j in (i+1)..num_nodes {
619                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
620                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
621                         }
622                 }
623
624                 nodes
625         }
626
627         macro_rules! open_channel {
628                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
629                         begin_open_channel!($node_a, $node_b, $channel_value);
630                         let events = $node_a.node.get_and_clear_pending_events();
631                         assert_eq!(events.len(), 1);
632                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
633                         end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
634                         tx
635                 }}
636         }
637
638         macro_rules! begin_open_channel {
639                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
640                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
641                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
642                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
643                 }}
644         }
645
646         macro_rules! handle_funding_generation_ready {
647                 ($event: expr, $channel_value: expr) => {{
648                         match $event {
649                                 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
650                                         assert_eq!(channel_value_satoshis, $channel_value);
651                                         assert_eq!(user_channel_id, 42);
652
653                                         let tx = Transaction { version: 1 as i32, lock_time: 0, input: Vec::new(), output: vec![TxOut {
654                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
655                                         }]};
656                                         (temporary_channel_id, tx)
657                                 },
658                                 _ => panic!("Unexpected event"),
659                         }
660                 }}
661         }
662
663         macro_rules! end_open_channel {
664                 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
665                         $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
666                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
667                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
668                 }}
669         }
670
671         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
672                 for i in 1..=depth {
673                         let prev_blockhash = node.best_block.block_hash();
674                         let height = node.best_block.height() + 1;
675                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: Default::default(), time: height, bits: 42, nonce: 42 };
676                         let txdata = vec![(0, tx)];
677                         node.best_block = BestBlock::new(header.block_hash(), height);
678                         match i {
679                                 1 => {
680                                         node.node.transactions_confirmed(&header, &txdata, height);
681                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
682                                 },
683                                 x if x == depth => {
684                                         node.node.best_block_updated(&header, height);
685                                         node.chain_monitor.best_block_updated(&header, height);
686                                 },
687                                 _ => {},
688                         }
689                 }
690         }
691         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
692                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
693         }
694
695         #[test]
696         fn test_background_processor() {
697                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
698                 // updates. Also test that when new updates are available, the manager signals that it needs
699                 // re-persistence and is successfully re-persisted.
700                 let nodes = create_nodes(2, "test_background_processor".to_string());
701
702                 // Go through the channel creation process so that each node has something to persist. Since
703                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
704                 // avoid a race with processing events.
705                 let tx = open_channel!(nodes[0], nodes[1], 100000);
706
707                 // Initiate the background processors to watch each node.
708                 let data_dir = nodes[0].persister.get_data_dir();
709                 let persister = Arc::new(Persister::new(data_dir));
710                 let event_handler = |_: &_| {};
711                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
712
713                 macro_rules! check_persisted_data {
714                         ($node: expr, $filepath: expr) => {
715                                 let mut expected_bytes = Vec::new();
716                                 loop {
717                                         expected_bytes.clear();
718                                         match $node.write(&mut expected_bytes) {
719                                                 Ok(()) => {
720                                                         match std::fs::read($filepath) {
721                                                                 Ok(bytes) => {
722                                                                         if bytes == expected_bytes {
723                                                                                 break
724                                                                         } else {
725                                                                                 continue
726                                                                         }
727                                                                 },
728                                                                 Err(_) => continue
729                                                         }
730                                                 },
731                                                 Err(e) => panic!("Unexpected error: {}", e)
732                                         }
733                                 }
734                         }
735                 }
736
737                 // Check that the initial channel manager data is persisted as expected.
738                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
739                 check_persisted_data!(nodes[0].node, filepath.clone());
740
741                 loop {
742                         if !nodes[0].node.get_persistence_condvar_value() { break }
743                 }
744
745                 // Force-close the channel.
746                 nodes[0].node.force_close_channel(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
747
748                 // Check that the force-close updates are persisted.
749                 check_persisted_data!(nodes[0].node, filepath.clone());
750                 loop {
751                         if !nodes[0].node.get_persistence_condvar_value() { break }
752                 }
753
754                 // Check network graph is persisted
755                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
756                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
757
758                 // Check scorer is persisted
759                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
760                 check_persisted_data!(nodes[0].scorer, filepath.clone());
761
762                 assert!(bg_processor.stop().is_ok());
763         }
764
765         #[test]
766         fn test_timer_tick_called() {
767                 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
768                 // `FRESHNESS_TIMER`.
769                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
770                 let data_dir = nodes[0].persister.get_data_dir();
771                 let persister = Arc::new(Persister::new(data_dir));
772                 let event_handler = |_: &_| {};
773                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
774                 loop {
775                         let log_entries = nodes[0].logger.lines.lock().unwrap();
776                         let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
777                         let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
778                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
779                                         log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
780                                 break
781                         }
782                 }
783
784                 assert!(bg_processor.stop().is_ok());
785         }
786
787         #[test]
788         fn test_channel_manager_persist_error() {
789                 // Test that if we encounter an error during manager persistence, the thread panics.
790                 let nodes = create_nodes(2, "test_persist_error".to_string());
791                 open_channel!(nodes[0], nodes[1], 100000);
792
793                 let data_dir = nodes[0].persister.get_data_dir();
794                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
795                 let event_handler = |_: &_| {};
796                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
797                 match bg_processor.join() {
798                         Ok(_) => panic!("Expected error persisting manager"),
799                         Err(e) => {
800                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
801                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
802                         },
803                 }
804         }
805
806         #[test]
807         fn test_network_graph_persist_error() {
808                 // Test that if we encounter an error during network graph persistence, an error gets returned.
809                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
810                 let data_dir = nodes[0].persister.get_data_dir();
811                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
812                 let event_handler = |_: &_| {};
813                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
814
815                 match bg_processor.stop() {
816                         Ok(_) => panic!("Expected error persisting network graph"),
817                         Err(e) => {
818                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
819                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
820                         },
821                 }
822         }
823
824         #[test]
825         fn test_scorer_persist_error() {
826                 // Test that if we encounter an error during scorer persistence, an error gets returned.
827                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
828                 let data_dir = nodes[0].persister.get_data_dir();
829                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
830                 let event_handler = |_: &_| {};
831                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
832
833                 match bg_processor.stop() {
834                         Ok(_) => panic!("Expected error persisting scorer"),
835                         Err(e) => {
836                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
837                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
838                         },
839                 }
840         }
841
842         #[test]
843         fn test_background_event_handling() {
844                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
845                 let channel_value = 100000;
846                 let data_dir = nodes[0].persister.get_data_dir();
847                 let persister = Arc::new(Persister::new(data_dir.clone()));
848
849                 // Set up a background event handler for FundingGenerationReady events.
850                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
851                 let event_handler = move |event: &Event| {
852                         sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
853                 };
854                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
855
856                 // Open a channel and check that the FundingGenerationReady event was handled.
857                 begin_open_channel!(nodes[0], nodes[1], channel_value);
858                 let (temporary_channel_id, funding_tx) = receiver
859                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
860                         .expect("FundingGenerationReady not handled within deadline");
861                 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
862
863                 // Confirm the funding transaction.
864                 confirm_transaction(&mut nodes[0], &funding_tx);
865                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
866                 confirm_transaction(&mut nodes[1], &funding_tx);
867                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
868                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
869                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
870                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
871                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
872
873                 assert!(bg_processor.stop().is_ok());
874
875                 // Set up a background event handler for SpendableOutputs events.
876                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
877                 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
878                 let persister = Arc::new(Persister::new(data_dir));
879                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
880
881                 // Force close the channel and check that the SpendableOutputs event was handled.
882                 nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
883                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
884                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
885                 let event = receiver
886                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
887                         .expect("SpendableOutputs not handled within deadline");
888                 match event {
889                         Event::SpendableOutputs { .. } => {},
890                         Event::ChannelClosed { .. } => {},
891                         _ => panic!("Unexpected event: {:?}", event),
892                 }
893
894                 assert!(bg_processor.stop().is_ok());
895         }
896
897         #[test]
898         fn test_scorer_persistence() {
899                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
900                 let data_dir = nodes[0].persister.get_data_dir();
901                 let persister = Arc::new(Persister::new(data_dir));
902                 let event_handler = |_: &_| {};
903                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
904
905                 loop {
906                         let log_entries = nodes[0].logger.lines.lock().unwrap();
907                         let expected_log = "Persisting scorer".to_string();
908                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
909                                 break
910                         }
911                 }
912
913                 assert!(bg_processor.stop().is_ok());
914         }
915
916         #[test]
917         fn test_not_pruning_network_graph_until_graph_sync_completion() {
918                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
919                 let data_dir = nodes[0].persister.get_data_dir();
920                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
921                 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
922                 let network_graph = nodes[0].network_graph.clone();
923                 let features = ChannelFeatures::empty();
924                 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
925                         .expect("Failed to update channel from partial announcement");
926                 let original_graph_description = network_graph.to_string();
927                 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
928                 assert_eq!(network_graph.read_only().channels().len(), 1);
929
930                 let event_handler = |_: &_| {};
931                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
932
933                 loop {
934                         let log_entries = nodes[0].logger.lines.lock().unwrap();
935                         let expected_log_a = "Assessing prunability of network graph".to_string();
936                         let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
937                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
938                                 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
939                                 break
940                         }
941                 }
942
943                 let initialization_input = vec![
944                         76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
945                         79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
946                         0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
947                         187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
948                         157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
949                         88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
950                         204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
951                         181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
952                         110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
953                         76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
954                         226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
955                         0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
956                         0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
957                 ];
958                 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
959
960                 // this should have added two channels
961                 assert_eq!(network_graph.read_only().channels().len(), 3);
962
963                 let _ = receiver
964                         .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
965                         .expect("Network graph not pruned within deadline");
966
967                 background_processor.stop().unwrap();
968
969                 // all channels should now be pruned
970                 assert_eq!(network_graph.read_only().channels().len(), 0);
971         }
972
973         #[test]
974         fn test_invoice_payer() {
975                 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
976                 let random_seed_bytes = keys_manager.get_secure_random_bytes();
977                 let nodes = create_nodes(2, "test_invoice_payer".to_string());
978
979                 // Initiate the background processors to watch each node.
980                 let data_dir = nodes[0].persister.get_data_dir();
981                 let persister = Arc::new(Persister::new(data_dir));
982                 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
983                 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
984                 let event_handler = Arc::clone(&invoice_payer);
985                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
986                 assert!(bg_processor.stop().is_ok());
987         }
988 }