Concretize `WriteableScore` into `MultiThreadedLockableScore`
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 #![deny(broken_intra_doc_links)]
6 #![deny(missing_docs)]
7 #![deny(unsafe_code)]
8
9 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
10
11 #[macro_use] extern crate lightning;
12 extern crate lightning_rapid_gossip_sync;
13
14 use lightning::chain;
15 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
16 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
17 use lightning::chain::keysinterface::{Sign, KeysInterface};
18 use lightning::ln::channelmanager::ChannelManager;
19 use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
20 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
21 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
22 use lightning::routing::scoring::{Score, MultiThreadedLockableScore};
23 use lightning::util::events::{Event, EventHandler, EventsProvider};
24 use lightning::util::logger::Logger;
25 use lightning::util::persist::Persister;
26 use lightning_rapid_gossip_sync::RapidGossipSync;
27 use std::sync::Arc;
28 use std::sync::atomic::{AtomicBool, Ordering};
29 use std::thread;
30 use std::thread::JoinHandle;
31 use std::time::{Duration, Instant};
32 use std::ops::Deref;
33
34 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
35 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
36 /// responsibilities are:
37 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
38 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
39 ///   writing it to disk/backups by invoking the callback given to it at startup.
40 ///   [`ChannelManager`] persistence should be done in the background.
41 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
42 ///   at the appropriate intervals.
43 /// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`GossipSync`] with a [`NetworkGraph`]
44 ///   is provided to [`BackgroundProcessor::start`]).
45 ///
46 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
47 /// upon as doing so may result in high latency.
48 ///
49 /// # Note
50 ///
51 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
52 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
53 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
54 /// unilateral chain closure fees are at risk.
55 ///
56 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
57 /// [`Event`]: lightning::util::events::Event
58 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
59 pub struct BackgroundProcessor {
60         stop_thread: Arc<AtomicBool>,
61         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
62 }
63
64 #[cfg(not(test))]
65 const FRESHNESS_TIMER: u64 = 60;
66 #[cfg(test)]
67 const FRESHNESS_TIMER: u64 = 1;
68
69 #[cfg(all(not(test), not(debug_assertions)))]
70 const PING_TIMER: u64 = 10;
71 /// Signature operations take a lot longer without compiler optimisations.
72 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
73 /// timeout is reached.
74 #[cfg(all(not(test), debug_assertions))]
75 const PING_TIMER: u64 = 30;
76 #[cfg(test)]
77 const PING_TIMER: u64 = 1;
78
79 /// Prune the network graph of stale entries hourly.
80 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
81
82 #[cfg(not(test))]
83 const SCORER_PERSIST_TIMER: u64 = 30;
84 #[cfg(test)]
85 const SCORER_PERSIST_TIMER: u64 = 1;
86
87 #[cfg(not(test))]
88 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
89 #[cfg(test)]
90 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
91
92 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
93 pub enum GossipSync<
94         P: Deref<Target = P2PGossipSync<G, A, L>>,
95         R: Deref<Target = RapidGossipSync<G, L>>,
96         G: Deref<Target = NetworkGraph<L>>,
97         A: Deref,
98         L: Deref,
99 >
100 where A::Target: chain::Access, L::Target: Logger {
101         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
102         P2P(P),
103         /// Rapid gossip sync from a trusted server.
104         Rapid(R),
105         /// No gossip sync.
106         None,
107 }
108
109 impl<
110         P: Deref<Target = P2PGossipSync<G, A, L>>,
111         R: Deref<Target = RapidGossipSync<G, L>>,
112         G: Deref<Target = NetworkGraph<L>>,
113         A: Deref,
114         L: Deref,
115 > GossipSync<P, R, G, A, L>
116 where A::Target: chain::Access, L::Target: Logger {
117         fn network_graph(&self) -> Option<&G> {
118                 match self {
119                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
120                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
121                         GossipSync::None => None,
122                 }
123         }
124
125         fn prunable_network_graph(&self) -> Option<&G> {
126                 match self {
127                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
128                         GossipSync::Rapid(gossip_sync) => {
129                                 if gossip_sync.is_initial_sync_complete() {
130                                         Some(gossip_sync.network_graph())
131                                 } else {
132                                         None
133                                 }
134                         },
135                         GossipSync::None => None,
136                 }
137         }
138 }
139
140 /// (C-not exported) as the bindings concretize everything and have constructors for us
141 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
142         GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
143 where
144         A::Target: chain::Access,
145         L::Target: Logger,
146 {
147         /// Initializes a new [`GossipSync::P2P`] variant.
148         pub fn p2p(gossip_sync: P) -> Self {
149                 GossipSync::P2P(gossip_sync)
150         }
151 }
152
153 /// (C-not exported) as the bindings concretize everything and have constructors for us
154 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
155         GossipSync<
156                 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
157                 R,
158                 G,
159                 &'a (dyn chain::Access + Send + Sync),
160                 L,
161         >
162 where
163         L::Target: Logger,
164 {
165         /// Initializes a new [`GossipSync::Rapid`] variant.
166         pub fn rapid(gossip_sync: R) -> Self {
167                 GossipSync::Rapid(gossip_sync)
168         }
169 }
170
171 /// (C-not exported) as the bindings concretize everything and have constructors for us
172 impl<'a, L: Deref>
173         GossipSync<
174                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
175                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
176                 &'a NetworkGraph<L>,
177                 &'a (dyn chain::Access + Send + Sync),
178                 L,
179         >
180 where
181         L::Target: Logger,
182 {
183         /// Initializes a new [`GossipSync::None`] variant.
184         pub fn none() -> Self {
185                 GossipSync::None
186         }
187 }
188
189 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
190 struct DecoratingEventHandler<
191         'a,
192         E: EventHandler,
193         PGS: Deref<Target = P2PGossipSync<G, A, L>>,
194         RGS: Deref<Target = RapidGossipSync<G, L>>,
195         G: Deref<Target = NetworkGraph<L>>,
196         A: Deref,
197         L: Deref,
198 >
199 where A::Target: chain::Access, L::Target: Logger {
200         event_handler: E,
201         gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
202 }
203
204 impl<
205         'a,
206         E: EventHandler,
207         PGS: Deref<Target = P2PGossipSync<G, A, L>>,
208         RGS: Deref<Target = RapidGossipSync<G, L>>,
209         G: Deref<Target = NetworkGraph<L>>,
210         A: Deref,
211         L: Deref,
212 > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
213 where A::Target: chain::Access, L::Target: Logger {
214         fn handle_event(&self, event: &Event) {
215                 if let Some(network_graph) = self.gossip_sync.network_graph() {
216                         network_graph.handle_event(event);
217                 }
218                 self.event_handler.handle_event(event);
219         }
220 }
221
222 impl BackgroundProcessor {
223         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
224         /// documentation].
225         ///
226         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
227         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
228         /// either [`join`] or [`stop`].
229         ///
230         /// # Data Persistence
231         ///
232         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
233         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
234         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
235         /// provided implementation.
236         ///
237         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
238         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
239         /// See the `lightning-persister` crate for LDK's provided implementation.
240         ///
241         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
242         /// error or call [`join`] and handle any error that may arise. For the latter case,
243         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
244         ///
245         /// # Event Handling
246         ///
247         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
248         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
249         /// functionality implemented by other handlers.
250         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
251         ///
252         /// # Rapid Gossip Sync
253         ///
254         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
255         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
256         /// until the [`RapidGossipSync`] instance completes its first sync.
257         ///
258         /// [top-level documentation]: BackgroundProcessor
259         /// [`join`]: Self::join
260         /// [`stop`]: Self::stop
261         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
262         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
263         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
264         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
265         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
266         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
267         pub fn start<
268                 'a,
269                 Signer: 'static + Sign,
270                 CA: 'static + Deref + Send + Sync,
271                 CF: 'static + Deref + Send + Sync,
272                 CW: 'static + Deref + Send + Sync,
273                 T: 'static + Deref + Send + Sync,
274                 K: 'static + Deref + Send + Sync,
275                 F: 'static + Deref + Send + Sync,
276                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
277                 L: 'static + Deref + Send + Sync,
278                 P: 'static + Deref + Send + Sync,
279                 Descriptor: 'static + SocketDescriptor + Send + Sync,
280                 CMH: 'static + Deref + Send + Sync,
281                 RMH: 'static + Deref + Send + Sync,
282                 EH: 'static + EventHandler + Send,
283                 PS: 'static + Deref + Send,
284                 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
285                 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
286                 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
287                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
288                 UMH: 'static + Deref + Send + Sync,
289                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
290                 SC: Score + Send,
291         >(
292                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
293                 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<&'static MultiThreadedLockableScore<SC>>,
294         ) -> Self
295         where
296                 CA::Target: 'static + chain::Access,
297                 CF::Target: 'static + chain::Filter,
298                 CW::Target: 'static + chain::Watch<Signer>,
299                 T::Target: 'static + BroadcasterInterface,
300                 K::Target: 'static + KeysInterface<Signer = Signer>,
301                 F::Target: 'static + FeeEstimator,
302                 L::Target: 'static + Logger,
303                 P::Target: 'static + Persist<Signer>,
304                 CMH::Target: 'static + ChannelMessageHandler,
305                 RMH::Target: 'static + RoutingMessageHandler,
306                 UMH::Target: 'static + CustomMessageHandler,
307                 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
308         {
309                 let stop_thread = Arc::new(AtomicBool::new(false));
310                 let stop_thread_clone = stop_thread.clone();
311                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
312                         let event_handler = DecoratingEventHandler {
313                                 event_handler,
314                                 gossip_sync: &gossip_sync,
315                         };
316
317                         log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
318                         channel_manager.timer_tick_occurred();
319
320                         let mut last_freshness_call = Instant::now();
321                         let mut last_ping_call = Instant::now();
322                         let mut last_prune_call = Instant::now();
323                         let mut last_scorer_persist_call = Instant::now();
324                         let mut have_pruned = false;
325
326                         loop {
327                                 channel_manager.process_pending_events(&event_handler);
328                                 chain_monitor.process_pending_events(&event_handler);
329
330                                 // Note that the PeerManager::process_events may block on ChannelManager's locks,
331                                 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
332                                 // we want to ensure we get into `persist_manager` as quickly as we can, especially
333                                 // without running the normal event processing above and handing events to users.
334                                 //
335                                 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
336                                 // processing a message effectively at any point during this loop. In order to
337                                 // minimize the time between such processing completing and persisting the updated
338                                 // ChannelManager, we want to minimize methods blocking on a ChannelManager
339                                 // generally, and as a fallback place such blocking only immediately before
340                                 // persistence.
341                                 peer_manager.process_events();
342
343                                 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
344                                 // see `await_start`'s use below.
345                                 let await_start = Instant::now();
346                                 let updates_available =
347                                         channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
348                                 let await_time = await_start.elapsed();
349
350                                 if updates_available {
351                                         log_trace!(logger, "Persisting ChannelManager...");
352                                         persister.persist_manager(&*channel_manager)?;
353                                         log_trace!(logger, "Done persisting ChannelManager.");
354                                 }
355                                 // Exit the loop if the background processor was requested to stop.
356                                 if stop_thread.load(Ordering::Acquire) == true {
357                                         log_trace!(logger, "Terminating background processor.");
358                                         break;
359                                 }
360                                 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
361                                         log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
362                                         channel_manager.timer_tick_occurred();
363                                         last_freshness_call = Instant::now();
364                                 }
365                                 if await_time > Duration::from_secs(1) {
366                                         // On various platforms, we may be starved of CPU cycles for several reasons.
367                                         // E.g. on iOS, if we've been in the background, we will be entirely paused.
368                                         // Similarly, if we're on a desktop platform and the device has been asleep, we
369                                         // may not get any cycles.
370                                         // We detect this by checking if our max-100ms-sleep, above, ran longer than a
371                                         // full second, at which point we assume sockets may have been killed (they
372                                         // appear to be at least on some platforms, even if it has only been a second).
373                                         // Note that we have to take care to not get here just because user event
374                                         // processing was slow at the top of the loop. For example, the sample client
375                                         // may call Bitcoin Core RPCs during event handling, which very often takes
376                                         // more than a handful of seconds to complete, and shouldn't disconnect all our
377                                         // peers.
378                                         log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
379                                         peer_manager.disconnect_all_peers();
380                                         last_ping_call = Instant::now();
381                                 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
382                                         log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
383                                         peer_manager.timer_tick_occurred();
384                                         last_ping_call = Instant::now();
385                                 }
386
387                                 // Note that we want to run a graph prune once not long after startup before
388                                 // falling back to our usual hourly prunes. This avoids short-lived clients never
389                                 // pruning their network graph. We run once 60 seconds after startup before
390                                 // continuing our normal cadence.
391                                 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
392                                         // The network graph must not be pruned while rapid sync completion is pending
393                                         log_trace!(logger, "Assessing prunability of network graph");
394                                         if let Some(network_graph) = gossip_sync.prunable_network_graph() {
395                                                 network_graph.remove_stale_channels();
396
397                                                 if let Err(e) = persister.persist_graph(network_graph) {
398                                                         log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
399                                                 }
400
401                                                 last_prune_call = Instant::now();
402                                                 have_pruned = true;
403                                         } else {
404                                                 log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
405                                         }
406                                 }
407
408                                 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
409                                         if let Some(ref scorer) = scorer {
410                                                 log_trace!(logger, "Persisting scorer");
411                                                 if let Err(e) = persister.persist_scorer(&scorer) {
412                                                         log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
413                                                 }
414                                         }
415                                         last_scorer_persist_call = Instant::now();
416                                 }
417                         }
418
419                         // After we exit, ensure we persist the ChannelManager one final time - this avoids
420                         // some races where users quit while channel updates were in-flight, with
421                         // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
422                         persister.persist_manager(&*channel_manager)?;
423
424                         // Persist Scorer on exit
425                         if let Some(ref scorer) = scorer {
426                                 persister.persist_scorer(&scorer)?;
427                         }
428
429                         // Persist NetworkGraph on exit
430                         if let Some(network_graph) = gossip_sync.network_graph() {
431                                 persister.persist_graph(network_graph)?;
432                         }
433
434                         Ok(())
435                 });
436                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
437         }
438
439         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
440         /// [`ChannelManager`].
441         ///
442         /// # Panics
443         ///
444         /// This function panics if the background thread has panicked such as while persisting or
445         /// handling events.
446         ///
447         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
448         pub fn join(mut self) -> Result<(), std::io::Error> {
449                 assert!(self.thread_handle.is_some());
450                 self.join_thread()
451         }
452
453         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
454         /// [`ChannelManager`].
455         ///
456         /// # Panics
457         ///
458         /// This function panics if the background thread has panicked such as while persisting or
459         /// handling events.
460         ///
461         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
462         pub fn stop(mut self) -> Result<(), std::io::Error> {
463                 assert!(self.thread_handle.is_some());
464                 self.stop_and_join_thread()
465         }
466
467         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
468                 self.stop_thread.store(true, Ordering::Release);
469                 self.join_thread()
470         }
471
472         fn join_thread(&mut self) -> Result<(), std::io::Error> {
473                 match self.thread_handle.take() {
474                         Some(handle) => handle.join().unwrap(),
475                         None => Ok(()),
476                 }
477         }
478 }
479
480 impl Drop for BackgroundProcessor {
481         fn drop(&mut self) {
482                 self.stop_and_join_thread().unwrap();
483         }
484 }
485
486 #[cfg(test)]
487 mod tests {
488         use bitcoin::blockdata::block::BlockHeader;
489         use bitcoin::blockdata::constants::genesis_block;
490         use bitcoin::blockdata::transaction::{Transaction, TxOut};
491         use bitcoin::network::constants::Network;
492         use lightning::chain::{BestBlock, Confirm, chainmonitor};
493         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
494         use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
495         use lightning::chain::transaction::OutPoint;
496         use lightning::get_event_msg;
497         use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
498         use lightning::ln::features::{ChannelFeatures, InitFeatures};
499         use lightning::ln::msgs::{ChannelMessageHandler, Init};
500         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
501         use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
502         use lightning::util::config::UserConfig;
503         use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
504         use lightning::util::ser::Writeable;
505         use lightning::util::test_utils;
506         use lightning::util::persist::KVStorePersister;
507         use lightning_invoice::payment::{InvoicePayer, Retry};
508         use lightning_invoice::utils::DefaultRouter;
509         use lightning_persister::FilesystemPersister;
510         use std::fs;
511         use std::path::PathBuf;
512         use std::sync::{Arc, Mutex};
513         use std::sync::mpsc::SyncSender;
514         use std::time::Duration;
515         use lightning::routing::scoring::{FixedPenaltyScorer};
516         use lightning_rapid_gossip_sync::RapidGossipSync;
517         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
518
519         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
520
521         #[derive(Clone, Eq, Hash, PartialEq)]
522         struct TestDescriptor{}
523         impl SocketDescriptor for TestDescriptor {
524                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
525                         0
526                 }
527
528                 fn disconnect_socket(&mut self) {}
529         }
530
531         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
532
533         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
534         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
535
536         struct Node {
537                 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
538                 p2p_gossip_sync: PGS,
539                 rapid_gossip_sync: RGS,
540                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
541                 chain_monitor: Arc<ChainMonitor>,
542                 persister: Arc<FilesystemPersister>,
543                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
544                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
545                 logger: Arc<test_utils::TestLogger>,
546                 best_block: BestBlock,
547                 scorer: Arc<Mutex<FixedPenaltyScorer>>,
548         }
549
550         impl Node {
551                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
552                         GossipSync::P2P(self.p2p_gossip_sync.clone())
553                 }
554
555                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
556                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
557                 }
558
559                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
560                         GossipSync::None
561                 }
562         }
563
564         impl Drop for Node {
565                 fn drop(&mut self) {
566                         let data_dir = self.persister.get_data_dir();
567                         match fs::remove_dir_all(data_dir.clone()) {
568                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
569                                 _ => {}
570                         }
571                 }
572         }
573
574         struct Persister {
575                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
576                 graph_persistence_notifier: Option<SyncSender<()>>,
577                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
578                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
579                 filesystem_persister: FilesystemPersister,
580         }
581
582         impl Persister {
583                 fn new(data_dir: String) -> Self {
584                         let filesystem_persister = FilesystemPersister::new(data_dir.clone());
585                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
586                 }
587
588                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
589                         Self { graph_error: Some((error, message)), ..self }
590                 }
591
592                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
593                         Self { graph_persistence_notifier: Some(sender), ..self }
594                 }
595
596                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
597                         Self { manager_error: Some((error, message)), ..self }
598                 }
599
600                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
601                         Self { scorer_error: Some((error, message)), ..self }
602                 }
603         }
604
605         impl KVStorePersister for Persister {
606                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
607                         if key == "manager" {
608                                 if let Some((error, message)) = self.manager_error {
609                                         return Err(std::io::Error::new(error, message))
610                                 }
611                         }
612
613                         if key == "network_graph" {
614                                 if let Some(sender) = &self.graph_persistence_notifier {
615                                         sender.send(()).unwrap();
616                                 };
617
618                                 if let Some((error, message)) = self.graph_error {
619                                         return Err(std::io::Error::new(error, message))
620                                 }
621                         }
622
623                         if key == "scorer" {
624                                 if let Some((error, message)) = self.scorer_error {
625                                         return Err(std::io::Error::new(error, message))
626                                 }
627                         }
628
629                         self.filesystem_persister.persist(key, object)
630                 }
631         }
632
633         fn get_full_filepath(filepath: String, filename: String) -> String {
634                 let mut path = PathBuf::from(filepath);
635                 path.push(filename);
636                 path.to_str().unwrap().to_string()
637         }
638
639         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
640                 let mut nodes = Vec::new();
641                 for i in 0..num_nodes {
642                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
643                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
644                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
645                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
646                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
647                         let seed = [i as u8; 32];
648                         let network = Network::Testnet;
649                         let genesis_block = genesis_block(network);
650                         let now = Duration::from_secs(genesis_block.header.time as u64);
651                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
652                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
653                         let best_block = BestBlock::from_genesis(network);
654                         let params = ChainParameters { network, best_block };
655                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
656                         let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
657                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
658                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
659                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
660                         let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
661                         let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
662                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
663                         nodes.push(node);
664                 }
665
666                 for i in 0..num_nodes {
667                         for j in (i+1)..num_nodes {
668                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
669                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
670                         }
671                 }
672
673                 nodes
674         }
675
676         macro_rules! open_channel {
677                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
678                         begin_open_channel!($node_a, $node_b, $channel_value);
679                         let events = $node_a.node.get_and_clear_pending_events();
680                         assert_eq!(events.len(), 1);
681                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
682                         end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
683                         tx
684                 }}
685         }
686
687         macro_rules! begin_open_channel {
688                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
689                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
690                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
691                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
692                 }}
693         }
694
695         macro_rules! handle_funding_generation_ready {
696                 ($event: expr, $channel_value: expr) => {{
697                         match $event {
698                                 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
699                                         assert_eq!(channel_value_satoshis, $channel_value);
700                                         assert_eq!(user_channel_id, 42);
701
702                                         let tx = Transaction { version: 1 as i32, lock_time: 0, input: Vec::new(), output: vec![TxOut {
703                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
704                                         }]};
705                                         (temporary_channel_id, tx)
706                                 },
707                                 _ => panic!("Unexpected event"),
708                         }
709                 }}
710         }
711
712         macro_rules! end_open_channel {
713                 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
714                         $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
715                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
716                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
717                 }}
718         }
719
720         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
721                 for i in 1..=depth {
722                         let prev_blockhash = node.best_block.block_hash();
723                         let height = node.best_block.height() + 1;
724                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: Default::default(), time: height, bits: 42, nonce: 42 };
725                         let txdata = vec![(0, tx)];
726                         node.best_block = BestBlock::new(header.block_hash(), height);
727                         match i {
728                                 1 => {
729                                         node.node.transactions_confirmed(&header, &txdata, height);
730                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
731                                 },
732                                 x if x == depth => {
733                                         node.node.best_block_updated(&header, height);
734                                         node.chain_monitor.best_block_updated(&header, height);
735                                 },
736                                 _ => {},
737                         }
738                 }
739         }
740         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
741                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
742         }
743
744         #[test]
745         fn test_background_processor() {
746                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
747                 // updates. Also test that when new updates are available, the manager signals that it needs
748                 // re-persistence and is successfully re-persisted.
749                 let nodes = create_nodes(2, "test_background_processor".to_string());
750
751                 // Go through the channel creation process so that each node has something to persist. Since
752                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
753                 // avoid a race with processing events.
754                 let tx = open_channel!(nodes[0], nodes[1], 100000);
755
756                 // Initiate the background processors to watch each node.
757                 let data_dir = nodes[0].persister.get_data_dir();
758                 let persister = Arc::new(Persister::new(data_dir));
759                 let event_handler = |_: &_| {};
760                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
761
762                 macro_rules! check_persisted_data {
763                         ($node: expr, $filepath: expr) => {
764                                 let mut expected_bytes = Vec::new();
765                                 loop {
766                                         expected_bytes.clear();
767                                         match $node.write(&mut expected_bytes) {
768                                                 Ok(()) => {
769                                                         match std::fs::read($filepath) {
770                                                                 Ok(bytes) => {
771                                                                         if bytes == expected_bytes {
772                                                                                 break
773                                                                         } else {
774                                                                                 continue
775                                                                         }
776                                                                 },
777                                                                 Err(_) => continue
778                                                         }
779                                                 },
780                                                 Err(e) => panic!("Unexpected error: {}", e)
781                                         }
782                                 }
783                         }
784                 }
785
786                 // Check that the initial channel manager data is persisted as expected.
787                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
788                 check_persisted_data!(nodes[0].node, filepath.clone());
789
790                 loop {
791                         if !nodes[0].node.get_persistence_condvar_value() { break }
792                 }
793
794                 // Force-close the channel.
795                 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
796
797                 // Check that the force-close updates are persisted.
798                 check_persisted_data!(nodes[0].node, filepath.clone());
799                 loop {
800                         if !nodes[0].node.get_persistence_condvar_value() { break }
801                 }
802
803                 // Check network graph is persisted
804                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
805                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
806
807                 // Check scorer is persisted
808                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
809                 check_persisted_data!(nodes[0].scorer, filepath.clone());
810
811                 assert!(bg_processor.stop().is_ok());
812         }
813
814         #[test]
815         fn test_timer_tick_called() {
816                 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
817                 // `FRESHNESS_TIMER`.
818                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
819                 let data_dir = nodes[0].persister.get_data_dir();
820                 let persister = Arc::new(Persister::new(data_dir));
821                 let event_handler = |_: &_| {};
822                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
823                 loop {
824                         let log_entries = nodes[0].logger.lines.lock().unwrap();
825                         let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
826                         let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
827                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
828                                         log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
829                                 break
830                         }
831                 }
832
833                 assert!(bg_processor.stop().is_ok());
834         }
835
836         #[test]
837         fn test_channel_manager_persist_error() {
838                 // Test that if we encounter an error during manager persistence, the thread panics.
839                 let nodes = create_nodes(2, "test_persist_error".to_string());
840                 open_channel!(nodes[0], nodes[1], 100000);
841
842                 let data_dir = nodes[0].persister.get_data_dir();
843                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
844                 let event_handler = |_: &_| {};
845                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
846                 match bg_processor.join() {
847                         Ok(_) => panic!("Expected error persisting manager"),
848                         Err(e) => {
849                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
850                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
851                         },
852                 }
853         }
854
855         #[test]
856         fn test_network_graph_persist_error() {
857                 // Test that if we encounter an error during network graph persistence, an error gets returned.
858                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
859                 let data_dir = nodes[0].persister.get_data_dir();
860                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
861                 let event_handler = |_: &_| {};
862                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
863
864                 match bg_processor.stop() {
865                         Ok(_) => panic!("Expected error persisting network graph"),
866                         Err(e) => {
867                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
868                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
869                         },
870                 }
871         }
872
873         #[test]
874         fn test_scorer_persist_error() {
875                 // Test that if we encounter an error during scorer persistence, an error gets returned.
876                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
877                 let data_dir = nodes[0].persister.get_data_dir();
878                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
879                 let event_handler = |_: &_| {};
880                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
881
882                 match bg_processor.stop() {
883                         Ok(_) => panic!("Expected error persisting scorer"),
884                         Err(e) => {
885                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
886                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
887                         },
888                 }
889         }
890
891         #[test]
892         fn test_background_event_handling() {
893                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
894                 let channel_value = 100000;
895                 let data_dir = nodes[0].persister.get_data_dir();
896                 let persister = Arc::new(Persister::new(data_dir.clone()));
897
898                 // Set up a background event handler for FundingGenerationReady events.
899                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
900                 let event_handler = move |event: &Event| {
901                         sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
902                 };
903                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
904
905                 // Open a channel and check that the FundingGenerationReady event was handled.
906                 begin_open_channel!(nodes[0], nodes[1], channel_value);
907                 let (temporary_channel_id, funding_tx) = receiver
908                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
909                         .expect("FundingGenerationReady not handled within deadline");
910                 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
911
912                 // Confirm the funding transaction.
913                 confirm_transaction(&mut nodes[0], &funding_tx);
914                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
915                 confirm_transaction(&mut nodes[1], &funding_tx);
916                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
917                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
918                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
919                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
920                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
921
922                 assert!(bg_processor.stop().is_ok());
923
924                 // Set up a background event handler for SpendableOutputs events.
925                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
926                 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
927                 let persister = Arc::new(Persister::new(data_dir));
928                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
929
930                 // Force close the channel and check that the SpendableOutputs event was handled.
931                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
932                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
933                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
934                 let event = receiver
935                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
936                         .expect("SpendableOutputs not handled within deadline");
937                 match event {
938                         Event::SpendableOutputs { .. } => {},
939                         Event::ChannelClosed { .. } => {},
940                         _ => panic!("Unexpected event: {:?}", event),
941                 }
942
943                 assert!(bg_processor.stop().is_ok());
944         }
945
946         #[test]
947         fn test_scorer_persistence() {
948                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
949                 let data_dir = nodes[0].persister.get_data_dir();
950                 let persister = Arc::new(Persister::new(data_dir));
951                 let event_handler = |_: &_| {};
952                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
953
954                 loop {
955                         let log_entries = nodes[0].logger.lines.lock().unwrap();
956                         let expected_log = "Persisting scorer".to_string();
957                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
958                                 break
959                         }
960                 }
961
962                 assert!(bg_processor.stop().is_ok());
963         }
964
965         #[test]
966         fn test_not_pruning_network_graph_until_graph_sync_completion() {
967                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
968                 let data_dir = nodes[0].persister.get_data_dir();
969                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
970                 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
971                 let network_graph = nodes[0].network_graph.clone();
972                 let features = ChannelFeatures::empty();
973                 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
974                         .expect("Failed to update channel from partial announcement");
975                 let original_graph_description = network_graph.to_string();
976                 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
977                 assert_eq!(network_graph.read_only().channels().len(), 1);
978
979                 let event_handler = |_: &_| {};
980                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
981
982                 loop {
983                         let log_entries = nodes[0].logger.lines.lock().unwrap();
984                         let expected_log_a = "Assessing prunability of network graph".to_string();
985                         let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
986                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
987                                 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
988                                 break
989                         }
990                 }
991
992                 let initialization_input = vec![
993                         76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
994                         79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
995                         0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
996                         187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
997                         157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
998                         88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
999                         204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1000                         181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1001                         110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1002                         76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1003                         226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1004                         0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1005                         0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1006                 ];
1007                 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1008
1009                 // this should have added two channels
1010                 assert_eq!(network_graph.read_only().channels().len(), 3);
1011
1012                 let _ = receiver
1013                         .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1014                         .expect("Network graph not pruned within deadline");
1015
1016                 background_processor.stop().unwrap();
1017
1018                 // all channels should now be pruned
1019                 assert_eq!(network_graph.read_only().channels().len(), 0);
1020         }
1021
1022         #[test]
1023         fn test_invoice_payer() {
1024                 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1025                 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1026                 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1027
1028                 // Initiate the background processors to watch each node.
1029                 let data_dir = nodes[0].persister.get_data_dir();
1030                 let persister = Arc::new(Persister::new(data_dir));
1031                 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
1032                 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
1033                 let event_handler = Arc::clone(&invoice_payer);
1034                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1035                 assert!(bg_processor.stop().is_ok());
1036         }
1037 }