]> git.bitcoin.ninja Git - rust-lightning/blob - lightning-background-processor/src/lib.rs
Switch to using the `OnionMessenger` directly in BP
[rust-lightning] / lightning-background-processor / src / lib.rs
1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
4
5 #![deny(rustdoc::broken_intra_doc_links)]
6 #![deny(rustdoc::private_intra_doc_links)]
7
8 #![deny(missing_docs)]
9 #![cfg_attr(not(feature = "futures"), deny(unsafe_code))]
10
11 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
12
13 #![cfg_attr(all(not(feature = "std"), not(test)), no_std)]
14
15 #[cfg(any(test, feature = "std"))]
16 extern crate core;
17
18 #[cfg(not(feature = "std"))]
19 extern crate alloc;
20
21 #[macro_use] extern crate lightning;
22 extern crate lightning_rapid_gossip_sync;
23
24 use lightning::chain;
25 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
26 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
27 use lightning::events::{Event, PathFailure};
28 #[cfg(feature = "std")]
29 use lightning::events::EventHandler;
30 #[cfg(any(feature = "std", feature = "futures"))]
31 use lightning::events::EventsProvider;
32
33 use lightning::ln::channelmanager::AChannelManager;
34 use lightning::ln::msgs::OnionMessageHandler;
35 use lightning::onion_message::messenger::AOnionMessenger;
36 use lightning::ln::peer_handler::APeerManager;
37 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
38 use lightning::routing::utxo::UtxoLookup;
39 use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
40 use lightning::util::logger::Logger;
41 use lightning::util::persist::Persister;
42 #[cfg(feature = "std")]
43 use lightning::util::wakers::Sleeper;
44 use lightning_rapid_gossip_sync::RapidGossipSync;
45
46 use core::ops::Deref;
47 use core::time::Duration;
48
49 #[cfg(feature = "std")]
50 use std::sync::Arc;
51 #[cfg(feature = "std")]
52 use core::sync::atomic::{AtomicBool, Ordering};
53 #[cfg(feature = "std")]
54 use std::thread::{self, JoinHandle};
55 #[cfg(feature = "std")]
56 use std::time::Instant;
57
58 #[cfg(not(feature = "std"))]
59 use alloc::boxed::Box;
60 #[cfg(not(feature = "std"))]
61 use alloc::vec::Vec;
62
63 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
64 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
65 /// responsibilities are:
66 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
67 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
68 ///   writing it to disk/backups by invoking the callback given to it at startup.
69 ///   [`ChannelManager`] persistence should be done in the background.
70 /// * Calling [`ChannelManager::timer_tick_occurred`], [`ChainMonitor::rebroadcast_pending_claims`]
71 ///   and [`PeerManager::timer_tick_occurred`] at the appropriate intervals.
72 /// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
73 ///   [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
74 ///
75 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
76 /// upon as doing so may result in high latency.
77 ///
78 /// # Note
79 ///
80 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
81 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
82 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
83 /// unilateral chain closure fees are at risk.
84 ///
85 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
86 /// [`ChannelManager::timer_tick_occurred`]: lightning::ln::channelmanager::ChannelManager::timer_tick_occurred
87 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
88 /// [`Event`]: lightning::events::Event
89 /// [`PeerManager::timer_tick_occurred`]: lightning::ln::peer_handler::PeerManager::timer_tick_occurred
90 /// [`PeerManager::process_events`]: lightning::ln::peer_handler::PeerManager::process_events
91 #[cfg(feature = "std")]
92 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
93 pub struct BackgroundProcessor {
94         stop_thread: Arc<AtomicBool>,
95         thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
96 }
97
98 #[cfg(not(test))]
99 const FRESHNESS_TIMER: u64 = 60;
100 #[cfg(test)]
101 const FRESHNESS_TIMER: u64 = 1;
102
103 #[cfg(all(not(test), not(debug_assertions)))]
104 const PING_TIMER: u64 = 10;
105 /// Signature operations take a lot longer without compiler optimisations.
106 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
107 /// timeout is reached.
108 #[cfg(all(not(test), debug_assertions))]
109 const PING_TIMER: u64 = 30;
110 #[cfg(test)]
111 const PING_TIMER: u64 = 1;
112
113 #[cfg(not(test))]
114 const ONION_MESSAGE_HANDLER_TIMER: u64 = 10;
115 #[cfg(test)]
116 const ONION_MESSAGE_HANDLER_TIMER: u64 = 1;
117
118 /// Prune the network graph of stale entries hourly.
119 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
120
121 #[cfg(not(test))]
122 const SCORER_PERSIST_TIMER: u64 = 60 * 5;
123 #[cfg(test)]
124 const SCORER_PERSIST_TIMER: u64 = 1;
125
126 #[cfg(not(test))]
127 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
128 #[cfg(test)]
129 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
130
131 #[cfg(not(test))]
132 const REBROADCAST_TIMER: u64 = 30;
133 #[cfg(test)]
134 const REBROADCAST_TIMER: u64 = 1;
135
136 #[cfg(feature = "futures")]
137 /// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
138 const fn min_u64(a: u64, b: u64) -> u64 { if a < b { a } else { b } }
139 #[cfg(feature = "futures")]
140 const FASTEST_TIMER: u64 = min_u64(min_u64(FRESHNESS_TIMER, PING_TIMER),
141         min_u64(SCORER_PERSIST_TIMER, min_u64(FIRST_NETWORK_PRUNE_TIMER, REBROADCAST_TIMER)));
142
143 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
144 pub enum GossipSync<
145         P: Deref<Target = P2PGossipSync<G, U, L>>,
146         R: Deref<Target = RapidGossipSync<G, L>>,
147         G: Deref<Target = NetworkGraph<L>>,
148         U: Deref,
149         L: Deref,
150 >
151 where U::Target: UtxoLookup, L::Target: Logger {
152         /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
153         P2P(P),
154         /// Rapid gossip sync from a trusted server.
155         Rapid(R),
156         /// No gossip sync.
157         None,
158 }
159
160 impl<
161         P: Deref<Target = P2PGossipSync<G, U, L>>,
162         R: Deref<Target = RapidGossipSync<G, L>>,
163         G: Deref<Target = NetworkGraph<L>>,
164         U: Deref,
165         L: Deref,
166 > GossipSync<P, R, G, U, L>
167 where U::Target: UtxoLookup, L::Target: Logger {
168         fn network_graph(&self) -> Option<&G> {
169                 match self {
170                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
171                         GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
172                         GossipSync::None => None,
173                 }
174         }
175
176         fn prunable_network_graph(&self) -> Option<&G> {
177                 match self {
178                         GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
179                         GossipSync::Rapid(gossip_sync) => {
180                                 if gossip_sync.is_initial_sync_complete() {
181                                         Some(gossip_sync.network_graph())
182                                 } else {
183                                         None
184                                 }
185                         },
186                         GossipSync::None => None,
187                 }
188         }
189 }
190
191 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
192 impl<P: Deref<Target = P2PGossipSync<G, U, L>>, G: Deref<Target = NetworkGraph<L>>, U: Deref, L: Deref>
193         GossipSync<P, &RapidGossipSync<G, L>, G, U, L>
194 where
195         U::Target: UtxoLookup,
196         L::Target: Logger,
197 {
198         /// Initializes a new [`GossipSync::P2P`] variant.
199         pub fn p2p(gossip_sync: P) -> Self {
200                 GossipSync::P2P(gossip_sync)
201         }
202 }
203
204 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
205 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
206         GossipSync<
207                 &P2PGossipSync<G, &'a (dyn UtxoLookup + Send + Sync), L>,
208                 R,
209                 G,
210                 &'a (dyn UtxoLookup + Send + Sync),
211                 L,
212         >
213 where
214         L::Target: Logger,
215 {
216         /// Initializes a new [`GossipSync::Rapid`] variant.
217         pub fn rapid(gossip_sync: R) -> Self {
218                 GossipSync::Rapid(gossip_sync)
219         }
220 }
221
222 /// This is not exported to bindings users as the bindings concretize everything and have constructors for us
223 impl<'a, L: Deref>
224         GossipSync<
225                 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn UtxoLookup + Send + Sync), L>,
226                 &RapidGossipSync<&'a NetworkGraph<L>, L>,
227                 &'a NetworkGraph<L>,
228                 &'a (dyn UtxoLookup + Send + Sync),
229                 L,
230         >
231 where
232         L::Target: Logger,
233 {
234         /// Initializes a new [`GossipSync::None`] variant.
235         pub fn none() -> Self {
236                 GossipSync::None
237         }
238 }
239
240 fn handle_network_graph_update<L: Deref>(
241         network_graph: &NetworkGraph<L>, event: &Event
242 ) where L::Target: Logger {
243         if let Event::PaymentPathFailed {
244                 failure: PathFailure::OnPath { network_update: Some(ref upd) }, .. } = event
245         {
246                 network_graph.handle_network_update(upd);
247         }
248 }
249
250 /// Updates scorer based on event and returns whether an update occurred so we can decide whether
251 /// to persist.
252 fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + WriteableScore<'a>>(
253         scorer: &'a S, event: &Event, duration_since_epoch: Duration,
254 ) -> bool {
255         match event {
256                 Event::PaymentPathFailed { ref path, short_channel_id: Some(scid), .. } => {
257                         let mut score = scorer.write_lock();
258                         score.payment_path_failed(path, *scid, duration_since_epoch);
259                 },
260                 Event::PaymentPathFailed { ref path, payment_failed_permanently: true, .. } => {
261                         // Reached if the destination explicitly failed it back. We treat this as a successful probe
262                         // because the payment made it all the way to the destination with sufficient liquidity.
263                         let mut score = scorer.write_lock();
264                         score.probe_successful(path, duration_since_epoch);
265                 },
266                 Event::PaymentPathSuccessful { path, .. } => {
267                         let mut score = scorer.write_lock();
268                         score.payment_path_successful(path, duration_since_epoch);
269                 },
270                 Event::ProbeSuccessful { path, .. } => {
271                         let mut score = scorer.write_lock();
272                         score.probe_successful(path, duration_since_epoch);
273                 },
274                 Event::ProbeFailed { path, short_channel_id: Some(scid), .. } => {
275                         let mut score = scorer.write_lock();
276                         score.probe_failed(path, *scid, duration_since_epoch);
277                 },
278                 _ => return false,
279         }
280         true
281 }
282
283 macro_rules! define_run_body {
284         (
285                 $persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
286                 $channel_manager: ident, $process_channel_manager_events: expr,
287                 $onion_messenger: ident, $process_onion_message_handler_events: expr,
288                 $peer_manager: ident, $gossip_sync: ident,
289                 $logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
290                 $timer_elapsed: expr, $check_slow_await: expr, $time_fetch: expr,
291         ) => { {
292                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
293                 $channel_manager.get_cm().timer_tick_occurred();
294                 log_trace!($logger, "Rebroadcasting monitor's pending claims on startup");
295                 $chain_monitor.rebroadcast_pending_claims();
296
297                 let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
298                 let mut last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER);
299                 let mut last_ping_call = $get_timer(PING_TIMER);
300                 let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
301                 let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
302                 let mut last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
303                 let mut have_pruned = false;
304                 let mut have_decayed_scorer = false;
305
306                 loop {
307                         $process_channel_manager_events;
308                         $process_chain_monitor_events;
309                         $process_onion_message_handler_events;
310
311                         // Note that the PeerManager::process_events may block on ChannelManager's locks,
312                         // hence it comes last here. When the ChannelManager finishes whatever it's doing,
313                         // we want to ensure we get into `persist_manager` as quickly as we can, especially
314                         // without running the normal event processing above and handing events to users.
315                         //
316                         // Specifically, on an *extremely* slow machine, we may see ChannelManager start
317                         // processing a message effectively at any point during this loop. In order to
318                         // minimize the time between such processing completing and persisting the updated
319                         // ChannelManager, we want to minimize methods blocking on a ChannelManager
320                         // generally, and as a fallback place such blocking only immediately before
321                         // persistence.
322                         $peer_manager.as_ref().process_events();
323
324                         // Exit the loop if the background processor was requested to stop.
325                         if $loop_exit_check {
326                                 log_trace!($logger, "Terminating background processor.");
327                                 break;
328                         }
329
330                         // We wait up to 100ms, but track how long it takes to detect being put to sleep,
331                         // see `await_start`'s use below.
332                         let mut await_start = None;
333                         if $check_slow_await { await_start = Some($get_timer(1)); }
334                         $await;
335                         let await_slow = if $check_slow_await { $timer_elapsed(&mut await_start.unwrap(), 1) } else { false };
336
337                         // Exit the loop if the background processor was requested to stop.
338                         if $loop_exit_check {
339                                 log_trace!($logger, "Terminating background processor.");
340                                 break;
341                         }
342
343                         if $channel_manager.get_cm().get_and_clear_needs_persistence() {
344                                 log_trace!($logger, "Persisting ChannelManager...");
345                                 $persister.persist_manager(&$channel_manager)?;
346                                 log_trace!($logger, "Done persisting ChannelManager.");
347                         }
348                         if $timer_elapsed(&mut last_freshness_call, FRESHNESS_TIMER) {
349                                 log_trace!($logger, "Calling ChannelManager's timer_tick_occurred");
350                                 $channel_manager.get_cm().timer_tick_occurred();
351                                 last_freshness_call = $get_timer(FRESHNESS_TIMER);
352                         }
353                         if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) {
354                                 if let Some(om) = &$onion_messenger {
355                                         log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred");
356                                         om.get_om().timer_tick_occurred();
357                                 }
358                                 last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER);
359                         }
360                         if await_slow {
361                                 // On various platforms, we may be starved of CPU cycles for several reasons.
362                                 // E.g. on iOS, if we've been in the background, we will be entirely paused.
363                                 // Similarly, if we're on a desktop platform and the device has been asleep, we
364                                 // may not get any cycles.
365                                 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
366                                 // full second, at which point we assume sockets may have been killed (they
367                                 // appear to be at least on some platforms, even if it has only been a second).
368                                 // Note that we have to take care to not get here just because user event
369                                 // processing was slow at the top of the loop. For example, the sample client
370                                 // may call Bitcoin Core RPCs during event handling, which very often takes
371                                 // more than a handful of seconds to complete, and shouldn't disconnect all our
372                                 // peers.
373                                 log_trace!($logger, "100ms sleep took more than a second, disconnecting peers.");
374                                 $peer_manager.as_ref().disconnect_all_peers();
375                                 last_ping_call = $get_timer(PING_TIMER);
376                         } else if $timer_elapsed(&mut last_ping_call, PING_TIMER) {
377                                 log_trace!($logger, "Calling PeerManager's timer_tick_occurred");
378                                 $peer_manager.as_ref().timer_tick_occurred();
379                                 last_ping_call = $get_timer(PING_TIMER);
380                         }
381
382                         // Note that we want to run a graph prune once not long after startup before
383                         // falling back to our usual hourly prunes. This avoids short-lived clients never
384                         // pruning their network graph. We run once 60 seconds after startup before
385                         // continuing our normal cadence. For RGS, since 60 seconds is likely too long,
386                         // we prune after an initial sync completes.
387                         let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
388                         let prune_timer_elapsed = $timer_elapsed(&mut last_prune_call, prune_timer);
389                         let should_prune = match $gossip_sync {
390                                 GossipSync::Rapid(_) => !have_pruned || prune_timer_elapsed,
391                                 _ => prune_timer_elapsed,
392                         };
393                         if should_prune {
394                                 // The network graph must not be pruned while rapid sync completion is pending
395                                 if let Some(network_graph) = $gossip_sync.prunable_network_graph() {
396                                         if let Some(duration_since_epoch) = $time_fetch() {
397                                                 log_trace!($logger, "Pruning and persisting network graph.");
398                                                 network_graph.remove_stale_channels_and_tracking_with_time(duration_since_epoch.as_secs());
399                                         } else {
400                                                 log_warn!($logger, "Not pruning network graph, consider enabling `std` or doing so manually with remove_stale_channels_and_tracking_with_time.");
401                                                 log_trace!($logger, "Persisting network graph.");
402                                         }
403
404                                         if let Err(e) = $persister.persist_graph(network_graph) {
405                                                 log_error!($logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
406                                         }
407
408                                         have_pruned = true;
409                                 }
410                                 let prune_timer = if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER };
411                                 last_prune_call = $get_timer(prune_timer);
412                         }
413
414                         if !have_decayed_scorer {
415                                 if let Some(ref scorer) = $scorer {
416                                         if let Some(duration_since_epoch) = $time_fetch() {
417                                                 log_trace!($logger, "Calling time_passed on scorer at startup");
418                                                 scorer.write_lock().time_passed(duration_since_epoch);
419                                         }
420                                 }
421                                 have_decayed_scorer = true;
422                         }
423
424                         if $timer_elapsed(&mut last_scorer_persist_call, SCORER_PERSIST_TIMER) {
425                                 if let Some(ref scorer) = $scorer {
426                                         if let Some(duration_since_epoch) = $time_fetch() {
427                                                 log_trace!($logger, "Calling time_passed and persisting scorer");
428                                                 scorer.write_lock().time_passed(duration_since_epoch);
429                                         } else {
430                                                 log_trace!($logger, "Persisting scorer");
431                                         }
432                                         if let Err(e) = $persister.persist_scorer(&scorer) {
433                                                 log_error!($logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
434                                         }
435                                 }
436                                 last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
437                         }
438
439                         if $timer_elapsed(&mut last_rebroadcast_call, REBROADCAST_TIMER) {
440                                 log_trace!($logger, "Rebroadcasting monitor's pending claims");
441                                 $chain_monitor.rebroadcast_pending_claims();
442                                 last_rebroadcast_call = $get_timer(REBROADCAST_TIMER);
443                         }
444                 }
445
446                 // After we exit, ensure we persist the ChannelManager one final time - this avoids
447                 // some races where users quit while channel updates were in-flight, with
448                 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
449                 $persister.persist_manager(&$channel_manager)?;
450
451                 // Persist Scorer on exit
452                 if let Some(ref scorer) = $scorer {
453                         $persister.persist_scorer(&scorer)?;
454                 }
455
456                 // Persist NetworkGraph on exit
457                 if let Some(network_graph) = $gossip_sync.network_graph() {
458                         $persister.persist_graph(network_graph)?;
459                 }
460
461                 Ok(())
462         } }
463 }
464
465 #[cfg(feature = "futures")]
466 pub(crate) mod futures_util {
467         use core::future::Future;
468         use core::task::{Poll, Waker, RawWaker, RawWakerVTable};
469         use core::pin::Pin;
470         use core::marker::Unpin;
471         pub(crate) struct Selector<
472                 A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
473         > {
474                 pub a: A,
475                 pub b: B,
476                 pub c: C,
477         }
478         pub(crate) enum SelectorOutput {
479                 A, B, C(bool),
480         }
481
482         impl<
483                 A: Future<Output=()> + Unpin, B: Future<Output=()> + Unpin, C: Future<Output=bool> + Unpin
484         > Future for Selector<A, B, C> {
485                 type Output = SelectorOutput;
486                 fn poll(mut self: Pin<&mut Self>, ctx: &mut core::task::Context<'_>) -> Poll<SelectorOutput> {
487                         match Pin::new(&mut self.a).poll(ctx) {
488                                 Poll::Ready(()) => { return Poll::Ready(SelectorOutput::A); },
489                                 Poll::Pending => {},
490                         }
491                         match Pin::new(&mut self.b).poll(ctx) {
492                                 Poll::Ready(()) => { return Poll::Ready(SelectorOutput::B); },
493                                 Poll::Pending => {},
494                         }
495                         match Pin::new(&mut self.c).poll(ctx) {
496                                 Poll::Ready(res) => { return Poll::Ready(SelectorOutput::C(res)); },
497                                 Poll::Pending => {},
498                         }
499                         Poll::Pending
500                 }
501         }
502
503         // If we want to poll a future without an async context to figure out if it has completed or
504         // not without awaiting, we need a Waker, which needs a vtable...we fill it with dummy values
505         // but sadly there's a good bit of boilerplate here.
506         fn dummy_waker_clone(_: *const ()) -> RawWaker { RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE) }
507         fn dummy_waker_action(_: *const ()) { }
508
509         const DUMMY_WAKER_VTABLE: RawWakerVTable = RawWakerVTable::new(
510                 dummy_waker_clone, dummy_waker_action, dummy_waker_action, dummy_waker_action);
511         pub(crate) fn dummy_waker() -> Waker { unsafe { Waker::from_raw(RawWaker::new(core::ptr::null(), &DUMMY_WAKER_VTABLE)) } }
512 }
513 #[cfg(feature = "futures")]
514 use futures_util::{Selector, SelectorOutput, dummy_waker};
515 #[cfg(feature = "futures")]
516 use core::task;
517
518 /// Processes background events in a future.
519 ///
520 /// `sleeper` should return a future which completes in the given amount of time and returns a
521 /// boolean indicating whether the background processing should exit. Once `sleeper` returns a
522 /// future which outputs `true`, the loop will exit and this function's future will complete.
523 /// The `sleeper` future is free to return early after it has triggered the exit condition.
524 ///
525 /// See [`BackgroundProcessor::start`] for information on which actions this handles.
526 ///
527 /// Requires the `futures` feature. Note that while this method is available without the `std`
528 /// feature, doing so will skip calling [`NetworkGraph::remove_stale_channels_and_tracking`],
529 /// you should call [`NetworkGraph::remove_stale_channels_and_tracking_with_time`] regularly
530 /// manually instead.
531 ///
532 /// The `mobile_interruptable_platform` flag should be set if we're currently running on a
533 /// mobile device, where we may need to check for interruption of the application regularly. If you
534 /// are unsure, you should set the flag, as the performance impact of it is minimal unless there
535 /// are hundreds or thousands of simultaneous process calls running.
536 ///
537 /// The `fetch_time` parameter should return the current wall clock time, if one is available. If
538 /// no time is available, some features may be disabled, however the node will still operate fine.
539 ///
540 /// For example, in order to process background events in a [Tokio](https://tokio.rs/) task, you
541 /// could setup `process_events_async` like this:
542 /// ```
543 /// # use lightning::io;
544 /// # use std::sync::{Arc, RwLock};
545 /// # use std::sync::atomic::{AtomicBool, Ordering};
546 /// # use std::time::SystemTime;
547 /// # use lightning_background_processor::{process_events_async, GossipSync};
548 /// # struct Logger {}
549 /// # impl lightning::util::logger::Logger for Logger {
550 /// #     fn log(&self, _record: lightning::util::logger::Record) {}
551 /// # }
552 /// # struct Store {}
553 /// # impl lightning::util::persist::KVStore for Store {
554 /// #     fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
555 /// #     fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> io::Result<()> { Ok(()) }
556 /// #     fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
557 /// #     fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
558 /// # }
559 /// # struct EventHandler {}
560 /// # impl EventHandler {
561 /// #     async fn handle_event(&self, _: lightning::events::Event) {}
562 /// # }
563 /// # #[derive(Eq, PartialEq, Clone, Hash)]
564 /// # struct SocketDescriptor {}
565 /// # impl lightning::ln::peer_handler::SocketDescriptor for SocketDescriptor {
566 /// #     fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize { 0 }
567 /// #     fn disconnect_socket(&mut self) {}
568 /// # }
569 /// # type ChainMonitor<B, F, FE> = lightning::chain::chainmonitor::ChainMonitor<lightning::sign::InMemorySigner, Arc<F>, Arc<B>, Arc<FE>, Arc<Logger>, Arc<Store>>;
570 /// # type NetworkGraph = lightning::routing::gossip::NetworkGraph<Arc<Logger>>;
571 /// # type P2PGossipSync<UL> = lightning::routing::gossip::P2PGossipSync<Arc<NetworkGraph>, Arc<UL>, Arc<Logger>>;
572 /// # type ChannelManager<B, F, FE> = lightning::ln::channelmanager::SimpleArcChannelManager<ChainMonitor<B, F, FE>, B, FE, Logger>;
573 /// # type OnionMessenger<B, F, FE> = lightning::onion_message::messenger::OnionMessenger<Arc<lightning::sign::KeysManager>, Arc<lightning::sign::KeysManager>, Arc<Logger>, Arc<ChannelManager<B, F, FE>>, Arc<lightning::onion_message::messenger::DefaultMessageRouter<Arc<NetworkGraph>, Arc<Logger>, Arc<lightning::sign::KeysManager>>>, Arc<ChannelManager<B, F, FE>>, lightning::ln::peer_handler::IgnoringMessageHandler>;
574 /// # type Scorer = RwLock<lightning::routing::scoring::ProbabilisticScorer<Arc<NetworkGraph>, Arc<Logger>>>;
575 /// # type PeerManager<B, F, FE, UL> = lightning::ln::peer_handler::SimpleArcPeerManager<SocketDescriptor, ChainMonitor<B, F, FE>, B, FE, Arc<UL>, Logger>;
576 /// #
577 /// # struct Node<
578 /// #     B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
579 /// #     F: lightning::chain::Filter + Send + Sync + 'static,
580 /// #     FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
581 /// #     UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
582 /// # > {
583 /// #     peer_manager: Arc<PeerManager<B, F, FE, UL>>,
584 /// #     event_handler: Arc<EventHandler>,
585 /// #     channel_manager: Arc<ChannelManager<B, F, FE>>,
586 /// #     onion_messenger: Arc<OnionMessenger<B, F, FE>>,
587 /// #     chain_monitor: Arc<ChainMonitor<B, F, FE>>,
588 /// #     gossip_sync: Arc<P2PGossipSync<UL>>,
589 /// #     persister: Arc<Store>,
590 /// #     logger: Arc<Logger>,
591 /// #     scorer: Arc<Scorer>,
592 /// # }
593 /// #
594 /// # async fn setup_background_processing<
595 /// #     B: lightning::chain::chaininterface::BroadcasterInterface + Send + Sync + 'static,
596 /// #     F: lightning::chain::Filter + Send + Sync + 'static,
597 /// #     FE: lightning::chain::chaininterface::FeeEstimator + Send + Sync + 'static,
598 /// #     UL: lightning::routing::utxo::UtxoLookup + Send + Sync + 'static,
599 /// # >(node: Node<B, F, FE, UL>) {
600 ///     let background_persister = Arc::clone(&node.persister);
601 ///     let background_event_handler = Arc::clone(&node.event_handler);
602 ///     let background_chain_mon = Arc::clone(&node.chain_monitor);
603 ///     let background_chan_man = Arc::clone(&node.channel_manager);
604 ///     let background_gossip_sync = GossipSync::p2p(Arc::clone(&node.gossip_sync));
605 ///     let background_peer_man = Arc::clone(&node.peer_manager);
606 ///     let background_onion_messenger = Arc::clone(&node.onion_messenger);
607 ///     let background_logger = Arc::clone(&node.logger);
608 ///     let background_scorer = Arc::clone(&node.scorer);
609 ///
610 ///     // Setup the sleeper.
611 ///     let (stop_sender, stop_receiver) = tokio::sync::watch::channel(());
612 ///
613 ///     let sleeper = move |d| {
614 ///             let mut receiver = stop_receiver.clone();
615 ///             Box::pin(async move {
616 ///                     tokio::select!{
617 ///                             _ = tokio::time::sleep(d) => false,
618 ///                             _ = receiver.changed() => true,
619 ///                     }
620 ///             })
621 ///     };
622 ///
623 ///     let mobile_interruptable_platform = false;
624 ///
625 ///     let handle = tokio::spawn(async move {
626 ///             process_events_async(
627 ///                     background_persister,
628 ///                     |e| background_event_handler.handle_event(e),
629 ///                     background_chain_mon,
630 ///                     background_chan_man,
631 ///                     Some(background_onion_messenger),
632 ///                     background_gossip_sync,
633 ///                     background_peer_man,
634 ///                     background_logger,
635 ///                     Some(background_scorer),
636 ///                     sleeper,
637 ///                     mobile_interruptable_platform,
638 ///                     || Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap())
639 ///             )
640 ///             .await
641 ///             .expect("Failed to process events");
642 ///     });
643 ///
644 ///     // Stop the background processing.
645 ///     stop_sender.send(()).unwrap();
646 ///     handle.await.unwrap();
647 ///     # }
648 ///```
649 #[cfg(feature = "futures")]
650 pub async fn process_events_async<
651         'a,
652         UL: 'static + Deref + Send + Sync,
653         CF: 'static + Deref + Send + Sync,
654         T: 'static + Deref + Send + Sync,
655         F: 'static + Deref + Send + Sync,
656         G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
657         L: 'static + Deref + Send + Sync,
658         P: 'static + Deref + Send + Sync,
659         EventHandlerFuture: core::future::Future<Output = ()>,
660         EventHandler: Fn(Event) -> EventHandlerFuture,
661         PS: 'static + Deref + Send,
662         M: 'static + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>> + Send + Sync,
663         CM: 'static + Deref + Send + Sync,
664         OM: 'static + Deref + Send + Sync,
665         PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
666         RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
667         PM: 'static + Deref + Send + Sync,
668         S: 'static + Deref<Target = SC> + Send + Sync,
669         SC: for<'b> WriteableScore<'b>,
670         SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
671         Sleeper: Fn(Duration) -> SleepFuture,
672         FetchTime: Fn() -> Option<Duration>,
673 >(
674         persister: PS, event_handler: EventHandler, chain_monitor: M, channel_manager: CM,
675         onion_messenger: Option<OM>,
676         gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
677         sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
678 ) -> Result<(), lightning::io::Error>
679 where
680         UL::Target: 'static + UtxoLookup,
681         CF::Target: 'static + chain::Filter,
682         T::Target: 'static + BroadcasterInterface,
683         F::Target: 'static + FeeEstimator,
684         L::Target: 'static + Logger,
685         P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
686         PS::Target: 'static + Persister<'a, CM, L, SC>,
687         CM::Target: AChannelManager + Send + Sync,
688         OM::Target: AOnionMessenger + Send + Sync,
689         PM::Target: APeerManager + Send + Sync,
690 {
691         let mut should_break = false;
692         let async_event_handler = |event| {
693                 let network_graph = gossip_sync.network_graph();
694                 let event_handler = &event_handler;
695                 let scorer = &scorer;
696                 let logger = &logger;
697                 let persister = &persister;
698                 let fetch_time = &fetch_time;
699                 Box::pin(async move { // We should be able to drop the Box once our MSRV is 1.68
700                         if let Some(network_graph) = network_graph {
701                                 handle_network_graph_update(network_graph, &event)
702                         }
703                         if let Some(ref scorer) = scorer {
704                                 if let Some(duration_since_epoch) = fetch_time() {
705                                         if update_scorer(scorer, &event, duration_since_epoch) {
706                                                 log_trace!(logger, "Persisting scorer after update");
707                                                 if let Err(e) = persister.persist_scorer(&scorer) {
708                                                         log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
709                                                 }
710                                         }
711                                 }
712                         }
713                         event_handler(event).await;
714                 })
715         };
716         define_run_body!(
717                 persister, chain_monitor,
718                 chain_monitor.process_pending_events_async(async_event_handler).await,
719                 channel_manager, channel_manager.get_cm().process_pending_events_async(async_event_handler).await,
720                 onion_messenger, if let Some(om) = &onion_messenger { om.get_om().process_pending_events_async(async_event_handler).await },
721                 peer_manager, gossip_sync, logger, scorer, should_break, {
722                         let fut = Selector {
723                                 a: channel_manager.get_cm().get_event_or_persistence_needed_future(),
724                                 b: chain_monitor.get_update_future(),
725                                 c: sleeper(if mobile_interruptable_platform { Duration::from_millis(100) } else { Duration::from_secs(FASTEST_TIMER) }),
726                         };
727                         match fut.await {
728                                 SelectorOutput::A|SelectorOutput::B => {},
729                                 SelectorOutput::C(exit) => {
730                                         should_break = exit;
731                                 }
732                         }
733                 }, |t| sleeper(Duration::from_secs(t)),
734                 |fut: &mut SleepFuture, _| {
735                         let mut waker = dummy_waker();
736                         let mut ctx = task::Context::from_waker(&mut waker);
737                         match core::pin::Pin::new(fut).poll(&mut ctx) {
738                                 task::Poll::Ready(exit) => { should_break = exit; true },
739                                 task::Poll::Pending => false,
740                         }
741                 }, mobile_interruptable_platform, fetch_time,
742         )
743 }
744
745 #[cfg(feature = "std")]
746 impl BackgroundProcessor {
747         /// Start a background thread that takes care of responsibilities enumerated in the [top-level
748         /// documentation].
749         ///
750         /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
751         /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
752         /// either [`join`] or [`stop`].
753         ///
754         /// # Data Persistence
755         ///
756         /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
757         /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
758         /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
759         /// provided implementation.
760         ///
761         /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
762         /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
763         /// See the `lightning-persister` crate for LDK's provided implementation.
764         ///
765         /// Typically, users should either implement [`Persister::persist_manager`] to never return an
766         /// error or call [`join`] and handle any error that may arise. For the latter case,
767         /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
768         ///
769         /// # Event Handling
770         ///
771         /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
772         /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
773         /// functionality implemented by other handlers.
774         /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
775         ///
776         /// # Rapid Gossip Sync
777         ///
778         /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
779         /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
780         /// until the [`RapidGossipSync`] instance completes its first sync.
781         ///
782         /// [top-level documentation]: BackgroundProcessor
783         /// [`join`]: Self::join
784         /// [`stop`]: Self::stop
785         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
786         /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
787         /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
788         /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
789         /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
790         /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
791         pub fn start<
792                 'a,
793                 UL: 'static + Deref + Send + Sync,
794                 CF: 'static + Deref + Send + Sync,
795                 T: 'static + Deref + Send + Sync,
796                 F: 'static + Deref + Send + Sync,
797                 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
798                 L: 'static + Deref + Send + Sync,
799                 P: 'static + Deref + Send + Sync,
800                 EH: 'static + EventHandler + Send,
801                 PS: 'static + Deref + Send,
802                 M: 'static + Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P>> + Send + Sync,
803                 CM: 'static + Deref + Send + Sync,
804                 OM: 'static + Deref + Send + Sync,
805                 PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
806                 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
807                 PM: 'static + Deref + Send + Sync,
808                 S: 'static + Deref<Target = SC> + Send + Sync,
809                 SC: for <'b> WriteableScore<'b>,
810         >(
811                 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
812                 onion_messenger: Option<OM>,
813                 gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM, logger: L, scorer: Option<S>,
814         ) -> Self
815         where
816                 UL::Target: 'static + UtxoLookup,
817                 CF::Target: 'static + chain::Filter,
818                 T::Target: 'static + BroadcasterInterface,
819                 F::Target: 'static + FeeEstimator,
820                 L::Target: 'static + Logger,
821                 P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
822                 PS::Target: 'static + Persister<'a, CM, L, SC>,
823                 CM::Target: AChannelManager + Send + Sync,
824                 OM::Target: AOnionMessenger + Send + Sync,
825                 PM::Target: APeerManager + Send + Sync,
826         {
827                 let stop_thread = Arc::new(AtomicBool::new(false));
828                 let stop_thread_clone = stop_thread.clone();
829                 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
830                         let event_handler = |event| {
831                                 let network_graph = gossip_sync.network_graph();
832                                 if let Some(network_graph) = network_graph {
833                                         handle_network_graph_update(network_graph, &event)
834                                 }
835                                 if let Some(ref scorer) = scorer {
836                                         use std::time::SystemTime;
837                                         let duration_since_epoch = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
838                                                 .expect("Time should be sometime after 1970");
839                                         if update_scorer(scorer, &event, duration_since_epoch) {
840                                                 log_trace!(logger, "Persisting scorer after update");
841                                                 if let Err(e) = persister.persist_scorer(&scorer) {
842                                                         log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
843                                                 }
844                                         }
845                                 }
846                                 event_handler.handle_event(event);
847                         };
848                         define_run_body!(
849                                 persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
850                                 channel_manager, channel_manager.get_cm().process_pending_events(&event_handler),
851                                 onion_messenger, if let Some(om) = &onion_messenger { om.get_om().process_pending_events(&event_handler) },
852                                 peer_manager, gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire),
853                                 { Sleeper::from_two_futures(
854                                         &channel_manager.get_cm().get_event_or_persistence_needed_future(),
855                                         &chain_monitor.get_update_future()
856                                 ).wait_timeout(Duration::from_millis(100)); },
857                                 |_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false,
858                                 || {
859                                         use std::time::SystemTime;
860                                         Some(SystemTime::now().duration_since(SystemTime::UNIX_EPOCH)
861                                                 .expect("Time should be sometime after 1970"))
862                                 },
863                         )
864                 });
865                 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
866         }
867
868         /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
869         /// [`ChannelManager`].
870         ///
871         /// # Panics
872         ///
873         /// This function panics if the background thread has panicked such as while persisting or
874         /// handling events.
875         ///
876         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
877         pub fn join(mut self) -> Result<(), std::io::Error> {
878                 assert!(self.thread_handle.is_some());
879                 self.join_thread()
880         }
881
882         /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
883         /// [`ChannelManager`].
884         ///
885         /// # Panics
886         ///
887         /// This function panics if the background thread has panicked such as while persisting or
888         /// handling events.
889         ///
890         /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
891         pub fn stop(mut self) -> Result<(), std::io::Error> {
892                 assert!(self.thread_handle.is_some());
893                 self.stop_and_join_thread()
894         }
895
896         fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
897                 self.stop_thread.store(true, Ordering::Release);
898                 self.join_thread()
899         }
900
901         fn join_thread(&mut self) -> Result<(), std::io::Error> {
902                 match self.thread_handle.take() {
903                         Some(handle) => handle.join().unwrap(),
904                         None => Ok(()),
905                 }
906         }
907 }
908
909 #[cfg(feature = "std")]
910 impl Drop for BackgroundProcessor {
911         fn drop(&mut self) {
912                 self.stop_and_join_thread().unwrap();
913         }
914 }
915
916 #[cfg(all(feature = "std", test))]
917 mod tests {
918         use bitcoin::{ScriptBuf, Txid};
919         use bitcoin::blockdata::constants::{genesis_block, ChainHash};
920         use bitcoin::blockdata::locktime::absolute::LockTime;
921         use bitcoin::blockdata::transaction::{Transaction, TxOut};
922         use bitcoin::hashes::Hash;
923         use bitcoin::network::constants::Network;
924         use bitcoin::secp256k1::{SecretKey, PublicKey, Secp256k1};
925         use lightning::chain::{BestBlock, Confirm, chainmonitor, Filter};
926         use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
927         use lightning::sign::{InMemorySigner, KeysManager, ChangeDestinationSource};
928         use lightning::chain::transaction::OutPoint;
929         use lightning::events::{Event, PathFailure, MessageSendEventsProvider, MessageSendEvent};
930         use lightning::{get_event_msg, get_event};
931         use lightning::ln::types::{PaymentHash, ChannelId};
932         use lightning::ln::channelmanager;
933         use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, MIN_CLTV_EXPIRY_DELTA, PaymentId};
934         use lightning::ln::features::{ChannelFeatures, NodeFeatures};
935         use lightning::ln::functional_test_utils::*;
936         use lightning::ln::msgs::{ChannelMessageHandler, Init};
937         use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
938         use lightning::onion_message::messenger::{DefaultMessageRouter, OnionMessenger};
939         use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
940         use lightning::routing::scoring::{ChannelUsage, ScoreUpdate, ScoreLookUp, LockableScore};
941         use lightning::routing::router::{DefaultRouter, Path, RouteHop, CandidateRouteHop};
942         use lightning::util::config::UserConfig;
943         use lightning::util::ser::Writeable;
944         use lightning::util::test_utils;
945         use lightning::util::persist::{KVStore,
946                 CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_KEY,
947                 NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, NETWORK_GRAPH_PERSISTENCE_KEY,
948                 SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY};
949         use lightning::util::sweep::{OutputSweeper, OutputSpendStatus};
950         use lightning_persister::fs_store::FilesystemStore;
951         use std::collections::VecDeque;
952         use std::{fs, env};
953         use std::path::PathBuf;
954         use std::sync::{Arc, Mutex};
955         use std::sync::mpsc::SyncSender;
956         use std::time::Duration;
957         use lightning_rapid_gossip_sync::RapidGossipSync;
958         use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
959
960         const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
961
962         #[derive(Clone, Hash, PartialEq, Eq)]
963         struct TestDescriptor{}
964         impl SocketDescriptor for TestDescriptor {
965                 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
966                         0
967                 }
968
969                 fn disconnect_socket(&mut self) {}
970         }
971
972         #[cfg(c_bindings)]
973         type LockingWrapper<T> = lightning::routing::scoring::MultiThreadedLockableScore<T>;
974         #[cfg(not(c_bindings))]
975         type LockingWrapper<T> = Mutex<T>;
976
977         type ChannelManager =
978                 channelmanager::ChannelManager<
979                         Arc<ChainMonitor>,
980                         Arc<test_utils::TestBroadcaster>,
981                         Arc<KeysManager>,
982                         Arc<KeysManager>,
983                         Arc<KeysManager>,
984                         Arc<test_utils::TestFeeEstimator>,
985                         Arc<DefaultRouter<
986                                 Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
987                                 Arc<test_utils::TestLogger>,
988                                 Arc<KeysManager>,
989                                 Arc<LockingWrapper<TestScorer>>,
990                                 (),
991                                 TestScorer>
992                         >,
993                         Arc<test_utils::TestLogger>>;
994
995         type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemStore>>;
996
997         type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
998         type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
999
1000         type OM = OnionMessenger<Arc<KeysManager>, Arc<KeysManager>, Arc<test_utils::TestLogger>, Arc<ChannelManager>, Arc<DefaultMessageRouter<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>, Arc<KeysManager>>>, IgnoringMessageHandler, IgnoringMessageHandler>;
1001
1002         struct Node {
1003                 node: Arc<ChannelManager>,
1004                 messenger: Arc<OM>,
1005                 p2p_gossip_sync: PGS,
1006                 rapid_gossip_sync: RGS,
1007                 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<OM>, Arc<test_utils::TestLogger>, IgnoringMessageHandler, Arc<KeysManager>>>,
1008                 chain_monitor: Arc<ChainMonitor>,
1009                 kv_store: Arc<FilesystemStore>,
1010                 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
1011                 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
1012                 logger: Arc<test_utils::TestLogger>,
1013                 best_block: BestBlock,
1014                 scorer: Arc<LockingWrapper<TestScorer>>,
1015                 sweeper: Arc<OutputSweeper<Arc<test_utils::TestBroadcaster>, Arc<TestWallet>,
1016                         Arc<test_utils::TestFeeEstimator>, Arc<dyn Filter + Sync + Send>, Arc<FilesystemStore>,
1017                         Arc<test_utils::TestLogger>, Arc<KeysManager>>>,
1018         }
1019
1020         impl Node {
1021                 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
1022                         GossipSync::P2P(self.p2p_gossip_sync.clone())
1023                 }
1024
1025                 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
1026                         GossipSync::Rapid(self.rapid_gossip_sync.clone())
1027                 }
1028
1029                 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
1030                         GossipSync::None
1031                 }
1032         }
1033
1034         impl Drop for Node {
1035                 fn drop(&mut self) {
1036                         let data_dir = self.kv_store.get_data_dir();
1037                         match fs::remove_dir_all(data_dir.clone()) {
1038                                 Err(e) => println!("Failed to remove test store directory {}: {}", data_dir.display(), e),
1039                                 _ => {}
1040                         }
1041                 }
1042         }
1043
1044         struct Persister {
1045                 graph_error: Option<(std::io::ErrorKind, &'static str)>,
1046                 graph_persistence_notifier: Option<SyncSender<()>>,
1047                 manager_error: Option<(std::io::ErrorKind, &'static str)>,
1048                 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
1049                 kv_store: FilesystemStore,
1050         }
1051
1052         impl Persister {
1053                 fn new(data_dir: PathBuf) -> Self {
1054                         let kv_store = FilesystemStore::new(data_dir);
1055                         Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, kv_store }
1056                 }
1057
1058                 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
1059                         Self { graph_error: Some((error, message)), ..self }
1060                 }
1061
1062                 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
1063                         Self { graph_persistence_notifier: Some(sender), ..self }
1064                 }
1065
1066                 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
1067                         Self { manager_error: Some((error, message)), ..self }
1068                 }
1069
1070                 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
1071                         Self { scorer_error: Some((error, message)), ..self }
1072                 }
1073         }
1074
1075         impl KVStore for Persister {
1076                 fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> lightning::io::Result<Vec<u8>> {
1077                         self.kv_store.read(primary_namespace, secondary_namespace, key)
1078                 }
1079
1080                 fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: &[u8]) -> lightning::io::Result<()> {
1081                         if primary_namespace == CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE &&
1082                                 secondary_namespace == CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE &&
1083                                 key == CHANNEL_MANAGER_PERSISTENCE_KEY
1084                         {
1085                                 if let Some((error, message)) = self.manager_error {
1086                                         return Err(std::io::Error::new(error, message))
1087                                 }
1088                         }
1089
1090                         if primary_namespace == NETWORK_GRAPH_PERSISTENCE_PRIMARY_NAMESPACE &&
1091                                 secondary_namespace == NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE &&
1092                                 key == NETWORK_GRAPH_PERSISTENCE_KEY
1093                         {
1094                                 if let Some(sender) = &self.graph_persistence_notifier {
1095                                         match sender.send(()) {
1096                                                 Ok(()) => {},
1097                                                 Err(std::sync::mpsc::SendError(())) => println!("Persister failed to notify as receiver went away."),
1098                                         }
1099                                 };
1100
1101                                 if let Some((error, message)) = self.graph_error {
1102                                         return Err(std::io::Error::new(error, message))
1103                                 }
1104                         }
1105
1106                         if primary_namespace == SCORER_PERSISTENCE_PRIMARY_NAMESPACE &&
1107                                 secondary_namespace == SCORER_PERSISTENCE_SECONDARY_NAMESPACE &&
1108                                 key == SCORER_PERSISTENCE_KEY
1109                         {
1110                                 if let Some((error, message)) = self.scorer_error {
1111                                         return Err(std::io::Error::new(error, message))
1112                                 }
1113                         }
1114
1115                         self.kv_store.write(primary_namespace, secondary_namespace, key, buf)
1116                 }
1117
1118                 fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> lightning::io::Result<()> {
1119                         self.kv_store.remove(primary_namespace, secondary_namespace, key, lazy)
1120                 }
1121
1122                 fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> lightning::io::Result<Vec<String>> {
1123                         self.kv_store.list(primary_namespace, secondary_namespace)
1124                 }
1125         }
1126
1127         struct TestScorer {
1128                 event_expectations: Option<VecDeque<TestResult>>,
1129         }
1130
1131         #[derive(Debug)]
1132         enum TestResult {
1133                 PaymentFailure { path: Path, short_channel_id: u64 },
1134                 PaymentSuccess { path: Path },
1135                 ProbeFailure { path: Path },
1136                 ProbeSuccess { path: Path },
1137         }
1138
1139         impl TestScorer {
1140                 fn new() -> Self {
1141                         Self { event_expectations: None }
1142                 }
1143
1144                 fn expect(&mut self, expectation: TestResult) {
1145                         self.event_expectations.get_or_insert_with(VecDeque::new).push_back(expectation);
1146                 }
1147         }
1148
1149         impl lightning::util::ser::Writeable for TestScorer {
1150                 fn write<W: lightning::util::ser::Writer>(&self, _: &mut W) -> Result<(), lightning::io::Error> { Ok(()) }
1151         }
1152
1153         impl ScoreLookUp for TestScorer {
1154                 type ScoreParams = ();
1155                 fn channel_penalty_msat(
1156                         &self, _candidate: &CandidateRouteHop, _usage: ChannelUsage, _score_params: &Self::ScoreParams
1157                 ) -> u64 { unimplemented!(); }
1158         }
1159
1160         impl ScoreUpdate for TestScorer {
1161                 fn payment_path_failed(&mut self, actual_path: &Path, actual_short_channel_id: u64, _: Duration) {
1162                         if let Some(expectations) = &mut self.event_expectations {
1163                                 match expectations.pop_front().unwrap() {
1164                                         TestResult::PaymentFailure { path, short_channel_id } => {
1165                                                 assert_eq!(actual_path, &path);
1166                                                 assert_eq!(actual_short_channel_id, short_channel_id);
1167                                         },
1168                                         TestResult::PaymentSuccess { path } => {
1169                                                 panic!("Unexpected successful payment path: {:?}", path)
1170                                         },
1171                                         TestResult::ProbeFailure { path } => {
1172                                                 panic!("Unexpected probe failure: {:?}", path)
1173                                         },
1174                                         TestResult::ProbeSuccess { path } => {
1175                                                 panic!("Unexpected probe success: {:?}", path)
1176                                         }
1177                                 }
1178                         }
1179                 }
1180
1181                 fn payment_path_successful(&mut self, actual_path: &Path, _: Duration) {
1182                         if let Some(expectations) = &mut self.event_expectations {
1183                                 match expectations.pop_front().unwrap() {
1184                                         TestResult::PaymentFailure { path, .. } => {
1185                                                 panic!("Unexpected payment path failure: {:?}", path)
1186                                         },
1187                                         TestResult::PaymentSuccess { path } => {
1188                                                 assert_eq!(actual_path, &path);
1189                                         },
1190                                         TestResult::ProbeFailure { path } => {
1191                                                 panic!("Unexpected probe failure: {:?}", path)
1192                                         },
1193                                         TestResult::ProbeSuccess { path } => {
1194                                                 panic!("Unexpected probe success: {:?}", path)
1195                                         }
1196                                 }
1197                         }
1198                 }
1199
1200                 fn probe_failed(&mut self, actual_path: &Path, _: u64, _: Duration) {
1201                         if let Some(expectations) = &mut self.event_expectations {
1202                                 match expectations.pop_front().unwrap() {
1203                                         TestResult::PaymentFailure { path, .. } => {
1204                                                 panic!("Unexpected payment path failure: {:?}", path)
1205                                         },
1206                                         TestResult::PaymentSuccess { path } => {
1207                                                 panic!("Unexpected payment path success: {:?}", path)
1208                                         },
1209                                         TestResult::ProbeFailure { path } => {
1210                                                 assert_eq!(actual_path, &path);
1211                                         },
1212                                         TestResult::ProbeSuccess { path } => {
1213                                                 panic!("Unexpected probe success: {:?}", path)
1214                                         }
1215                                 }
1216                         }
1217                 }
1218                 fn probe_successful(&mut self, actual_path: &Path, _: Duration) {
1219                         if let Some(expectations) = &mut self.event_expectations {
1220                                 match expectations.pop_front().unwrap() {
1221                                         TestResult::PaymentFailure { path, .. } => {
1222                                                 panic!("Unexpected payment path failure: {:?}", path)
1223                                         },
1224                                         TestResult::PaymentSuccess { path } => {
1225                                                 panic!("Unexpected payment path success: {:?}", path)
1226                                         },
1227                                         TestResult::ProbeFailure { path } => {
1228                                                 panic!("Unexpected probe failure: {:?}", path)
1229                                         },
1230                                         TestResult::ProbeSuccess { path } => {
1231                                                 assert_eq!(actual_path, &path);
1232                                         }
1233                                 }
1234                         }
1235                 }
1236                 fn time_passed(&mut self, _: Duration) {}
1237         }
1238
1239         #[cfg(c_bindings)]
1240         impl lightning::routing::scoring::Score for TestScorer {}
1241
1242         impl Drop for TestScorer {
1243                 fn drop(&mut self) {
1244                         if std::thread::panicking() {
1245                                 return;
1246                         }
1247
1248                         if let Some(event_expectations) = &self.event_expectations {
1249                                 if !event_expectations.is_empty() {
1250                                         panic!("Unsatisfied event expectations: {:?}", event_expectations);
1251                                 }
1252                         }
1253                 }
1254         }
1255
1256         struct TestWallet {}
1257
1258         impl ChangeDestinationSource for TestWallet {
1259                 fn get_change_destination_script(&self) -> Result<ScriptBuf, ()> {
1260                         Ok(ScriptBuf::new())
1261                 }
1262         }
1263
1264         fn get_full_filepath(filepath: String, filename: String) -> String {
1265                 let mut path = PathBuf::from(filepath);
1266                 path.push(filename);
1267                 path.to_str().unwrap().to_string()
1268         }
1269
1270         fn create_nodes(num_nodes: usize, persist_dir: &str) -> (String, Vec<Node>) {
1271                 let persist_temp_path = env::temp_dir().join(persist_dir);
1272                 let persist_dir = persist_temp_path.to_string_lossy().to_string();
1273                 let network = Network::Bitcoin;
1274                 let mut nodes = Vec::new();
1275                 for i in 0..num_nodes {
1276                         let tx_broadcaster = Arc::new(test_utils::TestBroadcaster::new(network));
1277                         let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
1278                         let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
1279                         let genesis_block = genesis_block(network);
1280                         let network_graph = Arc::new(NetworkGraph::new(network, logger.clone()));
1281                         let scorer = Arc::new(LockingWrapper::new(TestScorer::new()));
1282                         let now = Duration::from_secs(genesis_block.header.time as u64);
1283                         let seed = [i as u8; 32];
1284                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
1285                         let router = Arc::new(DefaultRouter::new(network_graph.clone(), logger.clone(), Arc::clone(&keys_manager), scorer.clone(), Default::default()));
1286                         let msg_router = Arc::new(DefaultMessageRouter::new(network_graph.clone(), Arc::clone(&keys_manager)));
1287                         let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Bitcoin));
1288                         let kv_store = Arc::new(FilesystemStore::new(format!("{}_persister_{}", &persist_dir, i).into()));
1289                         let now = Duration::from_secs(genesis_block.header.time as u64);
1290                         let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
1291                         let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), kv_store.clone()));
1292                         let best_block = BestBlock::from_network(network);
1293                         let params = ChainParameters { network, best_block };
1294                         let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), router.clone(), logger.clone(), keys_manager.clone(), keys_manager.clone(), keys_manager.clone(), UserConfig::default(), params, genesis_block.header.time));
1295                         let messenger = Arc::new(OnionMessenger::new(keys_manager.clone(), keys_manager.clone(), logger.clone(), manager.clone(), msg_router.clone(), IgnoringMessageHandler {}, IgnoringMessageHandler {}));
1296                         let wallet = Arc::new(TestWallet {});
1297                         let sweeper = Arc::new(OutputSweeper::new(best_block, Arc::clone(&tx_broadcaster), Arc::clone(&fee_estimator),
1298                                 None::<Arc<dyn Filter + Sync + Send>>, Arc::clone(&keys_manager), wallet, Arc::clone(&kv_store), Arc::clone(&logger)));
1299                         let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
1300                         let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone(), logger.clone()));
1301                         let msg_handler = MessageHandler {
1302                                 chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new(ChainHash::using_genesis_block(Network::Testnet))),
1303                                 route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new()),
1304                                 onion_message_handler: messenger.clone(), custom_message_handler: IgnoringMessageHandler{}
1305                         };
1306                         let peer_manager = Arc::new(PeerManager::new(msg_handler, 0, &seed, logger.clone(), keys_manager.clone()));
1307                         let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, kv_store, tx_broadcaster, network_graph, logger, best_block, scorer, sweeper, messenger };
1308                         nodes.push(node);
1309                 }
1310
1311                 for i in 0..num_nodes {
1312                         for j in (i+1)..num_nodes {
1313                                 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init {
1314                                         features: nodes[j].node.init_features(), networks: None, remote_network_address: None
1315                                 }, true).unwrap();
1316                                 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init {
1317                                         features: nodes[i].node.init_features(), networks: None, remote_network_address: None
1318                                 }, false).unwrap();
1319                         }
1320                 }
1321
1322                 (persist_dir, nodes)
1323         }
1324
1325         macro_rules! open_channel {
1326                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1327                         begin_open_channel!($node_a, $node_b, $channel_value);
1328                         let events = $node_a.node.get_and_clear_pending_events();
1329                         assert_eq!(events.len(), 1);
1330                         let (temporary_channel_id, tx) = handle_funding_generation_ready!(events[0], $channel_value);
1331                         $node_a.node.funding_transaction_generated(&temporary_channel_id, &$node_b.node.get_our_node_id(), tx.clone()).unwrap();
1332                         $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
1333                         get_event!($node_b, Event::ChannelPending);
1334                         $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
1335                         get_event!($node_a, Event::ChannelPending);
1336                         tx
1337                 }}
1338         }
1339
1340         macro_rules! begin_open_channel {
1341                 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
1342                         $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None, None).unwrap();
1343                         $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
1344                         $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
1345                 }}
1346         }
1347
1348         macro_rules! handle_funding_generation_ready {
1349                 ($event: expr, $channel_value: expr) => {{
1350                         match $event {
1351                                 Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
1352                                         assert_eq!(channel_value_satoshis, $channel_value);
1353                                         assert_eq!(user_channel_id, 42);
1354
1355                                         let tx = Transaction { version: 1 as i32, lock_time: LockTime::ZERO, input: Vec::new(), output: vec![TxOut {
1356                                                 value: channel_value_satoshis, script_pubkey: output_script.clone(),
1357                                         }]};
1358                                         (temporary_channel_id, tx)
1359                                 },
1360                                 _ => panic!("Unexpected event"),
1361                         }
1362                 }}
1363         }
1364
1365         fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
1366                 for i in 1..=depth {
1367                         let prev_blockhash = node.best_block.block_hash;
1368                         let height = node.best_block.height + 1;
1369                         let header = create_dummy_header(prev_blockhash, height);
1370                         let txdata = vec![(0, tx)];
1371                         node.best_block = BestBlock::new(header.block_hash(), height);
1372                         match i {
1373                                 1 => {
1374                                         node.node.transactions_confirmed(&header, &txdata, height);
1375                                         node.chain_monitor.transactions_confirmed(&header, &txdata, height);
1376                                         node.sweeper.transactions_confirmed(&header, &txdata, height);
1377                                 },
1378                                 x if x == depth => {
1379                                         // We need the TestBroadcaster to know about the new height so that it doesn't think
1380                                         // we're violating the time lock requirements of transactions broadcasted at that
1381                                         // point.
1382                                         node.tx_broadcaster.blocks.lock().unwrap().push((genesis_block(Network::Bitcoin), height));
1383                                         node.node.best_block_updated(&header, height);
1384                                         node.chain_monitor.best_block_updated(&header, height);
1385                                         node.sweeper.best_block_updated(&header, height);
1386                                 },
1387                                 _ => {},
1388                         }
1389                 }
1390         }
1391
1392         fn advance_chain(node: &mut Node, num_blocks: u32) {
1393                 for i in 1..=num_blocks {
1394                         let prev_blockhash = node.best_block.block_hash;
1395                         let height = node.best_block.height + 1;
1396                         let header = create_dummy_header(prev_blockhash, height);
1397                         node.best_block = BestBlock::new(header.block_hash(), height);
1398                         if i == num_blocks {
1399                                 // We need the TestBroadcaster to know about the new height so that it doesn't think
1400                                 // we're violating the time lock requirements of transactions broadcasted at that
1401                                 // point.
1402                                 node.tx_broadcaster.blocks.lock().unwrap().push((genesis_block(Network::Bitcoin), height));
1403                                 node.node.best_block_updated(&header, height);
1404                                 node.chain_monitor.best_block_updated(&header, height);
1405                                 node.sweeper.best_block_updated(&header, height);
1406                         }
1407                 }
1408         }
1409
1410         fn confirm_transaction(node: &mut Node, tx: &Transaction) {
1411                 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
1412         }
1413
1414         #[test]
1415         fn test_background_processor() {
1416                 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
1417                 // updates. Also test that when new updates are available, the manager signals that it needs
1418                 // re-persistence and is successfully re-persisted.
1419                 let (persist_dir, nodes) = create_nodes(2, "test_background_processor");
1420
1421                 // Go through the channel creation process so that each node has something to persist. Since
1422                 // open_channel consumes events, it must complete before starting BackgroundProcessor to
1423                 // avoid a race with processing events.
1424                 let tx = open_channel!(nodes[0], nodes[1], 100000);
1425
1426                 // Initiate the background processors to watch each node.
1427                 let data_dir = nodes[0].kv_store.get_data_dir();
1428                 let persister = Arc::new(Persister::new(data_dir));
1429                 let event_handler = |_: _| {};
1430                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1431
1432                 macro_rules! check_persisted_data {
1433                         ($node: expr, $filepath: expr) => {
1434                                 let mut expected_bytes = Vec::new();
1435                                 loop {
1436                                         expected_bytes.clear();
1437                                         match $node.write(&mut expected_bytes) {
1438                                                 Ok(()) => {
1439                                                         match std::fs::read($filepath) {
1440                                                                 Ok(bytes) => {
1441                                                                         if bytes == expected_bytes {
1442                                                                                 break
1443                                                                         } else {
1444                                                                                 continue
1445                                                                         }
1446                                                                 },
1447                                                                 Err(_) => continue
1448                                                         }
1449                                                 },
1450                                                 Err(e) => panic!("Unexpected error: {}", e)
1451                                         }
1452                                 }
1453                         }
1454                 }
1455
1456                 // Check that the initial channel manager data is persisted as expected.
1457                 let filepath = get_full_filepath(format!("{}_persister_0", &persist_dir), "manager".to_string());
1458                 check_persisted_data!(nodes[0].node, filepath.clone());
1459
1460                 loop {
1461                         if !nodes[0].node.get_event_or_persist_condvar_value() { break }
1462                 }
1463
1464                 // Force-close the channel.
1465                 nodes[0].node.force_close_broadcasting_latest_txn(&ChannelId::v1_from_funding_outpoint(OutPoint { txid: tx.txid(), index: 0 }), &nodes[1].node.get_our_node_id()).unwrap();
1466
1467                 // Check that the force-close updates are persisted.
1468                 check_persisted_data!(nodes[0].node, filepath.clone());
1469                 loop {
1470                         if !nodes[0].node.get_event_or_persist_condvar_value() { break }
1471                 }
1472
1473                 // Check network graph is persisted
1474                 let filepath = get_full_filepath(format!("{}_persister_0", &persist_dir), "network_graph".to_string());
1475                 check_persisted_data!(nodes[0].network_graph, filepath.clone());
1476
1477                 // Check scorer is persisted
1478                 let filepath = get_full_filepath(format!("{}_persister_0", &persist_dir), "scorer".to_string());
1479                 check_persisted_data!(nodes[0].scorer, filepath.clone());
1480
1481                 if !std::thread::panicking() {
1482                         bg_processor.stop().unwrap();
1483                 }
1484         }
1485
1486         #[test]
1487         fn test_timer_tick_called() {
1488                 // Test that:
1489                 // - `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`,
1490                 // - `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`,
1491                 // - `PeerManager::timer_tick_occurred` is called every `PING_TIMER`, and
1492                 // - `OnionMessageHandler::timer_tick_occurred` is called every `ONION_MESSAGE_HANDLER_TIMER`.
1493                 let (_, nodes) = create_nodes(1, "test_timer_tick_called");
1494                 let data_dir = nodes[0].kv_store.get_data_dir();
1495                 let persister = Arc::new(Persister::new(data_dir));
1496                 let event_handler = |_: _| {};
1497                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1498                 loop {
1499                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1500                         let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string();
1501                         let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string();
1502                         let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string();
1503                         let desired_log_4 = "Calling OnionMessageHandler's timer_tick_occurred".to_string();
1504                         if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some() &&
1505                                 log_entries.get(&("lightning_background_processor", desired_log_2)).is_some() &&
1506                                 log_entries.get(&("lightning_background_processor", desired_log_3)).is_some() &&
1507                                 log_entries.get(&("lightning_background_processor", desired_log_4)).is_some() {
1508                                 break
1509                         }
1510                 }
1511
1512                 if !std::thread::panicking() {
1513                         bg_processor.stop().unwrap();
1514                 }
1515         }
1516
1517         #[test]
1518         fn test_channel_manager_persist_error() {
1519                 // Test that if we encounter an error during manager persistence, the thread panics.
1520                 let (_, nodes) = create_nodes(2, "test_persist_error");
1521                 open_channel!(nodes[0], nodes[1], 100000);
1522
1523                 let data_dir = nodes[0].kv_store.get_data_dir();
1524                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1525                 let event_handler = |_: _| {};
1526                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1527                 match bg_processor.join() {
1528                         Ok(_) => panic!("Expected error persisting manager"),
1529                         Err(e) => {
1530                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1531                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1532                         },
1533                 }
1534         }
1535
1536         #[tokio::test]
1537         #[cfg(feature = "futures")]
1538         async fn test_channel_manager_persist_error_async() {
1539                 // Test that if we encounter an error during manager persistence, the thread panics.
1540                 let (_, nodes) = create_nodes(2, "test_persist_error_sync");
1541                 open_channel!(nodes[0], nodes[1], 100000);
1542
1543                 let data_dir = nodes[0].kv_store.get_data_dir();
1544                 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
1545
1546                 let bp_future = super::process_events_async(
1547                         persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()),
1548                         nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
1549                         Some(nodes[0].scorer.clone()), move |dur: Duration| {
1550                                 Box::pin(async move {
1551                                         tokio::time::sleep(dur).await;
1552                                         false // Never exit
1553                                 })
1554                         }, false, || Some(Duration::ZERO),
1555                 );
1556                 match bp_future.await {
1557                         Ok(_) => panic!("Expected error persisting manager"),
1558                         Err(e) => {
1559                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1560                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1561                         },
1562                 }
1563         }
1564
1565         #[test]
1566         fn test_network_graph_persist_error() {
1567                 // Test that if we encounter an error during network graph persistence, an error gets returned.
1568                 let (_, nodes) = create_nodes(2, "test_persist_network_graph_error");
1569                 let data_dir = nodes[0].kv_store.get_data_dir();
1570                 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
1571                 let event_handler = |_: _| {};
1572                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1573
1574                 match bg_processor.stop() {
1575                         Ok(_) => panic!("Expected error persisting network graph"),
1576                         Err(e) => {
1577                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1578                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1579                         },
1580                 }
1581         }
1582
1583         #[test]
1584         fn test_scorer_persist_error() {
1585                 // Test that if we encounter an error during scorer persistence, an error gets returned.
1586                 let (_, nodes) = create_nodes(2, "test_persist_scorer_error");
1587                 let data_dir = nodes[0].kv_store.get_data_dir();
1588                 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
1589                 let event_handler = |_: _| {};
1590                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(),  nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1591
1592                 match bg_processor.stop() {
1593                         Ok(_) => panic!("Expected error persisting scorer"),
1594                         Err(e) => {
1595                                 assert_eq!(e.kind(), std::io::ErrorKind::Other);
1596                                 assert_eq!(e.get_ref().unwrap().to_string(), "test");
1597                         },
1598                 }
1599         }
1600
1601         #[test]
1602         fn test_background_event_handling() {
1603                 let (_, mut nodes) = create_nodes(2, "test_background_event_handling");
1604                 let channel_value = 100000;
1605                 let data_dir = nodes[0].kv_store.get_data_dir();
1606                 let persister = Arc::new(Persister::new(data_dir.clone()));
1607
1608                 // Set up a background event handler for FundingGenerationReady events.
1609                 let (funding_generation_send, funding_generation_recv) = std::sync::mpsc::sync_channel(1);
1610                 let (channel_pending_send, channel_pending_recv) = std::sync::mpsc::sync_channel(1);
1611                 let event_handler = move |event: Event| match event {
1612                         Event::FundingGenerationReady { .. } => funding_generation_send.send(handle_funding_generation_ready!(event, channel_value)).unwrap(),
1613                         Event::ChannelPending { .. } => channel_pending_send.send(()).unwrap(),
1614                         Event::ChannelReady { .. } => {},
1615                         _ => panic!("Unexpected event: {:?}", event),
1616                 };
1617
1618                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1619
1620                 // Open a channel and check that the FundingGenerationReady event was handled.
1621                 begin_open_channel!(nodes[0], nodes[1], channel_value);
1622                 let (temporary_channel_id, funding_tx) = funding_generation_recv
1623                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1624                         .expect("FundingGenerationReady not handled within deadline");
1625                 nodes[0].node.funding_transaction_generated(&temporary_channel_id, &nodes[1].node.get_our_node_id(), funding_tx.clone()).unwrap();
1626                 nodes[1].node.handle_funding_created(&nodes[0].node.get_our_node_id(), &get_event_msg!(nodes[0], MessageSendEvent::SendFundingCreated, nodes[1].node.get_our_node_id()));
1627                 get_event!(nodes[1], Event::ChannelPending);
1628                 nodes[0].node.handle_funding_signed(&nodes[1].node.get_our_node_id(), &get_event_msg!(nodes[1], MessageSendEvent::SendFundingSigned, nodes[0].node.get_our_node_id()));
1629                 let _ = channel_pending_recv.recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1630                         .expect("ChannelPending not handled within deadline");
1631
1632                 // Confirm the funding transaction.
1633                 confirm_transaction(&mut nodes[0], &funding_tx);
1634                 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
1635                 confirm_transaction(&mut nodes[1], &funding_tx);
1636                 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
1637                 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
1638                 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
1639                 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
1640                 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
1641                 let broadcast_funding = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1642                 assert_eq!(broadcast_funding.txid(), funding_tx.txid());
1643                 assert!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().is_empty());
1644
1645                 if !std::thread::panicking() {
1646                         bg_processor.stop().unwrap();
1647                 }
1648
1649                 // Set up a background event handler for SpendableOutputs events.
1650                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1651                 let event_handler = move |event: Event| match event {
1652                         Event::SpendableOutputs { .. } => sender.send(event).unwrap(),
1653                         Event::ChannelReady { .. } => {},
1654                         Event::ChannelClosed { .. } => {},
1655                         _ => panic!("Unexpected event: {:?}", event),
1656                 };
1657                 let persister = Arc::new(Persister::new(data_dir));
1658                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1659
1660                 // Force close the channel and check that the SpendableOutputs event was handled.
1661                 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
1662                 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1663                 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
1664
1665                 let event = receiver
1666                         .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
1667                         .expect("Events not handled within deadline");
1668                 match event {
1669                         Event::SpendableOutputs { outputs, channel_id } => {
1670                                 nodes[0].sweeper.track_spendable_outputs(outputs, channel_id, false, Some(153)).unwrap();
1671                         },
1672                         _ => panic!("Unexpected event: {:?}", event),
1673                 }
1674
1675                 // Check we don't generate an initial sweeping tx until we reach the required height.
1676                 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
1677                 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
1678                 if let Some(sweep_tx_0) = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop() {
1679                         assert!(!tracked_output.is_spent_in(&sweep_tx_0));
1680                         match tracked_output.status {
1681                                 OutputSpendStatus::PendingInitialBroadcast { delayed_until_height } => {
1682                                         assert_eq!(delayed_until_height, Some(153));
1683                                 }
1684                                 _ => panic!("Unexpected status"),
1685                         }
1686                 }
1687
1688                 advance_chain(&mut nodes[0], 3);
1689
1690                 // Check we generate an initial sweeping tx.
1691                 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
1692                 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
1693                 let sweep_tx_0 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1694                 match tracked_output.status {
1695                         OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
1696                                 assert_eq!(sweep_tx_0.txid(), latest_spending_tx.txid());
1697                         }
1698                         _ => panic!("Unexpected status"),
1699                 }
1700
1701                 // Check we regenerate and rebroadcast the sweeping tx each block.
1702                 advance_chain(&mut nodes[0], 1);
1703                 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
1704                 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
1705                 let sweep_tx_1 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1706                 match tracked_output.status {
1707                         OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
1708                                 assert_eq!(sweep_tx_1.txid(), latest_spending_tx.txid());
1709                         }
1710                         _ => panic!("Unexpected status"),
1711                 }
1712                 assert_ne!(sweep_tx_0, sweep_tx_1);
1713
1714                 advance_chain(&mut nodes[0], 1);
1715                 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
1716                 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
1717                 let sweep_tx_2 = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
1718                 match tracked_output.status {
1719                         OutputSpendStatus::PendingFirstConfirmation { latest_spending_tx, .. } => {
1720                                 assert_eq!(sweep_tx_2.txid(), latest_spending_tx.txid());
1721                         }
1722                         _ => panic!("Unexpected status"),
1723                 }
1724                 assert_ne!(sweep_tx_0, sweep_tx_2);
1725                 assert_ne!(sweep_tx_1, sweep_tx_2);
1726
1727                 // Check we still track the spendable outputs up to ANTI_REORG_DELAY confirmations.
1728                 confirm_transaction_depth(&mut nodes[0], &sweep_tx_2, 5);
1729                 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
1730                 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
1731                 match tracked_output.status {
1732                         OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => {
1733                                 assert_eq!(sweep_tx_2.txid(), latest_spending_tx.txid());
1734                         }
1735                         _ => panic!("Unexpected status"),
1736                 }
1737
1738                 // Check we still see the transaction as confirmed if we unconfirm any untracked
1739                 // transaction. (We previously had a bug that would mark tracked transactions as
1740                 // unconfirmed if any transaction at an unknown block height would be unconfirmed.)
1741                 let unconf_txid = Txid::from_slice(&[0; 32]).unwrap();
1742                 nodes[0].sweeper.transaction_unconfirmed(&unconf_txid);
1743
1744                 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 1);
1745                 let tracked_output = nodes[0].sweeper.tracked_spendable_outputs().first().unwrap().clone();
1746                 match tracked_output.status {
1747                         OutputSpendStatus::PendingThresholdConfirmations { latest_spending_tx, .. } => {
1748                                 assert_eq!(sweep_tx_2.txid(), latest_spending_tx.txid());
1749                         }
1750                         _ => panic!("Unexpected status"),
1751                 }
1752
1753                 // Check we stop tracking the spendable outputs when one of the txs reaches
1754                 // ANTI_REORG_DELAY confirmations.
1755                 confirm_transaction_depth(&mut nodes[0], &sweep_tx_0, ANTI_REORG_DELAY);
1756                 assert_eq!(nodes[0].sweeper.tracked_spendable_outputs().len(), 0);
1757
1758                 if !std::thread::panicking() {
1759                         bg_processor.stop().unwrap();
1760                 }
1761         }
1762
1763         #[test]
1764         fn test_scorer_persistence() {
1765                 let (_, nodes) = create_nodes(2, "test_scorer_persistence");
1766                 let data_dir = nodes[0].kv_store.get_data_dir();
1767                 let persister = Arc::new(Persister::new(data_dir));
1768                 let event_handler = |_: _| {};
1769                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1770
1771                 loop {
1772                         let log_entries = nodes[0].logger.lines.lock().unwrap();
1773                         let expected_log = "Calling time_passed and persisting scorer".to_string();
1774                         if log_entries.get(&("lightning_background_processor", expected_log)).is_some() {
1775                                 break
1776                         }
1777                 }
1778
1779                 if !std::thread::panicking() {
1780                         bg_processor.stop().unwrap();
1781                 }
1782         }
1783
1784         macro_rules! do_test_not_pruning_network_graph_until_graph_sync_completion {
1785                 ($nodes: expr, $receive: expr, $sleep: expr) => {
1786                         let features = ChannelFeatures::empty();
1787                         $nodes[0].network_graph.add_channel_from_partial_announcement(
1788                                 42, 53, features, $nodes[0].node.get_our_node_id(), $nodes[1].node.get_our_node_id()
1789                         ).expect("Failed to update channel from partial announcement");
1790                         let original_graph_description = $nodes[0].network_graph.to_string();
1791                         assert!(original_graph_description.contains("42: features: 0000, node_one:"));
1792                         assert_eq!($nodes[0].network_graph.read_only().channels().len(), 1);
1793
1794                         loop {
1795                                 $sleep;
1796                                 let log_entries = $nodes[0].logger.lines.lock().unwrap();
1797                                 let loop_counter = "Calling ChannelManager's timer_tick_occurred".to_string();
1798                                 if *log_entries.get(&("lightning_background_processor", loop_counter))
1799                                         .unwrap_or(&0) > 1
1800                                 {
1801                                         // Wait until the loop has gone around at least twice.
1802                                         break
1803                                 }
1804                         }
1805
1806                         let initialization_input = vec![
1807                                 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
1808                                 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
1809                                 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1810                                 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1811                                 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1812                                 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1813                                 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1814                                 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1815                                 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1816                                 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1817                                 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1818                                 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1819                                 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1820                         ];
1821                         $nodes[0].rapid_gossip_sync.update_network_graph_no_std(&initialization_input[..], Some(1642291930)).unwrap();
1822
1823                         // this should have added two channels and pruned the previous one.
1824                         assert_eq!($nodes[0].network_graph.read_only().channels().len(), 2);
1825
1826                         $receive.expect("Network graph not pruned within deadline");
1827
1828                         // all channels should now be pruned
1829                         assert_eq!($nodes[0].network_graph.read_only().channels().len(), 0);
1830                 }
1831         }
1832
1833         #[test]
1834         fn test_not_pruning_network_graph_until_graph_sync_completion() {
1835                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1836
1837                 let (_, nodes) = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion");
1838                 let data_dir = nodes[0].kv_store.get_data_dir();
1839                 let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
1840
1841                 let event_handler = |_: _| {};
1842                 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1843
1844                 do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes,
1845                         receiver.recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5)),
1846                         std::thread::sleep(Duration::from_millis(1)));
1847
1848                 background_processor.stop().unwrap();
1849         }
1850
1851         #[tokio::test]
1852         #[cfg(feature = "futures")]
1853         async fn test_not_pruning_network_graph_until_graph_sync_completion_async() {
1854                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1855
1856                 let (_, nodes) = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion_async");
1857                 let data_dir = nodes[0].kv_store.get_data_dir();
1858                 let persister = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender));
1859
1860                 let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
1861                 let bp_future = super::process_events_async(
1862                         persister, |_: _| {async {}}, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()),
1863                         nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
1864                         Some(nodes[0].scorer.clone()), move |dur: Duration| {
1865                                 let mut exit_receiver = exit_receiver.clone();
1866                                 Box::pin(async move {
1867                                         tokio::select! {
1868                                                 _ = tokio::time::sleep(dur) => false,
1869                                                 _ = exit_receiver.changed() => true,
1870                                         }
1871                                 })
1872                         }, false, || Some(Duration::from_secs(1696300000)),
1873                 );
1874
1875                 let t1 = tokio::spawn(bp_future);
1876                 let t2 = tokio::spawn(async move {
1877                         do_test_not_pruning_network_graph_until_graph_sync_completion!(nodes, {
1878                                 let mut i = 0;
1879                                 loop {
1880                                         tokio::time::sleep(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER)).await;
1881                                         if let Ok(()) = receiver.try_recv() { break Ok::<(), ()>(()); }
1882                                         assert!(i < 5);
1883                                         i += 1;
1884                                 }
1885                         }, tokio::time::sleep(Duration::from_millis(1)).await);
1886                         exit_sender.send(()).unwrap();
1887                 });
1888                 let (r1, r2) = tokio::join!(t1, t2);
1889                 r1.unwrap().unwrap();
1890                 r2.unwrap()
1891         }
1892
1893         macro_rules! do_test_payment_path_scoring {
1894                 ($nodes: expr, $receive: expr) => {
1895                         // Ensure that we update the scorer when relevant events are processed. In this case, we ensure
1896                         // that we update the scorer upon a payment path succeeding (note that the channel must be
1897                         // public or else we won't score it).
1898                         // A background event handler for FundingGenerationReady events must be hooked up to a
1899                         // running background processor.
1900                         let scored_scid = 4242;
1901                         let secp_ctx = Secp256k1::new();
1902                         let node_1_privkey = SecretKey::from_slice(&[42; 32]).unwrap();
1903                         let node_1_id = PublicKey::from_secret_key(&secp_ctx, &node_1_privkey);
1904
1905                         let path = Path { hops: vec![RouteHop {
1906                                 pubkey: node_1_id,
1907                                 node_features: NodeFeatures::empty(),
1908                                 short_channel_id: scored_scid,
1909                                 channel_features: ChannelFeatures::empty(),
1910                                 fee_msat: 0,
1911                                 cltv_expiry_delta: MIN_CLTV_EXPIRY_DELTA as u32,
1912                                 maybe_announced_channel: true,
1913                         }], blinded_tail: None };
1914
1915                         $nodes[0].scorer.write_lock().expect(TestResult::PaymentFailure { path: path.clone(), short_channel_id: scored_scid });
1916                         $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1917                                 payment_id: None,
1918                                 payment_hash: PaymentHash([42; 32]),
1919                                 payment_failed_permanently: false,
1920                                 failure: PathFailure::OnPath { network_update: None },
1921                                 path: path.clone(),
1922                                 short_channel_id: Some(scored_scid),
1923                         });
1924                         let event = $receive.expect("PaymentPathFailed not handled within deadline");
1925                         match event {
1926                                 Event::PaymentPathFailed { .. } => {},
1927                                 _ => panic!("Unexpected event"),
1928                         }
1929
1930                         // Ensure we'll score payments that were explicitly failed back by the destination as
1931                         // ProbeSuccess.
1932                         $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
1933                         $nodes[0].node.push_pending_event(Event::PaymentPathFailed {
1934                                 payment_id: None,
1935                                 payment_hash: PaymentHash([42; 32]),
1936                                 payment_failed_permanently: true,
1937                                 failure: PathFailure::OnPath { network_update: None },
1938                                 path: path.clone(),
1939                                 short_channel_id: None,
1940                         });
1941                         let event = $receive.expect("PaymentPathFailed not handled within deadline");
1942                         match event {
1943                                 Event::PaymentPathFailed { .. } => {},
1944                                 _ => panic!("Unexpected event"),
1945                         }
1946
1947                         $nodes[0].scorer.write_lock().expect(TestResult::PaymentSuccess { path: path.clone() });
1948                         $nodes[0].node.push_pending_event(Event::PaymentPathSuccessful {
1949                                 payment_id: PaymentId([42; 32]),
1950                                 payment_hash: None,
1951                                 path: path.clone(),
1952                         });
1953                         let event = $receive.expect("PaymentPathSuccessful not handled within deadline");
1954                         match event {
1955                                 Event::PaymentPathSuccessful { .. } => {},
1956                                 _ => panic!("Unexpected event"),
1957                         }
1958
1959                         $nodes[0].scorer.write_lock().expect(TestResult::ProbeSuccess { path: path.clone() });
1960                         $nodes[0].node.push_pending_event(Event::ProbeSuccessful {
1961                                 payment_id: PaymentId([42; 32]),
1962                                 payment_hash: PaymentHash([42; 32]),
1963                                 path: path.clone(),
1964                         });
1965                         let event = $receive.expect("ProbeSuccessful not handled within deadline");
1966                         match event {
1967                                 Event::ProbeSuccessful  { .. } => {},
1968                                 _ => panic!("Unexpected event"),
1969                         }
1970
1971                         $nodes[0].scorer.write_lock().expect(TestResult::ProbeFailure { path: path.clone() });
1972                         $nodes[0].node.push_pending_event(Event::ProbeFailed {
1973                                 payment_id: PaymentId([42; 32]),
1974                                 payment_hash: PaymentHash([42; 32]),
1975                                 path,
1976                                 short_channel_id: Some(scored_scid),
1977                         });
1978                         let event = $receive.expect("ProbeFailure not handled within deadline");
1979                         match event {
1980                                 Event::ProbeFailed { .. } => {},
1981                                 _ => panic!("Unexpected event"),
1982                         }
1983                 }
1984         }
1985
1986         #[test]
1987         fn test_payment_path_scoring() {
1988                 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
1989                 let event_handler = move |event: Event| match event {
1990                         Event::PaymentPathFailed { .. } => sender.send(event).unwrap(),
1991                         Event::PaymentPathSuccessful { .. } => sender.send(event).unwrap(),
1992                         Event::ProbeSuccessful { .. } => sender.send(event).unwrap(),
1993                         Event::ProbeFailed { .. } => sender.send(event).unwrap(),
1994                         _ => panic!("Unexpected event: {:?}", event),
1995                 };
1996
1997                 let (_, nodes) = create_nodes(1, "test_payment_path_scoring");
1998                 let data_dir = nodes[0].kv_store.get_data_dir();
1999                 let persister = Arc::new(Persister::new(data_dir));
2000                 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
2001
2002                 do_test_payment_path_scoring!(nodes, receiver.recv_timeout(Duration::from_secs(EVENT_DEADLINE)));
2003
2004                 if !std::thread::panicking() {
2005                         bg_processor.stop().unwrap();
2006                 }
2007
2008                 let log_entries = nodes[0].logger.lines.lock().unwrap();
2009                 let expected_log = "Persisting scorer after update".to_string();
2010                 assert_eq!(*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(), 5);
2011         }
2012
2013         #[tokio::test]
2014         #[cfg(feature = "futures")]
2015         async fn test_payment_path_scoring_async() {
2016                 let (sender, mut receiver) = tokio::sync::mpsc::channel(1);
2017                 let event_handler = move |event: Event| {
2018                         let sender_ref = sender.clone();
2019                         async move {
2020                                 match event {
2021                                         Event::PaymentPathFailed { .. } => { sender_ref.send(event).await.unwrap() },
2022                                         Event::PaymentPathSuccessful { .. } => { sender_ref.send(event).await.unwrap() },
2023                                         Event::ProbeSuccessful { .. } => { sender_ref.send(event).await.unwrap() },
2024                                         Event::ProbeFailed { .. } => { sender_ref.send(event).await.unwrap() },
2025                                         _ => panic!("Unexpected event: {:?}", event),
2026                                 }
2027                         }
2028                 };
2029
2030                 let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async");
2031                 let data_dir = nodes[0].kv_store.get_data_dir();
2032                 let persister = Arc::new(Persister::new(data_dir));
2033
2034                 let (exit_sender, exit_receiver) = tokio::sync::watch::channel(());
2035
2036                 let bp_future = super::process_events_async(
2037                         persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), Some(nodes[0].messenger.clone()),
2038                         nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(),
2039                         Some(nodes[0].scorer.clone()), move |dur: Duration| {
2040                                 let mut exit_receiver = exit_receiver.clone();
2041                                 Box::pin(async move {
2042                                         tokio::select! {
2043                                                 _ = tokio::time::sleep(dur) => false,
2044                                                 _ = exit_receiver.changed() => true,
2045                                         }
2046                                 })
2047                         }, false, || Some(Duration::ZERO),
2048                 );
2049                 let t1 = tokio::spawn(bp_future);
2050                 let t2 = tokio::spawn(async move {
2051                         do_test_payment_path_scoring!(nodes, receiver.recv().await);
2052                         exit_sender.send(()).unwrap();
2053
2054                         let log_entries = nodes[0].logger.lines.lock().unwrap();
2055                         let expected_log = "Persisting scorer after update".to_string();
2056                         assert_eq!(*log_entries.get(&("lightning_background_processor", expected_log)).unwrap(), 5);
2057                 });
2058
2059                 let (r1, r2) = tokio::join!(t1, t2);
2060                 r1.unwrap().unwrap();
2061                 r2.unwrap()
2062         }
2063 }