Merge pull request #1857 from TheBlueMatt/2022-11-reload-htlc
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
8
9 #![deny(missing_docs)]
10 #![deny(unsafe_code)]
11
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
13
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
16
17 use lightning::chain;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::KeysInterface;
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::scoring::WriteableScore;
26 use lightning::util::events::{Event, EventHandler, EventsProvider};
27 use lightning::util::logger::Logger;
28 use lightning::util::persist::Persister;
29 use lightning_rapid_gossip_sync::RapidGossipSync;
30 use std::sync::Arc;
31 use std::sync::atomic::{AtomicBool, Ordering};
32 use std::thread;
33 use std::thread::JoinHandle;
34 use std::time::{Duration, Instant};
35 use std::ops::Deref;
36
37 #[cfg(feature = "futures")]
38 use futures_util::{select_biased, future::FutureExt};
39
40 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
41 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
42 /// responsibilities are:
43 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
44 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
45 ///   writing it to disk/backups by invoking the callback given to it at startup.
46 ///   [`ChannelManager`] persistence should be done in the background.
47 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
48 ///   at the appropriate intervals.
49 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
50 ///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
51 ///
52 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
53 /// upon as doing so may result in high latency.
54 ///
55 /// # Note
56 ///
57 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
58 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
59 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
60 /// unilateral chain closure fees are at risk.
61 ///
62 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
63 /// [`Event`]: lightning::util::events::Event
64 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
65 pub struct BackgroundProcessor {
66         stop_thread: Arc<AtomicBool>,
67         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
68 }
69
70 #[cfg(not(test))]
71 const FRESHNESS_TIMER: u64 = 60;
72 #[cfg(test)]
73 const FRESHNESS_TIMER: u64 = 1;
74
75 #[cfg(all(not(test), not(debug_assertions)))]
76 const PING_TIMER: u64 = 10;
77 /// Signature operations take a lot longer without compiler optimisations.
78 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
79 /// timeout is reached.
80 #[cfg(all(not(test), debug_assertions))]
81 const PING_TIMER: u64 = 30;
82 #[cfg(test)]
83 const PING_TIMER: u64 = 1;
84
85 /// Prune the network graph of stale entries hourly.
86 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
87
88 #[cfg(not(test))]
89 const SCORER_PERSIST_TIMER: u64 = 30;
90 #[cfg(test)]
91 const SCORER_PERSIST_TIMER: u64 = 1;
92
93 #[cfg(not(test))]
94 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
95 #[cfg(test)]
96 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
97
98 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
99 pub enum GossipSync<
100         P: Deref<Target = P2PGossipSync<G, A, L>>,
101         R: Deref<Target = RapidGossipSync<G, L>>,
102         G: Deref<Target = NetworkGraph<L>>,
103         A: Deref,
104         L: Deref,
105 >
106 where A::Target: chain::Access, L::Target: Logger {
107         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
108         P2P(P),
109         /// Rapid gossip sync from a trusted server.
110         Rapid(R),
111         /// No gossip sync.
112         None,
113 }
114
115 impl<
116         P: Deref<Target = P2PGossipSync<G, A, L>>,
117         R: Deref<Target = RapidGossipSync<G, L>>,
118         G: Deref<Target = NetworkGraph<L>>,
119         A: Deref,
120         L: Deref,
121 > GossipSync<P, R, G, A, L>
122 where A::Target: chain::Access, L::Target: Logger {
123         fn network_graph(&self) -> Option<&G> {
124                 match self {
125                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
126                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
127                         GossipSync::None => None,
128                 }
129         }
130
131         fn prunable_network_graph(&self) -> Option<&G> {
132                 match self {
133                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
134                         GossipSync::Rapid(gossip_sync) => {
135                                 if gossip_sync.is_initial_sync_complete() {
136                                         Some(gossip_sync.network_graph())
137                                 } else {
138                                         None
139                                 }
140                         },
141                         GossipSync::None => None,
142                 }
143         }
144 }
145
146 /// (C-not exported) as the bindings concretize everything and have constructors for us
147 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
148         GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
149 where
150         A::Target: chain::Access,
151         L::Target: Logger,
152 {
153         /// Initializes a new [`GossipSync::P2P`] variant.
154         pub fn p2p(gossip_sync: P) -> Self {
155                 GossipSync::P2P(gossip_sync)
156         }
157 }
158
159 /// (C-not exported) as the bindings concretize everything and have constructors for us
160 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
161         GossipSync<
162                 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
163                 R,
164                 G,
165                 &'a (dyn chain::Access + Send + Sync),
166                 L,
167         >
168 where
169         L::Target: Logger,
170 {
171         /// Initializes a new [`GossipSync::Rapid`] variant.
172         pub fn rapid(gossip_sync: R) -> Self {
173                 GossipSync::Rapid(gossip_sync)
174         }
175 }
176
177 /// (C-not exported) as the bindings concretize everything and have constructors for us
178 impl<'a, L: Deref>
179         GossipSync<
180                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
181                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
182                 &'a NetworkGraph<L>,
183                 &'a (dyn chain::Access + Send + Sync),
184                 L,
185         >
186 where
187         L::Target: Logger,
188 {
189         /// Initializes a new [`GossipSync::None`] variant.
190         pub fn none() -> Self {
191                 GossipSync::None
192         }
193 }
194
195 fn handle_network_graph_update<L: Deref>(
196         network_graph: &NetworkGraph<L>, event: &Event
197 ) where L::Target: Logger {
198         if let Event::PaymentPathFailed { ref network_update, .. } = event {
199                 if let Some(network_update) = network_update {
200                         network_graph.handle_network_update(&network_update);
201                 }
202         }
203 }
204
205 macro_rules! define_run_body {
206         ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
207          $channel_manager: ident, $process_channel_manager_events: expr,
208          $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
209          $loop_exit_check: expr, $await: expr)
210         => { {
211                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
212                 $channel_manager.timer_tick_occurred();
213
214                 let mut last_freshness_call = Instant::now();
215                 let mut last_ping_call = Instant::now();
216                 let mut last_prune_call = Instant::now();
217                 let mut last_scorer_persist_call = Instant::now();
218                 let mut have_pruned = false;
219
220                 loop {
221                         $process_channel_manager_events;
222                         $process_chain_monitor_events;
223
224                         // Note that the PeerManager::process_events may block on ChannelManager's locks,
225                         // hence it comes last here. When the ChannelManager finishes whatever it's doing,
226                         // we want to ensure we get into `persist_manager` as quickly as we can, especially
227                         // without running the normal event processing above and handing events to users.
228                         //
229                         // Specifically, on an *extremely* slow machine, we may see ChannelManager start
230                         // processing a message effectively at any point during this loop. In order to
231                         // minimize the time between such processing completing and persisting the updated
232                         // ChannelManager, we want to minimize methods blocking on a ChannelManager
233                         // generally, and as a fallback place such blocking only immediately before
234                         // persistence.
235                         $peer_manager.process_events();
236
237                         // We wait up to 100ms, but track how long it takes to detect being put to sleep,
238                         // see `await_start`'s use below.
239                         let await_start = Instant::now();
240                         let updates_available = $await;
241                         let await_time = await_start.elapsed();
242
243                         if updates_available {
244                                 log_trace!($logger, "Persisting ChannelManager...");
245                                 $persister.persist_manager(&*$channel_manager)?;
246                                 log_trace!($logger, "Done persisting ChannelManager.");
247                         }
248                         // Exit the loop if the background processor was requested to stop.
249                         if $loop_exit_check {
250                                 log_trace!($logger, "Terminating background processor.");
251                                 break;
252                         }
253                         if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
254                                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
255                                 $channel_manager.timer_tick_occurred();
256                                 last_freshness_call = Instant::now();
257                         }
258                         if await_time > Duration::from_secs(1) {
259                                 // On various platforms, we may be starved of CPU cycles for several reasons.
260                                 // E.g. on iOS, if we've been in the background, we will be entirely paused.
261                                 // Similarly, if we're on a desktop platform and the device has been asleep, we
262                                 // may not get any cycles.
263                                 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
264                                 // full second, at which point we assume sockets may have been killed (they
265                                 // appear to be at least on some platforms, even if it has only been a second).
266                                 // Note that we have to take care to not get here just because user event
267                                 // processing was slow at the top of the loop. For example, the sample client
268                                 // may call Bitcoin Core RPCs during event handling, which very often takes
269                                 // more than a handful of seconds to complete, and shouldn't disconnect all our
270                                 // peers.
271                                 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
272                                 $peer_manager.disconnect_all_peers();
273                                 last_ping_call = Instant::now();
274                         } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
275                                 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
276                                 $peer_manager.timer_tick_occurred();
277                                 last_ping_call = Instant::now();
278                         }
279
280                         // Note that we want to run a graph prune once not long after startup before
281                         // falling back to our usual hourly prunes. This avoids short-lived clients never
282                         // pruning their network graph. We run once 60 seconds after startup before
283                         // continuing our normal cadence.
284                         if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
285                                 // The network graph must not be pruned while rapid sync completion is pending
286                                 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
287                                         log_trace!($logger, "Pruning and persisting network graph.");
288                                         network_graph.remove_stale_channels_and_tracking();
289
290                                         if let Err(e) = $persister.persist_graph(network_graph) {
291                                                 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
292                                         }
293
294                                         last_prune_call = Instant::now();
295                                         have_pruned = true;
296                                 }
297                         }
298
299                         if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
300                                 if let Some(ref scorer) = $scorer {
301                                         log_trace!($logger, "Persisting scorer");
302                                         if let Err(e) = $persister.persist_scorer(&scorer) {
303                                                 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
304                                         }
305                                 }
306                                 last_scorer_persist_call = Instant::now();
307                         }
308                 }
309
310                 // After we exit, ensure we persist the ChannelManager one final time - this avoids
311                 // some races where users quit while channel updates were in-flight, with
312                 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
313                 $persister.persist_manager(&*$channel_manager)?;
314
315                 // Persist Scorer on exit
316                 if let Some(ref scorer) = $scorer {
317                         $persister.persist_scorer(&scorer)?;
318                 }
319
320                 // Persist NetworkGraph on exit
321                 if let Some(network_graph) = $gossip_sync.network_graph() {
322                         $persister.persist_graph(network_graph)?;
323                 }
324
325                 Ok(())
326         } }
327 }
328
329 /// Processes background events in a future.
330 ///
331 /// `sleeper` should return a future which completes in the given amount of time and returns a
332 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
333 /// future which outputs true, the loop will exit and this function's future will complete.
334 ///
335 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
336 #[cfg(feature = "futures")]
337 pub async fn process_events_async<
338         'a,
339         CA: 'static + Deref + Send + Sync,
340         CF: 'static + Deref + Send + Sync,
341         CW: 'static + Deref + Send + Sync,
342         T: 'static + Deref + Send + Sync,
343         K: 'static + Deref + Send + Sync,
344         F: 'static + Deref + Send + Sync,
345         G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
346         L: 'static + Deref + Send + Sync,
347         P: 'static + Deref + Send + Sync,
348         Descriptor: 'static + SocketDescriptor + Send + Sync,
349         CMH: 'static + Deref + Send + Sync,
350         RMH: 'static + Deref + Send + Sync,
351         OMH: 'static + Deref + Send + Sync,
352         EventHandlerFuture: core::future::Future<Output = ()>,
353         EventHandler: Fn(Event) -> EventHandlerFuture,
354         PS: 'static + Deref + Send,
355         M: 'static + Deref<Target = ChainMonitor<<K::Target as KeysInterface>::Signer, CF, T, F, L, P>> + Send + Sync,
356         CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
357         PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
358         RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
359         UMH: 'static + Deref + Send + Sync,
360         PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
361         S: 'static + Deref<Target = SC> + Send + Sync,
362         SC: WriteableScore<'a>,
363         SleepFuture: core::future::Future<Output = bool>,
364         Sleeper: Fn(Duration) -> SleepFuture
365 >(
366         persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
367         gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
368         sleeper: Sleeper,
369 ) -> Result<(), std::io::Error>
370 where
371         CA::Target: 'static + chain::Access,
372         CF::Target: 'static + chain::Filter,
373         CW::Target: 'static + chain::Watch<<K::Target as KeysInterface>::Signer>,
374         T::Target: 'static + BroadcasterInterface,
375         K::Target: 'static + KeysInterface,
376         F::Target: 'static + FeeEstimator,
377         L::Target: 'static + Logger,
378         P::Target: 'static + Persist<<K::Target as KeysInterface>::Signer>,
379         CMH::Target: 'static + ChannelMessageHandler,
380         OMH::Target: 'static + OnionMessageHandler,
381         RMH::Target: 'static + RoutingMessageHandler,
382         UMH::Target: 'static + CustomMessageHandler,
383         PS::Target: 'static + Persister<'a, CW, T, K, F, L, SC>,
384 {
385         let mut should_break = true;
386         let async_event_handler = |event| {
387                 let network_graph = gossip_sync.network_graph();
388                 let event_handler = &event_handler;
389                 async move {
390                         if let Some(network_graph) = network_graph {
391                                 handle_network_graph_update(network_graph, &event)
392                         }
393                         event_handler(event).await;
394                 }
395         };
396         define_run_body!(persister,
397                 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
398                 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
399                 gossip_sync, peer_manager, logger, scorer, should_break, {
400                         select_biased! {
401                                 _ = channel_manager.get_persistable_update_future().fuse() => true,
402                                 exit = sleeper(Duration::from_millis(100)).fuse() => {
403                                         should_break = exit;
404                                         false
405                                 }
406                         }
407                 })
408 }
409
410 impl BackgroundProcessor {
411         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
412         /// documentation].
413         ///
414         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
415         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
416         /// either [`join`] or [`stop`].
417         ///
418         /// # Data Persistence
419         ///
420         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
421         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
422         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
423         /// provided implementation.
424         ///
425         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
426         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
427         /// See the `lightning-persister` crate for LDK's provided implementation.
428         ///
429         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
430         /// error or call [`join`] and handle any error that may arise. For the latter case,
431         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
432         ///
433         /// # Event Handling
434         ///
435         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
436         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
437         /// functionality implemented by other handlers.
438         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
439         ///
440         /// # Rapid Gossip Sync
441         ///
442         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
443         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
444         /// until the [`RapidGossipSync`] instance completes its first sync.
445         ///
446         /// [top-level documentation]: BackgroundProcessor
447         /// [`join`]: Self::join
448         /// [`stop`]: Self::stop
449         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
450         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
451         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
452         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
453         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
454         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
455         pub fn start<
456                 'a,
457                 CA: 'static + Deref + Send + Sync,
458                 CF: 'static + Deref + Send + Sync,
459                 CW: 'static + Deref + Send + Sync,
460                 T: 'static + Deref + Send + Sync,
461                 K: 'static + Deref + Send + Sync,
462                 F: 'static + Deref + Send + Sync,
463                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
464                 L: 'static + Deref + Send + Sync,
465                 P: 'static + Deref + Send + Sync,
466                 Descriptor: 'static + SocketDescriptor + Send + Sync,
467                 CMH: 'static + Deref + Send + Sync,
468                 OMH: 'static + Deref + Send + Sync,
469                 RMH: 'static + Deref + Send + Sync,
470                 EH: 'static + EventHandler + Send,
471                 PS: 'static + Deref + Send,
472                 M: 'static + Deref<Target = ChainMonitor<<K::Target as KeysInterface>::Signer, CF, T, F, L, P>> + Send + Sync,
473                 CM: 'static + Deref<Target = ChannelManager<CW, T, K, F, L>> + Send + Sync,
474                 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
475                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
476                 UMH: 'static + Deref + Send + Sync,
477                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
478                 S: 'static + Deref<Target = SC> + Send + Sync,
479                 SC: WriteableScore<'a>,
480         >(
481                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
482                 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
483         ) -> Self
484         where
485                 CA::Target: 'static + chain::Access,
486                 CF::Target: 'static + chain::Filter,
487                 CW::Target: 'static + chain::Watch<<K::Target as KeysInterface>::Signer>,
488                 T::Target: 'static + BroadcasterInterface,
489                 K::Target: 'static + KeysInterface,
490                 F::Target: 'static + FeeEstimator,
491                 L::Target: 'static + Logger,
492                 P::Target: 'static + Persist<<K::Target as KeysInterface>::Signer>,
493                 CMH::Target: 'static + ChannelMessageHandler,
494                 OMH::Target: 'static + OnionMessageHandler,
495                 RMH::Target: 'static + RoutingMessageHandler,
496                 UMH::Target: 'static + CustomMessageHandler,
497                 PS::Target: 'static + Persister<'a, CW, T, K, F, L, SC>,
498         {
499                 let stop_thread = Arc::new(AtomicBool::new(false));
500                 let stop_thread_clone = stop_thread.clone();
501                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
502                         let event_handler = |event| {
503                                 let network_graph = gossip_sync.network_graph();
504                                 if let Some(network_graph) = network_graph {
505                                         handle_network_graph_update(network_graph, &event)
506                                 }
507                                 event_handler.handle_event(event);
508                         };
509                         define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
510                                 channel_manager, channel_manager.process_pending_events(&event_handler),
511                                 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
512                                 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)))
513                 });
514                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
515         }
516
517         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
518         /// [`ChannelManager`].
519         ///
520         /// # Panics
521         ///
522         /// This function panics if the background thread has panicked such as while persisting or
523         /// handling events.
524         ///
525         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
526         pub fn join(mut self) -> Result<(), std::io::Error> {
527                 assert!(self.thread_handle.is_some());
528                 self.join_thread()
529         }
530
531         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
532         /// [`ChannelManager`].
533         ///
534         /// # Panics
535         ///
536         /// This function panics if the background thread has panicked such as while persisting or
537         /// handling events.
538         ///
539         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
540         pub fn stop(mut self) -> Result<(), std::io::Error> {
541                 assert!(self.thread_handle.is_some());
542                 self.stop_and_join_thread()
543         }
544
545         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
546                 self.stop_thread.store(true, Ordering::Release);
547                 self.join_thread()
548         }
549
550         fn join_thread(&mut self) -> Result<(), std::io::Error> {
551                 match self.thread_handle.take() {
552                         Some(handle) => handle.join().unwrap(),
553                         None => Ok(()),
554                 }
555         }
556 }
557
558 impl Drop for BackgroundProcessor {
559         fn drop(&mut self) {
560                 self.stop_and_join_thread().unwrap();
561         }
562 }
563
564 #[cfg(test)]
565 mod tests {
566         use bitcoin::blockdata::block::BlockHeader;
567         use bitcoin::blockdata::constants::genesis_block;
568         use bitcoin::blockdata::locktime::PackedLockTime;
569         use bitcoin::blockdata::transaction::{Transaction, TxOut};
570         use bitcoin::network::constants::Network;
571         use lightning::chain::{BestBlock, Confirm, chainmonitor};
572         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
573         use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
574         use lightning::chain::transaction::OutPoint;
575         use lightning::get_event_msg;
576         use lightning::ln::channelmanager::{self, BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
577         use lightning::ln::features::ChannelFeatures;
578         use lightning::ln::msgs::{ChannelMessageHandler, Init};
579         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
580         use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
581         use lightning::routing::router::DefaultRouter;
582         use lightning::util::config::UserConfig;
583         use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
584         use lightning::util::ser::Writeable;
585         use lightning::util::test_utils;
586         use lightning::util::persist::KVStorePersister;
587         use lightning_invoice::payment::{InvoicePayer, Retry};
588         use lightning_persister::FilesystemPersister;
589         use std::fs;
590         use std::path::PathBuf;
591         use std::sync::{Arc, Mutex};
592         use std::sync::mpsc::SyncSender;
593         use std::time::Duration;
594         use bitcoin::hashes::Hash;
595         use bitcoin::TxMerkleNode;
596         use lightning::routing::scoring::{FixedPenaltyScorer};
597         use lightning_rapid_gossip_sync::RapidGossipSync;
598         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
599
600         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
601
602         #[derive(Clone, Hash, PartialEq, Eq)]
603         struct TestDescriptor{}
604         impl SocketDescriptor for TestDescriptor {
605                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
606                         0
607                 }
608
609                 fn disconnect_socket(&mut self) {}
610         }
611
612         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
613
614         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
615         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
616
617         struct Node {
618                 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
619                 p2p_gossip_sync: PGS,
620                 rapid_gossip_sync: RGS,
621                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
622                 chain_monitor: Arc<ChainMonitor>,
623                 persister: Arc<FilesystemPersister>,
624                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
625                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
626                 logger: Arc<test_utils::TestLogger>,
627                 best_block: BestBlock,
628                 scorer: Arc<Mutex<FixedPenaltyScorer>>,
629         }
630
631         impl Node {
632                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
633                         GossipSync::P2P(self.p2p_gossip_sync.clone())
634                 }
635
636                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
637                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
638                 }
639
640                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
641                         GossipSync::None
642                 }
643         }
644
645         impl Drop for Node {
646                 fn drop(&mut self) {
647                         let data_dir = self.persister.get_data_dir();
648                         match fs::remove_dir_all(data_dir.clone()) {
649                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
650                                 _ => {}
651                         }
652                 }
653         }
654
655         struct Persister {
656                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
657                 graph_persistence_notifier: Option<SyncSender<()>>,
658                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
659                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
660                 filesystem_persister: FilesystemPersister,
661         }
662
663         impl Persister {
664                 fn new(data_dir: String) -> Self {
665                         let filesystem_persister = FilesystemPersister::new(data_dir.clone());
666                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
667                 }
668
669                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
670                         Self { graph_error: Some((error, message)), ..self }
671                 }
672
673                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
674                         Self { graph_persistence_notifier: Some(sender), ..self }
675                 }
676
677                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
678                         Self { manager_error: Some((error, message)), ..self }
679                 }
680
681                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
682                         Self { scorer_error: Some((error, message)), ..self }
683                 }
684         }
685
686         impl KVStorePersister for Persister {
687                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
688                         if key == "manager" {
689                                 if let Some((error, message)) = self.manager_error {
690                                         return Err(std::io::Error::new(error, message))
691                                 }
692                         }
693
694                         if key == "network_graph" {
695                                 if let Some(sender) = &self.graph_persistence_notifier {
696                                         sender.send(()).unwrap();
697                                 };
698
699                                 if let Some((error, message)) = self.graph_error {
700                                         return Err(std::io::Error::new(error, message))
701                                 }
702                         }
703
704                         if key == "scorer" {
705                                 if let Some((error, message)) = self.scorer_error {
706                                         return Err(std::io::Error::new(error, message))
707                                 }
708                         }
709
710                         self.filesystem_persister.persist(key, object)
711                 }
712         }
713
714         fn get_full_filepath(filepath: String, filename: String) -> String {
715                 let mut path = PathBuf::from(filepath);
716                 path.push(filename);
717                 path.to_str().unwrap().to_string()
718         }
719
720         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
721                 let mut nodes = Vec::new();
722                 for i in 0..num_nodes {
723                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
724                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
725                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
726                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
727                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
728                         let seed = [i as u8; 32];
729                         let network = Network::Testnet;
730                         let genesis_block = genesis_block(network);
731                         let now = Duration::from_secs(genesis_block.header.time as u64);
732                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
733                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
734                         let best_block = BestBlock::from_genesis(network);
735                         let params = ChainParameters { network, best_block };
736                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
737                         let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
738                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
739                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
740                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
741                         let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{}));
742                         let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
743                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
744                         nodes.push(node);
745                 }
746
747                 for i in 0..num_nodes {
748                         for j in (i+1)..num_nodes {
749                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
750                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: channelmanager::provided_init_features(), remote_network_address: None }).unwrap();
751                         }
752                 }
753
754                 nodes
755         }
756
757         macro_rules! open_channel {
758                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
759                         begin_open_channel!($node_a, $node_b, $channel_value);
760                         let events = $node_a.node.get_and_clear_pending_events();
761                         assert_eq!(events.len(), 1);
762                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
763                         end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
764                         tx
765                 }}
766         }
767
768         macro_rules! begin_open_channel {
769                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
770                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
771                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
772                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), channelmanager::provided_init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
773                 }}
774         }
775
776         macro_rules! handle_funding_generation_ready {
777                 ($event: expr, $channel_value: expr) => {{
778                         match $event {
779                                 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
780                                         assert_eq!(channel_value_satoshis, $channel_value);
781                                         assert_eq!(user_channel_id, 42);
782
783                                         let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
784                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
785                                         }]};
786                                         (temporary_channel_id, tx)
787                                 },
788                                 _ => panic!("Unexpected event"),
789                         }
790                 }}
791         }
792
793         macro_rules! end_open_channel {
794                 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
795                         $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
796                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
797                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
798                 }}
799         }
800
801         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
802                 for i in 1..=depth {
803                         let prev_blockhash = node.best_block.block_hash();
804                         let height = node.best_block.height() + 1;
805                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
806                         let txdata = vec![(0, tx)];
807                         node.best_block = BestBlock::new(header.block_hash(), height);
808                         match i {
809                                 1 => {
810                                         node.node.transactions_confirmed(&header, &txdata, height);
811                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
812                                 },
813                                 x if x == depth => {
814                                         node.node.best_block_updated(&header, height);
815                                         node.chain_monitor.best_block_updated(&header, height);
816                                 },
817                                 _ => {},
818                         }
819                 }
820         }
821         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
822                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
823         }
824
825         #[test]
826         fn test_background_processor() {
827                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
828                 // updates. Also test that when new updates are available, the manager signals that it needs
829                 // re-persistence and is successfully re-persisted.
830                 let nodes = create_nodes(2, "test_background_processor".to_string());
831
832                 // Go through the channel creation process so that each node has something to persist. Since
833                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
834                 // avoid a race with processing events.
835                 let tx = open_channel!(nodes[0], nodes[1], 100000);
836
837                 // Initiate the background processors to watch each node.
838                 let data_dir = nodes[0].persister.get_data_dir();
839                 let persister = Arc::new(Persister::new(data_dir));
840                 let event_handler = |_: _| {};
841                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
842
843                 macro_rules! check_persisted_data {
844                         ($node: expr, $filepath: expr) => {
845                                 let mut expected_bytes = Vec::new();
846                                 loop {
847                                         expected_bytes.clear();
848                                         match $node.write(&mut expected_bytes) {
849                                                 Ok(()) => {
850                                                         match std::fs::read($filepath) {
851                                                                 Ok(bytes) => {
852                                                                         if bytes == expected_bytes {
853                                                                                 break
854                                                                         } else {
855                                                                                 continue
856                                                                         }
857                                                                 },
858                                                                 Err(_) => continue
859                                                         }
860                                                 },
861                                                 Err(e) => panic!("Unexpected error: {}", e)
862                                         }
863                                 }
864                         }
865                 }
866
867                 // Check that the initial channel manager data is persisted as expected.
868                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
869                 check_persisted_data!(nodes[0].node, filepath.clone());
870
871                 loop {
872                         if !nodes[0].node.get_persistence_condvar_value() { break }
873                 }
874
875                 // Force-close the channel.
876                 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
877
878                 // Check that the force-close updates are persisted.
879                 check_persisted_data!(nodes[0].node, filepath.clone());
880                 loop {
881                         if !nodes[0].node.get_persistence_condvar_value() { break }
882                 }
883
884                 // Check network graph is persisted
885                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
886                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
887
888                 // Check scorer is persisted
889                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
890                 check_persisted_data!(nodes[0].scorer, filepath.clone());
891
892                 assert!(bg_processor.stop().is_ok());
893         }
894
895         #[test]
896         fn test_timer_tick_called() {
897                 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
898                 // `FRESHNESS_TIMER`.
899                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
900                 let data_dir = nodes[0].persister.get_data_dir();
901                 let persister = Arc::new(Persister::new(data_dir));
902                 let event_handler = |_: _| {};
903                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
904                 loop {
905                         let log_entries = nodes[0].logger.lines.lock().unwrap();
906                         let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
907                         let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
908                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
909                                         log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
910                                 break
911                         }
912                 }
913
914                 assert!(bg_processor.stop().is_ok());
915         }
916
917         #[test]
918         fn test_channel_manager_persist_error() {
919                 // Test that if we encounter an error during manager persistence, the thread panics.
920                 let nodes = create_nodes(2, "test_persist_error".to_string());
921                 open_channel!(nodes[0], nodes[1], 100000);
922
923                 let data_dir = nodes[0].persister.get_data_dir();
924                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
925                 let event_handler = |_: _| {};
926                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
927                 match bg_processor.join() {
928                         Ok(_) => panic!("Expected error persisting manager"),
929                         Err(e) => {
930                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
931                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
932                         },
933                 }
934         }
935
936         #[test]
937         fn test_network_graph_persist_error() {
938                 // Test that if we encounter an error during network graph persistence, an error gets returned.
939                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
940                 let data_dir = nodes[0].persister.get_data_dir();
941                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
942                 let event_handler = |_: _| {};
943                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
944
945                 match bg_processor.stop() {
946                         Ok(_) => panic!("Expected error persisting network graph"),
947                         Err(e) => {
948                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
949                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
950                         },
951                 }
952         }
953
954         #[test]
955         fn test_scorer_persist_error() {
956                 // Test that if we encounter an error during scorer persistence, an error gets returned.
957                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
958                 let data_dir = nodes[0].persister.get_data_dir();
959                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
960                 let event_handler = |_: _| {};
961                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
962
963                 match bg_processor.stop() {
964                         Ok(_) => panic!("Expected error persisting scorer"),
965                         Err(e) => {
966                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
967                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
968                         },
969                 }
970         }
971
972         #[test]
973         fn test_background_event_handling() {
974                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
975                 let channel_value = 100000;
976                 let data_dir = nodes[0].persister.get_data_dir();
977                 let persister = Arc::new(Persister::new(data_dir.clone()));
978
979                 // Set up a background event handler for FundingGenerationReady events.
980                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
981                 let event_handler = move |event: Event| match event {
982                         Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
983                         Event::ChannelReady { .. } => {},
984                         _ => panic!("Unexpected event: {:?}", event),
985                 };
986
987                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
988
989                 // Open a channel and check that the FundingGenerationReady event was handled.
990                 begin_open_channel!(nodes[0], nodes[1], channel_value);
991                 let (temporary_channel_id, funding_tx) = receiver
992                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
993                         .expect("FundingGenerationReady not handled within deadline");
994                 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
995
996                 // Confirm the funding transaction.
997                 confirm_transaction(&mut nodes[0], &funding_tx);
998                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
999                 confirm_transaction(&mut nodes[1], &funding_tx);
1000                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1001                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1002                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1003                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1004                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1005
1006                 assert!(bg_processor.stop().is_ok());
1007
1008                 // Set up a background event handler for SpendableOutputs events.
1009                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1010                 let event_handler = move |event: Event| match event {
1011                         Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
1012                         Event::ChannelReady { .. } => {},
1013                         Event::ChannelClosed { .. } => {},
1014                         _ => panic!("Unexpected event: {:?}", event),
1015                 };
1016                 let persister = Arc::new(Persister::new(data_dir));
1017                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1018
1019                 // Force close the channel and check that the SpendableOutputs event was handled.
1020                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1021                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1022                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1023
1024                 let event = receiver
1025                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1026                         .expect("Events not handled within deadline");
1027                 match event {
1028                         Event::SpendableOutputs { .. } => {},
1029                         _ => panic!("Unexpected event: {:?}", event),
1030                 }
1031
1032                 assert!(bg_processor.stop().is_ok());
1033         }
1034
1035         #[test]
1036         fn test_scorer_persistence() {
1037                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1038                 let data_dir = nodes[0].persister.get_data_dir();
1039                 let persister = Arc::new(Persister::new(data_dir));
1040                 let event_handler = |_: _| {};
1041                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1042
1043                 loop {
1044                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1045                         let expected_log = "Persisting scorer".to_string();
1046                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1047                                 break
1048                         }
1049                 }
1050
1051                 assert!(bg_processor.stop().is_ok());
1052         }
1053
1054         #[test]
1055         fn test_not_pruning_network_graph_until_graph_sync_completion() {
1056                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1057                 let data_dir = nodes[0].persister.get_data_dir();
1058                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1059                 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1060                 let network_graph = nodes[0].network_graph.clone();
1061                 let features = ChannelFeatures::empty();
1062                 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1063                         .expect("Failed to update channel from partial announcement");
1064                 let original_graph_description = network_graph.to_string();
1065                 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1066                 assert_eq!(network_graph.read_only().channels().len(), 1);
1067
1068                 let event_handler = |_: _| {};
1069                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1070
1071                 loop {
1072                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1073                         let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1074                         if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
1075                                 .unwrap_or(&0) > 1
1076                         {
1077                                 // Wait until the loop has gone around at least twice.
1078                                 break
1079                         }
1080                 }
1081
1082                 let initialization_input = vec![
1083                         76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1084                         79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1085                         0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1086                         187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1087                         157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1088                         88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1089                         204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1090                         181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1091                         110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1092                         76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1093                         226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1094                         0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1095                         0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1096                 ];
1097                 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1098
1099                 // this should have added two channels
1100                 assert_eq!(network_graph.read_only().channels().len(), 3);
1101
1102                 let _ = receiver
1103                         .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1104                         .expect("Network graph not pruned within deadline");
1105
1106                 background_processor.stop().unwrap();
1107
1108                 // all channels should now be pruned
1109                 assert_eq!(network_graph.read_only().channels().len(), 0);
1110         }
1111
1112         #[test]
1113         fn test_invoice_payer() {
1114                 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1115                 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1116                 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1117
1118                 // Initiate the background processors to watch each node.
1119                 let data_dir = nodes[0].persister.get_data_dir();
1120                 let persister = Arc::new(Persister::new(data_dir));
1121                 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
1122                 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: _| {}, Retry::Attempts(2)));
1123                 let event_handler = Arc::clone(&invoice_payer);
1124                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1125                 assert!(bg_processor.stop().is_ok());
1126         }
1127 }