Implement EventHandler for NetworkGraph
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 #![deny(broken_intra_doc_links)]
6 #![deny(missing_docs)]
7 #![deny(unsafe_code)]
8
9 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
10
11 #[macro_use] extern crate lightning;
12 extern crate lightning_rapid_gossip_sync;
13
14 use lightning::chain;
15 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
16 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
17 use lightning::chain::keysinterface::{Sign, KeysInterface};
18 use lightning::ln::channelmanager::ChannelManager;
19 use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
20 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
21 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
22 use lightning::routing::scoring::WriteableScore;
23 use lightning::util::events::{Event, EventHandler, EventsProvider};
24 use lightning::util::logger::Logger;
25 use lightning::util::persist::Persister;
26 use lightning_rapid_gossip_sync::RapidGossipSync;
27 use std::sync::Arc;
28 use std::sync::atomic::{AtomicBool, Ordering};
29 use std::thread;
30 use std::thread::JoinHandle;
31 use std::time::{Duration, Instant};
32 use std::ops::Deref;
33
34 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
35 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
36 /// responsibilities are:
37 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
38 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
39 ///   writing it to disk/backups by invoking the callback given to it at startup.
40 ///   [`ChannelManager`] persistence should be done in the background.
41 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
42 ///   at the appropriate intervals.
43 /// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`P2PGossipSync`] is provided to
44 ///   [`BackgroundProcessor::start`]).
45 ///
46 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
47 /// upon as doing so may result in high latency.
48 ///
49 /// # Note
50 ///
51 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
52 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
53 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
54 /// unilateral chain closure fees are at risk.
55 ///
56 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
57 /// [`Event`]: lightning::util::events::Event
58 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
59 pub struct BackgroundProcessor {
60         stop_thread: Arc<AtomicBool>,
61         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
62 }
63
64 #[cfg(not(test))]
65 const FRESHNESS_TIMER: u64 = 60;
66 #[cfg(test)]
67 const FRESHNESS_TIMER: u64 = 1;
68
69 #[cfg(all(not(test), not(debug_assertions)))]
70 const PING_TIMER: u64 = 10;
71 /// Signature operations take a lot longer without compiler optimisations.
72 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
73 /// timeout is reached.
74 #[cfg(all(not(test), debug_assertions))]
75 const PING_TIMER: u64 = 30;
76 #[cfg(test)]
77 const PING_TIMER: u64 = 1;
78
79 /// Prune the network graph of stale entries hourly.
80 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
81
82 #[cfg(all(not(test), debug_assertions))]
83 const SCORER_PERSIST_TIMER: u64 = 30;
84 #[cfg(test)]
85 const SCORER_PERSIST_TIMER: u64 = 1;
86
87 #[cfg(not(test))]
88 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
89 #[cfg(test)]
90 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
91
92
93 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
94 struct DecoratingEventHandler<
95         E: EventHandler,
96         P: Deref<Target = P2PGossipSync<G, A, L>>,
97         G: Deref<Target = NetworkGraph<L>>,
98         A: Deref,
99         L: Deref,
100 >
101 where A::Target: chain::Access, L::Target: Logger {
102         event_handler: E,
103         p2p_gossip_sync: Option<P>,
104 }
105
106 impl<
107         E: EventHandler,
108         P: Deref<Target = P2PGossipSync<G, A, L>>,
109         G: Deref<Target = NetworkGraph<L>>,
110         A: Deref,
111         L: Deref,
112 > EventHandler for DecoratingEventHandler<E, P, G, A, L>
113 where A::Target: chain::Access, L::Target: Logger {
114         fn handle_event(&self, event: &Event) {
115                 if let Some(gossip_sync) = &self.p2p_gossip_sync {
116                         gossip_sync.network_graph().handle_event(event);
117                 }
118                 self.event_handler.handle_event(event);
119         }
120 }
121
122 impl BackgroundProcessor {
123         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
124         /// documentation].
125         ///
126         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
127         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
128         /// either [`join`] or [`stop`].
129         ///
130         /// # Data Persistence
131         ///
132         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
133         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
134         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
135         /// provided implementation.
136         ///
137         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk. See
138         /// [`NetworkGraph::write`] for writing out a [`NetworkGraph`]. See the `lightning-persister` crate
139         /// for LDK's provided implementation.
140         ///
141         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
142         /// error or call [`join`] and handle any error that may arise. For the latter case,
143         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
144         ///
145         /// # Event Handling
146         ///
147         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
148         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
149         /// functionality implemented by other handlers.
150         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
151         ///
152         /// # Rapid Gossip Sync
153         ///
154         /// If rapid gossip sync is meant to run at startup, pass an optional [`RapidGossipSync`]
155         /// to `rapid_gossip_sync` to indicate to [`BackgroundProcessor`] not to prune the
156         /// [`NetworkGraph`] instance until the [`RapidGossipSync`] instance completes its first sync.
157         ///
158         /// [top-level documentation]: BackgroundProcessor
159         /// [`join`]: Self::join
160         /// [`stop`]: Self::stop
161         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
162         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
163         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
164         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
165         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
166         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
167         pub fn start<
168                 'a,
169                 Signer: 'static + Sign,
170                 CA: 'static + Deref + Send + Sync,
171                 CF: 'static + Deref + Send + Sync,
172                 CW: 'static + Deref + Send + Sync,
173                 T: 'static + Deref + Send + Sync,
174                 K: 'static + Deref + Send + Sync,
175                 F: 'static + Deref + Send + Sync,
176                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
177                 L: 'static + Deref + Send + Sync,
178                 P: 'static + Deref + Send + Sync,
179                 Descriptor: 'static + SocketDescriptor + Send + Sync,
180                 CMH: 'static + Deref + Send + Sync,
181                 RMH: 'static + Deref + Send + Sync,
182                 EH: 'static + EventHandler + Send,
183                 PS: 'static + Deref + Send,
184                 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
185                 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
186                 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
187                 UMH: 'static + Deref + Send + Sync,
188                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
189                 S: 'static + Deref<Target = SC> + Send + Sync,
190                 SC: WriteableScore<'a>,
191                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send
192         >(
193                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
194                 p2p_gossip_sync: Option<PGS>, peer_manager: PM, logger: L, scorer: Option<S>,
195                 rapid_gossip_sync: Option<RGS>
196         ) -> Self
197         where
198                 CA::Target: 'static + chain::Access,
199                 CF::Target: 'static + chain::Filter,
200                 CW::Target: 'static + chain::Watch<Signer>,
201                 T::Target: 'static + BroadcasterInterface,
202                 K::Target: 'static + KeysInterface<Signer = Signer>,
203                 F::Target: 'static + FeeEstimator,
204                 L::Target: 'static + Logger,
205                 P::Target: 'static + Persist<Signer>,
206                 CMH::Target: 'static + ChannelMessageHandler,
207                 RMH::Target: 'static + RoutingMessageHandler,
208                 UMH::Target: 'static + CustomMessageHandler,
209                 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
210         {
211                 let stop_thread = Arc::new(AtomicBool::new(false));
212                 let stop_thread_clone = stop_thread.clone();
213                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
214                         let event_handler = DecoratingEventHandler {
215                                 event_handler,
216                                 p2p_gossip_sync: p2p_gossip_sync.as_ref().map(|t| t.deref()),
217                         };
218
219                         log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
220                         channel_manager.timer_tick_occurred();
221
222                         let mut last_freshness_call = Instant::now();
223                         let mut last_ping_call = Instant::now();
224                         let mut last_prune_call = Instant::now();
225                         let mut last_scorer_persist_call = Instant::now();
226                         let mut have_pruned = false;
227
228                         loop {
229                                 channel_manager.process_pending_events(&event_handler);
230                                 chain_monitor.process_pending_events(&event_handler);
231
232                                 // Note that the PeerManager::process_events may block on ChannelManager's locks,
233                                 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
234                                 // we want to ensure we get into `persist_manager` as quickly as we can, especially
235                                 // without running the normal event processing above and handing events to users.
236                                 //
237                                 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
238                                 // processing a message effectively at any point during this loop. In order to
239                                 // minimize the time between such processing completing and persisting the updated
240                                 // ChannelManager, we want to minimize methods blocking on a ChannelManager
241                                 // generally, and as a fallback place such blocking only immediately before
242                                 // persistence.
243                                 peer_manager.process_events();
244
245                                 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
246                                 // see `await_start`'s use below.
247                                 let await_start = Instant::now();
248                                 let updates_available =
249                                         channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
250                                 let await_time = await_start.elapsed();
251
252                                 if updates_available {
253                                         log_trace!(logger, "Persisting ChannelManager...");
254                                         persister.persist_manager(&*channel_manager)?;
255                                         log_trace!(logger, "Done persisting ChannelManager.");
256                                 }
257                                 // Exit the loop if the background processor was requested to stop.
258                                 if stop_thread.load(Ordering::Acquire) == true {
259                                         log_trace!(logger, "Terminating background processor.");
260                                         break;
261                                 }
262                                 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
263                                         log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
264                                         channel_manager.timer_tick_occurred();
265                                         last_freshness_call = Instant::now();
266                                 }
267                                 if await_time > Duration::from_secs(1) {
268                                         // On various platforms, we may be starved of CPU cycles for several reasons.
269                                         // E.g. on iOS, if we've been in the background, we will be entirely paused.
270                                         // Similarly, if we're on a desktop platform and the device has been asleep, we
271                                         // may not get any cycles.
272                                         // We detect this by checking if our max-100ms-sleep, above, ran longer than a
273                                         // full second, at which point we assume sockets may have been killed (they
274                                         // appear to be at least on some platforms, even if it has only been a second).
275                                         // Note that we have to take care to not get here just because user event
276                                         // processing was slow at the top of the loop. For example, the sample client
277                                         // may call Bitcoin Core RPCs during event handling, which very often takes
278                                         // more than a handful of seconds to complete, and shouldn't disconnect all our
279                                         // peers.
280                                         log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
281                                         peer_manager.disconnect_all_peers();
282                                         last_ping_call = Instant::now();
283                                 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
284                                         log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
285                                         peer_manager.timer_tick_occurred();
286                                         last_ping_call = Instant::now();
287                                 }
288
289                                 // Note that we want to run a graph prune once not long after startup before
290                                 // falling back to our usual hourly prunes. This avoids short-lived clients never
291                                 // pruning their network graph. We run once 60 seconds after startup before
292                                 // continuing our normal cadence.
293                                 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
294                                         // The network graph must not be pruned while rapid sync completion is pending
295                                         log_trace!(logger, "Assessing prunability of network graph");
296                                         let graph_to_prune = match rapid_gossip_sync.as_ref() {
297                                                 Some(rapid_sync) => {
298                                                         if rapid_sync.is_initial_sync_complete() {
299                                                                 Some(rapid_sync.network_graph())
300                                                         } else {
301                                                                 None
302                                                         }
303                                                 },
304                                                 None => p2p_gossip_sync.as_ref().map(|sync| sync.network_graph())
305                                         };
306
307                                         if let Some(network_graph_reference) = graph_to_prune {
308                                                 network_graph_reference.remove_stale_channels();
309
310                                                 if let Err(e) = persister.persist_graph(network_graph_reference) {
311                                                         log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
312                                                 }
313
314                                                 last_prune_call = Instant::now();
315                                                 have_pruned = true;
316                                         } else {
317                                                 log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
318                                         }
319                                 }
320
321                                 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
322                                         if let Some(ref scorer) = scorer {
323                                                 log_trace!(logger, "Persisting scorer");
324                                                 if let Err(e) = persister.persist_scorer(&scorer) {
325                                                         log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
326                                                 }
327                                         }
328                                         last_scorer_persist_call = Instant::now();
329                                 }
330                         }
331
332                         // After we exit, ensure we persist the ChannelManager one final time - this avoids
333                         // some races where users quit while channel updates were in-flight, with
334                         // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
335                         persister.persist_manager(&*channel_manager)?;
336
337                         // Persist Scorer on exit
338                         if let Some(ref scorer) = scorer {
339                                 persister.persist_scorer(&scorer)?;
340                         }
341
342                         // Persist NetworkGraph on exit
343                         if let Some(ref gossip_sync) = p2p_gossip_sync {
344                                 persister.persist_graph(gossip_sync.network_graph())?;
345                         }
346
347                         Ok(())
348                 });
349                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
350         }
351
352         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
353         /// [`ChannelManager`].
354         ///
355         /// # Panics
356         ///
357         /// This function panics if the background thread has panicked such as while persisting or
358         /// handling events.
359         ///
360         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
361         pub fn join(mut self) -> Result<(), std::io::Error> {
362                 assert!(self.thread_handle.is_some());
363                 self.join_thread()
364         }
365
366         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
367         /// [`ChannelManager`].
368         ///
369         /// # Panics
370         ///
371         /// This function panics if the background thread has panicked such as while persisting or
372         /// handling events.
373         ///
374         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
375         pub fn stop(mut self) -> Result<(), std::io::Error> {
376                 assert!(self.thread_handle.is_some());
377                 self.stop_and_join_thread()
378         }
379
380         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
381                 self.stop_thread.store(true, Ordering::Release);
382                 self.join_thread()
383         }
384
385         fn join_thread(&mut self) -> Result<(), std::io::Error> {
386                 match self.thread_handle.take() {
387                         Some(handle) => handle.join().unwrap(),
388                         None => Ok(()),
389                 }
390         }
391 }
392
393 impl Drop for BackgroundProcessor {
394         fn drop(&mut self) {
395                 self.stop_and_join_thread().unwrap();
396         }
397 }
398
399 #[cfg(test)]
400 mod tests {
401         use bitcoin::blockdata::block::BlockHeader;
402         use bitcoin::blockdata::constants::genesis_block;
403         use bitcoin::blockdata::transaction::{Transaction, TxOut};
404         use bitcoin::network::constants::Network;
405         use lightning::chain::{BestBlock, Confirm, chainmonitor};
406         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
407         use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
408         use lightning::chain::transaction::OutPoint;
409         use lightning::get_event_msg;
410         use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
411         use lightning::ln::features::{ChannelFeatures, InitFeatures};
412         use lightning::ln::msgs::{ChannelMessageHandler, Init};
413         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
414         use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
415         use lightning::util::config::UserConfig;
416         use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
417         use lightning::util::ser::Writeable;
418         use lightning::util::test_utils;
419         use lightning::util::persist::KVStorePersister;
420         use lightning_invoice::payment::{InvoicePayer, Retry};
421         use lightning_invoice::utils::DefaultRouter;
422         use lightning_persister::FilesystemPersister;
423         use std::fs;
424         use std::path::PathBuf;
425         use std::sync::{Arc, Mutex};
426         use std::sync::mpsc::SyncSender;
427         use std::time::Duration;
428         use lightning::routing::scoring::{FixedPenaltyScorer};
429         use lightning_rapid_gossip_sync::RapidGossipSync;
430         use super::{BackgroundProcessor, FRESHNESS_TIMER};
431
432         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
433
434         #[derive(Clone, Eq, Hash, PartialEq)]
435         struct TestDescriptor{}
436         impl SocketDescriptor for TestDescriptor {
437                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
438                         0
439                 }
440
441                 fn disconnect_socket(&mut self) {}
442         }
443
444         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
445
446         struct Node {
447                 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
448                 p2p_gossip_sync: Option<Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>>,
449                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
450                 chain_monitor: Arc<ChainMonitor>,
451                 persister: Arc<FilesystemPersister>,
452                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
453                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
454                 logger: Arc<test_utils::TestLogger>,
455                 best_block: BestBlock,
456                 scorer: Arc<Mutex<FixedPenaltyScorer>>,
457                 rapid_gossip_sync: Option<Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>>,
458         }
459
460         impl Drop for Node {
461                 fn drop(&mut self) {
462                         let data_dir = self.persister.get_data_dir();
463                         match fs::remove_dir_all(data_dir.clone()) {
464                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
465                                 _ => {}
466                         }
467                 }
468         }
469
470         struct Persister {
471                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
472                 graph_persistence_notifier: Option<SyncSender<()>>,
473                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
474                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
475                 filesystem_persister: FilesystemPersister,
476         }
477
478         impl Persister {
479                 fn new(data_dir: String) -> Self {
480                         let filesystem_persister = FilesystemPersister::new(data_dir.clone());
481                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
482                 }
483
484                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
485                         Self { graph_error: Some((error, message)), ..self }
486                 }
487
488                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
489                         Self { graph_persistence_notifier: Some(sender), ..self }
490                 }
491
492                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
493                         Self { manager_error: Some((error, message)), ..self }
494                 }
495
496                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
497                         Self { scorer_error: Some((error, message)), ..self }
498                 }
499         }
500
501         impl KVStorePersister for Persister {
502                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
503                         if key == "manager" {
504                                 if let Some((error, message)) = self.manager_error {
505                                         return Err(std::io::Error::new(error, message))
506                                 }
507                         }
508
509                         if key == "network_graph" {
510                                 if let Some(sender) = &self.graph_persistence_notifier {
511                                         sender.send(()).unwrap();
512                                 };
513
514                                 if let Some((error, message)) = self.graph_error {
515                                         return Err(std::io::Error::new(error, message))
516                                 }
517                         }
518
519                         if key == "scorer" {
520                                 if let Some((error, message)) = self.scorer_error {
521                                         return Err(std::io::Error::new(error, message))
522                                 }
523                         }
524
525                         self.filesystem_persister.persist(key, object)
526                 }
527         }
528
529         fn get_full_filepath(filepath: String, filename: String) -> String {
530                 let mut path = PathBuf::from(filepath);
531                 path.push(filename);
532                 path.to_str().unwrap().to_string()
533         }
534
535         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
536                 let mut nodes = Vec::new();
537                 for i in 0..num_nodes {
538                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
539                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
540                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
541                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
542                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
543                         let seed = [i as u8; 32];
544                         let network = Network::Testnet;
545                         let genesis_block = genesis_block(network);
546                         let now = Duration::from_secs(genesis_block.header.time as u64);
547                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
548                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
549                         let best_block = BestBlock::from_genesis(network);
550                         let params = ChainParameters { network, best_block };
551                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
552                         let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
553                         let p2p_gossip_sync = Some(Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone())));
554                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
555                         let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
556                         let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
557                         let rapid_gossip_sync = None;
558                         let node = Node { node: manager, p2p_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer, rapid_gossip_sync };
559                         nodes.push(node);
560                 }
561
562                 for i in 0..num_nodes {
563                         for j in (i+1)..num_nodes {
564                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
565                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
566                         }
567                 }
568
569                 nodes
570         }
571
572         macro_rules! open_channel {
573                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
574                         begin_open_channel!($node_a, $node_b, $channel_value);
575                         let events = $node_a.node.get_and_clear_pending_events();
576                         assert_eq!(events.len(), 1);
577                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
578                         end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
579                         tx
580                 }}
581         }
582
583         macro_rules! begin_open_channel {
584                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
585                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
586                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
587                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
588                 }}
589         }
590
591         macro_rules! handle_funding_generation_ready {
592                 ($event: expr, $channel_value: expr) => {{
593                         match $event {
594                                 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
595                                         assert_eq!(channel_value_satoshis, $channel_value);
596                                         assert_eq!(user_channel_id, 42);
597
598                                         let tx = Transaction { version: 1 as i32, lock_time: 0, input: Vec::new(), output: vec![TxOut {
599                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
600                                         }]};
601                                         (temporary_channel_id, tx)
602                                 },
603                                 _ => panic!("Unexpected event"),
604                         }
605                 }}
606         }
607
608         macro_rules! end_open_channel {
609                 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
610                         $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
611                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
612                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
613                 }}
614         }
615
616         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
617                 for i in 1..=depth {
618                         let prev_blockhash = node.best_block.block_hash();
619                         let height = node.best_block.height() + 1;
620                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: Default::default(), time: height, bits: 42, nonce: 42 };
621                         let txdata = vec![(0, tx)];
622                         node.best_block = BestBlock::new(header.block_hash(), height);
623                         match i {
624                                 1 => {
625                                         node.node.transactions_confirmed(&header, &txdata, height);
626                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
627                                 },
628                                 x if x == depth => {
629                                         node.node.best_block_updated(&header, height);
630                                         node.chain_monitor.best_block_updated(&header, height);
631                                 },
632                                 _ => {},
633                         }
634                 }
635         }
636         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
637                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
638         }
639
640         #[test]
641         fn test_background_processor() {
642                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
643                 // updates. Also test that when new updates are available, the manager signals that it needs
644                 // re-persistence and is successfully re-persisted.
645                 let nodes = create_nodes(2, "test_background_processor".to_string());
646
647                 // Go through the channel creation process so that each node has something to persist. Since
648                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
649                 // avoid a race with processing events.
650                 let tx = open_channel!(nodes[0], nodes[1], 100000);
651
652                 // Initiate the background processors to watch each node.
653                 let data_dir = nodes[0].persister.get_data_dir();
654                 let persister = Arc::new(Persister::new(data_dir));
655                 let event_handler = |_: &_| {};
656                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
657
658                 macro_rules! check_persisted_data {
659                         ($node: expr, $filepath: expr) => {
660                                 let mut expected_bytes = Vec::new();
661                                 loop {
662                                         expected_bytes.clear();
663                                         match $node.write(&mut expected_bytes) {
664                                                 Ok(()) => {
665                                                         match std::fs::read($filepath) {
666                                                                 Ok(bytes) => {
667                                                                         if bytes == expected_bytes {
668                                                                                 break
669                                                                         } else {
670                                                                                 continue
671                                                                         }
672                                                                 },
673                                                                 Err(_) => continue
674                                                         }
675                                                 },
676                                                 Err(e) => panic!("Unexpected error: {}", e)
677                                         }
678                                 }
679                         }
680                 }
681
682                 // Check that the initial channel manager data is persisted as expected.
683                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
684                 check_persisted_data!(nodes[0].node, filepath.clone());
685
686                 loop {
687                         if !nodes[0].node.get_persistence_condvar_value() { break }
688                 }
689
690                 // Force-close the channel.
691                 nodes[0].node.force_close_channel(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
692
693                 // Check that the force-close updates are persisted.
694                 check_persisted_data!(nodes[0].node, filepath.clone());
695                 loop {
696                         if !nodes[0].node.get_persistence_condvar_value() { break }
697                 }
698
699                 // Check network graph is persisted
700                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
701                 if let Some(ref handler) = nodes[0].p2p_gossip_sync {
702                         let network_graph = handler.network_graph();
703                         check_persisted_data!(network_graph, filepath.clone());
704                 }
705
706                 // Check scorer is persisted
707                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
708                 check_persisted_data!(nodes[0].scorer, filepath.clone());
709
710                 assert!(bg_processor.stop().is_ok());
711         }
712
713         #[test]
714         fn test_timer_tick_called() {
715                 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
716                 // `FRESHNESS_TIMER`.
717                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
718                 let data_dir = nodes[0].persister.get_data_dir();
719                 let persister = Arc::new(Persister::new(data_dir));
720                 let event_handler = |_: &_| {};
721                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
722                 loop {
723                         let log_entries = nodes[0].logger.lines.lock().unwrap();
724                         let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
725                         let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
726                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
727                                         log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
728                                 break
729                         }
730                 }
731
732                 assert!(bg_processor.stop().is_ok());
733         }
734
735         #[test]
736         fn test_channel_manager_persist_error() {
737                 // Test that if we encounter an error during manager persistence, the thread panics.
738                 let nodes = create_nodes(2, "test_persist_error".to_string());
739                 open_channel!(nodes[0], nodes[1], 100000);
740
741                 let data_dir = nodes[0].persister.get_data_dir();
742                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
743                 let event_handler = |_: &_| {};
744                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
745                 match bg_processor.join() {
746                         Ok(_) => panic!("Expected error persisting manager"),
747                         Err(e) => {
748                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
749                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
750                         },
751                 }
752         }
753
754         #[test]
755         fn test_network_graph_persist_error() {
756                 // Test that if we encounter an error during network graph persistence, an error gets returned.
757                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
758                 let data_dir = nodes[0].persister.get_data_dir();
759                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
760                 let event_handler = |_: &_| {};
761                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
762
763                 match bg_processor.stop() {
764                         Ok(_) => panic!("Expected error persisting network graph"),
765                         Err(e) => {
766                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
767                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
768                         },
769                 }
770         }
771
772         #[test]
773         fn test_scorer_persist_error() {
774                 // Test that if we encounter an error during scorer persistence, an error gets returned.
775                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
776                 let data_dir = nodes[0].persister.get_data_dir();
777                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
778                 let event_handler = |_: &_| {};
779                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
780
781                 match bg_processor.stop() {
782                         Ok(_) => panic!("Expected error persisting scorer"),
783                         Err(e) => {
784                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
785                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
786                         },
787                 }
788         }
789
790         #[test]
791         fn test_background_event_handling() {
792                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
793                 let channel_value = 100000;
794                 let data_dir = nodes[0].persister.get_data_dir();
795                 let persister = Arc::new(Persister::new(data_dir.clone()));
796
797                 // Set up a background event handler for FundingGenerationReady events.
798                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
799                 let event_handler = move |event: &Event| {
800                         sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
801                 };
802                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
803
804                 // Open a channel and check that the FundingGenerationReady event was handled.
805                 begin_open_channel!(nodes[0], nodes[1], channel_value);
806                 let (temporary_channel_id, funding_tx) = receiver
807                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
808                         .expect("FundingGenerationReady not handled within deadline");
809                 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
810
811                 // Confirm the funding transaction.
812                 confirm_transaction(&mut nodes[0], &funding_tx);
813                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
814                 confirm_transaction(&mut nodes[1], &funding_tx);
815                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
816                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
817                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
818                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
819                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
820
821                 assert!(bg_processor.stop().is_ok());
822
823                 // Set up a background event handler for SpendableOutputs events.
824                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
825                 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
826                 let persister = Arc::new(Persister::new(data_dir));
827                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
828
829                 // Force close the channel and check that the SpendableOutputs event was handled.
830                 nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
831                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
832                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
833                 let event = receiver
834                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
835                         .expect("SpendableOutputs not handled within deadline");
836                 match event {
837                         Event::SpendableOutputs { .. } => {},
838                         Event::ChannelClosed { .. } => {},
839                         _ => panic!("Unexpected event: {:?}", event),
840                 }
841
842                 assert!(bg_processor.stop().is_ok());
843         }
844
845         #[test]
846         fn test_scorer_persistence() {
847                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
848                 let data_dir = nodes[0].persister.get_data_dir();
849                 let persister = Arc::new(Persister::new(data_dir));
850                 let event_handler = |_: &_| {};
851                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
852
853                 loop {
854                         let log_entries = nodes[0].logger.lines.lock().unwrap();
855                         let expected_log = "Persisting scorer".to_string();
856                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
857                                 break
858                         }
859                 }
860
861                 assert!(bg_processor.stop().is_ok());
862         }
863
864         #[test]
865         fn test_not_pruning_network_graph_until_graph_sync_completion() {
866                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
867                 let data_dir = nodes[0].persister.get_data_dir();
868                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
869                 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
870                 let network_graph = nodes[0].network_graph.clone();
871                 let rapid_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
872                 let features = ChannelFeatures::empty();
873                 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
874                         .expect("Failed to update channel from partial announcement");
875                 let original_graph_description = network_graph.to_string();
876                 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
877                 assert_eq!(network_graph.read_only().channels().len(), 1);
878
879                 let event_handler = |_: &_| {};
880                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), Some(rapid_sync.clone()));
881
882                 loop {
883                         let log_entries = nodes[0].logger.lines.lock().unwrap();
884                         let expected_log_a = "Assessing prunability of network graph".to_string();
885                         let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
886                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
887                                 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
888                                 break
889                         }
890                 }
891
892                 let initialization_input = vec![
893                         76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
894                         79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
895                         0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
896                         187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
897                         157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
898                         88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
899                         204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
900                         181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
901                         110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
902                         76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
903                         226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
904                         0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
905                         0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
906                 ];
907                 rapid_sync.update_network_graph(&initialization_input[..]).unwrap();
908
909                 // this should have added two channels
910                 assert_eq!(network_graph.read_only().channels().len(), 3);
911
912                 let _ = receiver
913                         .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
914                         .expect("Network graph not pruned within deadline");
915
916                 background_processor.stop().unwrap();
917
918                 // all channels should now be pruned
919                 assert_eq!(network_graph.read_only().channels().len(), 0);
920         }
921
922         #[test]
923         fn test_invoice_payer() {
924                 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
925                 let random_seed_bytes = keys_manager.get_secure_random_bytes();
926                 let nodes = create_nodes(2, "test_invoice_payer".to_string());
927
928                 // Initiate the background processors to watch each node.
929                 let data_dir = nodes[0].persister.get_data_dir();
930                 let persister = Arc::new(Persister::new(data_dir));
931                 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
932                 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
933                 let event_handler = Arc::clone(&invoice_payer);
934                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()), nodes[0].rapid_gossip_sync.clone());
935                 assert!(bg_processor.stop().is_ok());
936         }
937 }