1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 // Prefix these with `rustdoc::` when we update our MSRV to be >= 1.52 to remove warnings.
6 #![deny(broken_intra_doc_links)]
7 #![deny(private_intra_doc_links)]
12 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
14 #[macro_use] extern crate lightning;
15 extern crate lightning_rapid_gossip_sync;
18 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
19 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
20 use lightning::chain::keysinterface::{Sign, KeysInterface};
21 use lightning::ln::channelmanager::ChannelManager;
22 use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
23 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
24 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
25 use lightning::routing::scoring::WriteableScore;
26 use lightning::util::events::{Event, EventHandler, EventsProvider};
27 use lightning::util::logger::Logger;
28 use lightning::util::persist::Persister;
29 use lightning_rapid_gossip_sync::RapidGossipSync;
31 use std::sync::atomic::{AtomicBool, Ordering};
33 use std::thread::JoinHandle;
34 use std::time::{Duration, Instant};
37 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
38 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
39 /// responsibilities are:
40 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
41 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
42 /// writing it to disk/backups by invoking the callback given to it at startup.
43 /// [`ChannelManager`] persistence should be done in the background.
44 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
45 /// at the appropriate intervals.
46 /// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`GossipSync`] with a [`NetworkGraph`]
47 /// is provided to [`BackgroundProcessor::start`]).
49 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
50 /// upon as doing so may result in high latency.
54 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
55 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
56 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
57 /// unilateral chain closure fees are at risk.
59 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
60 /// [`Event`]: lightning::util::events::Event
61 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
62 pub struct BackgroundProcessor {
63 stop_thread: Arc<AtomicBool>,
64 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
68 const FRESHNESS_TIMER: u64 = 60;
70 const FRESHNESS_TIMER: u64 = 1;
72 #[cfg(all(not(test), not(debug_assertions)))]
73 const PING_TIMER: u64 = 10;
74 /// Signature operations take a lot longer without compiler optimisations.
75 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
76 /// timeout is reached.
77 #[cfg(all(not(test), debug_assertions))]
78 const PING_TIMER: u64 = 30;
80 const PING_TIMER: u64 = 1;
82 /// Prune the network graph of stale entries hourly.
83 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
86 const SCORER_PERSIST_TIMER: u64 = 30;
88 const SCORER_PERSIST_TIMER: u64 = 1;
91 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
93 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
95 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
97 P: Deref<Target = P2PGossipSync<G, A, L>>,
98 R: Deref<Target = RapidGossipSync<G, L>>,
99 G: Deref<Target = NetworkGraph<L>>,
103 where A::Target: chain::Access, L::Target: Logger {
104 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
106 /// Rapid gossip sync from a trusted server.
113 P: Deref<Target = P2PGossipSync<G, A, L>>,
114 R: Deref<Target = RapidGossipSync<G, L>>,
115 G: Deref<Target = NetworkGraph<L>>,
118 > GossipSync<P, R, G, A, L>
119 where A::Target: chain::Access, L::Target: Logger {
120 fn network_graph(&self) -> Option<&G> {
122 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
123 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
124 GossipSync::None => None,
128 fn prunable_network_graph(&self) -> Option<&G> {
130 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
131 GossipSync::Rapid(gossip_sync) => {
132 if gossip_sync.is_initial_sync_complete() {
133 Some(gossip_sync.network_graph())
138 GossipSync::None => None,
143 /// (C-not exported) as the bindings concretize everything and have constructors for us
144 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
145 GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
147 A::Target: chain::Access,
150 /// Initializes a new [`GossipSync::P2P`] variant.
151 pub fn p2p(gossip_sync: P) -> Self {
152 GossipSync::P2P(gossip_sync)
156 /// (C-not exported) as the bindings concretize everything and have constructors for us
157 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
159 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
162 &'a (dyn chain::Access + Send + Sync),
168 /// Initializes a new [`GossipSync::Rapid`] variant.
169 pub fn rapid(gossip_sync: R) -> Self {
170 GossipSync::Rapid(gossip_sync)
174 /// (C-not exported) as the bindings concretize everything and have constructors for us
177 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
178 &RapidGossipSync<&'a NetworkGraph<L>, L>,
180 &'a (dyn chain::Access + Send + Sync),
186 /// Initializes a new [`GossipSync::None`] variant.
187 pub fn none() -> Self {
192 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
193 struct DecoratingEventHandler<
196 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
197 RGS: Deref<Target = RapidGossipSync<G, L>>,
198 G: Deref<Target = NetworkGraph<L>>,
202 where A::Target: chain::Access, L::Target: Logger {
204 gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
210 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
211 RGS: Deref<Target = RapidGossipSync<G, L>>,
212 G: Deref<Target = NetworkGraph<L>>,
215 > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
216 where A::Target: chain::Access, L::Target: Logger {
217 fn handle_event(&self, event: &Event) {
218 if let Some(network_graph) = self.gossip_sync.network_graph() {
219 network_graph.handle_event(event);
221 self.event_handler.handle_event(event);
225 impl BackgroundProcessor {
226 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
229 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
230 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
231 /// either [`join`] or [`stop`].
233 /// # Data Persistence
235 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
236 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
237 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
238 /// provided implementation.
240 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
241 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
242 /// See the `lightning-persister` crate for LDK's provided implementation.
244 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
245 /// error or call [`join`] and handle any error that may arise. For the latter case,
246 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
250 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
251 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
252 /// functionality implemented by other handlers.
253 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
255 /// # Rapid Gossip Sync
257 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
258 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
259 /// until the [`RapidGossipSync`] instance completes its first sync.
261 /// [top-level documentation]: BackgroundProcessor
262 /// [`join`]: Self::join
263 /// [`stop`]: Self::stop
264 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
265 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
266 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
267 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
268 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
269 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
272 Signer: 'static + Sign,
273 CA: 'static + Deref + Send + Sync,
274 CF: 'static + Deref + Send + Sync,
275 CW: 'static + Deref + Send + Sync,
276 T: 'static + Deref + Send + Sync,
277 K: 'static + Deref + Send + Sync,
278 F: 'static + Deref + Send + Sync,
279 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
280 L: 'static + Deref + Send + Sync,
281 P: 'static + Deref + Send + Sync,
282 Descriptor: 'static + SocketDescriptor + Send + Sync,
283 CMH: 'static + Deref + Send + Sync,
284 RMH: 'static + Deref + Send + Sync,
285 EH: 'static + EventHandler + Send,
286 PS: 'static + Deref + Send,
287 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
288 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
289 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
290 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
291 UMH: 'static + Deref + Send + Sync,
292 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
293 S: 'static + Deref<Target = SC> + Send + Sync,
294 SC: WriteableScore<'a>,
296 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
297 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
300 CA::Target: 'static + chain::Access,
301 CF::Target: 'static + chain::Filter,
302 CW::Target: 'static + chain::Watch<Signer>,
303 T::Target: 'static + BroadcasterInterface,
304 K::Target: 'static + KeysInterface<Signer = Signer>,
305 F::Target: 'static + FeeEstimator,
306 L::Target: 'static + Logger,
307 P::Target: 'static + Persist<Signer>,
308 CMH::Target: 'static + ChannelMessageHandler,
309 RMH::Target: 'static + RoutingMessageHandler,
310 UMH::Target: 'static + CustomMessageHandler,
311 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
313 let stop_thread = Arc::new(AtomicBool::new(false));
314 let stop_thread_clone = stop_thread.clone();
315 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
316 let event_handler = DecoratingEventHandler {
318 gossip_sync: &gossip_sync,
321 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
322 channel_manager.timer_tick_occurred();
324 let mut last_freshness_call = Instant::now();
325 let mut last_ping_call = Instant::now();
326 let mut last_prune_call = Instant::now();
327 let mut last_scorer_persist_call = Instant::now();
328 let mut have_pruned = false;
331 channel_manager.process_pending_events(&event_handler);
332 chain_monitor.process_pending_events(&event_handler);
334 // Note that the PeerManager::process_events may block on ChannelManager's locks,
335 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
336 // we want to ensure we get into `persist_manager` as quickly as we can, especially
337 // without running the normal event processing above and handing events to users.
339 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
340 // processing a message effectively at any point during this loop. In order to
341 // minimize the time between such processing completing and persisting the updated
342 // ChannelManager, we want to minimize methods blocking on a ChannelManager
343 // generally, and as a fallback place such blocking only immediately before
345 peer_manager.process_events();
347 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
348 // see `await_start`'s use below.
349 let await_start = Instant::now();
350 let updates_available =
351 channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
352 let await_time = await_start.elapsed();
354 if updates_available {
355 log_trace!(logger, "Persisting ChannelManager...");
356 persister.persist_manager(&*channel_manager)?;
357 log_trace!(logger, "Done persisting ChannelManager.");
359 // Exit the loop if the background processor was requested to stop.
360 if stop_thread.load(Ordering::Acquire) == true {
361 log_trace!(logger, "Terminating background processor.");
364 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
365 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
366 channel_manager.timer_tick_occurred();
367 last_freshness_call = Instant::now();
369 if await_time > Duration::from_secs(1) {
370 // On various platforms, we may be starved of CPU cycles for several reasons.
371 // E.g. on iOS, if we've been in the background, we will be entirely paused.
372 // Similarly, if we're on a desktop platform and the device has been asleep, we
373 // may not get any cycles.
374 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
375 // full second, at which point we assume sockets may have been killed (they
376 // appear to be at least on some platforms, even if it has only been a second).
377 // Note that we have to take care to not get here just because user event
378 // processing was slow at the top of the loop. For example, the sample client
379 // may call Bitcoin Core RPCs during event handling, which very often takes
380 // more than a handful of seconds to complete, and shouldn't disconnect all our
382 log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
383 peer_manager.disconnect_all_peers();
384 last_ping_call = Instant::now();
385 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
386 log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
387 peer_manager.timer_tick_occurred();
388 last_ping_call = Instant::now();
391 // Note that we want to run a graph prune once not long after startup before
392 // falling back to our usual hourly prunes. This avoids short-lived clients never
393 // pruning their network graph. We run once 60 seconds after startup before
394 // continuing our normal cadence.
395 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
396 // The network graph must not be pruned while rapid sync completion is pending
397 log_trace!(logger, "Assessing prunability of network graph");
398 if let Some(network_graph) = gossip_sync.prunable_network_graph() {
399 network_graph.remove_stale_channels();
401 if let Err(e) = persister.persist_graph(network_graph) {
402 log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
405 last_prune_call = Instant::now();
408 log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
412 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
413 if let Some(ref scorer) = scorer {
414 log_trace!(logger, "Persisting scorer");
415 if let Err(e) = persister.persist_scorer(&scorer) {
416 log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
419 last_scorer_persist_call = Instant::now();
423 // After we exit, ensure we persist the ChannelManager one final time - this avoids
424 // some races where users quit while channel updates were in-flight, with
425 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
426 persister.persist_manager(&*channel_manager)?;
428 // Persist Scorer on exit
429 if let Some(ref scorer) = scorer {
430 persister.persist_scorer(&scorer)?;
433 // Persist NetworkGraph on exit
434 if let Some(network_graph) = gossip_sync.network_graph() {
435 persister.persist_graph(network_graph)?;
440 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
443 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
444 /// [`ChannelManager`].
448 /// This function panics if the background thread has panicked such as while persisting or
451 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
452 pub fn join(mut self) -> Result<(), std::io::Error> {
453 assert!(self.thread_handle.is_some());
457 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
458 /// [`ChannelManager`].
462 /// This function panics if the background thread has panicked such as while persisting or
465 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
466 pub fn stop(mut self) -> Result<(), std::io::Error> {
467 assert!(self.thread_handle.is_some());
468 self.stop_and_join_thread()
471 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
472 self.stop_thread.store(true, Ordering::Release);
476 fn join_thread(&mut self) -> Result<(), std::io::Error> {
477 match self.thread_handle.take() {
478 Some(handle) => handle.join().unwrap(),
484 impl Drop for BackgroundProcessor {
486 self.stop_and_join_thread().unwrap();
492 use bitcoin::blockdata::block::BlockHeader;
493 use bitcoin::blockdata::constants::genesis_block;
494 use bitcoin::blockdata::transaction::{Transaction, TxOut};
495 use bitcoin::network::constants::Network;
496 use lightning::chain::{BestBlock, Confirm, chainmonitor};
497 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
498 use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
499 use lightning::chain::transaction::OutPoint;
500 use lightning::get_event_msg;
501 use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
502 use lightning::ln::features::{ChannelFeatures, InitFeatures};
503 use lightning::ln::msgs::{ChannelMessageHandler, Init};
504 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
505 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
506 use lightning::util::config::UserConfig;
507 use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
508 use lightning::util::ser::Writeable;
509 use lightning::util::test_utils;
510 use lightning::util::persist::KVStorePersister;
511 use lightning_invoice::payment::{InvoicePayer, Retry};
512 use lightning_invoice::utils::DefaultRouter;
513 use lightning_persister::FilesystemPersister;
515 use std::path::PathBuf;
516 use std::sync::{Arc, Mutex};
517 use std::sync::mpsc::SyncSender;
518 use std::time::Duration;
519 use lightning::routing::scoring::{FixedPenaltyScorer};
520 use lightning_rapid_gossip_sync::RapidGossipSync;
521 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
523 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
525 #[derive(Clone, Eq, Hash, PartialEq)]
526 struct TestDescriptor{}
527 impl SocketDescriptor for TestDescriptor {
528 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
532 fn disconnect_socket(&mut self) {}
535 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
537 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
538 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
541 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
542 p2p_gossip_sync: PGS,
543 rapid_gossip_sync: RGS,
544 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
545 chain_monitor: Arc<ChainMonitor>,
546 persister: Arc<FilesystemPersister>,
547 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
548 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
549 logger: Arc<test_utils::TestLogger>,
550 best_block: BestBlock,
551 scorer: Arc<Mutex<FixedPenaltyScorer>>,
555 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
556 GossipSync::P2P(self.p2p_gossip_sync.clone())
559 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
560 GossipSync::Rapid(self.rapid_gossip_sync.clone())
563 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
570 let data_dir = self.persister.get_data_dir();
571 match fs::remove_dir_all(data_dir.clone()) {
572 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
579 graph_error: Option<(std::io::ErrorKind, &'static str)>,
580 graph_persistence_notifier: Option<SyncSender<()>>,
581 manager_error: Option<(std::io::ErrorKind, &'static str)>,
582 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
583 filesystem_persister: FilesystemPersister,
587 fn new(data_dir: String) -> Self {
588 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
589 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
592 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
593 Self { graph_error: Some((error, message)), ..self }
596 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
597 Self { graph_persistence_notifier: Some(sender), ..self }
600 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
601 Self { manager_error: Some((error, message)), ..self }
604 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
605 Self { scorer_error: Some((error, message)), ..self }
609 impl KVStorePersister for Persister {
610 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
611 if key == "manager" {
612 if let Some((error, message)) = self.manager_error {
613 return Err(std::io::Error::new(error, message))
617 if key == "network_graph" {
618 if let Some(sender) = &self.graph_persistence_notifier {
619 sender.send(()).unwrap();
622 if let Some((error, message)) = self.graph_error {
623 return Err(std::io::Error::new(error, message))
628 if let Some((error, message)) = self.scorer_error {
629 return Err(std::io::Error::new(error, message))
633 self.filesystem_persister.persist(key, object)
637 fn get_full_filepath(filepath: String, filename: String) -> String {
638 let mut path = PathBuf::from(filepath);
640 path.to_str().unwrap().to_string()
643 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
644 let mut nodes = Vec::new();
645 for i in 0..num_nodes {
646 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
647 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
648 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
649 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
650 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
651 let seed = [i as u8; 32];
652 let network = Network::Testnet;
653 let genesis_block = genesis_block(network);
654 let now = Duration::from_secs(genesis_block.header.time as u64);
655 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
656 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
657 let best_block = BestBlock::from_genesis(network);
658 let params = ChainParameters { network, best_block };
659 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
660 let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
661 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
662 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
663 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
664 let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
665 let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
666 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
670 for i in 0..num_nodes {
671 for j in (i+1)..num_nodes {
672 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
673 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
680 macro_rules! open_channel {
681 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
682 begin_open_channel!($node_a, $node_b, $channel_value);
683 let events = $node_a.node.get_and_clear_pending_events();
684 assert_eq!(events.len(), 1);
685 let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
686 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
691 macro_rules! begin_open_channel {
692 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
693 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
694 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
695 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
699 macro_rules! handle_funding_generation_ready {
700 ($event: expr, $channel_value: expr) => {{
702 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
703 assert_eq!(channel_value_satoshis, $channel_value);
704 assert_eq!(user_channel_id, 42);
706 let tx = Transaction { version: 1 as i32, lock_time: 0, input: Vec::new(), output: vec![TxOut {
707 value: channel_value_satoshis, script_pubkey: output_script.clone(),
709 (temporary_channel_id, tx)
711 _ => panic!("Unexpected event"),
716 macro_rules! end_open_channel {
717 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
718 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
719 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
720 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
724 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
726 let prev_blockhash = node.best_block.block_hash();
727 let height = node.best_block.height() + 1;
728 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: Default::default(), time: height, bits: 42, nonce: 42 };
729 let txdata = vec![(0, tx)];
730 node.best_block = BestBlock::new(header.block_hash(), height);
733 node.node.transactions_confirmed(&header, &txdata, height);
734 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
737 node.node.best_block_updated(&header, height);
738 node.chain_monitor.best_block_updated(&header, height);
744 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
745 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
749 fn test_background_processor() {
750 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
751 // updates. Also test that when new updates are available, the manager signals that it needs
752 // re-persistence and is successfully re-persisted.
753 let nodes = create_nodes(2, "test_background_processor".to_string());
755 // Go through the channel creation process so that each node has something to persist. Since
756 // open_channel consumes events, it must complete before starting BackgroundProcessor to
757 // avoid a race with processing events.
758 let tx = open_channel!(nodes[0], nodes[1], 100000);
760 // Initiate the background processors to watch each node.
761 let data_dir = nodes[0].persister.get_data_dir();
762 let persister = Arc::new(Persister::new(data_dir));
763 let event_handler = |_: &_| {};
764 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
766 macro_rules! check_persisted_data {
767 ($node: expr, $filepath: expr) => {
768 let mut expected_bytes = Vec::new();
770 expected_bytes.clear();
771 match $node.write(&mut expected_bytes) {
773 match std::fs::read($filepath) {
775 if bytes == expected_bytes {
784 Err(e) => panic!("Unexpected error: {}", e)
790 // Check that the initial channel manager data is persisted as expected.
791 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
792 check_persisted_data!(nodes[0].node, filepath.clone());
795 if !nodes[0].node.get_persistence_condvar_value() { break }
798 // Force-close the channel.
799 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
801 // Check that the force-close updates are persisted.
802 check_persisted_data!(nodes[0].node, filepath.clone());
804 if !nodes[0].node.get_persistence_condvar_value() { break }
807 // Check network graph is persisted
808 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
809 check_persisted_data!(nodes[0].network_graph, filepath.clone());
811 // Check scorer is persisted
812 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
813 check_persisted_data!(nodes[0].scorer, filepath.clone());
815 assert!(bg_processor.stop().is_ok());
819 fn test_timer_tick_called() {
820 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
821 // `FRESHNESS_TIMER`.
822 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
823 let data_dir = nodes[0].persister.get_data_dir();
824 let persister = Arc::new(Persister::new(data_dir));
825 let event_handler = |_: &_| {};
826 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
828 let log_entries = nodes[0].logger.lines.lock().unwrap();
829 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
830 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
831 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
832 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
837 assert!(bg_processor.stop().is_ok());
841 fn test_channel_manager_persist_error() {
842 // Test that if we encounter an error during manager persistence, the thread panics.
843 let nodes = create_nodes(2, "test_persist_error".to_string());
844 open_channel!(nodes[0], nodes[1], 100000);
846 let data_dir = nodes[0].persister.get_data_dir();
847 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
848 let event_handler = |_: &_| {};
849 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
850 match bg_processor.join() {
851 Ok(_) => panic!("Expected error persisting manager"),
853 assert_eq!(e.kind(), std::io::ErrorKind::Other);
854 assert_eq!(e.get_ref().unwrap().to_string(), "test");
860 fn test_network_graph_persist_error() {
861 // Test that if we encounter an error during network graph persistence, an error gets returned.
862 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
863 let data_dir = nodes[0].persister.get_data_dir();
864 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
865 let event_handler = |_: &_| {};
866 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
868 match bg_processor.stop() {
869 Ok(_) => panic!("Expected error persisting network graph"),
871 assert_eq!(e.kind(), std::io::ErrorKind::Other);
872 assert_eq!(e.get_ref().unwrap().to_string(), "test");
878 fn test_scorer_persist_error() {
879 // Test that if we encounter an error during scorer persistence, an error gets returned.
880 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
881 let data_dir = nodes[0].persister.get_data_dir();
882 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
883 let event_handler = |_: &_| {};
884 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
886 match bg_processor.stop() {
887 Ok(_) => panic!("Expected error persisting scorer"),
889 assert_eq!(e.kind(), std::io::ErrorKind::Other);
890 assert_eq!(e.get_ref().unwrap().to_string(), "test");
896 fn test_background_event_handling() {
897 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
898 let channel_value = 100000;
899 let data_dir = nodes[0].persister.get_data_dir();
900 let persister = Arc::new(Persister::new(data_dir.clone()));
902 // Set up a background event handler for FundingGenerationReady events.
903 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
904 let event_handler = move |event: &Event| {
905 sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
907 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
909 // Open a channel and check that the FundingGenerationReady event was handled.
910 begin_open_channel!(nodes[0], nodes[1], channel_value);
911 let (temporary_channel_id, funding_tx) = receiver
912 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
913 .expect("FundingGenerationReady not handled within deadline");
914 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
916 // Confirm the funding transaction.
917 confirm_transaction(&mut nodes[0], &funding_tx);
918 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
919 confirm_transaction(&mut nodes[1], &funding_tx);
920 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
921 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
922 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
923 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
924 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
926 assert!(bg_processor.stop().is_ok());
928 // Set up a background event handler for SpendableOutputs events.
929 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
930 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
931 let persister = Arc::new(Persister::new(data_dir));
932 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
934 // Force close the channel and check that the SpendableOutputs event was handled.
935 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
936 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
937 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
939 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
940 .expect("SpendableOutputs not handled within deadline");
942 Event::SpendableOutputs { .. } => {},
943 Event::ChannelClosed { .. } => {},
944 _ => panic!("Unexpected event: {:?}", event),
947 assert!(bg_processor.stop().is_ok());
951 fn test_scorer_persistence() {
952 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
953 let data_dir = nodes[0].persister.get_data_dir();
954 let persister = Arc::new(Persister::new(data_dir));
955 let event_handler = |_: &_| {};
956 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
959 let log_entries = nodes[0].logger.lines.lock().unwrap();
960 let expected_log = "Persisting scorer".to_string();
961 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
966 assert!(bg_processor.stop().is_ok());
970 fn test_not_pruning_network_graph_until_graph_sync_completion() {
971 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
972 let data_dir = nodes[0].persister.get_data_dir();
973 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
974 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
975 let network_graph = nodes[0].network_graph.clone();
976 let features = ChannelFeatures::empty();
977 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
978 .expect("Failed to update channel from partial announcement");
979 let original_graph_description = network_graph.to_string();
980 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
981 assert_eq!(network_graph.read_only().channels().len(), 1);
983 let event_handler = |_: &_| {};
984 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
987 let log_entries = nodes[0].logger.lines.lock().unwrap();
988 let expected_log_a = "Assessing prunability of network graph".to_string();
989 let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
990 if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
991 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
996 let initialization_input = vec![
997 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
998 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
999 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1000 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1001 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1002 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1003 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1004 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1005 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1006 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1007 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1008 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1009 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1011 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1013 // this should have added two channels
1014 assert_eq!(network_graph.read_only().channels().len(), 3);
1017 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1018 .expect("Network graph not pruned within deadline");
1020 background_processor.stop().unwrap();
1022 // all channels should now be pruned
1023 assert_eq!(network_graph.read_only().channels().len(), 0);
1027 fn test_invoice_payer() {
1028 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1029 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1030 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1032 // Initiate the background processors to watch each node.
1033 let data_dir = nodes[0].persister.get_data_dir();
1034 let persister = Arc::new(Persister::new(data_dir));
1035 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
1036 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
1037 let event_handler = Arc::clone(&invoice_payer);
1038 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1039 assert!(bg_processor.stop().is_ok());