Move Scorer requirement away from Router trait
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
8
9 #![deny(missing_docs)]
10 #![deny(unsafe_code)]
11
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
13
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
16
17 use lightning::chain;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::{Sign, KeysInterface};
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::scoring::WriteableScore;
26 use lightning::util::events::{Event, EventHandler, EventsProvider};
27 use lightning::util::logger::Logger;
28 use lightning::util::persist::Persister;
29 use lightning_rapid_gossip_sync::RapidGossipSync;
30 use std::sync::Arc;
31 use std::sync::atomic::{AtomicBool, Ordering};
32 use std::thread;
33 use std::thread::JoinHandle;
34 use std::time::{Duration, Instant};
35 use std::ops::Deref;
36
37 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
38 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
39 /// responsibilities are:
40 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
41 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
42 ///   writing it to disk/backups by invoking the callback given to it at startup.
43 ///   [`ChannelManager`] persistence should be done in the background.
44 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
45 ///   at the appropriate intervals.
46 /// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`GossipSync`] with a [`NetworkGraph`]
47 ///   is provided to [`BackgroundProcessor::start`]).
48 ///
49 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
50 /// upon as doing so may result in high latency.
51 ///
52 /// # Note
53 ///
54 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
55 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
56 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
57 /// unilateral chain closure fees are at risk.
58 ///
59 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
60 /// [`Event`]: lightning::util::events::Event
61 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
62 pub struct BackgroundProcessor {
63         stop_thread: Arc<AtomicBool>,
64         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
65 }
66
67 #[cfg(not(test))]
68 const FRESHNESS_TIMER: u64 = 60;
69 #[cfg(test)]
70 const FRESHNESS_TIMER: u64 = 1;
71
72 #[cfg(all(not(test), not(debug_assertions)))]
73 const PING_TIMER: u64 = 10;
74 /// Signature operations take a lot longer without compiler optimisations.
75 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
76 /// timeout is reached.
77 #[cfg(all(not(test), debug_assertions))]
78 const PING_TIMER: u64 = 30;
79 #[cfg(test)]
80 const PING_TIMER: u64 = 1;
81
82 /// Prune the network graph of stale entries hourly.
83 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
84
85 #[cfg(not(test))]
86 const SCORER_PERSIST_TIMER: u64 = 30;
87 #[cfg(test)]
88 const SCORER_PERSIST_TIMER: u64 = 1;
89
90 #[cfg(not(test))]
91 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
92 #[cfg(test)]
93 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
94
95 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
96 pub enum GossipSync<
97         P: Deref<Target = P2PGossipSync<G, A, L>>,
98         R: Deref<Target = RapidGossipSync<G, L>>,
99         G: Deref<Target = NetworkGraph<L>>,
100         A: Deref,
101         L: Deref,
102 >
103 where A::Target: chain::Access, L::Target: Logger {
104         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
105         P2P(P),
106         /// Rapid gossip sync from a trusted server.
107         Rapid(R),
108         /// No gossip sync.
109         None,
110 }
111
112 impl<
113         P: Deref<Target = P2PGossipSync<G, A, L>>,
114         R: Deref<Target = RapidGossipSync<G, L>>,
115         G: Deref<Target = NetworkGraph<L>>,
116         A: Deref,
117         L: Deref,
118 > GossipSync<P, R, G, A, L>
119 where A::Target: chain::Access, L::Target: Logger {
120         fn network_graph(&self) -> Option<&G> {
121                 match self {
122                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
123                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
124                         GossipSync::None => None,
125                 }
126         }
127
128         fn prunable_network_graph(&self) -> Option<&G> {
129                 match self {
130                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
131                         GossipSync::Rapid(gossip_sync) => {
132                                 if gossip_sync.is_initial_sync_complete() {
133                                         Some(gossip_sync.network_graph())
134                                 } else {
135                                         None
136                                 }
137                         },
138                         GossipSync::None => None,
139                 }
140         }
141 }
142
143 /// (C-not exported) as the bindings concretize everything and have constructors for us
144 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
145         GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
146 where
147         A::Target: chain::Access,
148         L::Target: Logger,
149 {
150         /// Initializes a new [`GossipSync::P2P`] variant.
151         pub fn p2p(gossip_sync: P) -> Self {
152                 GossipSync::P2P(gossip_sync)
153         }
154 }
155
156 /// (C-not exported) as the bindings concretize everything and have constructors for us
157 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
158         GossipSync<
159                 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
160                 R,
161                 G,
162                 &'a (dyn chain::Access + Send + Sync),
163                 L,
164         >
165 where
166         L::Target: Logger,
167 {
168         /// Initializes a new [`GossipSync::Rapid`] variant.
169         pub fn rapid(gossip_sync: R) -> Self {
170                 GossipSync::Rapid(gossip_sync)
171         }
172 }
173
174 /// (C-not exported) as the bindings concretize everything and have constructors for us
175 impl<'a, L: Deref>
176         GossipSync<
177                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
178                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
179                 &'a NetworkGraph<L>,
180                 &'a (dyn chain::Access + Send + Sync),
181                 L,
182         >
183 where
184         L::Target: Logger,
185 {
186         /// Initializes a new [`GossipSync::None`] variant.
187         pub fn none() -> Self {
188                 GossipSync::None
189         }
190 }
191
192 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
193 struct DecoratingEventHandler<
194         'a,
195         E: EventHandler,
196         PGS: Deref<Target = P2PGossipSync<G, A, L>>,
197         RGS: Deref<Target = RapidGossipSync<G, L>>,
198         G: Deref<Target = NetworkGraph<L>>,
199         A: Deref,
200         L: Deref,
201 >
202 where A::Target: chain::Access, L::Target: Logger {
203         event_handler: E,
204         gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
205 }
206
207 impl<
208         'a,
209         E: EventHandler,
210         PGS: Deref<Target = P2PGossipSync<G, A, L>>,
211         RGS: Deref<Target = RapidGossipSync<G, L>>,
212         G: Deref<Target = NetworkGraph<L>>,
213         A: Deref,
214         L: Deref,
215 > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
216 where A::Target: chain::Access, L::Target: Logger {
217         fn handle_event(&self, event: &Event) {
218                 if let Some(network_graph) = self.gossip_sync.network_graph() {
219                         network_graph.handle_event(event);
220                 }
221                 self.event_handler.handle_event(event);
222         }
223 }
224
225 impl BackgroundProcessor {
226         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
227         /// documentation].
228         ///
229         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
230         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
231         /// either [`join`] or [`stop`].
232         ///
233         /// # Data Persistence
234         ///
235         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
236         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
237         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
238         /// provided implementation.
239         ///
240         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
241         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
242         /// See the `lightning-persister` crate for LDK's provided implementation.
243         ///
244         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
245         /// error or call [`join`] and handle any error that may arise. For the latter case,
246         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
247         ///
248         /// # Event Handling
249         ///
250         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
251         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
252         /// functionality implemented by other handlers.
253         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
254         ///
255         /// # Rapid Gossip Sync
256         ///
257         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
258         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
259         /// until the [`RapidGossipSync`] instance completes its first sync.
260         ///
261         /// [top-level documentation]: BackgroundProcessor
262         /// [`join`]: Self::join
263         /// [`stop`]: Self::stop
264         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
265         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
266         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
267         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
268         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
269         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
270         pub fn start<
271                 'a,
272                 Signer: 'static + Sign,
273                 CA: 'static + Deref + Send + Sync,
274                 CF: 'static + Deref + Send + Sync,
275                 CW: 'static + Deref + Send + Sync,
276                 T: 'static + Deref + Send + Sync,
277                 K: 'static + Deref + Send + Sync,
278                 F: 'static + Deref + Send + Sync,
279                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
280                 L: 'static + Deref + Send + Sync,
281                 P: 'static + Deref + Send + Sync,
282                 Descriptor: 'static + SocketDescriptor + Send + Sync,
283                 CMH: 'static + Deref + Send + Sync,
284                 OMH: 'static + Deref + Send + Sync,
285                 RMH: 'static + Deref + Send + Sync,
286                 EH: 'static + EventHandler + Send,
287                 PS: 'static + Deref + Send,
288                 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
289                 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
290                 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
291                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
292                 UMH: 'static + Deref + Send + Sync,
293                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
294                 S: 'static + Deref<Target = SC> + Send + Sync,
295                 SC: WriteableScore<'a>,
296         >(
297                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
298                 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
299         ) -> Self
300         where
301                 CA::Target: 'static + chain::Access,
302                 CF::Target: 'static + chain::Filter,
303                 CW::Target: 'static + chain::Watch<Signer>,
304                 T::Target: 'static + BroadcasterInterface,
305                 K::Target: 'static + KeysInterface<Signer = Signer>,
306                 F::Target: 'static + FeeEstimator,
307                 L::Target: 'static + Logger,
308                 P::Target: 'static + Persist<Signer>,
309                 CMH::Target: 'static + ChannelMessageHandler,
310                 OMH::Target: 'static + OnionMessageHandler,
311                 RMH::Target: 'static + RoutingMessageHandler,
312                 UMH::Target: 'static + CustomMessageHandler,
313                 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
314         {
315                 let stop_thread = Arc::new(AtomicBool::new(false));
316                 let stop_thread_clone = stop_thread.clone();
317                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
318                         let event_handler = DecoratingEventHandler {
319                                 event_handler,
320                                 gossip_sync: &gossip_sync,
321                         };
322
323                         log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
324                         channel_manager.timer_tick_occurred();
325
326                         let mut last_freshness_call = Instant::now();
327                         let mut last_ping_call = Instant::now();
328                         let mut last_prune_call = Instant::now();
329                         let mut last_scorer_persist_call = Instant::now();
330                         let mut have_pruned = false;
331
332                         loop {
333                                 channel_manager.process_pending_events(&event_handler);
334                                 chain_monitor.process_pending_events(&event_handler);
335
336                                 // Note that the PeerManager::process_events may block on ChannelManager's locks,
337                                 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
338                                 // we want to ensure we get into `persist_manager` as quickly as we can, especially
339                                 // without running the normal event processing above and handing events to users.
340                                 //
341                                 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
342                                 // processing a message effectively at any point during this loop. In order to
343                                 // minimize the time between such processing completing and persisting the updated
344                                 // ChannelManager, we want to minimize methods blocking on a ChannelManager
345                                 // generally, and as a fallback place such blocking only immediately before
346                                 // persistence.
347                                 peer_manager.process_events();
348
349                                 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
350                                 // see `await_start`'s use below.
351                                 let await_start = Instant::now();
352                                 let updates_available =
353                                         channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
354                                 let await_time = await_start.elapsed();
355
356                                 if updates_available {
357                                         log_trace!(logger, "Persisting ChannelManager...");
358                                         persister.persist_manager(&*channel_manager)?;
359                                         log_trace!(logger, "Done persisting ChannelManager.");
360                                 }
361                                 // Exit the loop if the background processor was requested to stop.
362                                 if stop_thread.load(Ordering::Acquire) == true {
363                                         log_trace!(logger, "Terminating background processor.");
364                                         break;
365                                 }
366                                 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
367                                         log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
368                                         channel_manager.timer_tick_occurred();
369                                         last_freshness_call = Instant::now();
370                                 }
371                                 if await_time > Duration::from_secs(1) {
372                                         // On various platforms, we may be starved of CPU cycles for several reasons.
373                                         // E.g. on iOS, if we've been in the background, we will be entirely paused.
374                                         // Similarly, if we're on a desktop platform and the device has been asleep, we
375                                         // may not get any cycles.
376                                         // We detect this by checking if our max-100ms-sleep, above, ran longer than a
377                                         // full second, at which point we assume sockets may have been killed (they
378                                         // appear to be at least on some platforms, even if it has only been a second).
379                                         // Note that we have to take care to not get here just because user event
380                                         // processing was slow at the top of the loop. For example, the sample client
381                                         // may call Bitcoin Core RPCs during event handling, which very often takes
382                                         // more than a handful of seconds to complete, and shouldn't disconnect all our
383                                         // peers.
384                                         log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
385                                         peer_manager.disconnect_all_peers();
386                                         last_ping_call = Instant::now();
387                                 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
388                                         log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
389                                         peer_manager.timer_tick_occurred();
390                                         last_ping_call = Instant::now();
391                                 }
392
393                                 // Note that we want to run a graph prune once not long after startup before
394                                 // falling back to our usual hourly prunes. This avoids short-lived clients never
395                                 // pruning their network graph. We run once 60 seconds after startup before
396                                 // continuing our normal cadence.
397                                 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
398                                         // The network graph must not be pruned while rapid sync completion is pending
399                                         log_trace!(logger, "Assessing prunability of network graph");
400                                         if let Some(network_graph) = gossip_sync.prunable_network_graph() {
401                                                 network_graph.remove_stale_channels();
402
403                                                 if let Err(e) = persister.persist_graph(network_graph) {
404                                                         log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
405                                                 }
406
407                                                 last_prune_call = Instant::now();
408                                                 have_pruned = true;
409                                         } else {
410                                                 log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
411                                         }
412                                 }
413
414                                 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
415                                         if let Some(ref scorer) = scorer {
416                                                 log_trace!(logger, "Persisting scorer");
417                                                 if let Err(e) = persister.persist_scorer(&scorer) {
418                                                         log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
419                                                 }
420                                         }
421                                         last_scorer_persist_call = Instant::now();
422                                 }
423                         }
424
425                         // After we exit, ensure we persist the ChannelManager one final time - this avoids
426                         // some races where users quit while channel updates were in-flight, with
427                         // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
428                         persister.persist_manager(&*channel_manager)?;
429
430                         // Persist Scorer on exit
431                         if let Some(ref scorer) = scorer {
432                                 persister.persist_scorer(&scorer)?;
433                         }
434
435                         // Persist NetworkGraph on exit
436                         if let Some(network_graph) = gossip_sync.network_graph() {
437                                 persister.persist_graph(network_graph)?;
438                         }
439
440                         Ok(())
441                 });
442                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
443         }
444
445         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
446         /// [`ChannelManager`].
447         ///
448         /// # Panics
449         ///
450         /// This function panics if the background thread has panicked such as while persisting or
451         /// handling events.
452         ///
453         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
454         pub fn join(mut self) -> Result<(), std::io::Error> {
455                 assert!(self.thread_handle.is_some());
456                 self.join_thread()
457         }
458
459         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
460         /// [`ChannelManager`].
461         ///
462         /// # Panics
463         ///
464         /// This function panics if the background thread has panicked such as while persisting or
465         /// handling events.
466         ///
467         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
468         pub fn stop(mut self) -> Result<(), std::io::Error> {
469                 assert!(self.thread_handle.is_some());
470                 self.stop_and_join_thread()
471         }
472
473         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
474                 self.stop_thread.store(true, Ordering::Release);
475                 self.join_thread()
476         }
477
478         fn join_thread(&mut self) -> Result<(), std::io::Error> {
479                 match self.thread_handle.take() {
480                         Some(handle) => handle.join().unwrap(),
481                         None => Ok(()),
482                 }
483         }
484 }
485
486 impl Drop for BackgroundProcessor {
487         fn drop(&mut self) {
488                 self.stop_and_join_thread().unwrap();
489         }
490 }
491
492 #[cfg(test)]
493 mod tests {
494         use bitcoin::blockdata::block::BlockHeader;
495         use bitcoin::blockdata::constants::genesis_block;
496         use bitcoin::blockdata::locktime::PackedLockTime;
497         use bitcoin::blockdata::transaction::{Transaction, TxOut};
498         use bitcoin::network::constants::Network;
499         use lightning::chain::{BestBlock, Confirm, chainmonitor};
500         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
501         use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
502         use lightning::chain::transaction::OutPoint;
503         use lightning::get_event_msg;
504         use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
505         use lightning::ln::features::{ChannelFeatures, InitFeatures};
506         use lightning::ln::msgs::{ChannelMessageHandler, Init};
507         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
508         use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
509         use lightning::util::config::UserConfig;
510         use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
511         use lightning::util::ser::Writeable;
512         use lightning::util::test_utils;
513         use lightning::util::persist::KVStorePersister;
514         use lightning_invoice::payment::{InvoicePayer, Retry};
515         use lightning_invoice::utils::DefaultRouter;
516         use lightning_persister::FilesystemPersister;
517         use std::fs;
518         use std::path::PathBuf;
519         use std::sync::{Arc, Mutex};
520         use std::sync::mpsc::SyncSender;
521         use std::time::Duration;
522         use bitcoin::hashes::Hash;
523         use bitcoin::TxMerkleNode;
524         use lightning::routing::scoring::{FixedPenaltyScorer};
525         use lightning_rapid_gossip_sync::RapidGossipSync;
526         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
527
528         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
529
530         #[derive(Clone, Eq, Hash, PartialEq)]
531         struct TestDescriptor{}
532         impl SocketDescriptor for TestDescriptor {
533                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
534                         0
535                 }
536
537                 fn disconnect_socket(&mut self) {}
538         }
539
540         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
541
542         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
543         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
544
545         struct Node {
546                 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
547                 p2p_gossip_sync: PGS,
548                 rapid_gossip_sync: RGS,
549                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
550                 chain_monitor: Arc<ChainMonitor>,
551                 persister: Arc<FilesystemPersister>,
552                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
553                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
554                 logger: Arc<test_utils::TestLogger>,
555                 best_block: BestBlock,
556                 scorer: Arc<Mutex<FixedPenaltyScorer>>,
557         }
558
559         impl Node {
560                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
561                         GossipSync::P2P(self.p2p_gossip_sync.clone())
562                 }
563
564                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
565                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
566                 }
567
568                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
569                         GossipSync::None
570                 }
571         }
572
573         impl Drop for Node {
574                 fn drop(&mut self) {
575                         let data_dir = self.persister.get_data_dir();
576                         match fs::remove_dir_all(data_dir.clone()) {
577                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
578                                 _ => {}
579                         }
580                 }
581         }
582
583         struct Persister {
584                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
585                 graph_persistence_notifier: Option<SyncSender<()>>,
586                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
587                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
588                 filesystem_persister: FilesystemPersister,
589         }
590
591         impl Persister {
592                 fn new(data_dir: String) -> Self {
593                         let filesystem_persister = FilesystemPersister::new(data_dir.clone());
594                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
595                 }
596
597                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
598                         Self { graph_error: Some((error, message)), ..self }
599                 }
600
601                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
602                         Self { graph_persistence_notifier: Some(sender), ..self }
603                 }
604
605                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
606                         Self { manager_error: Some((error, message)), ..self }
607                 }
608
609                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
610                         Self { scorer_error: Some((error, message)), ..self }
611                 }
612         }
613
614         impl KVStorePersister for Persister {
615                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
616                         if key == "manager" {
617                                 if let Some((error, message)) = self.manager_error {
618                                         return Err(std::io::Error::new(error, message))
619                                 }
620                         }
621
622                         if key == "network_graph" {
623                                 if let Some(sender) = &self.graph_persistence_notifier {
624                                         sender.send(()).unwrap();
625                                 };
626
627                                 if let Some((error, message)) = self.graph_error {
628                                         return Err(std::io::Error::new(error, message))
629                                 }
630                         }
631
632                         if key == "scorer" {
633                                 if let Some((error, message)) = self.scorer_error {
634                                         return Err(std::io::Error::new(error, message))
635                                 }
636                         }
637
638                         self.filesystem_persister.persist(key, object)
639                 }
640         }
641
642         fn get_full_filepath(filepath: String, filename: String) -> String {
643                 let mut path = PathBuf::from(filepath);
644                 path.push(filename);
645                 path.to_str().unwrap().to_string()
646         }
647
648         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
649                 let mut nodes = Vec::new();
650                 for i in 0..num_nodes {
651                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
652                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
653                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
654                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
655                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
656                         let seed = [i as u8; 32];
657                         let network = Network::Testnet;
658                         let genesis_block = genesis_block(network);
659                         let now = Duration::from_secs(genesis_block.header.time as u64);
660                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
661                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
662                         let best_block = BestBlock::from_genesis(network);
663                         let params = ChainParameters { network, best_block };
664                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
665                         let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
666                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
667                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
668                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
669                         let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
670                         let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
671                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
672                         nodes.push(node);
673                 }
674
675                 for i in 0..num_nodes {
676                         for j in (i+1)..num_nodes {
677                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
678                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
679                         }
680                 }
681
682                 nodes
683         }
684
685         macro_rules! open_channel {
686                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
687                         begin_open_channel!($node_a, $node_b, $channel_value);
688                         let events = $node_a.node.get_and_clear_pending_events();
689                         assert_eq!(events.len(), 1);
690                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
691                         end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
692                         tx
693                 }}
694         }
695
696         macro_rules! begin_open_channel {
697                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
698                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
699                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
700                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
701                 }}
702         }
703
704         macro_rules! handle_funding_generation_ready {
705                 ($event: expr, $channel_value: expr) => {{
706                         match $event {
707                                 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
708                                         assert_eq!(channel_value_satoshis, $channel_value);
709                                         assert_eq!(user_channel_id, 42);
710
711                                         let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
712                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
713                                         }]};
714                                         (temporary_channel_id, tx)
715                                 },
716                                 _ => panic!("Unexpected event"),
717                         }
718                 }}
719         }
720
721         macro_rules! end_open_channel {
722                 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
723                         $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
724                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
725                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
726                 }}
727         }
728
729         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
730                 for i in 1..=depth {
731                         let prev_blockhash = node.best_block.block_hash();
732                         let height = node.best_block.height() + 1;
733                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
734                         let txdata = vec![(0, tx)];
735                         node.best_block = BestBlock::new(header.block_hash(), height);
736                         match i {
737                                 1 => {
738                                         node.node.transactions_confirmed(&header, &txdata, height);
739                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
740                                 },
741                                 x if x == depth => {
742                                         node.node.best_block_updated(&header, height);
743                                         node.chain_monitor.best_block_updated(&header, height);
744                                 },
745                                 _ => {},
746                         }
747                 }
748         }
749         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
750                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
751         }
752
753         #[test]
754         fn test_background_processor() {
755                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
756                 // updates. Also test that when new updates are available, the manager signals that it needs
757                 // re-persistence and is successfully re-persisted.
758                 let nodes = create_nodes(2, "test_background_processor".to_string());
759
760                 // Go through the channel creation process so that each node has something to persist. Since
761                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
762                 // avoid a race with processing events.
763                 let tx = open_channel!(nodes[0], nodes[1], 100000);
764
765                 // Initiate the background processors to watch each node.
766                 let data_dir = nodes[0].persister.get_data_dir();
767                 let persister = Arc::new(Persister::new(data_dir));
768                 let event_handler = |_: &_| {};
769                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
770
771                 macro_rules! check_persisted_data {
772                         ($node: expr, $filepath: expr) => {
773                                 let mut expected_bytes = Vec::new();
774                                 loop {
775                                         expected_bytes.clear();
776                                         match $node.write(&mut expected_bytes) {
777                                                 Ok(()) => {
778                                                         match std::fs::read($filepath) {
779                                                                 Ok(bytes) => {
780                                                                         if bytes == expected_bytes {
781                                                                                 break
782                                                                         } else {
783                                                                                 continue
784                                                                         }
785                                                                 },
786                                                                 Err(_) => continue
787                                                         }
788                                                 },
789                                                 Err(e) => panic!("Unexpected error: {}", e)
790                                         }
791                                 }
792                         }
793                 }
794
795                 // Check that the initial channel manager data is persisted as expected.
796                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
797                 check_persisted_data!(nodes[0].node, filepath.clone());
798
799                 loop {
800                         if !nodes[0].node.get_persistence_condvar_value() { break }
801                 }
802
803                 // Force-close the channel.
804                 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
805
806                 // Check that the force-close updates are persisted.
807                 check_persisted_data!(nodes[0].node, filepath.clone());
808                 loop {
809                         if !nodes[0].node.get_persistence_condvar_value() { break }
810                 }
811
812                 // Check network graph is persisted
813                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
814                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
815
816                 // Check scorer is persisted
817                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
818                 check_persisted_data!(nodes[0].scorer, filepath.clone());
819
820                 assert!(bg_processor.stop().is_ok());
821         }
822
823         #[test]
824         fn test_timer_tick_called() {
825                 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
826                 // `FRESHNESS_TIMER`.
827                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
828                 let data_dir = nodes[0].persister.get_data_dir();
829                 let persister = Arc::new(Persister::new(data_dir));
830                 let event_handler = |_: &_| {};
831                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
832                 loop {
833                         let log_entries = nodes[0].logger.lines.lock().unwrap();
834                         let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
835                         let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
836                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
837                                         log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
838                                 break
839                         }
840                 }
841
842                 assert!(bg_processor.stop().is_ok());
843         }
844
845         #[test]
846         fn test_channel_manager_persist_error() {
847                 // Test that if we encounter an error during manager persistence, the thread panics.
848                 let nodes = create_nodes(2, "test_persist_error".to_string());
849                 open_channel!(nodes[0], nodes[1], 100000);
850
851                 let data_dir = nodes[0].persister.get_data_dir();
852                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
853                 let event_handler = |_: &_| {};
854                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
855                 match bg_processor.join() {
856                         Ok(_) => panic!("Expected error persisting manager"),
857                         Err(e) => {
858                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
859                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
860                         },
861                 }
862         }
863
864         #[test]
865         fn test_network_graph_persist_error() {
866                 // Test that if we encounter an error during network graph persistence, an error gets returned.
867                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
868                 let data_dir = nodes[0].persister.get_data_dir();
869                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
870                 let event_handler = |_: &_| {};
871                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
872
873                 match bg_processor.stop() {
874                         Ok(_) => panic!("Expected error persisting network graph"),
875                         Err(e) => {
876                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
877                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
878                         },
879                 }
880         }
881
882         #[test]
883         fn test_scorer_persist_error() {
884                 // Test that if we encounter an error during scorer persistence, an error gets returned.
885                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
886                 let data_dir = nodes[0].persister.get_data_dir();
887                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
888                 let event_handler = |_: &_| {};
889                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
890
891                 match bg_processor.stop() {
892                         Ok(_) => panic!("Expected error persisting scorer"),
893                         Err(e) => {
894                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
895                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
896                         },
897                 }
898         }
899
900         #[test]
901         fn test_background_event_handling() {
902                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
903                 let channel_value = 100000;
904                 let data_dir = nodes[0].persister.get_data_dir();
905                 let persister = Arc::new(Persister::new(data_dir.clone()));
906
907                 // Set up a background event handler for FundingGenerationReady events.
908                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
909                 let event_handler = move |event: &Event| {
910                         sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
911                 };
912                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
913
914                 // Open a channel and check that the FundingGenerationReady event was handled.
915                 begin_open_channel!(nodes[0], nodes[1], channel_value);
916                 let (temporary_channel_id, funding_tx) = receiver
917                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
918                         .expect("FundingGenerationReady not handled within deadline");
919                 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
920
921                 // Confirm the funding transaction.
922                 confirm_transaction(&mut nodes[0], &funding_tx);
923                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
924                 confirm_transaction(&mut nodes[1], &funding_tx);
925                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
926                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
927                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
928                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
929                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
930
931                 assert!(bg_processor.stop().is_ok());
932
933                 // Set up a background event handler for SpendableOutputs events.
934                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
935                 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
936                 let persister = Arc::new(Persister::new(data_dir));
937                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
938
939                 // Force close the channel and check that the SpendableOutputs event was handled.
940                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
941                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
942                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
943                 let event = receiver
944                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
945                         .expect("SpendableOutputs not handled within deadline");
946                 match event {
947                         Event::SpendableOutputs { .. } => {},
948                         Event::ChannelClosed { .. } => {},
949                         _ => panic!("Unexpected event: {:?}", event),
950                 }
951
952                 assert!(bg_processor.stop().is_ok());
953         }
954
955         #[test]
956         fn test_scorer_persistence() {
957                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
958                 let data_dir = nodes[0].persister.get_data_dir();
959                 let persister = Arc::new(Persister::new(data_dir));
960                 let event_handler = |_: &_| {};
961                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
962
963                 loop {
964                         let log_entries = nodes[0].logger.lines.lock().unwrap();
965                         let expected_log = "Persisting scorer".to_string();
966                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
967                                 break
968                         }
969                 }
970
971                 assert!(bg_processor.stop().is_ok());
972         }
973
974         #[test]
975         fn test_not_pruning_network_graph_until_graph_sync_completion() {
976                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
977                 let data_dir = nodes[0].persister.get_data_dir();
978                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
979                 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
980                 let network_graph = nodes[0].network_graph.clone();
981                 let features = ChannelFeatures::empty();
982                 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
983                         .expect("Failed to update channel from partial announcement");
984                 let original_graph_description = network_graph.to_string();
985                 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
986                 assert_eq!(network_graph.read_only().channels().len(), 1);
987
988                 let event_handler = |_: &_| {};
989                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
990
991                 loop {
992                         let log_entries = nodes[0].logger.lines.lock().unwrap();
993                         let expected_log_a = "Assessing prunability of network graph".to_string();
994                         let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
995                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
996                                 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
997                                 break
998                         }
999                 }
1000
1001                 let initialization_input = vec![
1002                         76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1003                         79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1004                         0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1005                         187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1006                         157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1007                         88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1008                         204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1009                         181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1010                         110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1011                         76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1012                         226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1013                         0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1014                         0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1015                 ];
1016                 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1017
1018                 // this should have added two channels
1019                 assert_eq!(network_graph.read_only().channels().len(), 3);
1020
1021                 let _ = receiver
1022                         .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1023                         .expect("Network graph not pruned within deadline");
1024
1025                 background_processor.stop().unwrap();
1026
1027                 // all channels should now be pruned
1028                 assert_eq!(network_graph.read_only().channels().len(), 0);
1029         }
1030
1031         #[test]
1032         fn test_invoice_payer() {
1033                 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1034                 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1035                 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1036
1037                 // Initiate the background processors to watch each node.
1038                 let data_dir = nodes[0].persister.get_data_dir();
1039                 let persister = Arc::new(Persister::new(data_dir));
1040                 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
1041                 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
1042                 let event_handler = Arc::clone(&invoice_payer);
1043                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1044                 assert!(bg_processor.stop().is_ok());
1045         }
1046 }