8f6f0c49c1dcf0fdb3d5a6cb3468aeaafa38ef20
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
8
9 #![deny(missing_docs)]
10 #![deny(unsafe_code)]
11
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
13
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
16
17 use lightning::chain;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::{Sign, KeysInterface};
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::scoring::WriteableScore;
26 use lightning::util::events::{Event, EventHandler, EventsProvider};
27 use lightning::util::logger::Logger;
28 use lightning::util::persist::Persister;
29 use lightning_rapid_gossip_sync::RapidGossipSync;
30 use std::sync::Arc;
31 use std::sync::atomic::{AtomicBool, Ordering};
32 use std::thread;
33 use std::thread::JoinHandle;
34 use std::time::{Duration, Instant};
35 use std::ops::Deref;
36
37 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
38 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
39 /// responsibilities are:
40 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
41 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
42 ///   writing it to disk/backups by invoking the callback given to it at startup.
43 ///   [`ChannelManager`] persistence should be done in the background.
44 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
45 ///   at the appropriate intervals.
46 /// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`GossipSync`] with a [`NetworkGraph`]
47 ///   is provided to [`BackgroundProcessor::start`]).
48 ///
49 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
50 /// upon as doing so may result in high latency.
51 ///
52 /// # Note
53 ///
54 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
55 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
56 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
57 /// unilateral chain closure fees are at risk.
58 ///
59 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
60 /// [`Event`]: lightning::util::events::Event
61 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
62 pub struct BackgroundProcessor {
63         stop_thread: Arc<AtomicBool>,
64         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
65 }
66
67 #[cfg(not(test))]
68 const FRESHNESS_TIMER: u64 = 60;
69 #[cfg(test)]
70 const FRESHNESS_TIMER: u64 = 1;
71
72 #[cfg(all(not(test), not(debug_assertions)))]
73 const PING_TIMER: u64 = 10;
74 /// Signature operations take a lot longer without compiler optimisations.
75 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
76 /// timeout is reached.
77 #[cfg(all(not(test), debug_assertions))]
78 const PING_TIMER: u64 = 30;
79 #[cfg(test)]
80 const PING_TIMER: u64 = 1;
81
82 /// Prune the network graph of stale entries hourly.
83 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
84
85 #[cfg(not(test))]
86 const SCORER_PERSIST_TIMER: u64 = 30;
87 #[cfg(test)]
88 const SCORER_PERSIST_TIMER: u64 = 1;
89
90 #[cfg(not(test))]
91 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
92 #[cfg(test)]
93 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
94
95 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
96 pub enum GossipSync<
97         P: Deref<Target = P2PGossipSync<G, A, L>>,
98         R: Deref<Target = RapidGossipSync<G, L>>,
99         G: Deref<Target = NetworkGraph<L>>,
100         A: Deref,
101         L: Deref,
102 >
103 where A::Target: chain::Access, L::Target: Logger {
104         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
105         P2P(P),
106         /// Rapid gossip sync from a trusted server.
107         Rapid(R),
108         /// No gossip sync.
109         None,
110 }
111
112 impl<
113         P: Deref<Target = P2PGossipSync<G, A, L>>,
114         R: Deref<Target = RapidGossipSync<G, L>>,
115         G: Deref<Target = NetworkGraph<L>>,
116         A: Deref,
117         L: Deref,
118 > GossipSync<P, R, G, A, L>
119 where A::Target: chain::Access, L::Target: Logger {
120         fn network_graph(&self) -> Option<&G> {
121                 match self {
122                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
123                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
124                         GossipSync::None => None,
125                 }
126         }
127
128         fn prunable_network_graph(&self) -> Option<&G> {
129                 match self {
130                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
131                         GossipSync::Rapid(gossip_sync) => {
132                                 if gossip_sync.is_initial_sync_complete() {
133                                         Some(gossip_sync.network_graph())
134                                 } else {
135                                         None
136                                 }
137                         },
138                         GossipSync::None => None,
139                 }
140         }
141 }
142
143 /// (C-not exported) as the bindings concretize everything and have constructors for us
144 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
145         GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
146 where
147         A::Target: chain::Access,
148         L::Target: Logger,
149 {
150         /// Initializes a new [`GossipSync::P2P`] variant.
151         pub fn p2p(gossip_sync: P) -> Self {
152                 GossipSync::P2P(gossip_sync)
153         }
154 }
155
156 /// (C-not exported) as the bindings concretize everything and have constructors for us
157 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
158         GossipSync<
159                 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
160                 R,
161                 G,
162                 &'a (dyn chain::Access + Send + Sync),
163                 L,
164         >
165 where
166         L::Target: Logger,
167 {
168         /// Initializes a new [`GossipSync::Rapid`] variant.
169         pub fn rapid(gossip_sync: R) -> Self {
170                 GossipSync::Rapid(gossip_sync)
171         }
172 }
173
174 /// (C-not exported) as the bindings concretize everything and have constructors for us
175 impl<'a, L: Deref>
176         GossipSync<
177                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
178                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
179                 &'a NetworkGraph<L>,
180                 &'a (dyn chain::Access + Send + Sync),
181                 L,
182         >
183 where
184         L::Target: Logger,
185 {
186         /// Initializes a new [`GossipSync::None`] variant.
187         pub fn none() -> Self {
188                 GossipSync::None
189         }
190 }
191
192 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
193 struct DecoratingEventHandler<
194         'a,
195         E: EventHandler,
196         PGS: Deref<Target = P2PGossipSync<G, A, L>>,
197         RGS: Deref<Target = RapidGossipSync<G, L>>,
198         G: Deref<Target = NetworkGraph<L>>,
199         A: Deref,
200         L: Deref,
201 >
202 where A::Target: chain::Access, L::Target: Logger {
203         event_handler: E,
204         gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
205 }
206
207 impl<
208         'a,
209         E: EventHandler,
210         PGS: Deref<Target = P2PGossipSync<G, A, L>>,
211         RGS: Deref<Target = RapidGossipSync<G, L>>,
212         G: Deref<Target = NetworkGraph<L>>,
213         A: Deref,
214         L: Deref,
215 > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
216 where A::Target: chain::Access, L::Target: Logger {
217         fn handle_event(&self, event: &Event) {
218                 if let Some(network_graph) = self.gossip_sync.network_graph() {
219                         network_graph.handle_event(event);
220                 }
221                 self.event_handler.handle_event(event);
222         }
223 }
224
225 impl BackgroundProcessor {
226         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
227         /// documentation].
228         ///
229         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
230         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
231         /// either [`join`] or [`stop`].
232         ///
233         /// # Data Persistence
234         ///
235         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
236         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
237         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
238         /// provided implementation.
239         ///
240         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
241         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
242         /// See the `lightning-persister` crate for LDK's provided implementation.
243         ///
244         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
245         /// error or call [`join`] and handle any error that may arise. For the latter case,
246         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
247         ///
248         /// # Event Handling
249         ///
250         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
251         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
252         /// functionality implemented by other handlers.
253         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
254         ///
255         /// # Rapid Gossip Sync
256         ///
257         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
258         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
259         /// until the [`RapidGossipSync`] instance completes its first sync.
260         ///
261         /// [top-level documentation]: BackgroundProcessor
262         /// [`join`]: Self::join
263         /// [`stop`]: Self::stop
264         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
265         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
266         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
267         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
268         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
269         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
270         pub fn start<
271                 'a,
272                 Signer: 'static + Sign,
273                 CA: 'static + Deref + Send + Sync,
274                 CF: 'static + Deref + Send + Sync,
275                 CW: 'static + Deref + Send + Sync,
276                 T: 'static + Deref + Send + Sync,
277                 K: 'static + Deref + Send + Sync,
278                 F: 'static + Deref + Send + Sync,
279                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
280                 L: 'static + Deref + Send + Sync,
281                 P: 'static + Deref + Send + Sync,
282                 Descriptor: 'static + SocketDescriptor + Send + Sync,
283                 CMH: 'static + Deref + Send + Sync,
284                 RMH: 'static + Deref + Send + Sync,
285                 EH: 'static + EventHandler + Send,
286                 PS: 'static + Deref + Send,
287                 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
288                 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
289                 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
290                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
291                 UMH: 'static + Deref + Send + Sync,
292                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
293                 S: 'static + Deref<Target = SC> + Send + Sync,
294                 SC: WriteableScore<'a>,
295         >(
296                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
297                 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
298         ) -> Self
299         where
300                 CA::Target: 'static + chain::Access,
301                 CF::Target: 'static + chain::Filter,
302                 CW::Target: 'static + chain::Watch<Signer>,
303                 T::Target: 'static + BroadcasterInterface,
304                 K::Target: 'static + KeysInterface<Signer = Signer>,
305                 F::Target: 'static + FeeEstimator,
306                 L::Target: 'static + Logger,
307                 P::Target: 'static + Persist<Signer>,
308                 CMH::Target: 'static + ChannelMessageHandler,
309                 RMH::Target: 'static + RoutingMessageHandler,
310                 UMH::Target: 'static + CustomMessageHandler,
311                 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
312         {
313                 let stop_thread = Arc::new(AtomicBool::new(false));
314                 let stop_thread_clone = stop_thread.clone();
315                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
316                         let event_handler = DecoratingEventHandler {
317                                 event_handler,
318                                 gossip_sync: &gossip_sync,
319                         };
320
321                         log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
322                         channel_manager.timer_tick_occurred();
323
324                         let mut last_freshness_call = Instant::now();
325                         let mut last_ping_call = Instant::now();
326                         let mut last_prune_call = Instant::now();
327                         let mut last_scorer_persist_call = Instant::now();
328                         let mut have_pruned = false;
329
330                         loop {
331                                 channel_manager.process_pending_events(&event_handler);
332                                 chain_monitor.process_pending_events(&event_handler);
333
334                                 // Note that the PeerManager::process_events may block on ChannelManager's locks,
335                                 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
336                                 // we want to ensure we get into `persist_manager` as quickly as we can, especially
337                                 // without running the normal event processing above and handing events to users.
338                                 //
339                                 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
340                                 // processing a message effectively at any point during this loop. In order to
341                                 // minimize the time between such processing completing and persisting the updated
342                                 // ChannelManager, we want to minimize methods blocking on a ChannelManager
343                                 // generally, and as a fallback place such blocking only immediately before
344                                 // persistence.
345                                 peer_manager.process_events();
346
347                                 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
348                                 // see `await_start`'s use below.
349                                 let await_start = Instant::now();
350                                 let updates_available =
351                                         channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
352                                 let await_time = await_start.elapsed();
353
354                                 if updates_available {
355                                         log_trace!(logger, "Persisting ChannelManager...");
356                                         persister.persist_manager(&*channel_manager)?;
357                                         log_trace!(logger, "Done persisting ChannelManager.");
358                                 }
359                                 // Exit the loop if the background processor was requested to stop.
360                                 if stop_thread.load(Ordering::Acquire) == true {
361                                         log_trace!(logger, "Terminating background processor.");
362                                         break;
363                                 }
364                                 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
365                                         log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
366                                         channel_manager.timer_tick_occurred();
367                                         last_freshness_call = Instant::now();
368                                 }
369                                 if await_time > Duration::from_secs(1) {
370                                         // On various platforms, we may be starved of CPU cycles for several reasons.
371                                         // E.g. on iOS, if we've been in the background, we will be entirely paused.
372                                         // Similarly, if we're on a desktop platform and the device has been asleep, we
373                                         // may not get any cycles.
374                                         // We detect this by checking if our max-100ms-sleep, above, ran longer than a
375                                         // full second, at which point we assume sockets may have been killed (they
376                                         // appear to be at least on some platforms, even if it has only been a second).
377                                         // Note that we have to take care to not get here just because user event
378                                         // processing was slow at the top of the loop. For example, the sample client
379                                         // may call Bitcoin Core RPCs during event handling, which very often takes
380                                         // more than a handful of seconds to complete, and shouldn't disconnect all our
381                                         // peers.
382                                         log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
383                                         peer_manager.disconnect_all_peers();
384                                         last_ping_call = Instant::now();
385                                 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
386                                         log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
387                                         peer_manager.timer_tick_occurred();
388                                         last_ping_call = Instant::now();
389                                 }
390
391                                 // Note that we want to run a graph prune once not long after startup before
392                                 // falling back to our usual hourly prunes. This avoids short-lived clients never
393                                 // pruning their network graph. We run once 60 seconds after startup before
394                                 // continuing our normal cadence.
395                                 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
396                                         // The network graph must not be pruned while rapid sync completion is pending
397                                         log_trace!(logger, "Assessing prunability of network graph");
398                                         if let Some(network_graph) = gossip_sync.prunable_network_graph() {
399                                                 network_graph.remove_stale_channels();
400
401                                                 if let Err(e) = persister.persist_graph(network_graph) {
402                                                         log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
403                                                 }
404
405                                                 last_prune_call = Instant::now();
406                                                 have_pruned = true;
407                                         } else {
408                                                 log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
409                                         }
410                                 }
411
412                                 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
413                                         if let Some(ref scorer) = scorer {
414                                                 log_trace!(logger, "Persisting scorer");
415                                                 if let Err(e) = persister.persist_scorer(&scorer) {
416                                                         log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
417                                                 }
418                                         }
419                                         last_scorer_persist_call = Instant::now();
420                                 }
421                         }
422
423                         // After we exit, ensure we persist the ChannelManager one final time - this avoids
424                         // some races where users quit while channel updates were in-flight, with
425                         // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
426                         persister.persist_manager(&*channel_manager)?;
427
428                         // Persist Scorer on exit
429                         if let Some(ref scorer) = scorer {
430                                 persister.persist_scorer(&scorer)?;
431                         }
432
433                         // Persist NetworkGraph on exit
434                         if let Some(network_graph) = gossip_sync.network_graph() {
435                                 persister.persist_graph(network_graph)?;
436                         }
437
438                         Ok(())
439                 });
440                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
441         }
442
443         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
444         /// [`ChannelManager`].
445         ///
446         /// # Panics
447         ///
448         /// This function panics if the background thread has panicked such as while persisting or
449         /// handling events.
450         ///
451         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
452         pub fn join(mut self) -> Result<(), std::io::Error> {
453                 assert!(self.thread_handle.is_some());
454                 self.join_thread()
455         }
456
457         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
458         /// [`ChannelManager`].
459         ///
460         /// # Panics
461         ///
462         /// This function panics if the background thread has panicked such as while persisting or
463         /// handling events.
464         ///
465         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
466         pub fn stop(mut self) -> Result<(), std::io::Error> {
467                 assert!(self.thread_handle.is_some());
468                 self.stop_and_join_thread()
469         }
470
471         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
472                 self.stop_thread.store(true, Ordering::Release);
473                 self.join_thread()
474         }
475
476         fn join_thread(&mut self) -> Result<(), std::io::Error> {
477                 match self.thread_handle.take() {
478                         Some(handle) => handle.join().unwrap(),
479                         None => Ok(()),
480                 }
481         }
482 }
483
484 impl Drop for BackgroundProcessor {
485         fn drop(&mut self) {
486                 self.stop_and_join_thread().unwrap();
487         }
488 }
489
490 #[cfg(test)]
491 mod tests {
492         use bitcoin::blockdata::block::BlockHeader;
493         use bitcoin::blockdata::constants::genesis_block;
494         use bitcoin::blockdata::locktime::PackedLockTime;
495         use bitcoin::blockdata::transaction::{Transaction, TxOut};
496         use bitcoin::network::constants::Network;
497         use lightning::chain::{BestBlock, Confirm, chainmonitor};
498         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
499         use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
500         use lightning::chain::transaction::OutPoint;
501         use lightning::get_event_msg;
502         use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
503         use lightning::ln::features::{ChannelFeatures, InitFeatures};
504         use lightning::ln::msgs::{ChannelMessageHandler, Init};
505         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
506         use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
507         use lightning::util::config::UserConfig;
508         use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
509         use lightning::util::ser::Writeable;
510         use lightning::util::test_utils;
511         use lightning::util::persist::KVStorePersister;
512         use lightning_invoice::payment::{InvoicePayer, Retry};
513         use lightning_invoice::utils::DefaultRouter;
514         use lightning_persister::FilesystemPersister;
515         use std::fs;
516         use std::path::PathBuf;
517         use std::sync::{Arc, Mutex};
518         use std::sync::mpsc::SyncSender;
519         use std::time::Duration;
520         use bitcoin::hashes::Hash;
521         use bitcoin::TxMerkleNode;
522         use lightning::routing::scoring::{FixedPenaltyScorer};
523         use lightning_rapid_gossip_sync::RapidGossipSync;
524         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
525
526         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
527
528         #[derive(Clone, Eq, Hash, PartialEq)]
529         struct TestDescriptor{}
530         impl SocketDescriptor for TestDescriptor {
531                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
532                         0
533                 }
534
535                 fn disconnect_socket(&mut self) {}
536         }
537
538         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
539
540         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
541         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
542
543         struct Node {
544                 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
545                 p2p_gossip_sync: PGS,
546                 rapid_gossip_sync: RGS,
547                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
548                 chain_monitor: Arc<ChainMonitor>,
549                 persister: Arc<FilesystemPersister>,
550                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
551                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
552                 logger: Arc<test_utils::TestLogger>,
553                 best_block: BestBlock,
554                 scorer: Arc<Mutex<FixedPenaltyScorer>>,
555         }
556
557         impl Node {
558                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
559                         GossipSync::P2P(self.p2p_gossip_sync.clone())
560                 }
561
562                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
563                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
564                 }
565
566                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
567                         GossipSync::None
568                 }
569         }
570
571         impl Drop for Node {
572                 fn drop(&mut self) {
573                         let data_dir = self.persister.get_data_dir();
574                         match fs::remove_dir_all(data_dir.clone()) {
575                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
576                                 _ => {}
577                         }
578                 }
579         }
580
581         struct Persister {
582                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
583                 graph_persistence_notifier: Option<SyncSender<()>>,
584                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
585                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
586                 filesystem_persister: FilesystemPersister,
587         }
588
589         impl Persister {
590                 fn new(data_dir: String) -> Self {
591                         let filesystem_persister = FilesystemPersister::new(data_dir.clone());
592                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
593                 }
594
595                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
596                         Self { graph_error: Some((error, message)), ..self }
597                 }
598
599                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
600                         Self { graph_persistence_notifier: Some(sender), ..self }
601                 }
602
603                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
604                         Self { manager_error: Some((error, message)), ..self }
605                 }
606
607                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
608                         Self { scorer_error: Some((error, message)), ..self }
609                 }
610         }
611
612         impl KVStorePersister for Persister {
613                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
614                         if key == "manager" {
615                                 if let Some((error, message)) = self.manager_error {
616                                         return Err(std::io::Error::new(error, message))
617                                 }
618                         }
619
620                         if key == "network_graph" {
621                                 if let Some(sender) = &self.graph_persistence_notifier {
622                                         sender.send(()).unwrap();
623                                 };
624
625                                 if let Some((error, message)) = self.graph_error {
626                                         return Err(std::io::Error::new(error, message))
627                                 }
628                         }
629
630                         if key == "scorer" {
631                                 if let Some((error, message)) = self.scorer_error {
632                                         return Err(std::io::Error::new(error, message))
633                                 }
634                         }
635
636                         self.filesystem_persister.persist(key, object)
637                 }
638         }
639
640         fn get_full_filepath(filepath: String, filename: String) -> String {
641                 let mut path = PathBuf::from(filepath);
642                 path.push(filename);
643                 path.to_str().unwrap().to_string()
644         }
645
646         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
647                 let mut nodes = Vec::new();
648                 for i in 0..num_nodes {
649                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
650                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
651                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
652                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
653                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
654                         let seed = [i as u8; 32];
655                         let network = Network::Testnet;
656                         let genesis_block = genesis_block(network);
657                         let now = Duration::from_secs(genesis_block.header.time as u64);
658                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
659                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
660                         let best_block = BestBlock::from_genesis(network);
661                         let params = ChainParameters { network, best_block };
662                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
663                         let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
664                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
665                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
666                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
667                         let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
668                         let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
669                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
670                         nodes.push(node);
671                 }
672
673                 for i in 0..num_nodes {
674                         for j in (i+1)..num_nodes {
675                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
676                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
677                         }
678                 }
679
680                 nodes
681         }
682
683         macro_rules! open_channel {
684                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
685                         begin_open_channel!($node_a, $node_b, $channel_value);
686                         let events = $node_a.node.get_and_clear_pending_events();
687                         assert_eq!(events.len(), 1);
688                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
689                         end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
690                         tx
691                 }}
692         }
693
694         macro_rules! begin_open_channel {
695                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
696                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
697                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
698                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
699                 }}
700         }
701
702         macro_rules! handle_funding_generation_ready {
703                 ($event: expr, $channel_value: expr) => {{
704                         match $event {
705                                 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
706                                         assert_eq!(channel_value_satoshis, $channel_value);
707                                         assert_eq!(user_channel_id, 42);
708
709                                         let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
710                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
711                                         }]};
712                                         (temporary_channel_id, tx)
713                                 },
714                                 _ => panic!("Unexpected event"),
715                         }
716                 }}
717         }
718
719         macro_rules! end_open_channel {
720                 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
721                         $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
722                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
723                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
724                 }}
725         }
726
727         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
728                 for i in 1..=depth {
729                         let prev_blockhash = node.best_block.block_hash();
730                         let height = node.best_block.height() + 1;
731                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
732                         let txdata = vec![(0, tx)];
733                         node.best_block = BestBlock::new(header.block_hash(), height);
734                         match i {
735                                 1 => {
736                                         node.node.transactions_confirmed(&header, &txdata, height);
737                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
738                                 },
739                                 x if x == depth => {
740                                         node.node.best_block_updated(&header, height);
741                                         node.chain_monitor.best_block_updated(&header, height);
742                                 },
743                                 _ => {},
744                         }
745                 }
746         }
747         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
748                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
749         }
750
751         #[test]
752         fn test_background_processor() {
753                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
754                 // updates. Also test that when new updates are available, the manager signals that it needs
755                 // re-persistence and is successfully re-persisted.
756                 let nodes = create_nodes(2, "test_background_processor".to_string());
757
758                 // Go through the channel creation process so that each node has something to persist. Since
759                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
760                 // avoid a race with processing events.
761                 let tx = open_channel!(nodes[0], nodes[1], 100000);
762
763                 // Initiate the background processors to watch each node.
764                 let data_dir = nodes[0].persister.get_data_dir();
765                 let persister = Arc::new(Persister::new(data_dir));
766                 let event_handler = |_: &_| {};
767                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
768
769                 macro_rules! check_persisted_data {
770                         ($node: expr, $filepath: expr) => {
771                                 let mut expected_bytes = Vec::new();
772                                 loop {
773                                         expected_bytes.clear();
774                                         match $node.write(&mut expected_bytes) {
775                                                 Ok(()) => {
776                                                         match std::fs::read($filepath) {
777                                                                 Ok(bytes) => {
778                                                                         if bytes == expected_bytes {
779                                                                                 break
780                                                                         } else {
781                                                                                 continue
782                                                                         }
783                                                                 },
784                                                                 Err(_) => continue
785                                                         }
786                                                 },
787                                                 Err(e) => panic!("Unexpected error: {}", e)
788                                         }
789                                 }
790                         }
791                 }
792
793                 // Check that the initial channel manager data is persisted as expected.
794                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
795                 check_persisted_data!(nodes[0].node, filepath.clone());
796
797                 loop {
798                         if !nodes[0].node.get_persistence_condvar_value() { break }
799                 }
800
801                 // Force-close the channel.
802                 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
803
804                 // Check that the force-close updates are persisted.
805                 check_persisted_data!(nodes[0].node, filepath.clone());
806                 loop {
807                         if !nodes[0].node.get_persistence_condvar_value() { break }
808                 }
809
810                 // Check network graph is persisted
811                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
812                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
813
814                 // Check scorer is persisted
815                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
816                 check_persisted_data!(nodes[0].scorer, filepath.clone());
817
818                 assert!(bg_processor.stop().is_ok());
819         }
820
821         #[test]
822         fn test_timer_tick_called() {
823                 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
824                 // `FRESHNESS_TIMER`.
825                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
826                 let data_dir = nodes[0].persister.get_data_dir();
827                 let persister = Arc::new(Persister::new(data_dir));
828                 let event_handler = |_: &_| {};
829                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
830                 loop {
831                         let log_entries = nodes[0].logger.lines.lock().unwrap();
832                         let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
833                         let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
834                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
835                                         log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
836                                 break
837                         }
838                 }
839
840                 assert!(bg_processor.stop().is_ok());
841         }
842
843         #[test]
844         fn test_channel_manager_persist_error() {
845                 // Test that if we encounter an error during manager persistence, the thread panics.
846                 let nodes = create_nodes(2, "test_persist_error".to_string());
847                 open_channel!(nodes[0], nodes[1], 100000);
848
849                 let data_dir = nodes[0].persister.get_data_dir();
850                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
851                 let event_handler = |_: &_| {};
852                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
853                 match bg_processor.join() {
854                         Ok(_) => panic!("Expected error persisting manager"),
855                         Err(e) => {
856                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
857                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
858                         },
859                 }
860         }
861
862         #[test]
863         fn test_network_graph_persist_error() {
864                 // Test that if we encounter an error during network graph persistence, an error gets returned.
865                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
866                 let data_dir = nodes[0].persister.get_data_dir();
867                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
868                 let event_handler = |_: &_| {};
869                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
870
871                 match bg_processor.stop() {
872                         Ok(_) => panic!("Expected error persisting network graph"),
873                         Err(e) => {
874                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
875                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
876                         },
877                 }
878         }
879
880         #[test]
881         fn test_scorer_persist_error() {
882                 // Test that if we encounter an error during scorer persistence, an error gets returned.
883                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
884                 let data_dir = nodes[0].persister.get_data_dir();
885                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
886                 let event_handler = |_: &_| {};
887                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
888
889                 match bg_processor.stop() {
890                         Ok(_) => panic!("Expected error persisting scorer"),
891                         Err(e) => {
892                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
893                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
894                         },
895                 }
896         }
897
898         #[test]
899         fn test_background_event_handling() {
900                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
901                 let channel_value = 100000;
902                 let data_dir = nodes[0].persister.get_data_dir();
903                 let persister = Arc::new(Persister::new(data_dir.clone()));
904
905                 // Set up a background event handler for FundingGenerationReady events.
906                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
907                 let event_handler = move |event: &Event| {
908                         sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
909                 };
910                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
911
912                 // Open a channel and check that the FundingGenerationReady event was handled.
913                 begin_open_channel!(nodes[0], nodes[1], channel_value);
914                 let (temporary_channel_id, funding_tx) = receiver
915                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
916                         .expect("FundingGenerationReady not handled within deadline");
917                 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
918
919                 // Confirm the funding transaction.
920                 confirm_transaction(&mut nodes[0], &funding_tx);
921                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
922                 confirm_transaction(&mut nodes[1], &funding_tx);
923                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
924                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
925                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
926                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
927                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
928
929                 assert!(bg_processor.stop().is_ok());
930
931                 // Set up a background event handler for SpendableOutputs events.
932                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
933                 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
934                 let persister = Arc::new(Persister::new(data_dir));
935                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
936
937                 // Force close the channel and check that the SpendableOutputs event was handled.
938                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
939                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
940                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
941                 let event = receiver
942                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
943                         .expect("SpendableOutputs not handled within deadline");
944                 match event {
945                         Event::SpendableOutputs { .. } => {},
946                         Event::ChannelClosed { .. } => {},
947                         _ => panic!("Unexpected event: {:?}", event),
948                 }
949
950                 assert!(bg_processor.stop().is_ok());
951         }
952
953         #[test]
954         fn test_scorer_persistence() {
955                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
956                 let data_dir = nodes[0].persister.get_data_dir();
957                 let persister = Arc::new(Persister::new(data_dir));
958                 let event_handler = |_: &_| {};
959                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
960
961                 loop {
962                         let log_entries = nodes[0].logger.lines.lock().unwrap();
963                         let expected_log = "Persisting scorer".to_string();
964                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
965                                 break
966                         }
967                 }
968
969                 assert!(bg_processor.stop().is_ok());
970         }
971
972         #[test]
973         fn test_not_pruning_network_graph_until_graph_sync_completion() {
974                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
975                 let data_dir = nodes[0].persister.get_data_dir();
976                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
977                 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
978                 let network_graph = nodes[0].network_graph.clone();
979                 let features = ChannelFeatures::empty();
980                 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
981                         .expect("Failed to update channel from partial announcement");
982                 let original_graph_description = network_graph.to_string();
983                 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
984                 assert_eq!(network_graph.read_only().channels().len(), 1);
985
986                 let event_handler = |_: &_| {};
987                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
988
989                 loop {
990                         let log_entries = nodes[0].logger.lines.lock().unwrap();
991                         let expected_log_a = "Assessing prunability of network graph".to_string();
992                         let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
993                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
994                                 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
995                                 break
996                         }
997                 }
998
999                 let initialization_input = vec![
1000                         76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1001                         79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1002                         0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1003                         187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1004                         157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1005                         88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1006                         204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1007                         181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1008                         110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1009                         76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1010                         226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1011                         0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1012                         0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1013                 ];
1014                 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1015
1016                 // this should have added two channels
1017                 assert_eq!(network_graph.read_only().channels().len(), 3);
1018
1019                 let _ = receiver
1020                         .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1021                         .expect("Network graph not pruned within deadline");
1022
1023                 background_processor.stop().unwrap();
1024
1025                 // all channels should now be pruned
1026                 assert_eq!(network_graph.read_only().channels().len(), 0);
1027         }
1028
1029         #[test]
1030         fn test_invoice_payer() {
1031                 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1032                 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1033                 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1034
1035                 // Initiate the background processors to watch each node.
1036                 let data_dir = nodes[0].persister.get_data_dir();
1037                 let persister = Arc::new(Persister::new(data_dir));
1038                 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
1039                 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
1040                 let event_handler = Arc::clone(&invoice_payer);
1041                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1042                 assert!(bg_processor.stop().is_ok());
1043         }
1044 }