b2edc2301efda2b39224cf557b82b313ef46890f
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 #![deny(broken_intra_doc_links)]
6 #![deny(missing_docs)]
7 #![deny(unsafe_code)]
8
9 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
10
11 #[macro_use] extern crate lightning;
12 extern crate lightning_rapid_gossip_sync;
13
14 use lightning::chain;
15 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
16 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
17 use lightning::chain::keysinterface::{Sign, KeysInterface};
18 use lightning::ln::channelmanager::ChannelManager;
19 use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
20 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
21 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
22 use lightning::routing::scoring::WriteableScore;
23 use lightning::util::events::{Event, EventHandler, EventsProvider};
24 use lightning::util::logger::Logger;
25 use lightning::util::persist::Persister;
26 use lightning_rapid_gossip_sync::RapidGossipSync;
27 use std::sync::Arc;
28 use std::sync::atomic::{AtomicBool, Ordering};
29 use std::thread;
30 use std::thread::JoinHandle;
31 use std::time::{Duration, Instant};
32 use std::ops::Deref;
33
34 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
35 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
36 /// responsibilities are:
37 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
38 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
39 ///   writing it to disk/backups by invoking the callback given to it at startup.
40 ///   [`ChannelManager`] persistence should be done in the background.
41 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
42 ///   at the appropriate intervals.
43 /// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`GossipSync`] with a [`NetworkGraph`]
44 ///   is provided to [`BackgroundProcessor::start`]).
45 ///
46 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
47 /// upon as doing so may result in high latency.
48 ///
49 /// # Note
50 ///
51 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
52 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
53 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
54 /// unilateral chain closure fees are at risk.
55 ///
56 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
57 /// [`Event`]: lightning::util::events::Event
58 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
59 pub struct BackgroundProcessor {
60         stop_thread: Arc<AtomicBool>,
61         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
62 }
63
64 #[cfg(not(test))]
65 const FRESHNESS_TIMER: u64 = 60;
66 #[cfg(test)]
67 const FRESHNESS_TIMER: u64 = 1;
68
69 #[cfg(all(not(test), not(debug_assertions)))]
70 const PING_TIMER: u64 = 10;
71 /// Signature operations take a lot longer without compiler optimisations.
72 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
73 /// timeout is reached.
74 #[cfg(all(not(test), debug_assertions))]
75 const PING_TIMER: u64 = 30;
76 #[cfg(test)]
77 const PING_TIMER: u64 = 1;
78
79 /// Prune the network graph of stale entries hourly.
80 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
81
82 #[cfg(all(not(test), debug_assertions))]
83 const SCORER_PERSIST_TIMER: u64 = 30;
84 #[cfg(test)]
85 const SCORER_PERSIST_TIMER: u64 = 1;
86
87 #[cfg(not(test))]
88 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
89 #[cfg(test)]
90 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
91
92 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
93 pub enum GossipSync<
94         P: Deref<Target = P2PGossipSync<G, A, L>>,
95         R: Deref<Target = RapidGossipSync<G, L>>,
96         G: Deref<Target = NetworkGraph<L>>,
97         A: Deref,
98         L: Deref,
99 >
100 where A::Target: chain::Access, L::Target: Logger {
101         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
102         P2P(P),
103         /// Rapid gossip sync from a trusted server.
104         Rapid(R),
105         /// No gossip sync.
106         None,
107 }
108
109 impl<
110         P: Deref<Target = P2PGossipSync<G, A, L>>,
111         R: Deref<Target = RapidGossipSync<G, L>>,
112         G: Deref<Target = NetworkGraph<L>>,
113         A: Deref,
114         L: Deref,
115 > GossipSync<P, R, G, A, L>
116 where A::Target: chain::Access, L::Target: Logger {
117         fn network_graph(&self) -> Option<&G> {
118                 match self {
119                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
120                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
121                         GossipSync::None => None,
122                 }
123         }
124
125         fn prunable_network_graph(&self) -> Option<&G> {
126                 match self {
127                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
128                         GossipSync::Rapid(gossip_sync) => {
129                                 if gossip_sync.is_initial_sync_complete() {
130                                         Some(gossip_sync.network_graph())
131                                 } else {
132                                         None
133                                 }
134                         },
135                         GossipSync::None => None,
136                 }
137         }
138 }
139
140 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
141 struct DecoratingEventHandler<
142         'a,
143         E: EventHandler,
144         PGS: Deref<Target = P2PGossipSync<G, A, L>>,
145         RGS: Deref<Target = RapidGossipSync<G, L>>,
146         G: Deref<Target = NetworkGraph<L>>,
147         A: Deref,
148         L: Deref,
149 >
150 where A::Target: chain::Access, L::Target: Logger {
151         event_handler: E,
152         gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
153 }
154
155 impl<
156         'a,
157         E: EventHandler,
158         PGS: Deref<Target = P2PGossipSync<G, A, L>>,
159         RGS: Deref<Target = RapidGossipSync<G, L>>,
160         G: Deref<Target = NetworkGraph<L>>,
161         A: Deref,
162         L: Deref,
163 > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
164 where A::Target: chain::Access, L::Target: Logger {
165         fn handle_event(&self, event: &Event) {
166                 if let Some(network_graph) = self.gossip_sync.network_graph() {
167                         network_graph.handle_event(event);
168                 }
169                 self.event_handler.handle_event(event);
170         }
171 }
172
173 impl BackgroundProcessor {
174         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
175         /// documentation].
176         ///
177         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
178         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
179         /// either [`join`] or [`stop`].
180         ///
181         /// # Data Persistence
182         ///
183         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
184         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
185         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
186         /// provided implementation.
187         ///
188         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
189         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
190         /// See the `lightning-persister` crate for LDK's provided implementation.
191         ///
192         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
193         /// error or call [`join`] and handle any error that may arise. For the latter case,
194         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
195         ///
196         /// # Event Handling
197         ///
198         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
199         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
200         /// functionality implemented by other handlers.
201         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
202         ///
203         /// # Rapid Gossip Sync
204         ///
205         /// If rapid gossip sync is meant to run at startup, pass a [`RapidGossipSync`] to `gossip_sync`
206         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
207         /// until the [`RapidGossipSync`] instance completes its first sync.
208         ///
209         /// [top-level documentation]: BackgroundProcessor
210         /// [`join`]: Self::join
211         /// [`stop`]: Self::stop
212         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
213         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
214         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
215         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
216         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
217         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
218         pub fn start<
219                 'a,
220                 Signer: 'static + Sign,
221                 CA: 'static + Deref + Send + Sync,
222                 CF: 'static + Deref + Send + Sync,
223                 CW: 'static + Deref + Send + Sync,
224                 T: 'static + Deref + Send + Sync,
225                 K: 'static + Deref + Send + Sync,
226                 F: 'static + Deref + Send + Sync,
227                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
228                 L: 'static + Deref + Send + Sync,
229                 P: 'static + Deref + Send + Sync,
230                 Descriptor: 'static + SocketDescriptor + Send + Sync,
231                 CMH: 'static + Deref + Send + Sync,
232                 RMH: 'static + Deref + Send + Sync,
233                 EH: 'static + EventHandler + Send,
234                 PS: 'static + Deref + Send,
235                 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
236                 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
237                 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
238                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
239                 UMH: 'static + Deref + Send + Sync,
240                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
241                 S: 'static + Deref<Target = SC> + Send + Sync,
242                 SC: WriteableScore<'a>,
243         >(
244                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
245                 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
246         ) -> Self
247         where
248                 CA::Target: 'static + chain::Access,
249                 CF::Target: 'static + chain::Filter,
250                 CW::Target: 'static + chain::Watch<Signer>,
251                 T::Target: 'static + BroadcasterInterface,
252                 K::Target: 'static + KeysInterface<Signer = Signer>,
253                 F::Target: 'static + FeeEstimator,
254                 L::Target: 'static + Logger,
255                 P::Target: 'static + Persist<Signer>,
256                 CMH::Target: 'static + ChannelMessageHandler,
257                 RMH::Target: 'static + RoutingMessageHandler,
258                 UMH::Target: 'static + CustomMessageHandler,
259                 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
260         {
261                 let stop_thread = Arc::new(AtomicBool::new(false));
262                 let stop_thread_clone = stop_thread.clone();
263                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
264                         let event_handler = DecoratingEventHandler {
265                                 event_handler,
266                                 gossip_sync: &gossip_sync,
267                         };
268
269                         log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
270                         channel_manager.timer_tick_occurred();
271
272                         let mut last_freshness_call = Instant::now();
273                         let mut last_ping_call = Instant::now();
274                         let mut last_prune_call = Instant::now();
275                         let mut last_scorer_persist_call = Instant::now();
276                         let mut have_pruned = false;
277
278                         loop {
279                                 channel_manager.process_pending_events(&event_handler);
280                                 chain_monitor.process_pending_events(&event_handler);
281
282                                 // Note that the PeerManager::process_events may block on ChannelManager's locks,
283                                 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
284                                 // we want to ensure we get into `persist_manager` as quickly as we can, especially
285                                 // without running the normal event processing above and handing events to users.
286                                 //
287                                 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
288                                 // processing a message effectively at any point during this loop. In order to
289                                 // minimize the time between such processing completing and persisting the updated
290                                 // ChannelManager, we want to minimize methods blocking on a ChannelManager
291                                 // generally, and as a fallback place such blocking only immediately before
292                                 // persistence.
293                                 peer_manager.process_events();
294
295                                 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
296                                 // see `await_start`'s use below.
297                                 let await_start = Instant::now();
298                                 let updates_available =
299                                         channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
300                                 let await_time = await_start.elapsed();
301
302                                 if updates_available {
303                                         log_trace!(logger, "Persisting ChannelManager...");
304                                         persister.persist_manager(&*channel_manager)?;
305                                         log_trace!(logger, "Done persisting ChannelManager.");
306                                 }
307                                 // Exit the loop if the background processor was requested to stop.
308                                 if stop_thread.load(Ordering::Acquire) == true {
309                                         log_trace!(logger, "Terminating background processor.");
310                                         break;
311                                 }
312                                 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
313                                         log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
314                                         channel_manager.timer_tick_occurred();
315                                         last_freshness_call = Instant::now();
316                                 }
317                                 if await_time > Duration::from_secs(1) {
318                                         // On various platforms, we may be starved of CPU cycles for several reasons.
319                                         // E.g. on iOS, if we've been in the background, we will be entirely paused.
320                                         // Similarly, if we're on a desktop platform and the device has been asleep, we
321                                         // may not get any cycles.
322                                         // We detect this by checking if our max-100ms-sleep, above, ran longer than a
323                                         // full second, at which point we assume sockets may have been killed (they
324                                         // appear to be at least on some platforms, even if it has only been a second).
325                                         // Note that we have to take care to not get here just because user event
326                                         // processing was slow at the top of the loop. For example, the sample client
327                                         // may call Bitcoin Core RPCs during event handling, which very often takes
328                                         // more than a handful of seconds to complete, and shouldn't disconnect all our
329                                         // peers.
330                                         log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
331                                         peer_manager.disconnect_all_peers();
332                                         last_ping_call = Instant::now();
333                                 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
334                                         log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
335                                         peer_manager.timer_tick_occurred();
336                                         last_ping_call = Instant::now();
337                                 }
338
339                                 // Note that we want to run a graph prune once not long after startup before
340                                 // falling back to our usual hourly prunes. This avoids short-lived clients never
341                                 // pruning their network graph. We run once 60 seconds after startup before
342                                 // continuing our normal cadence.
343                                 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
344                                         // The network graph must not be pruned while rapid sync completion is pending
345                                         log_trace!(logger, "Assessing prunability of network graph");
346                                         if let Some(network_graph) = gossip_sync.prunable_network_graph() {
347                                                 network_graph.remove_stale_channels();
348
349                                                 if let Err(e) = persister.persist_graph(network_graph) {
350                                                         log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
351                                                 }
352
353                                                 last_prune_call = Instant::now();
354                                                 have_pruned = true;
355                                         } else {
356                                                 log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
357                                         }
358                                 }
359
360                                 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
361                                         if let Some(ref scorer) = scorer {
362                                                 log_trace!(logger, "Persisting scorer");
363                                                 if let Err(e) = persister.persist_scorer(&scorer) {
364                                                         log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
365                                                 }
366                                         }
367                                         last_scorer_persist_call = Instant::now();
368                                 }
369                         }
370
371                         // After we exit, ensure we persist the ChannelManager one final time - this avoids
372                         // some races where users quit while channel updates were in-flight, with
373                         // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
374                         persister.persist_manager(&*channel_manager)?;
375
376                         // Persist Scorer on exit
377                         if let Some(ref scorer) = scorer {
378                                 persister.persist_scorer(&scorer)?;
379                         }
380
381                         // Persist NetworkGraph on exit
382                         if let Some(network_graph) = gossip_sync.network_graph() {
383                                 persister.persist_graph(network_graph)?;
384                         }
385
386                         Ok(())
387                 });
388                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
389         }
390
391         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
392         /// [`ChannelManager`].
393         ///
394         /// # Panics
395         ///
396         /// This function panics if the background thread has panicked such as while persisting or
397         /// handling events.
398         ///
399         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
400         pub fn join(mut self) -> Result<(), std::io::Error> {
401                 assert!(self.thread_handle.is_some());
402                 self.join_thread()
403         }
404
405         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
406         /// [`ChannelManager`].
407         ///
408         /// # Panics
409         ///
410         /// This function panics if the background thread has panicked such as while persisting or
411         /// handling events.
412         ///
413         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
414         pub fn stop(mut self) -> Result<(), std::io::Error> {
415                 assert!(self.thread_handle.is_some());
416                 self.stop_and_join_thread()
417         }
418
419         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
420                 self.stop_thread.store(true, Ordering::Release);
421                 self.join_thread()
422         }
423
424         fn join_thread(&mut self) -> Result<(), std::io::Error> {
425                 match self.thread_handle.take() {
426                         Some(handle) => handle.join().unwrap(),
427                         None => Ok(()),
428                 }
429         }
430 }
431
432 impl Drop for BackgroundProcessor {
433         fn drop(&mut self) {
434                 self.stop_and_join_thread().unwrap();
435         }
436 }
437
438 #[cfg(test)]
439 mod tests {
440         use bitcoin::blockdata::block::BlockHeader;
441         use bitcoin::blockdata::constants::genesis_block;
442         use bitcoin::blockdata::transaction::{Transaction, TxOut};
443         use bitcoin::network::constants::Network;
444         use lightning::chain::{BestBlock, Confirm, chainmonitor};
445         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
446         use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
447         use lightning::chain::transaction::OutPoint;
448         use lightning::get_event_msg;
449         use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
450         use lightning::ln::features::{ChannelFeatures, InitFeatures};
451         use lightning::ln::msgs::{ChannelMessageHandler, Init};
452         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
453         use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
454         use lightning::util::config::UserConfig;
455         use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
456         use lightning::util::ser::Writeable;
457         use lightning::util::test_utils;
458         use lightning::util::persist::KVStorePersister;
459         use lightning_invoice::payment::{InvoicePayer, Retry};
460         use lightning_invoice::utils::DefaultRouter;
461         use lightning_persister::FilesystemPersister;
462         use std::fs;
463         use std::path::PathBuf;
464         use std::sync::{Arc, Mutex};
465         use std::sync::mpsc::SyncSender;
466         use std::time::Duration;
467         use lightning::routing::scoring::{FixedPenaltyScorer};
468         use lightning_rapid_gossip_sync::RapidGossipSync;
469         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
470
471         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
472
473         #[derive(Clone, Eq, Hash, PartialEq)]
474         struct TestDescriptor{}
475         impl SocketDescriptor for TestDescriptor {
476                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
477                         0
478                 }
479
480                 fn disconnect_socket(&mut self) {}
481         }
482
483         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
484
485         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
486         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
487
488         struct Node {
489                 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
490                 p2p_gossip_sync: PGS,
491                 rapid_gossip_sync: RGS,
492                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
493                 chain_monitor: Arc<ChainMonitor>,
494                 persister: Arc<FilesystemPersister>,
495                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
496                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
497                 logger: Arc<test_utils::TestLogger>,
498                 best_block: BestBlock,
499                 scorer: Arc<Mutex<FixedPenaltyScorer>>,
500         }
501
502         impl Node {
503                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
504                         GossipSync::P2P(self.p2p_gossip_sync.clone())
505                 }
506
507                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
508                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
509                 }
510
511                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
512                         GossipSync::None
513                 }
514         }
515
516         impl Drop for Node {
517                 fn drop(&mut self) {
518                         let data_dir = self.persister.get_data_dir();
519                         match fs::remove_dir_all(data_dir.clone()) {
520                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
521                                 _ => {}
522                         }
523                 }
524         }
525
526         struct Persister {
527                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
528                 graph_persistence_notifier: Option<SyncSender<()>>,
529                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
530                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
531                 filesystem_persister: FilesystemPersister,
532         }
533
534         impl Persister {
535                 fn new(data_dir: String) -> Self {
536                         let filesystem_persister = FilesystemPersister::new(data_dir.clone());
537                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
538                 }
539
540                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
541                         Self { graph_error: Some((error, message)), ..self }
542                 }
543
544                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
545                         Self { graph_persistence_notifier: Some(sender), ..self }
546                 }
547
548                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
549                         Self { manager_error: Some((error, message)), ..self }
550                 }
551
552                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
553                         Self { scorer_error: Some((error, message)), ..self }
554                 }
555         }
556
557         impl KVStorePersister for Persister {
558                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
559                         if key == "manager" {
560                                 if let Some((error, message)) = self.manager_error {
561                                         return Err(std::io::Error::new(error, message))
562                                 }
563                         }
564
565                         if key == "network_graph" {
566                                 if let Some(sender) = &self.graph_persistence_notifier {
567                                         sender.send(()).unwrap();
568                                 };
569
570                                 if let Some((error, message)) = self.graph_error {
571                                         return Err(std::io::Error::new(error, message))
572                                 }
573                         }
574
575                         if key == "scorer" {
576                                 if let Some((error, message)) = self.scorer_error {
577                                         return Err(std::io::Error::new(error, message))
578                                 }
579                         }
580
581                         self.filesystem_persister.persist(key, object)
582                 }
583         }
584
585         fn get_full_filepath(filepath: String, filename: String) -> String {
586                 let mut path = PathBuf::from(filepath);
587                 path.push(filename);
588                 path.to_str().unwrap().to_string()
589         }
590
591         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
592                 let mut nodes = Vec::new();
593                 for i in 0..num_nodes {
594                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
595                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
596                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
597                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
598                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
599                         let seed = [i as u8; 32];
600                         let network = Network::Testnet;
601                         let genesis_block = genesis_block(network);
602                         let now = Duration::from_secs(genesis_block.header.time as u64);
603                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
604                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
605                         let best_block = BestBlock::from_genesis(network);
606                         let params = ChainParameters { network, best_block };
607                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
608                         let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
609                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
610                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
611                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
612                         let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
613                         let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
614                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
615                         nodes.push(node);
616                 }
617
618                 for i in 0..num_nodes {
619                         for j in (i+1)..num_nodes {
620                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
621                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
622                         }
623                 }
624
625                 nodes
626         }
627
628         macro_rules! open_channel {
629                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
630                         begin_open_channel!($node_a, $node_b, $channel_value);
631                         let events = $node_a.node.get_and_clear_pending_events();
632                         assert_eq!(events.len(), 1);
633                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
634                         end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
635                         tx
636                 }}
637         }
638
639         macro_rules! begin_open_channel {
640                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
641                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
642                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
643                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
644                 }}
645         }
646
647         macro_rules! handle_funding_generation_ready {
648                 ($event: expr, $channel_value: expr) => {{
649                         match $event {
650                                 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
651                                         assert_eq!(channel_value_satoshis, $channel_value);
652                                         assert_eq!(user_channel_id, 42);
653
654                                         let tx = Transaction { version: 1 as i32, lock_time: 0, input: Vec::new(), output: vec![TxOut {
655                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
656                                         }]};
657                                         (temporary_channel_id, tx)
658                                 },
659                                 _ => panic!("Unexpected event"),
660                         }
661                 }}
662         }
663
664         macro_rules! end_open_channel {
665                 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
666                         $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
667                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
668                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
669                 }}
670         }
671
672         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
673                 for i in 1..=depth {
674                         let prev_blockhash = node.best_block.block_hash();
675                         let height = node.best_block.height() + 1;
676                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: Default::default(), time: height, bits: 42, nonce: 42 };
677                         let txdata = vec![(0, tx)];
678                         node.best_block = BestBlock::new(header.block_hash(), height);
679                         match i {
680                                 1 => {
681                                         node.node.transactions_confirmed(&header, &txdata, height);
682                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
683                                 },
684                                 x if x == depth => {
685                                         node.node.best_block_updated(&header, height);
686                                         node.chain_monitor.best_block_updated(&header, height);
687                                 },
688                                 _ => {},
689                         }
690                 }
691         }
692         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
693                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
694         }
695
696         #[test]
697         fn test_background_processor() {
698                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
699                 // updates. Also test that when new updates are available, the manager signals that it needs
700                 // re-persistence and is successfully re-persisted.
701                 let nodes = create_nodes(2, "test_background_processor".to_string());
702
703                 // Go through the channel creation process so that each node has something to persist. Since
704                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
705                 // avoid a race with processing events.
706                 let tx = open_channel!(nodes[0], nodes[1], 100000);
707
708                 // Initiate the background processors to watch each node.
709                 let data_dir = nodes[0].persister.get_data_dir();
710                 let persister = Arc::new(Persister::new(data_dir));
711                 let event_handler = |_: &_| {};
712                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
713
714                 macro_rules! check_persisted_data {
715                         ($node: expr, $filepath: expr) => {
716                                 let mut expected_bytes = Vec::new();
717                                 loop {
718                                         expected_bytes.clear();
719                                         match $node.write(&mut expected_bytes) {
720                                                 Ok(()) => {
721                                                         match std::fs::read($filepath) {
722                                                                 Ok(bytes) => {
723                                                                         if bytes == expected_bytes {
724                                                                                 break
725                                                                         } else {
726                                                                                 continue
727                                                                         }
728                                                                 },
729                                                                 Err(_) => continue
730                                                         }
731                                                 },
732                                                 Err(e) => panic!("Unexpected error: {}", e)
733                                         }
734                                 }
735                         }
736                 }
737
738                 // Check that the initial channel manager data is persisted as expected.
739                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
740                 check_persisted_data!(nodes[0].node, filepath.clone());
741
742                 loop {
743                         if !nodes[0].node.get_persistence_condvar_value() { break }
744                 }
745
746                 // Force-close the channel.
747                 nodes[0].node.force_close_channel(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
748
749                 // Check that the force-close updates are persisted.
750                 check_persisted_data!(nodes[0].node, filepath.clone());
751                 loop {
752                         if !nodes[0].node.get_persistence_condvar_value() { break }
753                 }
754
755                 // Check network graph is persisted
756                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
757                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
758
759                 // Check scorer is persisted
760                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
761                 check_persisted_data!(nodes[0].scorer, filepath.clone());
762
763                 assert!(bg_processor.stop().is_ok());
764         }
765
766         #[test]
767         fn test_timer_tick_called() {
768                 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
769                 // `FRESHNESS_TIMER`.
770                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
771                 let data_dir = nodes[0].persister.get_data_dir();
772                 let persister = Arc::new(Persister::new(data_dir));
773                 let event_handler = |_: &_| {};
774                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
775                 loop {
776                         let log_entries = nodes[0].logger.lines.lock().unwrap();
777                         let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
778                         let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
779                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
780                                         log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
781                                 break
782                         }
783                 }
784
785                 assert!(bg_processor.stop().is_ok());
786         }
787
788         #[test]
789         fn test_channel_manager_persist_error() {
790                 // Test that if we encounter an error during manager persistence, the thread panics.
791                 let nodes = create_nodes(2, "test_persist_error".to_string());
792                 open_channel!(nodes[0], nodes[1], 100000);
793
794                 let data_dir = nodes[0].persister.get_data_dir();
795                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
796                 let event_handler = |_: &_| {};
797                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
798                 match bg_processor.join() {
799                         Ok(_) => panic!("Expected error persisting manager"),
800                         Err(e) => {
801                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
802                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
803                         },
804                 }
805         }
806
807         #[test]
808         fn test_network_graph_persist_error() {
809                 // Test that if we encounter an error during network graph persistence, an error gets returned.
810                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
811                 let data_dir = nodes[0].persister.get_data_dir();
812                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
813                 let event_handler = |_: &_| {};
814                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
815
816                 match bg_processor.stop() {
817                         Ok(_) => panic!("Expected error persisting network graph"),
818                         Err(e) => {
819                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
820                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
821                         },
822                 }
823         }
824
825         #[test]
826         fn test_scorer_persist_error() {
827                 // Test that if we encounter an error during scorer persistence, an error gets returned.
828                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
829                 let data_dir = nodes[0].persister.get_data_dir();
830                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
831                 let event_handler = |_: &_| {};
832                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
833
834                 match bg_processor.stop() {
835                         Ok(_) => panic!("Expected error persisting scorer"),
836                         Err(e) => {
837                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
838                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
839                         },
840                 }
841         }
842
843         #[test]
844         fn test_background_event_handling() {
845                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
846                 let channel_value = 100000;
847                 let data_dir = nodes[0].persister.get_data_dir();
848                 let persister = Arc::new(Persister::new(data_dir.clone()));
849
850                 // Set up a background event handler for FundingGenerationReady events.
851                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
852                 let event_handler = move |event: &Event| {
853                         sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
854                 };
855                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
856
857                 // Open a channel and check that the FundingGenerationReady event was handled.
858                 begin_open_channel!(nodes[0], nodes[1], channel_value);
859                 let (temporary_channel_id, funding_tx) = receiver
860                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
861                         .expect("FundingGenerationReady not handled within deadline");
862                 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
863
864                 // Confirm the funding transaction.
865                 confirm_transaction(&mut nodes[0], &funding_tx);
866                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
867                 confirm_transaction(&mut nodes[1], &funding_tx);
868                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
869                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
870                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
871                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
872                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
873
874                 assert!(bg_processor.stop().is_ok());
875
876                 // Set up a background event handler for SpendableOutputs events.
877                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
878                 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
879                 let persister = Arc::new(Persister::new(data_dir));
880                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
881
882                 // Force close the channel and check that the SpendableOutputs event was handled.
883                 nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
884                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
885                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
886                 let event = receiver
887                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
888                         .expect("SpendableOutputs not handled within deadline");
889                 match event {
890                         Event::SpendableOutputs { .. } => {},
891                         Event::ChannelClosed { .. } => {},
892                         _ => panic!("Unexpected event: {:?}", event),
893                 }
894
895                 assert!(bg_processor.stop().is_ok());
896         }
897
898         #[test]
899         fn test_scorer_persistence() {
900                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
901                 let data_dir = nodes[0].persister.get_data_dir();
902                 let persister = Arc::new(Persister::new(data_dir));
903                 let event_handler = |_: &_| {};
904                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
905
906                 loop {
907                         let log_entries = nodes[0].logger.lines.lock().unwrap();
908                         let expected_log = "Persisting scorer".to_string();
909                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
910                                 break
911                         }
912                 }
913
914                 assert!(bg_processor.stop().is_ok());
915         }
916
917         #[test]
918         fn test_not_pruning_network_graph_until_graph_sync_completion() {
919                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
920                 let data_dir = nodes[0].persister.get_data_dir();
921                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
922                 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
923                 let network_graph = nodes[0].network_graph.clone();
924                 let features = ChannelFeatures::empty();
925                 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
926                         .expect("Failed to update channel from partial announcement");
927                 let original_graph_description = network_graph.to_string();
928                 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
929                 assert_eq!(network_graph.read_only().channels().len(), 1);
930
931                 let event_handler = |_: &_| {};
932                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
933
934                 loop {
935                         let log_entries = nodes[0].logger.lines.lock().unwrap();
936                         let expected_log_a = "Assessing prunability of network graph".to_string();
937                         let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
938                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
939                                 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
940                                 break
941                         }
942                 }
943
944                 let initialization_input = vec![
945                         76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
946                         79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
947                         0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
948                         187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
949                         157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
950                         88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
951                         204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
952                         181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
953                         110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
954                         76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
955                         226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
956                         0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
957                         0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
958                 ];
959                 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
960
961                 // this should have added two channels
962                 assert_eq!(network_graph.read_only().channels().len(), 3);
963
964                 let _ = receiver
965                         .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
966                         .expect("Network graph not pruned within deadline");
967
968                 background_processor.stop().unwrap();
969
970                 // all channels should now be pruned
971                 assert_eq!(network_graph.read_only().channels().len(), 0);
972         }
973
974         #[test]
975         fn test_invoice_payer() {
976                 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
977                 let random_seed_bytes = keys_manager.get_secure_random_bytes();
978                 let nodes = create_nodes(2, "test_invoice_payer".to_string());
979
980                 // Initiate the background processors to watch each node.
981                 let data_dir = nodes[0].persister.get_data_dir();
982                 let persister = Arc::new(Persister::new(data_dir));
983                 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
984                 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
985                 let event_handler = Arc::clone(&invoice_payer);
986                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
987                 assert!(bg_processor.stop().is_ok());
988         }
989 }