d13c1311e299135a5bea29265e4486f13c5fe8cf
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 #![deny(broken_intra_doc_links)]
6 #![deny(missing_docs)]
7 #![deny(unsafe_code)]
8
9 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
10
11 #[macro_use] extern crate lightning;
12 extern crate lightning_rapid_gossip_sync;
13
14 use lightning::chain;
15 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
16 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
17 use lightning::chain::keysinterface::{Sign, KeysInterface};
18 use lightning::ln::channelmanager::ChannelManager;
19 use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
20 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
21 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
22 use lightning::routing::scoring::WriteableScore;
23 use lightning::util::events::{Event, EventHandler, EventsProvider};
24 use lightning::util::logger::Logger;
25 use lightning::util::persist::Persister;
26 use lightning_rapid_gossip_sync::RapidGossipSync;
27 use std::sync::Arc;
28 use std::sync::atomic::{AtomicBool, Ordering};
29 use std::thread;
30 use std::thread::JoinHandle;
31 use std::time::{Duration, Instant};
32 use std::ops::Deref;
33
34 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
35 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
36 /// responsibilities are:
37 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
38 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
39 ///   writing it to disk/backups by invoking the callback given to it at startup.
40 ///   [`ChannelManager`] persistence should be done in the background.
41 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
42 ///   at the appropriate intervals.
43 /// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`GossipSync`] with a [`NetworkGraph`]
44 ///   is provided to [`BackgroundProcessor::start`]).
45 ///
46 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
47 /// upon as doing so may result in high latency.
48 ///
49 /// # Note
50 ///
51 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
52 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
53 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
54 /// unilateral chain closure fees are at risk.
55 ///
56 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
57 /// [`Event`]: lightning::util::events::Event
58 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
59 pub struct BackgroundProcessor {
60         stop_thread: Arc<AtomicBool>,
61         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
62 }
63
64 #[cfg(not(test))]
65 const FRESHNESS_TIMER: u64 = 60;
66 #[cfg(test)]
67 const FRESHNESS_TIMER: u64 = 1;
68
69 #[cfg(all(not(test), not(debug_assertions)))]
70 const PING_TIMER: u64 = 10;
71 /// Signature operations take a lot longer without compiler optimisations.
72 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
73 /// timeout is reached.
74 #[cfg(all(not(test), debug_assertions))]
75 const PING_TIMER: u64 = 30;
76 #[cfg(test)]
77 const PING_TIMER: u64 = 1;
78
79 /// Prune the network graph of stale entries hourly.
80 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
81
82 #[cfg(not(test))]
83 const SCORER_PERSIST_TIMER: u64 = 30;
84 #[cfg(test)]
85 const SCORER_PERSIST_TIMER: u64 = 1;
86
87 #[cfg(not(test))]
88 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
89 #[cfg(test)]
90 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
91
92 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
93 pub enum GossipSync<
94         P: Deref<Target = P2PGossipSync<G, A, L>>,
95         R: Deref<Target = RapidGossipSync<G, L>>,
96         G: Deref<Target = NetworkGraph<L>>,
97         A: Deref,
98         L: Deref,
99 >
100 where A::Target: chain::Access, L::Target: Logger {
101         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
102         P2P(P),
103         /// Rapid gossip sync from a trusted server.
104         Rapid(R),
105         /// No gossip sync.
106         None,
107 }
108
109 impl<
110         P: Deref<Target = P2PGossipSync<G, A, L>>,
111         R: Deref<Target = RapidGossipSync<G, L>>,
112         G: Deref<Target = NetworkGraph<L>>,
113         A: Deref,
114         L: Deref,
115 > GossipSync<P, R, G, A, L>
116 where A::Target: chain::Access, L::Target: Logger {
117         fn network_graph(&self) -> Option<&G> {
118                 match self {
119                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
120                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
121                         GossipSync::None => None,
122                 }
123         }
124
125         fn prunable_network_graph(&self) -> Option<&G> {
126                 match self {
127                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
128                         GossipSync::Rapid(gossip_sync) => {
129                                 if gossip_sync.is_initial_sync_complete() {
130                                         Some(gossip_sync.network_graph())
131                                 } else {
132                                         None
133                                 }
134                         },
135                         GossipSync::None => None,
136                 }
137         }
138 }
139
140 /// (C-not exported) as the bindings concretize everything and have constructors for us
141 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
142         GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
143 where
144         A::Target: chain::Access,
145         L::Target: Logger,
146 {
147         /// Initializes a new [`GossipSync::P2P`] variant.
148         pub fn p2p(gossip_sync: P) -> Self {
149                 GossipSync::P2P(gossip_sync)
150         }
151 }
152
153 /// (C-not exported) as the bindings concretize everything and have constructors for us
154 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
155         GossipSync<
156                 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
157                 R,
158                 G,
159                 &'a (dyn chain::Access + Send + Sync),
160                 L,
161         >
162 where
163         L::Target: Logger,
164 {
165         /// Initializes a new [`GossipSync::Rapid`] variant.
166         pub fn rapid(gossip_sync: R) -> Self {
167                 GossipSync::Rapid(gossip_sync)
168         }
169 }
170
171 /// (C-not exported) as the bindings concretize everything and have constructors for us
172 impl<'a, L: Deref>
173         GossipSync<
174                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
175                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
176                 &'a NetworkGraph<L>,
177                 &'a (dyn chain::Access + Send + Sync),
178                 L,
179         >
180 where
181         L::Target: Logger,
182 {
183         /// Initializes a new [`GossipSync::None`] variant.
184         pub fn none() -> Self {
185                 GossipSync::None
186         }
187 }
188
189 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
190 struct DecoratingEventHandler<
191         'a,
192         E: EventHandler,
193         PGS: Deref<Target = P2PGossipSync<G, A, L>>,
194         RGS: Deref<Target = RapidGossipSync<G, L>>,
195         G: Deref<Target = NetworkGraph<L>>,
196         A: Deref,
197         L: Deref,
198 >
199 where A::Target: chain::Access, L::Target: Logger {
200         event_handler: E,
201         gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
202 }
203
204 impl<
205         'a,
206         E: EventHandler,
207         PGS: Deref<Target = P2PGossipSync<G, A, L>>,
208         RGS: Deref<Target = RapidGossipSync<G, L>>,
209         G: Deref<Target = NetworkGraph<L>>,
210         A: Deref,
211         L: Deref,
212 > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
213 where A::Target: chain::Access, L::Target: Logger {
214         fn handle_event(&self, event: &Event) {
215                 if let Some(network_graph) = self.gossip_sync.network_graph() {
216                         network_graph.handle_event(event);
217                 }
218                 self.event_handler.handle_event(event);
219         }
220 }
221
222 impl BackgroundProcessor {
223         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
224         /// documentation].
225         ///
226         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
227         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
228         /// either [`join`] or [`stop`].
229         ///
230         /// # Data Persistence
231         ///
232         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
233         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
234         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
235         /// provided implementation.
236         ///
237         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
238         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
239         /// See the `lightning-persister` crate for LDK's provided implementation.
240         ///
241         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
242         /// error or call [`join`] and handle any error that may arise. For the latter case,
243         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
244         ///
245         /// # Event Handling
246         ///
247         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
248         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
249         /// functionality implemented by other handlers.
250         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
251         ///
252         /// # Rapid Gossip Sync
253         ///
254         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
255         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
256         /// until the [`RapidGossipSync`] instance completes its first sync.
257         ///
258         /// [top-level documentation]: BackgroundProcessor
259         /// [`join`]: Self::join
260         /// [`stop`]: Self::stop
261         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
262         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
263         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
264         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
265         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
266         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
267         pub fn start<
268                 'a,
269                 Signer: 'static + Sign,
270                 CA: 'static + Deref + Send + Sync,
271                 CF: 'static + Deref + Send + Sync,
272                 CW: 'static + Deref + Send + Sync,
273                 T: 'static + Deref + Send + Sync,
274                 K: 'static + Deref + Send + Sync,
275                 F: 'static + Deref + Send + Sync,
276                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
277                 L: 'static + Deref + Send + Sync,
278                 P: 'static + Deref + Send + Sync,
279                 Descriptor: 'static + SocketDescriptor + Send + Sync,
280                 CMH: 'static + Deref + Send + Sync,
281                 RMH: 'static + Deref + Send + Sync,
282                 EH: 'static + EventHandler + Send,
283                 PS: 'static + Deref + Send,
284                 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
285                 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
286                 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
287                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
288                 UMH: 'static + Deref + Send + Sync,
289                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
290                 S: 'static + Deref<Target = SC> + Send + Sync,
291                 SC: WriteableScore<'a>,
292         >(
293                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
294                 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
295         ) -> Self
296         where
297                 CA::Target: 'static + chain::Access,
298                 CF::Target: 'static + chain::Filter,
299                 CW::Target: 'static + chain::Watch<Signer>,
300                 T::Target: 'static + BroadcasterInterface,
301                 K::Target: 'static + KeysInterface<Signer = Signer>,
302                 F::Target: 'static + FeeEstimator,
303                 L::Target: 'static + Logger,
304                 P::Target: 'static + Persist<Signer>,
305                 CMH::Target: 'static + ChannelMessageHandler,
306                 RMH::Target: 'static + RoutingMessageHandler,
307                 UMH::Target: 'static + CustomMessageHandler,
308                 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
309         {
310                 let stop_thread = Arc::new(AtomicBool::new(false));
311                 let stop_thread_clone = stop_thread.clone();
312                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
313                         let event_handler = DecoratingEventHandler {
314                                 event_handler,
315                                 gossip_sync: &gossip_sync,
316                         };
317
318                         log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
319                         channel_manager.timer_tick_occurred();
320
321                         let mut last_freshness_call = Instant::now();
322                         let mut last_ping_call = Instant::now();
323                         let mut last_prune_call = Instant::now();
324                         let mut last_scorer_persist_call = Instant::now();
325                         let mut have_pruned = false;
326
327                         loop {
328                                 channel_manager.process_pending_events(&event_handler);
329                                 chain_monitor.process_pending_events(&event_handler);
330
331                                 // Note that the PeerManager::process_events may block on ChannelManager's locks,
332                                 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
333                                 // we want to ensure we get into `persist_manager` as quickly as we can, especially
334                                 // without running the normal event processing above and handing events to users.
335                                 //
336                                 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
337                                 // processing a message effectively at any point during this loop. In order to
338                                 // minimize the time between such processing completing and persisting the updated
339                                 // ChannelManager, we want to minimize methods blocking on a ChannelManager
340                                 // generally, and as a fallback place such blocking only immediately before
341                                 // persistence.
342                                 peer_manager.process_events();
343
344                                 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
345                                 // see `await_start`'s use below.
346                                 let await_start = Instant::now();
347                                 let updates_available =
348                                         channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
349                                 let await_time = await_start.elapsed();
350
351                                 if updates_available {
352                                         log_trace!(logger, "Persisting ChannelManager...");
353                                         persister.persist_manager(&*channel_manager)?;
354                                         log_trace!(logger, "Done persisting ChannelManager.");
355                                 }
356                                 // Exit the loop if the background processor was requested to stop.
357                                 if stop_thread.load(Ordering::Acquire) == true {
358                                         log_trace!(logger, "Terminating background processor.");
359                                         break;
360                                 }
361                                 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
362                                         log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
363                                         channel_manager.timer_tick_occurred();
364                                         last_freshness_call = Instant::now();
365                                 }
366                                 if await_time > Duration::from_secs(1) {
367                                         // On various platforms, we may be starved of CPU cycles for several reasons.
368                                         // E.g. on iOS, if we've been in the background, we will be entirely paused.
369                                         // Similarly, if we're on a desktop platform and the device has been asleep, we
370                                         // may not get any cycles.
371                                         // We detect this by checking if our max-100ms-sleep, above, ran longer than a
372                                         // full second, at which point we assume sockets may have been killed (they
373                                         // appear to be at least on some platforms, even if it has only been a second).
374                                         // Note that we have to take care to not get here just because user event
375                                         // processing was slow at the top of the loop. For example, the sample client
376                                         // may call Bitcoin Core RPCs during event handling, which very often takes
377                                         // more than a handful of seconds to complete, and shouldn't disconnect all our
378                                         // peers.
379                                         log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
380                                         peer_manager.disconnect_all_peers();
381                                         last_ping_call = Instant::now();
382                                 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
383                                         log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
384                                         peer_manager.timer_tick_occurred();
385                                         last_ping_call = Instant::now();
386                                 }
387
388                                 // Note that we want to run a graph prune once not long after startup before
389                                 // falling back to our usual hourly prunes. This avoids short-lived clients never
390                                 // pruning their network graph. We run once 60 seconds after startup before
391                                 // continuing our normal cadence.
392                                 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
393                                         // The network graph must not be pruned while rapid sync completion is pending
394                                         log_trace!(logger, "Assessing prunability of network graph");
395                                         if let Some(network_graph) = gossip_sync.prunable_network_graph() {
396                                                 network_graph.remove_stale_channels();
397
398                                                 if let Err(e) = persister.persist_graph(network_graph) {
399                                                         log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
400                                                 }
401
402                                                 last_prune_call = Instant::now();
403                                                 have_pruned = true;
404                                         } else {
405                                                 log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
406                                         }
407                                 }
408
409                                 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
410                                         if let Some(ref scorer) = scorer {
411                                                 log_trace!(logger, "Persisting scorer");
412                                                 if let Err(e) = persister.persist_scorer(&scorer) {
413                                                         log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
414                                                 }
415                                         }
416                                         last_scorer_persist_call = Instant::now();
417                                 }
418                         }
419
420                         // After we exit, ensure we persist the ChannelManager one final time - this avoids
421                         // some races where users quit while channel updates were in-flight, with
422                         // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
423                         persister.persist_manager(&*channel_manager)?;
424
425                         // Persist Scorer on exit
426                         if let Some(ref scorer) = scorer {
427                                 persister.persist_scorer(&scorer)?;
428                         }
429
430                         // Persist NetworkGraph on exit
431                         if let Some(network_graph) = gossip_sync.network_graph() {
432                                 persister.persist_graph(network_graph)?;
433                         }
434
435                         Ok(())
436                 });
437                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
438         }
439
440         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
441         /// [`ChannelManager`].
442         ///
443         /// # Panics
444         ///
445         /// This function panics if the background thread has panicked such as while persisting or
446         /// handling events.
447         ///
448         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
449         pub fn join(mut self) -> Result<(), std::io::Error> {
450                 assert!(self.thread_handle.is_some());
451                 self.join_thread()
452         }
453
454         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
455         /// [`ChannelManager`].
456         ///
457         /// # Panics
458         ///
459         /// This function panics if the background thread has panicked such as while persisting or
460         /// handling events.
461         ///
462         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
463         pub fn stop(mut self) -> Result<(), std::io::Error> {
464                 assert!(self.thread_handle.is_some());
465                 self.stop_and_join_thread()
466         }
467
468         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
469                 self.stop_thread.store(true, Ordering::Release);
470                 self.join_thread()
471         }
472
473         fn join_thread(&mut self) -> Result<(), std::io::Error> {
474                 match self.thread_handle.take() {
475                         Some(handle) => handle.join().unwrap(),
476                         None => Ok(()),
477                 }
478         }
479 }
480
481 impl Drop for BackgroundProcessor {
482         fn drop(&mut self) {
483                 self.stop_and_join_thread().unwrap();
484         }
485 }
486
487 #[cfg(test)]
488 mod tests {
489         use bitcoin::blockdata::block::BlockHeader;
490         use bitcoin::blockdata::constants::genesis_block;
491         use bitcoin::blockdata::locktime::PackedLockTime;
492         use bitcoin::blockdata::transaction::{Transaction, TxOut};
493         use bitcoin::network::constants::Network;
494         use lightning::chain::{BestBlock, Confirm, chainmonitor};
495         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
496         use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
497         use lightning::chain::transaction::OutPoint;
498         use lightning::get_event_msg;
499         use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
500         use lightning::ln::features::{ChannelFeatures, InitFeatures};
501         use lightning::ln::msgs::{ChannelMessageHandler, Init};
502         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
503         use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
504         use lightning::util::config::UserConfig;
505         use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
506         use lightning::util::ser::Writeable;
507         use lightning::util::test_utils;
508         use lightning::util::persist::KVStorePersister;
509         use lightning_invoice::payment::{InvoicePayer, Retry};
510         use lightning_invoice::utils::DefaultRouter;
511         use lightning_persister::FilesystemPersister;
512         use std::fs;
513         use std::path::PathBuf;
514         use std::sync::{Arc, Mutex};
515         use std::sync::mpsc::SyncSender;
516         use std::time::Duration;
517         use bitcoin::hashes::Hash;
518         use bitcoin::TxMerkleNode;
519         use lightning::routing::scoring::{FixedPenaltyScorer};
520         use lightning_rapid_gossip_sync::RapidGossipSync;
521         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
522
523         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
524
525         #[derive(Clone, Eq, Hash, PartialEq)]
526         struct TestDescriptor{}
527         impl SocketDescriptor for TestDescriptor {
528                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
529                         0
530                 }
531
532                 fn disconnect_socket(&mut self) {}
533         }
534
535         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
536
537         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
538         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
539
540         struct Node {
541                 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
542                 p2p_gossip_sync: PGS,
543                 rapid_gossip_sync: RGS,
544                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
545                 chain_monitor: Arc<ChainMonitor>,
546                 persister: Arc<FilesystemPersister>,
547                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
548                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
549                 logger: Arc<test_utils::TestLogger>,
550                 best_block: BestBlock,
551                 scorer: Arc<Mutex<FixedPenaltyScorer>>,
552         }
553
554         impl Node {
555                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
556                         GossipSync::P2P(self.p2p_gossip_sync.clone())
557                 }
558
559                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
560                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
561                 }
562
563                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
564                         GossipSync::None
565                 }
566         }
567
568         impl Drop for Node {
569                 fn drop(&mut self) {
570                         let data_dir = self.persister.get_data_dir();
571                         match fs::remove_dir_all(data_dir.clone()) {
572                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
573                                 _ => {}
574                         }
575                 }
576         }
577
578         struct Persister {
579                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
580                 graph_persistence_notifier: Option<SyncSender<()>>,
581                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
582                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
583                 filesystem_persister: FilesystemPersister,
584         }
585
586         impl Persister {
587                 fn new(data_dir: String) -> Self {
588                         let filesystem_persister = FilesystemPersister::new(data_dir.clone());
589                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
590                 }
591
592                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
593                         Self { graph_error: Some((error, message)), ..self }
594                 }
595
596                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
597                         Self { graph_persistence_notifier: Some(sender), ..self }
598                 }
599
600                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
601                         Self { manager_error: Some((error, message)), ..self }
602                 }
603
604                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
605                         Self { scorer_error: Some((error, message)), ..self }
606                 }
607         }
608
609         impl KVStorePersister for Persister {
610                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
611                         if key == "manager" {
612                                 if let Some((error, message)) = self.manager_error {
613                                         return Err(std::io::Error::new(error, message))
614                                 }
615                         }
616
617                         if key == "network_graph" {
618                                 if let Some(sender) = &self.graph_persistence_notifier {
619                                         sender.send(()).unwrap();
620                                 };
621
622                                 if let Some((error, message)) = self.graph_error {
623                                         return Err(std::io::Error::new(error, message))
624                                 }
625                         }
626
627                         if key == "scorer" {
628                                 if let Some((error, message)) = self.scorer_error {
629                                         return Err(std::io::Error::new(error, message))
630                                 }
631                         }
632
633                         self.filesystem_persister.persist(key, object)
634                 }
635         }
636
637         fn get_full_filepath(filepath: String, filename: String) -> String {
638                 let mut path = PathBuf::from(filepath);
639                 path.push(filename);
640                 path.to_str().unwrap().to_string()
641         }
642
643         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
644                 let mut nodes = Vec::new();
645                 for i in 0..num_nodes {
646                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
647                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
648                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
649                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
650                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
651                         let seed = [i as u8; 32];
652                         let network = Network::Testnet;
653                         let genesis_block = genesis_block(network);
654                         let now = Duration::from_secs(genesis_block.header.time as u64);
655                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
656                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
657                         let best_block = BestBlock::from_genesis(network);
658                         let params = ChainParameters { network, best_block };
659                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
660                         let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
661                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
662                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
663                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
664                         let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
665                         let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
666                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
667                         nodes.push(node);
668                 }
669
670                 for i in 0..num_nodes {
671                         for j in (i+1)..num_nodes {
672                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
673                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
674                         }
675                 }
676
677                 nodes
678         }
679
680         macro_rules! open_channel {
681                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
682                         begin_open_channel!($node_a, $node_b, $channel_value);
683                         let events = $node_a.node.get_and_clear_pending_events();
684                         assert_eq!(events.len(), 1);
685                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
686                         end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
687                         tx
688                 }}
689         }
690
691         macro_rules! begin_open_channel {
692                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
693                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
694                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
695                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
696                 }}
697         }
698
699         macro_rules! handle_funding_generation_ready {
700                 ($event: expr, $channel_value: expr) => {{
701                         match $event {
702                                 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
703                                         assert_eq!(channel_value_satoshis, $channel_value);
704                                         assert_eq!(user_channel_id, 42);
705
706                                         let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
707                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
708                                         }]};
709                                         (temporary_channel_id, tx)
710                                 },
711                                 _ => panic!("Unexpected event"),
712                         }
713                 }}
714         }
715
716         macro_rules! end_open_channel {
717                 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
718                         $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
719                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
720                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
721                 }}
722         }
723
724         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
725                 for i in 1..=depth {
726                         let prev_blockhash = node.best_block.block_hash();
727                         let height = node.best_block.height() + 1;
728                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
729                         let txdata = vec![(0, tx)];
730                         node.best_block = BestBlock::new(header.block_hash(), height);
731                         match i {
732                                 1 => {
733                                         node.node.transactions_confirmed(&header, &txdata, height);
734                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
735                                 },
736                                 x if x == depth => {
737                                         node.node.best_block_updated(&header, height);
738                                         node.chain_monitor.best_block_updated(&header, height);
739                                 },
740                                 _ => {},
741                         }
742                 }
743         }
744         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
745                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
746         }
747
748         #[test]
749         fn test_background_processor() {
750                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
751                 // updates. Also test that when new updates are available, the manager signals that it needs
752                 // re-persistence and is successfully re-persisted.
753                 let nodes = create_nodes(2, "test_background_processor".to_string());
754
755                 // Go through the channel creation process so that each node has something to persist. Since
756                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
757                 // avoid a race with processing events.
758                 let tx = open_channel!(nodes[0], nodes[1], 100000);
759
760                 // Initiate the background processors to watch each node.
761                 let data_dir = nodes[0].persister.get_data_dir();
762                 let persister = Arc::new(Persister::new(data_dir));
763                 let event_handler = |_: &_| {};
764                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
765
766                 macro_rules! check_persisted_data {
767                         ($node: expr, $filepath: expr) => {
768                                 let mut expected_bytes = Vec::new();
769                                 loop {
770                                         expected_bytes.clear();
771                                         match $node.write(&mut expected_bytes) {
772                                                 Ok(()) => {
773                                                         match std::fs::read($filepath) {
774                                                                 Ok(bytes) => {
775                                                                         if bytes == expected_bytes {
776                                                                                 break
777                                                                         } else {
778                                                                                 continue
779                                                                         }
780                                                                 },
781                                                                 Err(_) => continue
782                                                         }
783                                                 },
784                                                 Err(e) => panic!("Unexpected error: {}", e)
785                                         }
786                                 }
787                         }
788                 }
789
790                 // Check that the initial channel manager data is persisted as expected.
791                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
792                 check_persisted_data!(nodes[0].node, filepath.clone());
793
794                 loop {
795                         if !nodes[0].node.get_persistence_condvar_value() { break }
796                 }
797
798                 // Force-close the channel.
799                 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
800
801                 // Check that the force-close updates are persisted.
802                 check_persisted_data!(nodes[0].node, filepath.clone());
803                 loop {
804                         if !nodes[0].node.get_persistence_condvar_value() { break }
805                 }
806
807                 // Check network graph is persisted
808                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
809                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
810
811                 // Check scorer is persisted
812                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
813                 check_persisted_data!(nodes[0].scorer, filepath.clone());
814
815                 assert!(bg_processor.stop().is_ok());
816         }
817
818         #[test]
819         fn test_timer_tick_called() {
820                 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
821                 // `FRESHNESS_TIMER`.
822                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
823                 let data_dir = nodes[0].persister.get_data_dir();
824                 let persister = Arc::new(Persister::new(data_dir));
825                 let event_handler = |_: &_| {};
826                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
827                 loop {
828                         let log_entries = nodes[0].logger.lines.lock().unwrap();
829                         let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
830                         let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
831                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
832                                         log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
833                                 break
834                         }
835                 }
836
837                 assert!(bg_processor.stop().is_ok());
838         }
839
840         #[test]
841         fn test_channel_manager_persist_error() {
842                 // Test that if we encounter an error during manager persistence, the thread panics.
843                 let nodes = create_nodes(2, "test_persist_error".to_string());
844                 open_channel!(nodes[0], nodes[1], 100000);
845
846                 let data_dir = nodes[0].persister.get_data_dir();
847                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
848                 let event_handler = |_: &_| {};
849                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
850                 match bg_processor.join() {
851                         Ok(_) => panic!("Expected error persisting manager"),
852                         Err(e) => {
853                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
854                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
855                         },
856                 }
857         }
858
859         #[test]
860         fn test_network_graph_persist_error() {
861                 // Test that if we encounter an error during network graph persistence, an error gets returned.
862                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
863                 let data_dir = nodes[0].persister.get_data_dir();
864                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
865                 let event_handler = |_: &_| {};
866                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
867
868                 match bg_processor.stop() {
869                         Ok(_) => panic!("Expected error persisting network graph"),
870                         Err(e) => {
871                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
872                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
873                         },
874                 }
875         }
876
877         #[test]
878         fn test_scorer_persist_error() {
879                 // Test that if we encounter an error during scorer persistence, an error gets returned.
880                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
881                 let data_dir = nodes[0].persister.get_data_dir();
882                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
883                 let event_handler = |_: &_| {};
884                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
885
886                 match bg_processor.stop() {
887                         Ok(_) => panic!("Expected error persisting scorer"),
888                         Err(e) => {
889                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
890                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
891                         },
892                 }
893         }
894
895         #[test]
896         fn test_background_event_handling() {
897                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
898                 let channel_value = 100000;
899                 let data_dir = nodes[0].persister.get_data_dir();
900                 let persister = Arc::new(Persister::new(data_dir.clone()));
901
902                 // Set up a background event handler for FundingGenerationReady events.
903                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
904                 let event_handler = move |event: &Event| {
905                         sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
906                 };
907                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
908
909                 // Open a channel and check that the FundingGenerationReady event was handled.
910                 begin_open_channel!(nodes[0], nodes[1], channel_value);
911                 let (temporary_channel_id, funding_tx) = receiver
912                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
913                         .expect("FundingGenerationReady not handled within deadline");
914                 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
915
916                 // Confirm the funding transaction.
917                 confirm_transaction(&mut nodes[0], &funding_tx);
918                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
919                 confirm_transaction(&mut nodes[1], &funding_tx);
920                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
921                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
922                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
923                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
924                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
925
926                 assert!(bg_processor.stop().is_ok());
927
928                 // Set up a background event handler for SpendableOutputs events.
929                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
930                 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
931                 let persister = Arc::new(Persister::new(data_dir));
932                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
933
934                 // Force close the channel and check that the SpendableOutputs event was handled.
935                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
936                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
937                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
938                 let event = receiver
939                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
940                         .expect("SpendableOutputs not handled within deadline");
941                 match event {
942                         Event::SpendableOutputs { .. } => {},
943                         Event::ChannelClosed { .. } => {},
944                         _ => panic!("Unexpected event: {:?}", event),
945                 }
946
947                 assert!(bg_processor.stop().is_ok());
948         }
949
950         #[test]
951         fn test_scorer_persistence() {
952                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
953                 let data_dir = nodes[0].persister.get_data_dir();
954                 let persister = Arc::new(Persister::new(data_dir));
955                 let event_handler = |_: &_| {};
956                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
957
958                 loop {
959                         let log_entries = nodes[0].logger.lines.lock().unwrap();
960                         let expected_log = "Persisting scorer".to_string();
961                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
962                                 break
963                         }
964                 }
965
966                 assert!(bg_processor.stop().is_ok());
967         }
968
969         #[test]
970         fn test_not_pruning_network_graph_until_graph_sync_completion() {
971                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
972                 let data_dir = nodes[0].persister.get_data_dir();
973                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
974                 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
975                 let network_graph = nodes[0].network_graph.clone();
976                 let features = ChannelFeatures::empty();
977                 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
978                         .expect("Failed to update channel from partial announcement");
979                 let original_graph_description = network_graph.to_string();
980                 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
981                 assert_eq!(network_graph.read_only().channels().len(), 1);
982
983                 let event_handler = |_: &_| {};
984                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
985
986                 loop {
987                         let log_entries = nodes[0].logger.lines.lock().unwrap();
988                         let expected_log_a = "Assessing prunability of network graph".to_string();
989                         let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
990                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
991                                 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
992                                 break
993                         }
994                 }
995
996                 let initialization_input = vec![
997                         76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
998                         79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
999                         0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1000                         187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1001                         157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1002                         88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1003                         204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1004                         181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1005                         110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1006                         76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1007                         226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1008                         0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1009                         0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1010                 ];
1011                 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1012
1013                 // this should have added two channels
1014                 assert_eq!(network_graph.read_only().channels().len(), 3);
1015
1016                 let _ = receiver
1017                         .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1018                         .expect("Network graph not pruned within deadline");
1019
1020                 background_processor.stop().unwrap();
1021
1022                 // all channels should now be pruned
1023                 assert_eq!(network_graph.read_only().channels().len(), 0);
1024         }
1025
1026         #[test]
1027         fn test_invoice_payer() {
1028                 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1029                 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1030                 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1031
1032                 // Initiate the background processors to watch each node.
1033                 let data_dir = nodes[0].persister.get_data_dir();
1034                 let persister = Arc::new(Persister::new(data_dir));
1035                 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
1036                 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
1037                 let event_handler = Arc::clone(&invoice_payer);
1038                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1039                 assert!(bg_processor.stop().is_ok());
1040         }
1041 }