Merge pull request #1826 from TheBlueMatt/2022-10-idempotency-err
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
8
9 #![deny(missing_docs)]
10 #![deny(unsafe_code)]
11
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
13
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
16
17 use lightning::chain;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::KeysInterface;
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::scoring::WriteableScore;
26 use lightning::util::events::{Event, EventHandler, EventsProvider};
27 use lightning::util::logger::Logger;
28 use lightning::util::persist::Persister;
29 use lightning_rapid_gossip_sync::RapidGossipSync;
30 use std::sync::Arc;
31 use std::sync::atomic::{AtomicBool, Ordering};
32 use std::thread;
33 use std::thread::JoinHandle;
34 use std::time::{Duration, Instant};
35 use std::ops::Deref;
36
37 #[cfg(feature = "futures")]
38 use futures_util::{select_biased, future::FutureExt};
39
40 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
41 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
42 /// responsibilities are:
43 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
44 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
45 ///   writing it to disk/backups by invoking the callback given to it at startup.
46 ///   [`ChannelManager`] persistence should be done in the background.
47 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
48 ///   at the appropriate intervals.
49 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
50 ///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
51 ///
52 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
53 /// upon as doing so may result in high latency.
54 ///
55 /// # Note
56 ///
57 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
58 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
59 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
60 /// unilateral chain closure fees are at risk.
61 ///
62 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
63 /// [`Event`]: lightning::util::events::Event
64 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
65 pub struct BackgroundProcessor {
66         stop_thread: Arc<AtomicBool>,
67         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
68 }
69
70 #[cfg(not(test))]
71 const FRESHNESS_TIMER: u64 = 60;
72 #[cfg(test)]
73 const FRESHNESS_TIMER: u64 = 1;
74
75 #[cfg(all(not(test), not(debug_assertions)))]
76 const PING_TIMER: u64 = 10;
77 /// Signature operations take a lot longer without compiler optimisations.
78 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
79 /// timeout is reached.
80 #[cfg(all(not(test), debug_assertions))]
81 const PING_TIMER: u64 = 30;
82 #[cfg(test)]
83 const PING_TIMER: u64 = 1;
84
85 /// Prune the network graph of stale entries hourly.
86 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
87
88 #[cfg(not(test))]
89 const SCORER_PERSIST_TIMER: u64 = 30;
90 #[cfg(test)]
91 const SCORER_PERSIST_TIMER: u64 = 1;
92
93 #[cfg(not(test))]
94 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
95 #[cfg(test)]
96 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
97
98 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
99 pub enum GossipSync<
100         P: Deref<Target = P2PGossipSync<G, A, L>>,
101         R: Deref<Target = RapidGossipSync<G, L>>,
102         G: Deref<Target = NetworkGraph<L>>,
103         A: Deref,
104         L: Deref,
105 >
106 where A::Target: chain::Access, L::Target: Logger {
107         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
108         P2P(P),
109         /// Rapid gossip sync from a trusted server.
110         Rapid(R),
111         /// No gossip sync.
112         None,
113 }
114
115 impl<
116         P: Deref<Target = P2PGossipSync<G, A, L>>,
117         R: Deref<Target = RapidGossipSync<G, L>>,
118         G: Deref<Target = NetworkGraph<L>>,
119         A: Deref,
120         L: Deref,
121 > GossipSync<P, R, G, A, L>
122 where A::Target: chain::Access, L::Target: Logger {
123         fn network_graph(&self) -> Option<&G> {
124                 match self {
125                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
126                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
127                         GossipSync::None => None,
128                 }
129         }
130
131         fn prunable_network_graph(&self) -> Option<&G> {
132                 match self {
133                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
134                         GossipSync::Rapid(gossip_sync) => {
135                                 if gossip_sync.is_initial_sync_complete() {
136                                         Some(gossip_sync.network_graph())
137                                 } else {
138                                         None
139                                 }
140                         },
141                         GossipSync::None => None,
142                 }
143         }
144 }
145
146 /// (C-not exported) as the bindings concretize everything and have constructors for us
147 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
148         GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
149 where
150         A::Target: chain::Access,
151         L::Target: Logger,
152 {
153         /// Initializes a new [`GossipSync::P2P`] variant.
154         pub fn p2p(gossip_sync: P) -> Self {
155                 GossipSync::P2P(gossip_sync)
156         }
157 }
158
159 /// (C-not exported) as the bindings concretize everything and have constructors for us
160 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
161         GossipSync<
162                 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
163                 R,
164                 G,
165                 &'a (dyn chain::Access + Send + Sync),
166                 L,
167         >
168 where
169         L::Target: Logger,
170 {
171         /// Initializes a new [`GossipSync::Rapid`] variant.
172         pub fn rapid(gossip_sync: R) -> Self {
173                 GossipSync::Rapid(gossip_sync)
174         }
175 }
176
177 /// (C-not exported) as the bindings concretize everything and have constructors for us
178 impl<'a, L: Deref>
179         GossipSync<
180                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
181                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
182                 &'a NetworkGraph<L>,
183                 &'a (dyn chain::Access + Send + Sync),
184                 L,
185         >
186 where
187         L::Target: Logger,
188 {
189         /// Initializes a new [`GossipSync::None`] variant.
190         pub fn none() -> Self {
191                 GossipSync::None
192         }
193 }
194
195 fn handle_network_graph_update<L: Deref>(
196         network_graph: &NetworkGraph<L>, event: &Event
197 ) where L::Target: Logger {
198         if let Event::PaymentPathFailed { ref network_update, .. } = event {
199                 if let Some(network_update) = network_update {
200                         network_graph.handle_network_update(&network_update);
201                 }
202         }
203 }
204
205 macro_rules! define_run_body {
206         ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
207          $channel_manager: ident, $process_channel_manager_events: expr,
208          $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
209          $loop_exit_check: expr, $await: expr)
210         => { {
211                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
212                 $channel_manager.timer_tick_occurred();
213
214                 let mut last_freshness_call = Instant::now();
215                 let mut last_ping_call = Instant::now();
216                 let mut last_prune_call = Instant::now();
217                 let mut last_scorer_persist_call = Instant::now();
218                 let mut have_pruned = false;
219
220                 loop {
221                         $process_channel_manager_events;
222                         $process_chain_monitor_events;
223
224                         // Note that the PeerManager::process_events may block on ChannelManager's locks,
225                         // hence it comes last here. When the ChannelManager finishes whatever it's doing,
226                         // we want to ensure we get into `persist_manager` as quickly as we can, especially
227                         // without running the normal event processing above and handing events to users.
228                         //
229                         // Specifically, on an *extremely* slow machine, we may see ChannelManager start
230                         // processing a message effectively at any point during this loop. In order to
231                         // minimize the time between such processing completing and persisting the updated
232                         // ChannelManager, we want to minimize methods blocking on a ChannelManager
233                         // generally, and as a fallback place such blocking only immediately before
234                         // persistence.
235                         $peer_manager.process_events();
236
237                         // We wait up to 100ms, but track how long it takes to detect being put to sleep,
238                         // see `await_start`'s use below.
239                         let await_start = Instant::now();
240                         let updates_available = $await;
241                         let await_time = await_start.elapsed();
242
243                         if updates_available {
244                                 log_trace!($logger, "Persisting ChannelManager...");
245                                 $persister.persist_manager(&*$channel_manager)?;
246                                 log_trace!($logger, "Done persisting ChannelManager.");
247                         }
248                         // Exit the loop if the background processor was requested to stop.
249                         if $loop_exit_check {
250                                 log_trace!($logger, "Terminating background processor.");
251                                 break;
252                         }
253                         if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
254                                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
255                                 $channel_manager.timer_tick_occurred();
256                                 last_freshness_call = Instant::now();
257                         }
258                         if await_time > Duration::from_secs(1) {
259                                 // On various platforms, we may be starved of CPU cycles for several reasons.
260                                 // E.g. on iOS, if we've been in the background, we will be entirely paused.
261                                 // Similarly, if we're on a desktop platform and the device has been asleep, we
262                                 // may not get any cycles.
263                                 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
264                                 // full second, at which point we assume sockets may have been killed (they
265                                 // appear to be at least on some platforms, even if it has only been a second).
266                                 // Note that we have to take care to not get here just because user event
267                                 // processing was slow at the top of the loop. For example, the sample client
268                                 // may call Bitcoin Core RPCs during event handling, which very often takes
269                                 // more than a handful of seconds to complete, and shouldn't disconnect all our
270                                 // peers.
271                                 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
272                                 $peer_manager.disconnect_all_peers();
273                                 last_ping_call = Instant::now();
274                         } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
275                                 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
276                                 $peer_manager.timer_tick_occurred();
277                                 last_ping_call = Instant::now();
278                         }
279
280                         // Note that we want to run a graph prune once not long after startup before
281                         // falling back to our usual hourly prunes. This avoids short-lived clients never
282                         // pruning their network graph. We run once 60 seconds after startup before
283                         // continuing our normal cadence.
284                         if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
285                                 // The network graph must not be pruned while rapid sync completion is pending
286                                 log_trace!($logger, "Assessing prunability of network graph");
287                                 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
288                                         network_graph.remove_stale_channels_and_tracking();
289
290                                         if let Err(e) = $persister.persist_graph(network_graph) {
291                                                 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
292                                         }
293
294                                         last_prune_call = Instant::now();
295                                         have_pruned = true;
296                                 } else {
297                                         log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
298                                 }
299                         }
300
301                         if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
302                                 if let Some(ref scorer) = $scorer {
303                                         log_trace!($logger, "Persisting scorer");
304                                         if let Err(e) = $persister.persist_scorer(&scorer) {
305                                                 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
306                                         }
307                                 }
308                                 last_scorer_persist_call = Instant::now();
309                         }
310                 }
311
312                 // After we exit, ensure we persist the ChannelManager one final time - this avoids
313                 // some races where users quit while channel updates were in-flight, with
314                 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
315                 $persister.persist_manager(&*$channel_manager)?;
316
317                 // Persist Scorer on exit
318                 if let Some(ref scorer) = $scorer {
319                         $persister.persist_scorer(&scorer)?;
320                 }
321
322                 // Persist NetworkGraph on exit
323                 if let Some(network_graph) = $gossip_sync.network_graph() {
324                         $persister.persist_graph(network_graph)?;
325                 }
326
327                 Ok(())
328         } }
329 }
330
331 /// Processes background events in a future.
332 ///
333 /// `sleeper` should return a future which completes in the given amount of time and returns a
334 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
335 /// future which outputs true, the loop will exit and this function's future will complete.
336 ///
337 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
338 #[cfg(feature = "futures")]
339 pub async fn process_events_async<
340         'a,
341         CA: 'static + Deref + Send + Sync,
342         CF: 'static + Deref + Send + Sync,
343         CW: 'static + Deref + Send + Sync,
344         T: 'static + Deref + Send + Sync,
345         K: 'static + Deref + Send + Sync,
346         F: 'static + Deref + Send + Sync,
347         G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
348         L: 'static + Deref + Send + Sync,
349         P: 'static + Deref + Send + Sync,
350         Descriptor: 'static + SocketDescriptor + Send + Sync,
351         CMH: 'static + Deref + Send + Sync,
352         RMH: 'static + Deref + Send + Sync,
353         OMH: 'static + Deref + Send + Sync,
354         EventHandlerFuture: core::future::Future<Output = ()>,
355         EventHandler: Fn(Event) -> EventHandlerFuture,
356         PS: 'static + Deref + Send,
357         M: 'static + Deref<Target = ChainMonitor<<K::Target as KeysInterface>::Signer, CF, T, F, L, P>> + Send + Sync,
358         CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
359         PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
360         RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
361         UMH: 'static + Deref + Send + Sync,
362         PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
363         S: 'static + Deref<Target = SC> + Send + Sync,
364         SC: WriteableScore<'a>,
365         SleepFuture: core::future::Future<Output = bool>,
366         Sleeper: Fn(Duration) -> SleepFuture
367 >(
368         persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
369         gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
370         sleeper: Sleeper,
371 ) -> Result<(), std::io::Error>
372 where
373         CA::Target: 'static + chain::Access,
374         CF::Target: 'static + chain::Filter,
375         CW::Target: 'static + chain::Watch<<K::Target as KeysInterface>::Signer>,
376         T::Target: 'static + BroadcasterInterface,
377         K::Target: 'static + KeysInterface,
378         F::Target: 'static + FeeEstimator,
379         L::Target: 'static + Logger,
380         P::Target: 'static + Persist<<K::Target as KeysInterface>::Signer>,
381         CMH::Target: 'static + ChannelMessageHandler,
382         OMH::Target: 'static + OnionMessageHandler,
383         RMH::Target: 'static + RoutingMessageHandler,
384         UMH::Target: 'static + CustomMessageHandler,
385         PS::Target: 'static + Persister<'a, CW, T, K, F, L, SC>,
386 {
387         let mut should_break = true;
388         let async_event_handler = |event| {
389                 let network_graph = gossip_sync.network_graph();
390                 let event_handler = &event_handler;
391                 async move {
392                         if let Some(network_graph) = network_graph {
393                                 handle_network_graph_update(network_graph, &event)
394                         }
395                         event_handler(event).await;
396                 }
397         };
398         define_run_body!(persister,
399                 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
400                 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
401                 gossip_sync, peer_manager, logger, scorer, should_break, {
402                         select_biased! {
403                                 _ = channel_manager.get_persistable_update_future().fuse() => true,
404                                 exit = sleeper(Duration::from_millis(100)).fuse() => {
405                                         should_break = exit;
406                                         false
407                                 }
408                         }
409                 })
410 }
411
412 impl BackgroundProcessor {
413         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
414         /// documentation].
415         ///
416         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
417         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
418         /// either [`join`] or [`stop`].
419         ///
420         /// # Data Persistence
421         ///
422         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
423         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
424         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
425         /// provided implementation.
426         ///
427         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
428         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
429         /// See the `lightning-persister` crate for LDK's provided implementation.
430         ///
431         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
432         /// error or call [`join`] and handle any error that may arise. For the latter case,
433         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
434         ///
435         /// # Event Handling
436         ///
437         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
438         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
439         /// functionality implemented by other handlers.
440         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
441         ///
442         /// # Rapid Gossip Sync
443         ///
444         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
445         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
446         /// until the [`RapidGossipSync`] instance completes its first sync.
447         ///
448         /// [top-level documentation]: BackgroundProcessor
449         /// [`join`]: Self::join
450         /// [`stop`]: Self::stop
451         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
452         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
453         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
454         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
455         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
456         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
457         pub fn start<
458                 'a,
459                 CA: 'static + Deref + Send + Sync,
460                 CF: 'static + Deref + Send + Sync,
461                 CW: 'static + Deref + Send + Sync,
462                 T: 'static + Deref + Send + Sync,
463                 K: 'static + Deref + Send + Sync,
464                 F: 'static + Deref + Send + Sync,
465                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
466                 L: 'static + Deref + Send + Sync,
467                 P: 'static + Deref + Send + Sync,
468                 Descriptor: 'static + SocketDescriptor + Send + Sync,
469                 CMH: 'static + Deref + Send + Sync,
470                 OMH: 'static + Deref + Send + Sync,
471                 RMH: 'static + Deref + Send + Sync,
472                 EH: 'static + EventHandler + Send,
473                 PS: 'static + Deref + Send,
474                 M: 'static + Deref<Target = ChainMonitor<<K::Target as KeysInterface>::Signer, CF, T, F, L, P>> + Send + Sync,
475                 CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
476                 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
477                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
478                 UMH: 'static + Deref + Send + Sync,
479                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
480                 S: 'static + Deref<Target = SC> + Send + Sync,
481                 SC: WriteableScore<'a>,
482         >(
483                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
484                 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
485         ) -> Self
486         where
487                 CA::Target: 'static + chain::Access,
488                 CF::Target: 'static + chain::Filter,
489                 CW::Target: 'static + chain::Watch<<K::Target as KeysInterface>::Signer>,
490                 T::Target: 'static + BroadcasterInterface,
491                 K::Target: 'static + KeysInterface,
492                 F::Target: 'static + FeeEstimator,
493                 L::Target: 'static + Logger,
494                 P::Target: 'static + Persist<<K::Target as KeysInterface>::Signer>,
495                 CMH::Target: 'static + ChannelMessageHandler,
496                 OMH::Target: 'static + OnionMessageHandler,
497                 RMH::Target: 'static + RoutingMessageHandler,
498                 UMH::Target: 'static + CustomMessageHandler,
499                 PS::Target: 'static + Persister<'a, CW, T, K, F, L, SC>,
500         {
501                 let stop_thread = Arc::new(AtomicBool::new(false));
502                 let stop_thread_clone = stop_thread.clone();
503                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
504                         let event_handler = |event| {
505                                 let network_graph = gossip_sync.network_graph();
506                                 if let Some(network_graph) = network_graph {
507                                         handle_network_graph_update(network_graph, &event)
508                                 }
509                                 event_handler.handle_event(event);
510                         };
511                         define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
512                                 channel_manager, channel_manager.process_pending_events(&event_handler),
513                                 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
514                                 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
515                 });
516                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
517         }
518
519         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
520         /// [`ChannelManager`].
521         ///
522         /// # Panics
523         ///
524         /// This function panics if the background thread has panicked such as while persisting or
525         /// handling events.
526         ///
527         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
528         pub fn join(mut self) -> Result<(), std::io::Error> {
529                 assert!(self.thread_handle.is_some());
530                 self.join_thread()
531         }
532
533         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
534         /// [`ChannelManager`].
535         ///
536         /// # Panics
537         ///
538         /// This function panics if the background thread has panicked such as while persisting or
539         /// handling events.
540         ///
541         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
542         pub fn stop(mut self) -> Result<(), std::io::Error> {
543                 assert!(self.thread_handle.is_some());
544                 self.stop_and_join_thread()
545         }
546
547         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
548                 self.stop_thread.store(true, Ordering::Release);
549                 self.join_thread()
550         }
551
552         fn join_thread(&mut self) -> Result<(), std::io::Error> {
553                 match self.thread_handle.take() {
554                         Some(handle) => handle.join().unwrap(),
555                         None => Ok(()),
556                 }
557         }
558 }
559
560 impl Drop for BackgroundProcessor {
561         fn drop(&mut self) {
562                 self.stop_and_join_thread().unwrap();
563         }
564 }
565
566 #[cfg(test)]
567 mod tests {
568         use bitcoin::blockdata::block::BlockHeader;
569         use bitcoin::blockdata::constants::genesis_block;
570         use bitcoin::blockdata::locktime::PackedLockTime;
571         use bitcoin::blockdata::transaction::{Transaction, TxOut};
572         use bitcoin::network::constants::Network;
573         use lightning::chain::{BestBlock, Confirm, chainmonitor};
574         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
575         use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
576         use lightning::chain::transaction::OutPoint;
577         use lightning::get_event_msg;
578         use lightning::ln::channelmanager::{self, BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
579         use lightning::ln::features::ChannelFeatures;
580         use lightning::ln::msgs::{ChannelMessageHandler, Init};
581         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
582         use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
583         use lightning::util::config::UserConfig;
584         use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
585         use lightning::util::ser::Writeable;
586         use lightning::util::test_utils;
587         use lightning::util::persist::KVStorePersister;
588         use lightning_invoice::payment::{InvoicePayer, Retry};
589         use lightning_invoice::utils::DefaultRouter;
590         use lightning_persister::FilesystemPersister;
591         use std::fs;
592         use std::path::PathBuf;
593         use std::sync::{Arc, Mutex};
594         use std::sync::mpsc::SyncSender;
595         use std::time::Duration;
596         use bitcoin::hashes::Hash;
597         use bitcoin::TxMerkleNode;
598         use lightning::routing::scoring::{FixedPenaltyScorer};
599         use lightning_rapid_gossip_sync::RapidGossipSync;
600         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
601
602         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
603
604         #[derive(Clone, Hash, PartialEq, Eq)]
605         struct TestDescriptor{}
606         impl SocketDescriptor for TestDescriptor {
607                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
608                         0
609                 }
610
611                 fn disconnect_socket(&mut self) {}
612         }
613
614         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
615
616         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
617         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
618
619         struct Node {
620                 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
621                 p2p_gossip_sync: PGS,
622                 rapid_gossip_sync: RGS,
623                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
624                 chain_monitor: Arc<ChainMonitor>,
625                 persister: Arc<FilesystemPersister>,
626                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
627                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
628                 logger: Arc<test_utils::TestLogger>,
629                 best_block: BestBlock,
630                 scorer: Arc<Mutex<FixedPenaltyScorer>>,
631         }
632
633         impl Node {
634                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
635                         GossipSync::P2P(self.p2p_gossip_sync.clone())
636                 }
637
638                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
639                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
640                 }
641
642                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
643                         GossipSync::None
644                 }
645         }
646
647         impl Drop for Node {
648                 fn drop(&mut self) {
649                         let data_dir = self.persister.get_data_dir();
650                         match fs::remove_dir_all(data_dir.clone()) {
651                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
652                                 _ => {}
653                         }
654                 }
655         }
656
657         struct Persister {
658                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
659                 graph_persistence_notifier: Option<SyncSender<()>>,
660                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
661                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
662                 filesystem_persister: FilesystemPersister,
663         }
664
665         impl Persister {
666                 fn new(data_dir: String) -> Self {
667                         let filesystem_persister = FilesystemPersister::new(data_dir.clone());
668                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
669                 }
670
671                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
672                         Self { graph_error: Some((error, message)), ..self }
673                 }
674
675                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
676                         Self { graph_persistence_notifier: Some(sender), ..self }
677                 }
678
679                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
680                         Self { manager_error: Some((error, message)), ..self }
681                 }
682
683                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
684                         Self { scorer_error: Some((error, message)), ..self }
685                 }
686         }
687
688         impl KVStorePersister for Persister {
689                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
690                         if key == "manager" {
691                                 if let Some((error, message)) = self.manager_error {
692                                         return Err(std::io::Error::new(error, message))
693                                 }
694                         }
695
696                         if key == "network_graph" {
697                                 if let Some(sender) = &self.graph_persistence_notifier {
698                                         sender.send(()).unwrap();
699                                 };
700
701                                 if let Some((error, message)) = self.graph_error {
702                                         return Err(std::io::Error::new(error, message))
703                                 }
704                         }
705
706                         if key == "scorer" {
707                                 if let Some((error, message)) = self.scorer_error {
708                                         return Err(std::io::Error::new(error, message))
709                                 }
710                         }
711
712                         self.filesystem_persister.persist(key, object)
713                 }
714         }
715
716         fn get_full_filepath(filepath: String, filename: String) -> String {
717                 let mut path = PathBuf::from(filepath);
718                 path.push(filename);
719                 path.to_str().unwrap().to_string()
720         }
721
722         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
723                 let mut nodes = Vec::new();
724                 for i in 0..num_nodes {
725                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
726                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
727                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
728                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
729                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
730                         let seed = [i as u8; 32];
731                         let network = Network::Testnet;
732                         let genesis_block = genesis_block(network);
733                         let now = Duration::from_secs(genesis_block.header.time as u64);
734                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
735                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
736                         let best_block = BestBlock::from_genesis(network);
737                         let params = ChainParameters { network, best_block };
738                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
739                         let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
740                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
741                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
742                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
743                         let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{}));
744                         let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
745                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
746                         nodes.push(node);
747                 }
748
749                 for i in 0..num_nodes {
750                         for j in (i+1)..num_nodes {
751                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
752                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
753                         }
754                 }
755
756                 nodes
757         }
758
759         macro_rules! open_channel {
760                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
761                         begin_open_channel!($node_a, $node_b, $channel_value);
762                         let events = $node_a.node.get_and_clear_pending_events();
763                         assert_eq!(events.len(), 1);
764                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
765                         end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
766                         tx
767                 }}
768         }
769
770         macro_rules! begin_open_channel {
771                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
772                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
773                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
774                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
775                 }}
776         }
777
778         macro_rules! handle_funding_generation_ready {
779                 ($event: expr, $channel_value: expr) => {{
780                         match $event {
781                                 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
782                                         assert_eq!(channel_value_satoshis, $channel_value);
783                                         assert_eq!(user_channel_id, 42);
784
785                                         let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
786                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
787                                         }]};
788                                         (temporary_channel_id, tx)
789                                 },
790                                 _ => panic!("Unexpected event"),
791                         }
792                 }}
793         }
794
795         macro_rules! end_open_channel {
796                 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
797                         $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
798                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
799                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
800                 }}
801         }
802
803         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
804                 for i in 1..=depth {
805                         let prev_blockhash = node.best_block.block_hash();
806                         let height = node.best_block.height() + 1;
807                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
808                         let txdata = vec![(0, tx)];
809                         node.best_block = BestBlock::new(header.block_hash(), height);
810                         match i {
811                                 1 => {
812                                         node.node.transactions_confirmed(&header, &txdata, height);
813                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
814                                 },
815                                 x if x == depth => {
816                                         node.node.best_block_updated(&header, height);
817                                         node.chain_monitor.best_block_updated(&header, height);
818                                 },
819                                 _ => {},
820                         }
821                 }
822         }
823         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
824                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
825         }
826
827         #[test]
828         fn test_background_processor() {
829                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
830                 // updates. Also test that when new updates are available, the manager signals that it needs
831                 // re-persistence and is successfully re-persisted.
832                 let nodes = create_nodes(2, "test_background_processor".to_string());
833
834                 // Go through the channel creation process so that each node has something to persist. Since
835                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
836                 // avoid a race with processing events.
837                 let tx = open_channel!(nodes[0], nodes[1], 100000);
838
839                 // Initiate the background processors to watch each node.
840                 let data_dir = nodes[0].persister.get_data_dir();
841                 let persister = Arc::new(Persister::new(data_dir));
842                 let event_handler = |_: _| {};
843                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
844
845                 macro_rules! check_persisted_data {
846                         ($node: expr, $filepath: expr) => {
847                                 let mut expected_bytes = Vec::new();
848                                 loop {
849                                         expected_bytes.clear();
850                                         match $node.write(&mut expected_bytes) {
851                                                 Ok(()) => {
852                                                         match std::fs::read($filepath) {
853                                                                 Ok(bytes) => {
854                                                                         if bytes == expected_bytes {
855                                                                                 break
856                                                                         } else {
857                                                                                 continue
858                                                                         }
859                                                                 },
860                                                                 Err(_) => continue
861                                                         }
862                                                 },
863                                                 Err(e) => panic!("Unexpected error: {}", e)
864                                         }
865                                 }
866                         }
867                 }
868
869                 // Check that the initial channel manager data is persisted as expected.
870                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
871                 check_persisted_data!(nodes[0].node, filepath.clone());
872
873                 loop {
874                         if !nodes[0].node.get_persistence_condvar_value() { break }
875                 }
876
877                 // Force-close the channel.
878                 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
879
880                 // Check that the force-close updates are persisted.
881                 check_persisted_data!(nodes[0].node, filepath.clone());
882                 loop {
883                         if !nodes[0].node.get_persistence_condvar_value() { break }
884                 }
885
886                 // Check network graph is persisted
887                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
888                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
889
890                 // Check scorer is persisted
891                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
892                 check_persisted_data!(nodes[0].scorer, filepath.clone());
893
894                 assert!(bg_processor.stop().is_ok());
895         }
896
897         #[test]
898         fn test_timer_tick_called() {
899                 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
900                 // `FRESHNESS_TIMER`.
901                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
902                 let data_dir = nodes[0].persister.get_data_dir();
903                 let persister = Arc::new(Persister::new(data_dir));
904                 let event_handler = |_: _| {};
905                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
906                 loop {
907                         let log_entries = nodes[0].logger.lines.lock().unwrap();
908                         let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
909                         let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
910                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
911                                         log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
912                                 break
913                         }
914                 }
915
916                 assert!(bg_processor.stop().is_ok());
917         }
918
919         #[test]
920         fn test_channel_manager_persist_error() {
921                 // Test that if we encounter an error during manager persistence, the thread panics.
922                 let nodes = create_nodes(2, "test_persist_error".to_string());
923                 open_channel!(nodes[0], nodes[1], 100000);
924
925                 let data_dir = nodes[0].persister.get_data_dir();
926                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
927                 let event_handler = |_: _| {};
928                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
929                 match bg_processor.join() {
930                         Ok(_) => panic!("Expected error persisting manager"),
931                         Err(e) => {
932                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
933                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
934                         },
935                 }
936         }
937
938         #[test]
939         fn test_network_graph_persist_error() {
940                 // Test that if we encounter an error during network graph persistence, an error gets returned.
941                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
942                 let data_dir = nodes[0].persister.get_data_dir();
943                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
944                 let event_handler = |_: _| {};
945                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
946
947                 match bg_processor.stop() {
948                         Ok(_) => panic!("Expected error persisting network graph"),
949                         Err(e) => {
950                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
951                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
952                         },
953                 }
954         }
955
956         #[test]
957         fn test_scorer_persist_error() {
958                 // Test that if we encounter an error during scorer persistence, an error gets returned.
959                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
960                 let data_dir = nodes[0].persister.get_data_dir();
961                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
962                 let event_handler = |_: _| {};
963                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
964
965                 match bg_processor.stop() {
966                         Ok(_) => panic!("Expected error persisting scorer"),
967                         Err(e) => {
968                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
969                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
970                         },
971                 }
972         }
973
974         #[test]
975         fn test_background_event_handling() {
976                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
977                 let channel_value = 100000;
978                 let data_dir = nodes[0].persister.get_data_dir();
979                 let persister = Arc::new(Persister::new(data_dir.clone()));
980
981                 // Set up a background event handler for FundingGenerationReady events.
982                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
983                 let event_handler = move |event: Event| match event {
984                         Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
985                         Event::ChannelReady { .. } => {},
986                         _ => panic!("Unexpected event: {:?}", event),
987                 };
988
989                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
990
991                 // Open a channel and check that the FundingGenerationReady event was handled.
992                 begin_open_channel!(nodes[0], nodes[1], channel_value);
993                 let (temporary_channel_id, funding_tx) = receiver
994                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
995                         .expect("FundingGenerationReady not handled within deadline");
996                 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
997
998                 // Confirm the funding transaction.
999                 confirm_transaction(&mut nodes[0], &funding_tx);
1000                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1001                 confirm_transaction(&mut nodes[1], &funding_tx);
1002                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1003                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1004                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1005                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1006                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1007
1008                 assert!(bg_processor.stop().is_ok());
1009
1010                 // Set up a background event handler for SpendableOutputs events.
1011                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1012                 let event_handler = move |event: Event| match event {
1013                         Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
1014                         Event::ChannelReady { .. } => {},
1015                         Event::ChannelClosed { .. } => {},
1016                         _ => panic!("Unexpected event: {:?}", event),
1017                 };
1018                 let persister = Arc::new(Persister::new(data_dir));
1019                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1020
1021                 // Force close the channel and check that the SpendableOutputs event was handled.
1022                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1023                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1024                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1025
1026                 let event = receiver
1027                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1028                         .expect("Events not handled within deadline");
1029                 match event {
1030                         Event::SpendableOutputs { .. } => {},
1031                         _ => panic!("Unexpected event: {:?}", event),
1032                 }
1033
1034                 assert!(bg_processor.stop().is_ok());
1035         }
1036
1037         #[test]
1038         fn test_scorer_persistence() {
1039                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1040                 let data_dir = nodes[0].persister.get_data_dir();
1041                 let persister = Arc::new(Persister::new(data_dir));
1042                 let event_handler = |_: _| {};
1043                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1044
1045                 loop {
1046                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1047                         let expected_log = "Persisting scorer".to_string();
1048                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1049                                 break
1050                         }
1051                 }
1052
1053                 assert!(bg_processor.stop().is_ok());
1054         }
1055
1056         #[test]
1057         fn test_not_pruning_network_graph_until_graph_sync_completion() {
1058                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1059                 let data_dir = nodes[0].persister.get_data_dir();
1060                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1061                 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1062                 let network_graph = nodes[0].network_graph.clone();
1063                 let features = ChannelFeatures::empty();
1064                 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1065                         .expect("Failed to update channel from partial announcement");
1066                 let original_graph_description = network_graph.to_string();
1067                 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1068                 assert_eq!(network_graph.read_only().channels().len(), 1);
1069
1070                 let event_handler = |_: _| {};
1071                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1072
1073                 loop {
1074                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1075                         let expected_log_a = "Assessing prunability of network graph".to_string();
1076                         let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
1077                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
1078                                 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
1079                                 break
1080                         }
1081                 }
1082
1083                 let initialization_input = vec![
1084                         76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1085                         79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1086                         0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1087                         187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1088                         157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1089                         88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1090                         204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1091                         181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1092                         110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1093                         76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1094                         226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1095                         0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1096                         0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1097                 ];
1098                 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1099
1100                 // this should have added two channels
1101                 assert_eq!(network_graph.read_only().channels().len(), 3);
1102
1103                 let _ = receiver
1104                         .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1105                         .expect("Network graph not pruned within deadline");
1106
1107                 background_processor.stop().unwrap();
1108
1109                 // all channels should now be pruned
1110                 assert_eq!(network_graph.read_only().channels().len(), 0);
1111         }
1112
1113         #[test]
1114         fn test_invoice_payer() {
1115                 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1116                 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1117                 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1118
1119                 // Initiate the background processors to watch each node.
1120                 let data_dir = nodes[0].persister.get_data_dir();
1121                 let persister = Arc::new(Persister::new(data_dir));
1122                 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
1123                 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: _| {}, Retry::Attempts(2)));
1124                 let event_handler = Arc::clone(&invoice_payer);
1125                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1126                 assert!(bg_processor.stop().is_ok());
1127         }
1128 }