]> git.bitcoin.ninja Git - rust-lightning/blob - lightning-background-processor/src/lib.rs
b394a2311c3b7943d5d2c81a5308773875bbb468
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
8
9 #![deny(missing_docs)]
10 #![deny(unsafe_code)]
11
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
13
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
16
17 use lightning::chain;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::{Sign, KeysInterface};
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::scoring::WriteableScore;
26 use lightning::util::events::{Event, EventHandler, EventsProvider};
27 use lightning::util::logger::Logger;
28 use lightning::util::persist::Persister;
29 use lightning_rapid_gossip_sync::RapidGossipSync;
30 use std::sync::Arc;
31 use std::sync::atomic::{AtomicBool, Ordering};
32 use std::thread;
33 use std::thread::JoinHandle;
34 use std::time::{Duration, Instant};
35 use std::ops::Deref;
36
37 #[cfg(feature = "futures")]
38 use futures_util::{select_biased, future::FutureExt};
39
40 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
41 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
42 /// responsibilities are:
43 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
44 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
45 ///   writing it to disk/backups by invoking the callback given to it at startup.
46 ///   [`ChannelManager`] persistence should be done in the background.
47 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
48 ///   at the appropriate intervals.
49 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
50 ///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
51 ///
52 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
53 /// upon as doing so may result in high latency.
54 ///
55 /// # Note
56 ///
57 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
58 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
59 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
60 /// unilateral chain closure fees are at risk.
61 ///
62 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
63 /// [`Event`]: lightning::util::events::Event
64 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
65 pub struct BackgroundProcessor {
66         stop_thread: Arc<AtomicBool>,
67         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
68 }
69
70 #[cfg(not(test))]
71 const FRESHNESS_TIMER: u64 = 60;
72 #[cfg(test)]
73 const FRESHNESS_TIMER: u64 = 1;
74
75 #[cfg(all(not(test), not(debug_assertions)))]
76 const PING_TIMER: u64 = 10;
77 /// Signature operations take a lot longer without compiler optimisations.
78 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
79 /// timeout is reached.
80 #[cfg(all(not(test), debug_assertions))]
81 const PING_TIMER: u64 = 30;
82 #[cfg(test)]
83 const PING_TIMER: u64 = 1;
84
85 /// Prune the network graph of stale entries hourly.
86 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
87
88 #[cfg(not(test))]
89 const SCORER_PERSIST_TIMER: u64 = 30;
90 #[cfg(test)]
91 const SCORER_PERSIST_TIMER: u64 = 1;
92
93 #[cfg(not(test))]
94 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
95 #[cfg(test)]
96 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
97
98 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
99 pub enum GossipSync<
100         P: Deref<Target = P2PGossipSync<G, A, L>>,
101         R: Deref<Target = RapidGossipSync<G, L>>,
102         G: Deref<Target = NetworkGraph<L>>,
103         A: Deref,
104         L: Deref,
105 >
106 where A::Target: chain::Access, L::Target: Logger {
107         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
108         P2P(P),
109         /// Rapid gossip sync from a trusted server.
110         Rapid(R),
111         /// No gossip sync.
112         None,
113 }
114
115 impl<
116         P: Deref<Target = P2PGossipSync<G, A, L>>,
117         R: Deref<Target = RapidGossipSync<G, L>>,
118         G: Deref<Target = NetworkGraph<L>>,
119         A: Deref,
120         L: Deref,
121 > GossipSync<P, R, G, A, L>
122 where A::Target: chain::Access, L::Target: Logger {
123         fn network_graph(&self) -> Option<&G> {
124                 match self {
125                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
126                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
127                         GossipSync::None => None,
128                 }
129         }
130
131         fn prunable_network_graph(&self) -> Option<&G> {
132                 match self {
133                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
134                         GossipSync::Rapid(gossip_sync) => {
135                                 if gossip_sync.is_initial_sync_complete() {
136                                         Some(gossip_sync.network_graph())
137                                 } else {
138                                         None
139                                 }
140                         },
141                         GossipSync::None => None,
142                 }
143         }
144 }
145
146 /// (C-not exported) as the bindings concretize everything and have constructors for us
147 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
148         GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
149 where
150         A::Target: chain::Access,
151         L::Target: Logger,
152 {
153         /// Initializes a new [`GossipSync::P2P`] variant.
154         pub fn p2p(gossip_sync: P) -> Self {
155                 GossipSync::P2P(gossip_sync)
156         }
157 }
158
159 /// (C-not exported) as the bindings concretize everything and have constructors for us
160 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
161         GossipSync<
162                 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
163                 R,
164                 G,
165                 &'a (dyn chain::Access + Send + Sync),
166                 L,
167         >
168 where
169         L::Target: Logger,
170 {
171         /// Initializes a new [`GossipSync::Rapid`] variant.
172         pub fn rapid(gossip_sync: R) -> Self {
173                 GossipSync::Rapid(gossip_sync)
174         }
175 }
176
177 /// (C-not exported) as the bindings concretize everything and have constructors for us
178 impl<'a, L: Deref>
179         GossipSync<
180                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
181                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
182                 &'a NetworkGraph<L>,
183                 &'a (dyn chain::Access + Send + Sync),
184                 L,
185         >
186 where
187         L::Target: Logger,
188 {
189         /// Initializes a new [`GossipSync::None`] variant.
190         pub fn none() -> Self {
191                 GossipSync::None
192         }
193 }
194
195 fn handle_network_graph_update<L: Deref>(
196         network_graph: &NetworkGraph<L>, event: &Event
197 ) where L::Target: Logger {
198         if let Event::PaymentPathFailed { ref network_update, .. } = event {
199                 if let Some(network_update) = network_update {
200                         network_graph.handle_network_update(&network_update);
201                 }
202         }
203 }
204
205 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
206 struct DecoratingEventHandler<
207         'a,
208         E: EventHandler,
209         PGS: Deref<Target = P2PGossipSync<G, A, L>>,
210         RGS: Deref<Target = RapidGossipSync<G, L>>,
211         G: Deref<Target = NetworkGraph<L>>,
212         A: Deref,
213         L: Deref,
214 >
215 where A::Target: chain::Access, L::Target: Logger {
216         event_handler: E,
217         gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
218 }
219
220 impl<
221         'a,
222         E: EventHandler,
223         PGS: Deref<Target = P2PGossipSync<G, A, L>>,
224         RGS: Deref<Target = RapidGossipSync<G, L>>,
225         G: Deref<Target = NetworkGraph<L>>,
226         A: Deref,
227         L: Deref,
228 > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
229 where A::Target: chain::Access, L::Target: Logger {
230         fn handle_event(&self, event: Event) {
231                 if let Some(network_graph) = self.gossip_sync.network_graph() {
232                         handle_network_graph_update(network_graph, &event)
233                 }
234                 self.event_handler.handle_event(event);
235         }
236 }
237
238 macro_rules! define_run_body {
239         ($persister: ident, $event_handler: ident, $chain_monitor: ident, $channel_manager: ident,
240          $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
241          $loop_exit_check: expr, $await: expr)
242         => { {
243                 let event_handler = DecoratingEventHandler {
244                         event_handler: $event_handler,
245                         gossip_sync: &$gossip_sync,
246                 };
247
248                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
249                 $channel_manager.timer_tick_occurred();
250
251                 let mut last_freshness_call = Instant::now();
252                 let mut last_ping_call = Instant::now();
253                 let mut last_prune_call = Instant::now();
254                 let mut last_scorer_persist_call = Instant::now();
255                 let mut have_pruned = false;
256
257                 loop {
258                         $channel_manager.process_pending_events(&event_handler);
259                         $chain_monitor.process_pending_events(&event_handler);
260
261                         // Note that the PeerManager::process_events may block on ChannelManager's locks,
262                         // hence it comes last here. When the ChannelManager finishes whatever it's doing,
263                         // we want to ensure we get into `persist_manager` as quickly as we can, especially
264                         // without running the normal event processing above and handing events to users.
265                         //
266                         // Specifically, on an *extremely* slow machine, we may see ChannelManager start
267                         // processing a message effectively at any point during this loop. In order to
268                         // minimize the time between such processing completing and persisting the updated
269                         // ChannelManager, we want to minimize methods blocking on a ChannelManager
270                         // generally, and as a fallback place such blocking only immediately before
271                         // persistence.
272                         $peer_manager.process_events();
273
274                         // We wait up to 100ms, but track how long it takes to detect being put to sleep,
275                         // see `await_start`'s use below.
276                         let await_start = Instant::now();
277                         let updates_available = $await;
278                         let await_time = await_start.elapsed();
279
280                         if updates_available {
281                                 log_trace!($logger, "Persisting ChannelManager...");
282                                 $persister.persist_manager(&*$channel_manager)?;
283                                 log_trace!($logger, "Done persisting ChannelManager.");
284                         }
285                         // Exit the loop if the background processor was requested to stop.
286                         if $loop_exit_check {
287                                 log_trace!($logger, "Terminating background processor.");
288                                 break;
289                         }
290                         if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
291                                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
292                                 $channel_manager.timer_tick_occurred();
293                                 last_freshness_call = Instant::now();
294                         }
295                         if await_time > Duration::from_secs(1) {
296                                 // On various platforms, we may be starved of CPU cycles for several reasons.
297                                 // E.g. on iOS, if we've been in the background, we will be entirely paused.
298                                 // Similarly, if we're on a desktop platform and the device has been asleep, we
299                                 // may not get any cycles.
300                                 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
301                                 // full second, at which point we assume sockets may have been killed (they
302                                 // appear to be at least on some platforms, even if it has only been a second).
303                                 // Note that we have to take care to not get here just because user event
304                                 // processing was slow at the top of the loop. For example, the sample client
305                                 // may call Bitcoin Core RPCs during event handling, which very often takes
306                                 // more than a handful of seconds to complete, and shouldn't disconnect all our
307                                 // peers.
308                                 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
309                                 $peer_manager.disconnect_all_peers();
310                                 last_ping_call = Instant::now();
311                         } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
312                                 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
313                                 $peer_manager.timer_tick_occurred();
314                                 last_ping_call = Instant::now();
315                         }
316
317                         // Note that we want to run a graph prune once not long after startup before
318                         // falling back to our usual hourly prunes. This avoids short-lived clients never
319                         // pruning their network graph. We run once 60 seconds after startup before
320                         // continuing our normal cadence.
321                         if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
322                                 // The network graph must not be pruned while rapid sync completion is pending
323                                 log_trace!($logger, "Assessing prunability of network graph");
324                                 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
325                                         network_graph.remove_stale_channels_and_tracking();
326
327                                         if let Err(e) = $persister.persist_graph(network_graph) {
328                                                 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
329                                         }
330
331                                         last_prune_call = Instant::now();
332                                         have_pruned = true;
333                                 } else {
334                                         log_trace!($logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
335                                 }
336                         }
337
338                         if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
339                                 if let Some(ref scorer) = $scorer {
340                                         log_trace!($logger, "Persisting scorer");
341                                         if let Err(e) = $persister.persist_scorer(&scorer) {
342                                                 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
343                                         }
344                                 }
345                                 last_scorer_persist_call = Instant::now();
346                         }
347                 }
348
349                 // After we exit, ensure we persist the ChannelManager one final time - this avoids
350                 // some races where users quit while channel updates were in-flight, with
351                 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
352                 $persister.persist_manager(&*$channel_manager)?;
353
354                 // Persist Scorer on exit
355                 if let Some(ref scorer) = $scorer {
356                         $persister.persist_scorer(&scorer)?;
357                 }
358
359                 // Persist NetworkGraph on exit
360                 if let Some(network_graph) = $gossip_sync.network_graph() {
361                         $persister.persist_graph(network_graph)?;
362                 }
363
364                 Ok(())
365         } }
366 }
367
368 /// Processes background events in a future.
369 ///
370 /// `sleeper` should return a future which completes in the given amount of time and returns a
371 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
372 /// future which outputs true, the loop will exit and this function's future will complete.
373 ///
374 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
375 #[cfg(feature = "futures")]
376 pub async fn process_events_async<
377         'a,
378         Signer: 'static + Sign,
379         CA: 'static + Deref + Send + Sync,
380         CF: 'static + Deref + Send + Sync,
381         CW: 'static + Deref + Send + Sync,
382         T: 'static + Deref + Send + Sync,
383         K: 'static + Deref + Send + Sync,
384         F: 'static + Deref + Send + Sync,
385         G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
386         L: 'static + Deref + Send + Sync,
387         P: 'static + Deref + Send + Sync,
388         Descriptor: 'static + SocketDescriptor + Send + Sync,
389         CMH: 'static + Deref + Send + Sync,
390         RMH: 'static + Deref + Send + Sync,
391         OMH: 'static + Deref + Send + Sync,
392         EH: 'static + EventHandler + Send,
393         PS: 'static + Deref + Send,
394         M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
395         CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
396         PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
397         RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
398         UMH: 'static + Deref + Send + Sync,
399         PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
400         S: 'static + Deref<Target = SC> + Send + Sync,
401         SC: WriteableScore<'a>,
402         SleepFuture: core::future::Future<Output = bool>,
403         Sleeper: Fn(Duration) -> SleepFuture
404 >(
405         persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
406         gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
407         sleeper: Sleeper,
408 ) -> Result<(), std::io::Error>
409 where
410         CA::Target: 'static + chain::Access,
411         CF::Target: 'static + chain::Filter,
412         CW::Target: 'static + chain::Watch<Signer>,
413         T::Target: 'static + BroadcasterInterface,
414         K::Target: 'static + KeysInterface<Signer = Signer>,
415         F::Target: 'static + FeeEstimator,
416         L::Target: 'static + Logger,
417         P::Target: 'static + Persist<Signer>,
418         CMH::Target: 'static + ChannelMessageHandler,
419         OMH::Target: 'static + OnionMessageHandler,
420         RMH::Target: 'static + RoutingMessageHandler,
421         UMH::Target: 'static + CustomMessageHandler,
422         PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
423 {
424         let mut should_break = true;
425         define_run_body!(persister, event_handler, chain_monitor, channel_manager,
426                 gossip_sync, peer_manager, logger, scorer, should_break, {
427                         select_biased! {
428                                 _ = channel_manager.get_persistable_update_future().fuse() => true,
429                                 exit = sleeper(Duration::from_millis(100)).fuse() => {
430                                         should_break = exit;
431                                         false
432                                 }
433                         }
434                 })
435 }
436
437 impl BackgroundProcessor {
438         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
439         /// documentation].
440         ///
441         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
442         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
443         /// either [`join`] or [`stop`].
444         ///
445         /// # Data Persistence
446         ///
447         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
448         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
449         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
450         /// provided implementation.
451         ///
452         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
453         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
454         /// See the `lightning-persister` crate for LDK's provided implementation.
455         ///
456         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
457         /// error or call [`join`] and handle any error that may arise. For the latter case,
458         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
459         ///
460         /// # Event Handling
461         ///
462         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
463         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
464         /// functionality implemented by other handlers.
465         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
466         ///
467         /// # Rapid Gossip Sync
468         ///
469         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
470         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
471         /// until the [`RapidGossipSync`] instance completes its first sync.
472         ///
473         /// [top-level documentation]: BackgroundProcessor
474         /// [`join`]: Self::join
475         /// [`stop`]: Self::stop
476         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
477         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
478         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
479         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
480         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
481         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
482         pub fn start<
483                 'a,
484                 Signer: 'static + Sign,
485                 CA: 'static + Deref + Send + Sync,
486                 CF: 'static + Deref + Send + Sync,
487                 CW: 'static + Deref + Send + Sync,
488                 T: 'static + Deref + Send + Sync,
489                 K: 'static + Deref + Send + Sync,
490                 F: 'static + Deref + Send + Sync,
491                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
492                 L: 'static + Deref + Send + Sync,
493                 P: 'static + Deref + Send + Sync,
494                 Descriptor: 'static + SocketDescriptor + Send + Sync,
495                 CMH: 'static + Deref + Send + Sync,
496                 OMH: 'static + Deref + Send + Sync,
497                 RMH: 'static + Deref + Send + Sync,
498                 EH: 'static + EventHandler + Send,
499                 PS: 'static + Deref + Send,
500                 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
501                 CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
502                 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
503                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
504                 UMH: 'static + Deref + Send + Sync,
505                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
506                 S: 'static + Deref<Target = SC> + Send + Sync,
507                 SC: WriteableScore<'a>,
508         >(
509                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
510                 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
511         ) -> Self
512         where
513                 CA::Target: 'static + chain::Access,
514                 CF::Target: 'static + chain::Filter,
515                 CW::Target: 'static + chain::Watch<Signer>,
516                 T::Target: 'static + BroadcasterInterface,
517                 K::Target: 'static + KeysInterface<Signer = Signer>,
518                 F::Target: 'static + FeeEstimator,
519                 L::Target: 'static + Logger,
520                 P::Target: 'static + Persist<Signer>,
521                 CMH::Target: 'static + ChannelMessageHandler,
522                 OMH::Target: 'static + OnionMessageHandler,
523                 RMH::Target: 'static + RoutingMessageHandler,
524                 UMH::Target: 'static + CustomMessageHandler,
525                 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
526         {
527                 let stop_thread = Arc::new(AtomicBool::new(false));
528                 let stop_thread_clone = stop_thread.clone();
529                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
530                         define_run_body!(persister, event_handler, chain_monitor, channel_manager,
531                                 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
532                                 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
533                 });
534                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
535         }
536
537         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
538         /// [`ChannelManager`].
539         ///
540         /// # Panics
541         ///
542         /// This function panics if the background thread has panicked such as while persisting or
543         /// handling events.
544         ///
545         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
546         pub fn join(mut self) -> Result<(), std::io::Error> {
547                 assert!(self.thread_handle.is_some());
548                 self.join_thread()
549         }
550
551         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
552         /// [`ChannelManager`].
553         ///
554         /// # Panics
555         ///
556         /// This function panics if the background thread has panicked such as while persisting or
557         /// handling events.
558         ///
559         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
560         pub fn stop(mut self) -> Result<(), std::io::Error> {
561                 assert!(self.thread_handle.is_some());
562                 self.stop_and_join_thread()
563         }
564
565         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
566                 self.stop_thread.store(true, Ordering::Release);
567                 self.join_thread()
568         }
569
570         fn join_thread(&mut self) -> Result<(), std::io::Error> {
571                 match self.thread_handle.take() {
572                         Some(handle) => handle.join().unwrap(),
573                         None => Ok(()),
574                 }
575         }
576 }
577
578 impl Drop for BackgroundProcessor {
579         fn drop(&mut self) {
580                 self.stop_and_join_thread().unwrap();
581         }
582 }
583
584 #[cfg(test)]
585 mod tests {
586         use bitcoin::blockdata::block::BlockHeader;
587         use bitcoin::blockdata::constants::genesis_block;
588         use bitcoin::blockdata::locktime::PackedLockTime;
589         use bitcoin::blockdata::transaction::{Transaction, TxOut};
590         use bitcoin::network::constants::Network;
591         use lightning::chain::{BestBlock, Confirm, chainmonitor};
592         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
593         use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
594         use lightning::chain::transaction::OutPoint;
595         use lightning::get_event_msg;
596         use lightning::ln::channelmanager::{self, BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
597         use lightning::ln::features::ChannelFeatures;
598         use lightning::ln::msgs::{ChannelMessageHandler, Init};
599         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
600         use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
601         use lightning::util::config::UserConfig;
602         use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
603         use lightning::util::ser::Writeable;
604         use lightning::util::test_utils;
605         use lightning::util::persist::KVStorePersister;
606         use lightning_invoice::payment::{InvoicePayer, Retry};
607         use lightning_invoice::utils::DefaultRouter;
608         use lightning_persister::FilesystemPersister;
609         use std::fs;
610         use std::path::PathBuf;
611         use std::sync::{Arc, Mutex};
612         use std::sync::mpsc::SyncSender;
613         use std::time::Duration;
614         use bitcoin::hashes::Hash;
615         use bitcoin::TxMerkleNode;
616         use lightning::routing::scoring::{FixedPenaltyScorer};
617         use lightning_rapid_gossip_sync::RapidGossipSync;
618         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
619
620         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
621
622         #[derive(Clone, Hash, PartialEq, Eq)]
623         struct TestDescriptor{}
624         impl SocketDescriptor for TestDescriptor {
625                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
626                         0
627                 }
628
629                 fn disconnect_socket(&mut self) {}
630         }
631
632         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
633
634         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
635         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
636
637         struct Node {
638                 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
639                 p2p_gossip_sync: PGS,
640                 rapid_gossip_sync: RGS,
641                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
642                 chain_monitor: Arc<ChainMonitor>,
643                 persister: Arc<FilesystemPersister>,
644                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
645                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
646                 logger: Arc<test_utils::TestLogger>,
647                 best_block: BestBlock,
648                 scorer: Arc<Mutex<FixedPenaltyScorer>>,
649         }
650
651         impl Node {
652                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
653                         GossipSync::P2P(self.p2p_gossip_sync.clone())
654                 }
655
656                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
657                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
658                 }
659
660                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
661                         GossipSync::None
662                 }
663         }
664
665         impl Drop for Node {
666                 fn drop(&mut self) {
667                         let data_dir = self.persister.get_data_dir();
668                         match fs::remove_dir_all(data_dir.clone()) {
669                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
670                                 _ => {}
671                         }
672                 }
673         }
674
675         struct Persister {
676                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
677                 graph_persistence_notifier: Option<SyncSender<()>>,
678                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
679                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
680                 filesystem_persister: FilesystemPersister,
681         }
682
683         impl Persister {
684                 fn new(data_dir: String) -> Self {
685                         let filesystem_persister = FilesystemPersister::new(data_dir.clone());
686                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
687                 }
688
689                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
690                         Self { graph_error: Some((error, message)), ..self }
691                 }
692
693                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
694                         Self { graph_persistence_notifier: Some(sender), ..self }
695                 }
696
697                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
698                         Self { manager_error: Some((error, message)), ..self }
699                 }
700
701                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
702                         Self { scorer_error: Some((error, message)), ..self }
703                 }
704         }
705
706         impl KVStorePersister for Persister {
707                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
708                         if key == "manager" {
709                                 if let Some((error, message)) = self.manager_error {
710                                         return Err(std::io::Error::new(error, message))
711                                 }
712                         }
713
714                         if key == "network_graph" {
715                                 if let Some(sender) = &self.graph_persistence_notifier {
716                                         sender.send(()).unwrap();
717                                 };
718
719                                 if let Some((error, message)) = self.graph_error {
720                                         return Err(std::io::Error::new(error, message))
721                                 }
722                         }
723
724                         if key == "scorer" {
725                                 if let Some((error, message)) = self.scorer_error {
726                                         return Err(std::io::Error::new(error, message))
727                                 }
728                         }
729
730                         self.filesystem_persister.persist(key, object)
731                 }
732         }
733
734         fn get_full_filepath(filepath: String, filename: String) -> String {
735                 let mut path = PathBuf::from(filepath);
736                 path.push(filename);
737                 path.to_str().unwrap().to_string()
738         }
739
740         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
741                 let mut nodes = Vec::new();
742                 for i in 0..num_nodes {
743                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
744                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
745                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
746                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
747                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
748                         let seed = [i as u8; 32];
749                         let network = Network::Testnet;
750                         let genesis_block = genesis_block(network);
751                         let now = Duration::from_secs(genesis_block.header.time as u64);
752                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
753                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
754                         let best_block = BestBlock::from_genesis(network);
755                         let params = ChainParameters { network, best_block };
756                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
757                         let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
758                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
759                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
760                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
761                         let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{}));
762                         let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
763                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
764                         nodes.push(node);
765                 }
766
767                 for i in 0..num_nodes {
768                         for j in (i+1)..num_nodes {
769                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
770                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
771                         }
772                 }
773
774                 nodes
775         }
776
777         macro_rules! open_channel {
778                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
779                         begin_open_channel!($node_a, $node_b, $channel_value);
780                         let events = $node_a.node.get_and_clear_pending_events();
781                         assert_eq!(events.len(), 1);
782                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
783                         end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
784                         tx
785                 }}
786         }
787
788         macro_rules! begin_open_channel {
789                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
790                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
791                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
792                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
793                 }}
794         }
795
796         macro_rules! handle_funding_generation_ready {
797                 ($event: expr, $channel_value: expr) => {{
798                         match $event {
799                                 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
800                                         assert_eq!(channel_value_satoshis, $channel_value);
801                                         assert_eq!(user_channel_id, 42);
802
803                                         let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
804                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
805                                         }]};
806                                         (temporary_channel_id, tx)
807                                 },
808                                 _ => panic!("Unexpected event"),
809                         }
810                 }}
811         }
812
813         macro_rules! end_open_channel {
814                 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
815                         $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
816                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
817                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
818                 }}
819         }
820
821         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
822                 for i in 1..=depth {
823                         let prev_blockhash = node.best_block.block_hash();
824                         let height = node.best_block.height() + 1;
825                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
826                         let txdata = vec![(0, tx)];
827                         node.best_block = BestBlock::new(header.block_hash(), height);
828                         match i {
829                                 1 => {
830                                         node.node.transactions_confirmed(&header, &txdata, height);
831                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
832                                 },
833                                 x if x == depth => {
834                                         node.node.best_block_updated(&header, height);
835                                         node.chain_monitor.best_block_updated(&header, height);
836                                 },
837                                 _ => {},
838                         }
839                 }
840         }
841         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
842                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
843         }
844
845         #[test]
846         fn test_background_processor() {
847                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
848                 // updates. Also test that when new updates are available, the manager signals that it needs
849                 // re-persistence and is successfully re-persisted.
850                 let nodes = create_nodes(2, "test_background_processor".to_string());
851
852                 // Go through the channel creation process so that each node has something to persist. Since
853                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
854                 // avoid a race with processing events.
855                 let tx = open_channel!(nodes[0], nodes[1], 100000);
856
857                 // Initiate the background processors to watch each node.
858                 let data_dir = nodes[0].persister.get_data_dir();
859                 let persister = Arc::new(Persister::new(data_dir));
860                 let event_handler = |_: _| {};
861                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
862
863                 macro_rules! check_persisted_data {
864                         ($node: expr, $filepath: expr) => {
865                                 let mut expected_bytes = Vec::new();
866                                 loop {
867                                         expected_bytes.clear();
868                                         match $node.write(&mut expected_bytes) {
869                                                 Ok(()) => {
870                                                         match std::fs::read($filepath) {
871                                                                 Ok(bytes) => {
872                                                                         if bytes == expected_bytes {
873                                                                                 break
874                                                                         } else {
875                                                                                 continue
876                                                                         }
877                                                                 },
878                                                                 Err(_) => continue
879                                                         }
880                                                 },
881                                                 Err(e) => panic!("Unexpected error: {}", e)
882                                         }
883                                 }
884                         }
885                 }
886
887                 // Check that the initial channel manager data is persisted as expected.
888                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
889                 check_persisted_data!(nodes[0].node, filepath.clone());
890
891                 loop {
892                         if !nodes[0].node.get_persistence_condvar_value() { break }
893                 }
894
895                 // Force-close the channel.
896                 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
897
898                 // Check that the force-close updates are persisted.
899                 check_persisted_data!(nodes[0].node, filepath.clone());
900                 loop {
901                         if !nodes[0].node.get_persistence_condvar_value() { break }
902                 }
903
904                 // Check network graph is persisted
905                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
906                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
907
908                 // Check scorer is persisted
909                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
910                 check_persisted_data!(nodes[0].scorer, filepath.clone());
911
912                 assert!(bg_processor.stop().is_ok());
913         }
914
915         #[test]
916         fn test_timer_tick_called() {
917                 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
918                 // `FRESHNESS_TIMER`.
919                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
920                 let data_dir = nodes[0].persister.get_data_dir();
921                 let persister = Arc::new(Persister::new(data_dir));
922                 let event_handler = |_: _| {};
923                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
924                 loop {
925                         let log_entries = nodes[0].logger.lines.lock().unwrap();
926                         let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
927                         let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
928                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
929                                         log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
930                                 break
931                         }
932                 }
933
934                 assert!(bg_processor.stop().is_ok());
935         }
936
937         #[test]
938         fn test_channel_manager_persist_error() {
939                 // Test that if we encounter an error during manager persistence, the thread panics.
940                 let nodes = create_nodes(2, "test_persist_error".to_string());
941                 open_channel!(nodes[0], nodes[1], 100000);
942
943                 let data_dir = nodes[0].persister.get_data_dir();
944                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
945                 let event_handler = |_: _| {};
946                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
947                 match bg_processor.join() {
948                         Ok(_) => panic!("Expected error persisting manager"),
949                         Err(e) => {
950                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
951                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
952                         },
953                 }
954         }
955
956         #[test]
957         fn test_network_graph_persist_error() {
958                 // Test that if we encounter an error during network graph persistence, an error gets returned.
959                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
960                 let data_dir = nodes[0].persister.get_data_dir();
961                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
962                 let event_handler = |_: _| {};
963                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
964
965                 match bg_processor.stop() {
966                         Ok(_) => panic!("Expected error persisting network graph"),
967                         Err(e) => {
968                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
969                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
970                         },
971                 }
972         }
973
974         #[test]
975         fn test_scorer_persist_error() {
976                 // Test that if we encounter an error during scorer persistence, an error gets returned.
977                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
978                 let data_dir = nodes[0].persister.get_data_dir();
979                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
980                 let event_handler = |_: _| {};
981                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
982
983                 match bg_processor.stop() {
984                         Ok(_) => panic!("Expected error persisting scorer"),
985                         Err(e) => {
986                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
987                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
988                         },
989                 }
990         }
991
992         #[test]
993         fn test_background_event_handling() {
994                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
995                 let channel_value = 100000;
996                 let data_dir = nodes[0].persister.get_data_dir();
997                 let persister = Arc::new(Persister::new(data_dir.clone()));
998
999                 // Set up a background event handler for FundingGenerationReady events.
1000                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1001                 let event_handler = move |event: Event| match event {
1002                         Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1003                         Event::ChannelReady { .. } => {},
1004                         _ => panic!("Unexpected event: {:?}", event),
1005                 };
1006
1007                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1008
1009                 // Open a channel and check that the FundingGenerationReady event was handled.
1010                 begin_open_channel!(nodes[0], nodes[1], channel_value);
1011                 let (temporary_channel_id, funding_tx) = receiver
1012                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1013                         .expect("FundingGenerationReady not handled within deadline");
1014                 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1015
1016                 // Confirm the funding transaction.
1017                 confirm_transaction(&mut nodes[0], &funding_tx);
1018                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1019                 confirm_transaction(&mut nodes[1], &funding_tx);
1020                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1021                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1022                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1023                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1024                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1025
1026                 assert!(bg_processor.stop().is_ok());
1027
1028                 // Set up a background event handler for SpendableOutputs events.
1029                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1030                 let event_handler = move |event: Event| match event {
1031                         Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
1032                         Event::ChannelReady { .. } => {},
1033                         Event::ChannelClosed { .. } => {},
1034                         _ => panic!("Unexpected event: {:?}", event),
1035                 };
1036                 let persister = Arc::new(Persister::new(data_dir));
1037                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1038
1039                 // Force close the channel and check that the SpendableOutputs event was handled.
1040                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1041                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1042                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1043
1044                 let event = receiver
1045                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1046                         .expect("Events not handled within deadline");
1047                 match event {
1048                         Event::SpendableOutputs { .. } => {},
1049                         _ => panic!("Unexpected event: {:?}", event),
1050                 }
1051
1052                 assert!(bg_processor.stop().is_ok());
1053         }
1054
1055         #[test]
1056         fn test_scorer_persistence() {
1057                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1058                 let data_dir = nodes[0].persister.get_data_dir();
1059                 let persister = Arc::new(Persister::new(data_dir));
1060                 let event_handler = |_: _| {};
1061                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1062
1063                 loop {
1064                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1065                         let expected_log = "Persisting scorer".to_string();
1066                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1067                                 break
1068                         }
1069                 }
1070
1071                 assert!(bg_processor.stop().is_ok());
1072         }
1073
1074         #[test]
1075         fn test_not_pruning_network_graph_until_graph_sync_completion() {
1076                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1077                 let data_dir = nodes[0].persister.get_data_dir();
1078                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1079                 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1080                 let network_graph = nodes[0].network_graph.clone();
1081                 let features = ChannelFeatures::empty();
1082                 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1083                         .expect("Failed to update channel from partial announcement");
1084                 let original_graph_description = network_graph.to_string();
1085                 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1086                 assert_eq!(network_graph.read_only().channels().len(), 1);
1087
1088                 let event_handler = |_: _| {};
1089                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1090
1091                 loop {
1092                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1093                         let expected_log_a = "Assessing prunability of network graph".to_string();
1094                         let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
1095                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
1096                                 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
1097                                 break
1098                         }
1099                 }
1100
1101                 let initialization_input = vec![
1102                         76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1103                         79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1104                         0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1105                         187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1106                         157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1107                         88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1108                         204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1109                         181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1110                         110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1111                         76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1112                         226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1113                         0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1114                         0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1115                 ];
1116                 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1117
1118                 // this should have added two channels
1119                 assert_eq!(network_graph.read_only().channels().len(), 3);
1120
1121                 let _ = receiver
1122                         .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1123                         .expect("Network graph not pruned within deadline");
1124
1125                 background_processor.stop().unwrap();
1126
1127                 // all channels should now be pruned
1128                 assert_eq!(network_graph.read_only().channels().len(), 0);
1129         }
1130
1131         #[test]
1132         fn test_invoice_payer() {
1133                 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1134                 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1135                 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1136
1137                 // Initiate the background processors to watch each node.
1138                 let data_dir = nodes[0].persister.get_data_dir();
1139                 let persister = Arc::new(Persister::new(data_dir));
1140                 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
1141                 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: _| {}, Retry::Attempts(2)));
1142                 let event_handler = Arc::clone(&invoice_payer);
1143                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1144                 assert!(bg_processor.stop().is_ok());
1145         }
1146 }