]> git.bitcoin.ninja Git - rust-lightning/blob - lightning-background-processor/src/lib.rs
Use flexible timer types in background processor's regular jobs
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
8
9 #![deny(missing_docs)]
10 #![deny(unsafe_code)]
11
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
13
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
16
17 use lightning::chain;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::{EntropySource, NodeSigner, SignerProvider};
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, OnionMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::router::Router;
26 use lightning::routing::scoring::WriteableScore;
27 use lightning::util::events::{Event, EventHandler, EventsProvider};
28 use lightning::util::logger::Logger;
29 use lightning::util::persist::Persister;
30 use lightning_rapid_gossip_sync::RapidGossipSync;
31 use std::sync::Arc;
32 use std::sync::atomic::{AtomicBool, Ordering};
33 use std::thread;
34 use std::thread::JoinHandle;
35 use std::time::{Duration, Instant};
36 use std::ops::Deref;
37
38 #[cfg(feature = "futures")]
39 use futures_util::{select_biased, future::FutureExt};
40
41 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
42 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
43 /// responsibilities are:
44 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
45 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
46 ///   writing it to disk/backups by invoking the callback given to it at startup.
47 ///   [`ChannelManager`] persistence should be done in the background.
48 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
49 ///   at the appropriate intervals.
50 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
51 ///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
52 ///
53 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
54 /// upon as doing so may result in high latency.
55 ///
56 /// # Note
57 ///
58 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
59 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
60 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
61 /// unilateral chain closure fees are at risk.
62 ///
63 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
64 /// [`Event`]: lightning::util::events::Event
65 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
66 pub struct BackgroundProcessor {
67         stop_thread: Arc<AtomicBool>,
68         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
69 }
70
71 #[cfg(not(test))]
72 const FRESHNESS_TIMER: u64 = 60;
73 #[cfg(test)]
74 const FRESHNESS_TIMER: u64 = 1;
75
76 #[cfg(all(not(test), not(debug_assertions)))]
77 const PING_TIMER: u64 = 10;
78 /// Signature operations take a lot longer without compiler optimisations.
79 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
80 /// timeout is reached.
81 #[cfg(all(not(test), debug_assertions))]
82 const PING_TIMER: u64 = 30;
83 #[cfg(test)]
84 const PING_TIMER: u64 = 1;
85
86 /// Prune the network graph of stale entries hourly.
87 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
88
89 #[cfg(not(test))]
90 const SCORER_PERSIST_TIMER: u64 = 30;
91 #[cfg(test)]
92 const SCORER_PERSIST_TIMER: u64 = 1;
93
94 #[cfg(not(test))]
95 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
96 #[cfg(test)]
97 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
98
99 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
100 pub enum GossipSync<
101         P: Deref<Target = P2PGossipSync<G, A, L>>,
102         R: Deref<Target = RapidGossipSync<G, L>>,
103         G: Deref<Target = NetworkGraph<L>>,
104         A: Deref,
105         L: Deref,
106 >
107 where A::Target: chain::Access, L::Target: Logger {
108         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
109         P2P(P),
110         /// Rapid gossip sync from a trusted server.
111         Rapid(R),
112         /// No gossip sync.
113         None,
114 }
115
116 impl<
117         P: Deref<Target = P2PGossipSync<G, A, L>>,
118         R: Deref<Target = RapidGossipSync<G, L>>,
119         G: Deref<Target = NetworkGraph<L>>,
120         A: Deref,
121         L: Deref,
122 > GossipSync<P, R, G, A, L>
123 where A::Target: chain::Access, L::Target: Logger {
124         fn network_graph(&self) -> Option<&G> {
125                 match self {
126                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
127                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
128                         GossipSync::None => None,
129                 }
130         }
131
132         fn prunable_network_graph(&self) -> Option<&G> {
133                 match self {
134                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
135                         GossipSync::Rapid(gossip_sync) => {
136                                 if gossip_sync.is_initial_sync_complete() {
137                                         Some(gossip_sync.network_graph())
138                                 } else {
139                                         None
140                                 }
141                         },
142                         GossipSync::None => None,
143                 }
144         }
145 }
146
147 /// (C-not exported) as the bindings concretize everything and have constructors for us
148 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
149         GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
150 where
151         A::Target: chain::Access,
152         L::Target: Logger,
153 {
154         /// Initializes a new [`GossipSync::P2P`] variant.
155         pub fn p2p(gossip_sync: P) -> Self {
156                 GossipSync::P2P(gossip_sync)
157         }
158 }
159
160 /// (C-not exported) as the bindings concretize everything and have constructors for us
161 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
162         GossipSync<
163                 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
164                 R,
165                 G,
166                 &'a (dyn chain::Access + Send + Sync),
167                 L,
168         >
169 where
170         L::Target: Logger,
171 {
172         /// Initializes a new [`GossipSync::Rapid`] variant.
173         pub fn rapid(gossip_sync: R) -> Self {
174                 GossipSync::Rapid(gossip_sync)
175         }
176 }
177
178 /// (C-not exported) as the bindings concretize everything and have constructors for us
179 impl<'a, L: Deref>
180         GossipSync<
181                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
182                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
183                 &'a NetworkGraph<L>,
184                 &'a (dyn chain::Access + Send + Sync),
185                 L,
186         >
187 where
188         L::Target: Logger,
189 {
190         /// Initializes a new [`GossipSync::None`] variant.
191         pub fn none() -> Self {
192                 GossipSync::None
193         }
194 }
195
196 fn handle_network_graph_update<L: Deref>(
197         network_graph: &NetworkGraph<L>, event: &Event
198 ) where L::Target: Logger {
199         if let Event::PaymentPathFailed { ref network_update, .. } = event {
200                 if let Some(network_update) = network_update {
201                         network_graph.handle_network_update(&network_update);
202                 }
203         }
204 }
205
206 macro_rules! define_run_body {
207         ($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
208          $channel_manager: ident, $process_channel_manager_events: expr,
209          $gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
210          $loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr)
211         => { {
212                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
213                 $channel_manager.timer_tick_occurred();
214
215                 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
216                 let mut last_ping_call = $get_timer(PING_TIMER);
217                 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
218                 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
219                 let mut have_pruned = false;
220
221                 loop {
222                         $process_channel_manager_events;
223                         $process_chain_monitor_events;
224
225                         // Note that the PeerManager::process_events may block on ChannelManager's locks,
226                         // hence it comes last here. When the ChannelManager finishes whatever it's doing,
227                         // we want to ensure we get into `persist_manager` as quickly as we can, especially
228                         // without running the normal event processing above and handing events to users.
229                         //
230                         // Specifically, on an *extremely* slow machine, we may see ChannelManager start
231                         // processing a message effectively at any point during this loop. In order to
232                         // minimize the time between such processing completing and persisting the updated
233                         // ChannelManager, we want to minimize methods blocking on a ChannelManager
234                         // generally, and as a fallback place such blocking only immediately before
235                         // persistence.
236                         $peer_manager.process_events();
237
238                         // We wait up to 100ms, but track how long it takes to detect being put to sleep,
239                         // see `await_start`'s use below.
240                         let mut await_start = $get_timer(1);
241                         let updates_available = $await;
242                         let await_slow = $timer_elapsed(&mut await_start, 1);
243
244                         if updates_available {
245                                 log_trace!($logger, "Persisting ChannelManager...");
246                                 $persister.persist_manager(&*$channel_manager)?;
247                                 log_trace!($logger, "Done persisting ChannelManager.");
248                         }
249                         // Exit the loop if the background processor was requested to stop.
250                         if $loop_exit_check {
251                                 log_trace!($logger, "Terminating background processor.");
252                                 break;
253                         }
254                         if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
255                                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
256                                 $channel_manager.timer_tick_occurred();
257                                 last_freshness_call = $get_timer(FRESHNESS_TIMER);
258                         }
259                         if await_slow {
260                                 // On various platforms, we may be starved of CPU cycles for several reasons.
261                                 // E.g. on iOS, if we've been in the background, we will be entirely paused.
262                                 // Similarly, if we're on a desktop platform and the device has been asleep, we
263                                 // may not get any cycles.
264                                 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
265                                 // full second, at which point we assume sockets may have been killed (they
266                                 // appear to be at least on some platforms, even if it has only been a second).
267                                 // Note that we have to take care to not get here just because user event
268                                 // processing was slow at the top of the loop. For example, the sample client
269                                 // may call Bitcoin Core RPCs during event handling, which very often takes
270                                 // more than a handful of seconds to complete, and shouldn't disconnect all our
271                                 // peers.
272                                 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
273                                 $peer_manager.disconnect_all_peers();
274                                 last_ping_call = $get_timer(PING_TIMER);
275                         } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
276                                 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
277                                 $peer_manager.timer_tick_occurred();
278                                 last_ping_call = $get_timer(PING_TIMER);
279                         }
280
281                         // Note that we want to run a graph prune once not long after startup before
282                         // falling back to our usual hourly prunes. This avoids short-lived clients never
283                         // pruning their network graph. We run once 60 seconds after startup before
284                         // continuing our normal cadence.
285                         if $timer_elapsed(&mut last_prune_call, if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER }) {
286                                 // The network graph must not be pruned while rapid sync completion is pending
287                                 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
288                                         log_trace!($logger, "Pruning and persisting network graph.");
289                                         network_graph.remove_stale_channels_and_tracking();
290
291                                         if let Err(e) = $persister.persist_graph(network_graph) {
292                                                 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
293                                         }
294
295                                         last_prune_call = $get_timer(NETWORK_PRUNE_TIMER);
296                                         have_pruned = true;
297                                 }
298                         }
299
300                         if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
301                                 if let Some(ref scorer) = $scorer {
302                                         log_trace!($logger, "Persisting scorer");
303                                         if let Err(e) = $persister.persist_scorer(&scorer) {
304                                                 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
305                                         }
306                                 }
307                                 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
308                         }
309                 }
310
311                 // After we exit, ensure we persist the ChannelManager one final time - this avoids
312                 // some races where users quit while channel updates were in-flight, with
313                 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
314                 $persister.persist_manager(&*$channel_manager)?;
315
316                 // Persist Scorer on exit
317                 if let Some(ref scorer) = $scorer {
318                         $persister.persist_scorer(&scorer)?;
319                 }
320
321                 // Persist NetworkGraph on exit
322                 if let Some(network_graph) = $gossip_sync.network_graph() {
323                         $persister.persist_graph(network_graph)?;
324                 }
325
326                 Ok(())
327         } }
328 }
329
330 /// Processes background events in a future.
331 ///
332 /// `sleeper` should return a future which completes in the given amount of time and returns a
333 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
334 /// future which outputs true, the loop will exit and this function's future will complete.
335 ///
336 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
337 #[cfg(feature = "futures")]
338 pub async fn process_events_async<
339         'a,
340         CA: 'static + Deref + Send + Sync,
341         CF: 'static + Deref + Send + Sync,
342         CW: 'static + Deref + Send + Sync,
343         T: 'static + Deref + Send + Sync,
344         ES: 'static + Deref + Send + Sync,
345         NS: 'static + Deref + Send + Sync,
346         SP: 'static + Deref + Send + Sync,
347         F: 'static + Deref + Send + Sync,
348         R: 'static + Deref + Send + Sync,
349         G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
350         L: 'static + Deref + Send + Sync,
351         P: 'static + Deref + Send + Sync,
352         Descriptor: 'static + SocketDescriptor + Send + Sync,
353         CMH: 'static + Deref + Send + Sync,
354         RMH: 'static + Deref + Send + Sync,
355         OMH: 'static + Deref + Send + Sync,
356         EventHandlerFuture: core::future::Future<Output = ()>,
357         EventHandler: Fn(Event) -> EventHandlerFuture,
358         PS: 'static + Deref + Send,
359         M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
360         CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
361         PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
362         RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
363         UMH: 'static + Deref + Send + Sync,
364         PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
365         S: 'static + Deref<Target = SC> + Send + Sync,
366         SC: WriteableScore<'a>,
367         SleepFuture: core::future::Future<Output = bool>,
368         Sleeper: Fn(Duration) -> SleepFuture
369 >(
370         persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
371         gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
372         sleeper: Sleeper,
373 ) -> Result<(), std::io::Error>
374 where
375         CA::Target: 'static + chain::Access,
376         CF::Target: 'static + chain::Filter,
377         CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
378         T::Target: 'static + BroadcasterInterface,
379         ES::Target: 'static + EntropySource,
380         NS::Target: 'static + NodeSigner,
381         SP::Target: 'static + SignerProvider,
382         F::Target: 'static + FeeEstimator,
383         R::Target: 'static + Router,
384         L::Target: 'static + Logger,
385         P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
386         CMH::Target: 'static + ChannelMessageHandler,
387         OMH::Target: 'static + OnionMessageHandler,
388         RMH::Target: 'static + RoutingMessageHandler,
389         UMH::Target: 'static + CustomMessageHandler,
390         PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
391 {
392         let mut should_break = true;
393         let async_event_handler = |event| {
394                 let network_graph = gossip_sync.network_graph();
395                 let event_handler = &event_handler;
396                 async move {
397                         if let Some(network_graph) = network_graph {
398                                 handle_network_graph_update(network_graph, &event)
399                         }
400                         event_handler(event).await;
401                 }
402         };
403         define_run_body!(persister,
404                 chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
405                 channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
406                 gossip_sync, peer_manager, logger, scorer, should_break, {
407                         select_biased! {
408                                 _ = channel_manager.get_persistable_update_future().fuse() => true,
409                                 exit = sleeper(Duration::from_millis(100)).fuse() => {
410                                         should_break = exit;
411                                         false
412                                 }
413                         }
414                 }, |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
415 }
416
417 impl BackgroundProcessor {
418         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
419         /// documentation].
420         ///
421         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
422         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
423         /// either [`join`] or [`stop`].
424         ///
425         /// # Data Persistence
426         ///
427         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
428         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
429         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
430         /// provided implementation.
431         ///
432         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
433         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
434         /// See the `lightning-persister` crate for LDK's provided implementation.
435         ///
436         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
437         /// error or call [`join`] and handle any error that may arise. For the latter case,
438         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
439         ///
440         /// # Event Handling
441         ///
442         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
443         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
444         /// functionality implemented by other handlers.
445         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
446         ///
447         /// # Rapid Gossip Sync
448         ///
449         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
450         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
451         /// until the [`RapidGossipSync`] instance completes its first sync.
452         ///
453         /// [top-level documentation]: BackgroundProcessor
454         /// [`join`]: Self::join
455         /// [`stop`]: Self::stop
456         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
457         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
458         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
459         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
460         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
461         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
462         pub fn start<
463                 'a,
464                 CA: 'static + Deref + Send + Sync,
465                 CF: 'static + Deref + Send + Sync,
466                 CW: 'static + Deref + Send + Sync,
467                 T: 'static + Deref + Send + Sync,
468                 ES: 'static + Deref + Send + Sync,
469                 NS: 'static + Deref + Send + Sync,
470                 SP: 'static + Deref + Send + Sync,
471                 F: 'static + Deref + Send + Sync,
472                 R: 'static + Deref + Send + Sync,
473                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
474                 L: 'static + Deref + Send + Sync,
475                 P: 'static + Deref + Send + Sync,
476                 Descriptor: 'static + SocketDescriptor + Send + Sync,
477                 CMH: 'static + Deref + Send + Sync,
478                 OMH: 'static + Deref + Send + Sync,
479                 RMH: 'static + Deref + Send + Sync,
480                 EH: 'static + EventHandler + Send,
481                 PS: 'static + Deref + Send,
482                 M: 'static + Deref<Target = ChainMonitor<<SP::Target as SignerProvider>::Signer, CF, T, F, L, P>> + Send + Sync,
483                 CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
484                 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
485                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
486                 UMH: 'static + Deref + Send + Sync,
487                 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, OMH, L, UMH>> + Send + Sync,
488                 S: 'static + Deref<Target = SC> + Send + Sync,
489                 SC: WriteableScore<'a>,
490         >(
491                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
492                 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
493         ) -> Self
494         where
495                 CA::Target: 'static + chain::Access,
496                 CF::Target: 'static + chain::Filter,
497                 CW::Target: 'static + chain::Watch<<SP::Target as SignerProvider>::Signer>,
498                 T::Target: 'static + BroadcasterInterface,
499                 ES::Target: 'static + EntropySource,
500                 NS::Target: 'static + NodeSigner,
501                 SP::Target: 'static + SignerProvider,
502                 F::Target: 'static + FeeEstimator,
503                 R::Target: 'static + Router,
504                 L::Target: 'static + Logger,
505                 P::Target: 'static + Persist<<SP::Target as SignerProvider>::Signer>,
506                 CMH::Target: 'static + ChannelMessageHandler,
507                 OMH::Target: 'static + OnionMessageHandler,
508                 RMH::Target: 'static + RoutingMessageHandler,
509                 UMH::Target: 'static + CustomMessageHandler,
510                 PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
511         {
512                 let stop_thread = Arc::new(AtomicBool::new(false));
513                 let stop_thread_clone = stop_thread.clone();
514                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
515                         let event_handler = |event| {
516                                 let network_graph = gossip_sync.network_graph();
517                                 if let Some(network_graph) = network_graph {
518                                         handle_network_graph_update(network_graph, &event)
519                                 }
520                                 event_handler.handle_event(event);
521                         };
522                         define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
523                                 channel_manager, channel_manager.process_pending_events(&event_handler),
524                                 gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
525                                 channel_manager.await_persistable_update_timeout(Duration::from_millis(100)),
526                                 |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur)
527                 });
528                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
529         }
530
531         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
532         /// [`ChannelManager`].
533         ///
534         /// # Panics
535         ///
536         /// This function panics if the background thread has panicked such as while persisting or
537         /// handling events.
538         ///
539         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
540         pub fn join(mut self) -> Result<(), std::io::Error> {
541                 assert!(self.thread_handle.is_some());
542                 self.join_thread()
543         }
544
545         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
546         /// [`ChannelManager`].
547         ///
548         /// # Panics
549         ///
550         /// This function panics if the background thread has panicked such as while persisting or
551         /// handling events.
552         ///
553         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
554         pub fn stop(mut self) -> Result<(), std::io::Error> {
555                 assert!(self.thread_handle.is_some());
556                 self.stop_and_join_thread()
557         }
558
559         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
560                 self.stop_thread.store(true, Ordering::Release);
561                 self.join_thread()
562         }
563
564         fn join_thread(&mut self) -> Result<(), std::io::Error> {
565                 match self.thread_handle.take() {
566                         Some(handle) => handle.join().unwrap(),
567                         None => Ok(()),
568                 }
569         }
570 }
571
572 impl Drop for BackgroundProcessor {
573         fn drop(&mut self) {
574                 self.stop_and_join_thread().unwrap();
575         }
576 }
577
578 #[cfg(test)]
579 mod tests {
580         use bitcoin::blockdata::block::BlockHeader;
581         use bitcoin::blockdata::constants::genesis_block;
582         use bitcoin::blockdata::locktime::PackedLockTime;
583         use bitcoin::blockdata::transaction::{Transaction, TxOut};
584         use bitcoin::network::constants::Network;
585         use lightning::chain::{BestBlock, Confirm, chainmonitor};
586         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
587         use lightning::chain::keysinterface::{InMemorySigner, Recipient, EntropySource, KeysManager, NodeSigner};
588         use lightning::chain::transaction::OutPoint;
589         use lightning::get_event_msg;
590         use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
591         use lightning::ln::features::ChannelFeatures;
592         use lightning::ln::msgs::{ChannelMessageHandler, Init};
593         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
594         use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
595         use lightning::routing::router::DefaultRouter;
596         use lightning::routing::scoring::{ProbabilisticScoringParameters, ProbabilisticScorer};
597         use lightning::util::config::UserConfig;
598         use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
599         use lightning::util::ser::Writeable;
600         use lightning::util::test_utils;
601         use lightning::util::persist::KVStorePersister;
602         use lightning_invoice::payment::{InvoicePayer, Retry};
603         use lightning_persister::FilesystemPersister;
604         use std::fs;
605         use std::path::PathBuf;
606         use std::sync::{Arc, Mutex};
607         use std::sync::mpsc::SyncSender;
608         use std::time::Duration;
609         use bitcoin::hashes::Hash;
610         use bitcoin::TxMerkleNode;
611         use lightning_rapid_gossip_sync::RapidGossipSync;
612         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
613
614         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
615
616         #[derive(Clone, Hash, PartialEq, Eq)]
617         struct TestDescriptor{}
618         impl SocketDescriptor for TestDescriptor {
619                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
620                         0
621                 }
622
623                 fn disconnect_socket(&mut self) {}
624         }
625
626         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
627
628         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
629         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
630
631         struct Node {
632                 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
633                 p2p_gossip_sync: PGS,
634                 rapid_gossip_sync: RGS,
635                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, IgnoringMessageHandler, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
636                 chain_monitor: Arc<ChainMonitor>,
637                 persister: Arc<FilesystemPersister>,
638                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
639                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
640                 logger: Arc<test_utils::TestLogger>,
641                 best_block: BestBlock,
642                 scorer: Arc<Mutex<ProbabilisticScorer<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>>,
643         }
644
645         impl Node {
646                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
647                         GossipSync::P2P(self.p2p_gossip_sync.clone())
648                 }
649
650                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
651                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
652                 }
653
654                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
655                         GossipSync::None
656                 }
657         }
658
659         impl Drop for Node {
660                 fn drop(&mut self) {
661                         let data_dir = self.persister.get_data_dir();
662                         match fs::remove_dir_all(data_dir.clone()) {
663                                 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
664                                 _ => {}
665                         }
666                 }
667         }
668
669         struct Persister {
670                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
671                 graph_persistence_notifier: Option<SyncSender<()>>,
672                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
673                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
674                 filesystem_persister: FilesystemPersister,
675         }
676
677         impl Persister {
678                 fn new(data_dir: String) -> Self {
679                         let filesystem_persister = FilesystemPersister::new(data_dir.clone());
680                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
681                 }
682
683                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
684                         Self { graph_error: Some((error, message)), ..self }
685                 }
686
687                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
688                         Self { graph_persistence_notifier: Some(sender), ..self }
689                 }
690
691                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
692                         Self { manager_error: Some((error, message)), ..self }
693                 }
694
695                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
696                         Self { scorer_error: Some((error, message)), ..self }
697                 }
698         }
699
700         impl KVStorePersister for Persister {
701                 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
702                         if key == "manager" {
703                                 if let Some((error, message)) = self.manager_error {
704                                         return Err(std::io::Error::new(error, message))
705                                 }
706                         }
707
708                         if key == "network_graph" {
709                                 if let Some(sender) = &self.graph_persistence_notifier {
710                                         sender.send(()).unwrap();
711                                 };
712
713                                 if let Some((error, message)) = self.graph_error {
714                                         return Err(std::io::Error::new(error, message))
715                                 }
716                         }
717
718                         if key == "scorer" {
719                                 if let Some((error, message)) = self.scorer_error {
720                                         return Err(std::io::Error::new(error, message))
721                                 }
722                         }
723
724                         self.filesystem_persister.persist(key, object)
725                 }
726         }
727
728         fn get_full_filepath(filepath: String, filename: String) -> String {
729                 let mut path = PathBuf::from(filepath);
730                 path.push(filename);
731                 path.to_str().unwrap().to_string()
732         }
733
734         fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
735                 let mut nodes = Vec::new();
736                 for i in 0..num_nodes {
737                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
738                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
739                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
740                         let network = Network::Testnet;
741                         let genesis_block = genesis_block(network);
742                         let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
743                         let params = ProbabilisticScoringParameters::default();
744                         let scorer = Arc::new(Mutex::new(ProbabilisticScorer::new(params, network_graph.clone(), logger.clone())));
745                         let seed = [i as u8; 32];
746                         let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), seed, scorer.clone()));
747                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
748                         let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
749                         let now = Duration::from_secs(genesis_block.header.time as u64);
750                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
751                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
752                         let best_block = BestBlock::from_genesis(network);
753                         let params = ChainParameters { network, best_block };
754                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params));
755                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
756                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
757                         let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()), onion_message_handler: IgnoringMessageHandler{}};
758                         let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), 0, &seed, logger.clone(), IgnoringMessageHandler{}));
759                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
760                         nodes.push(node);
761                 }
762
763                 for i in 0..num_nodes {
764                         for j in (i+1)..num_nodes {
765                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: nodes[j].node.init_features(), remote_network_address: None }).unwrap();
766                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: nodes[i].node.init_features(), remote_network_address: None }).unwrap();
767                         }
768                 }
769
770                 nodes
771         }
772
773         macro_rules! open_channel {
774                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
775                         begin_open_channel!($node_a, $node_b, $channel_value);
776                         let events = $node_a.node.get_and_clear_pending_events();
777                         assert_eq!(events.len(), 1);
778                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
779                         end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
780                         tx
781                 }}
782         }
783
784         macro_rules! begin_open_channel {
785                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
786                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
787                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), $node_a.node.init_features(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
788                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), $node_b.node.init_features(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
789                 }}
790         }
791
792         macro_rules! handle_funding_generation_ready {
793                 ($event: expr, $channel_value: expr) => {{
794                         match $event {
795                                 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
796                                         assert_eq!(channel_value_satoshis, $channel_value);
797                                         assert_eq!(user_channel_id, 42);
798
799                                         let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
800                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
801                                         }]};
802                                         (temporary_channel_id, tx)
803                                 },
804                                 _ => panic!("Unexpected event"),
805                         }
806                 }}
807         }
808
809         macro_rules! end_open_channel {
810                 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
811                         $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
812                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
813                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
814                 }}
815         }
816
817         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
818                 for i in 1..=depth {
819                         let prev_blockhash = node.best_block.block_hash();
820                         let height = node.best_block.height() + 1;
821                         let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
822                         let txdata = vec![(0, tx)];
823                         node.best_block = BestBlock::new(header.block_hash(), height);
824                         match i {
825                                 1 => {
826                                         node.node.transactions_confirmed(&header, &txdata, height);
827                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
828                                 },
829                                 x if x == depth => {
830                                         node.node.best_block_updated(&header, height);
831                                         node.chain_monitor.best_block_updated(&header, height);
832                                 },
833                                 _ => {},
834                         }
835                 }
836         }
837         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
838                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
839         }
840
841         #[test]
842         fn test_background_processor() {
843                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
844                 // updates. Also test that when new updates are available, the manager signals that it needs
845                 // re-persistence and is successfully re-persisted.
846                 let nodes = create_nodes(2, "test_background_processor".to_string());
847
848                 // Go through the channel creation process so that each node has something to persist. Since
849                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
850                 // avoid a race with processing events.
851                 let tx = open_channel!(nodes[0], nodes[1], 100000);
852
853                 // Initiate the background processors to watch each node.
854                 let data_dir = nodes[0].persister.get_data_dir();
855                 let persister = Arc::new(Persister::new(data_dir));
856                 let event_handler = |_: _| {};
857                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
858
859                 macro_rules! check_persisted_data {
860                         ($node: expr, $filepath: expr) => {
861                                 let mut expected_bytes = Vec::new();
862                                 loop {
863                                         expected_bytes.clear();
864                                         match $node.write(&mut expected_bytes) {
865                                                 Ok(()) => {
866                                                         match std::fs::read($filepath) {
867                                                                 Ok(bytes) => {
868                                                                         if bytes == expected_bytes {
869                                                                                 break
870                                                                         } else {
871                                                                                 continue
872                                                                         }
873                                                                 },
874                                                                 Err(_) => continue
875                                                         }
876                                                 },
877                                                 Err(e) => panic!("Unexpected error: {}", e)
878                                         }
879                                 }
880                         }
881                 }
882
883                 // Check that the initial channel manager data is persisted as expected.
884                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
885                 check_persisted_data!(nodes[0].node, filepath.clone());
886
887                 loop {
888                         if !nodes[0].node.get_persistence_condvar_value() { break }
889                 }
890
891                 // Force-close the channel.
892                 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
893
894                 // Check that the force-close updates are persisted.
895                 check_persisted_data!(nodes[0].node, filepath.clone());
896                 loop {
897                         if !nodes[0].node.get_persistence_condvar_value() { break }
898                 }
899
900                 // Check network graph is persisted
901                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
902                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
903
904                 // Check scorer is persisted
905                 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
906                 check_persisted_data!(nodes[0].scorer, filepath.clone());
907
908                 assert!(bg_processor.stop().is_ok());
909         }
910
911         #[test]
912         fn test_timer_tick_called() {
913                 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
914                 // `FRESHNESS_TIMER`.
915                 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
916                 let data_dir = nodes[0].persister.get_data_dir();
917                 let persister = Arc::new(Persister::new(data_dir));
918                 let event_handler = |_: _| {};
919                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
920                 loop {
921                         let log_entries = nodes[0].logger.lines.lock().unwrap();
922                         let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
923                         let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
924                         if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
925                                         log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
926                                 break
927                         }
928                 }
929
930                 assert!(bg_processor.stop().is_ok());
931         }
932
933         #[test]
934         fn test_channel_manager_persist_error() {
935                 // Test that if we encounter an error during manager persistence, the thread panics.
936                 let nodes = create_nodes(2, "test_persist_error".to_string());
937                 open_channel!(nodes[0], nodes[1], 100000);
938
939                 let data_dir = nodes[0].persister.get_data_dir();
940                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
941                 let event_handler = |_: _| {};
942                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
943                 match bg_processor.join() {
944                         Ok(_) => panic!("Expected error persisting manager"),
945                         Err(e) => {
946                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
947                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
948                         },
949                 }
950         }
951
952         #[test]
953         fn test_network_graph_persist_error() {
954                 // Test that if we encounter an error during network graph persistence, an error gets returned.
955                 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
956                 let data_dir = nodes[0].persister.get_data_dir();
957                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
958                 let event_handler = |_: _| {};
959                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
960
961                 match bg_processor.stop() {
962                         Ok(_) => panic!("Expected error persisting network graph"),
963                         Err(e) => {
964                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
965                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
966                         },
967                 }
968         }
969
970         #[test]
971         fn test_scorer_persist_error() {
972                 // Test that if we encounter an error during scorer persistence, an error gets returned.
973                 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
974                 let data_dir = nodes[0].persister.get_data_dir();
975                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
976                 let event_handler = |_: _| {};
977                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
978
979                 match bg_processor.stop() {
980                         Ok(_) => panic!("Expected error persisting scorer"),
981                         Err(e) => {
982                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
983                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
984                         },
985                 }
986         }
987
988         #[test]
989         fn test_background_event_handling() {
990                 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
991                 let channel_value = 100000;
992                 let data_dir = nodes[0].persister.get_data_dir();
993                 let persister = Arc::new(Persister::new(data_dir.clone()));
994
995                 // Set up a background event handler for FundingGenerationReady events.
996                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
997                 let event_handler = move |event: Event| match event {
998                         Event::FundingGenerationReady { .. } => sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
999                         Event::ChannelReady { .. } => {},
1000                         _ => panic!("Unexpected event: {:?}", event),
1001                 };
1002
1003                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1004
1005                 // Open a channel and check that the FundingGenerationReady event was handled.
1006                 begin_open_channel!(nodes[0], nodes[1], channel_value);
1007                 let (temporary_channel_id, funding_tx) = receiver
1008                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1009                         .expect("FundingGenerationReady not handled within deadline");
1010                 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
1011
1012                 // Confirm the funding transaction.
1013                 confirm_transaction(&mut nodes[0], &funding_tx);
1014                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1015                 confirm_transaction(&mut nodes[1], &funding_tx);
1016                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1017                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1018                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1019                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1020                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1021
1022                 assert!(bg_processor.stop().is_ok());
1023
1024                 // Set up a background event handler for SpendableOutputs events.
1025                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1026                 let event_handler = move |event: Event| match event {
1027                         Event::SpendableOutputs { .. } => sender.send(event.clone()).unwrap(),
1028                         Event::ChannelReady { .. } => {},
1029                         Event::ChannelClosed { .. } => {},
1030                         _ => panic!("Unexpected event: {:?}", event),
1031                 };
1032                 let persister = Arc::new(Persister::new(data_dir));
1033                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1034
1035                 // Force close the channel and check that the SpendableOutputs event was handled.
1036                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1037                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1038                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1039
1040                 let event = receiver
1041                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1042                         .expect("Events not handled within deadline");
1043                 match event {
1044                         Event::SpendableOutputs { .. } => {},
1045                         _ => panic!("Unexpected event: {:?}", event),
1046                 }
1047
1048                 assert!(bg_processor.stop().is_ok());
1049         }
1050
1051         #[test]
1052         fn test_scorer_persistence() {
1053                 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
1054                 let data_dir = nodes[0].persister.get_data_dir();
1055                 let persister = Arc::new(Persister::new(data_dir));
1056                 let event_handler = |_: _| {};
1057                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1058
1059                 loop {
1060                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1061                         let expected_log = "Persisting scorer".to_string();
1062                         if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
1063                                 break
1064                         }
1065                 }
1066
1067                 assert!(bg_processor.stop().is_ok());
1068         }
1069
1070         #[test]
1071         fn test_not_pruning_network_graph_until_graph_sync_completion() {
1072                 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
1073                 let data_dir = nodes[0].persister.get_data_dir();
1074                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1075                 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
1076                 let network_graph = nodes[0].network_graph.clone();
1077                 let features = ChannelFeatures::empty();
1078                 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
1079                         .expect("Failed to update channel from partial announcement");
1080                 let original_graph_description = network_graph.to_string();
1081                 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1082                 assert_eq!(network_graph.read_only().channels().len(), 1);
1083
1084                 let event_handler = |_: _| {};
1085                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1086
1087                 loop {
1088                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1089                         let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1090                         if *log_entries.get(&("lightning_background_processor".to_string(), loop_counter))
1091                                 .unwrap_or(&0) > 1
1092                         {
1093                                 // Wait until the loop has gone around at least twice.
1094                                 break
1095                         }
1096                 }
1097
1098                 let initialization_input = vec![
1099                         76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1100                         79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1101                         0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1102                         187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1103                         157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1104                         88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1105                         204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1106                         181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1107                         110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1108                         76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1109                         226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1110                         0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1111                         0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1112                 ];
1113                 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1114
1115                 // this should have added two channels
1116                 assert_eq!(network_graph.read_only().channels().len(), 3);
1117
1118                 let _ = receiver
1119                         .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1120                         .expect("Network graph not pruned within deadline");
1121
1122                 background_processor.stop().unwrap();
1123
1124                 // all channels should now be pruned
1125                 assert_eq!(network_graph.read_only().channels().len(), 0);
1126         }
1127
1128         #[test]
1129         fn test_invoice_payer() {
1130                 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1131                 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1132                 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1133
1134                 // Initiate the background processors to watch each node.
1135                 let data_dir = nodes[0].persister.get_data_dir();
1136                 let persister = Arc::new(Persister::new(data_dir));
1137                 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes, Arc::clone(&nodes[0].scorer));
1138                 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].logger), |_: _| {}, Retry::Attempts(2)));
1139                 let event_handler = Arc::clone(&invoice_payer);
1140                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1141                 assert!(bg_processor.stop().is_ok());
1142         }
1143 }