1 //! Utilities that take care of tasks that (1) need to happen periodically to keep Rust-Lightning
2 //! running properly, and (2) either can or should be run in the background. See docs for
3 //! [`BackgroundProcessor`] for more details on the nitty-gritty.
5 #![deny(broken_intra_doc_links)]
9 #![cfg_attr(docsrs, feature(doc_auto_cfg))]
11 #[macro_use] extern crate lightning;
12 extern crate lightning_rapid_gossip_sync;
15 use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
16 use lightning::chain::chainmonitor::{ChainMonitor, Persist};
17 use lightning::chain::keysinterface::{Sign, KeysInterface};
18 use lightning::ln::channelmanager::ChannelManager;
19 use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
20 use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
21 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
22 use lightning::routing::scoring::WriteableScore;
23 use lightning::util::events::{Event, EventHandler, EventsProvider};
24 use lightning::util::logger::Logger;
25 use lightning::util::persist::Persister;
26 use lightning_rapid_gossip_sync::RapidGossipSync;
28 use std::sync::atomic::{AtomicBool, Ordering};
30 use std::thread::JoinHandle;
31 use std::time::{Duration, Instant};
34 /// `BackgroundProcessor` takes care of tasks that (1) need to happen periodically to keep
35 /// Rust-Lightning running properly, and (2) either can or should be run in the background. Its
36 /// responsibilities are:
37 /// * Processing [`Event`]s with a user-provided [`EventHandler`].
38 /// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
39 /// writing it to disk/backups by invoking the callback given to it at startup.
40 /// [`ChannelManager`] persistence should be done in the background.
41 /// * Calling [`ChannelManager::timer_tick_occurred`] and [`PeerManager::timer_tick_occurred`]
42 /// at the appropriate intervals.
43 /// * Calling [`NetworkGraph::remove_stale_channels`] (if a [`GossipSync`] with a [`NetworkGraph`]
44 /// is provided to [`BackgroundProcessor::start`]).
46 /// It will also call [`PeerManager::process_events`] periodically though this shouldn't be relied
47 /// upon as doing so may result in high latency.
51 /// If [`ChannelManager`] persistence fails and the persisted manager becomes out-of-date, then
52 /// there is a risk of channels force-closing on startup when the manager realizes it's outdated.
53 /// However, as long as [`ChannelMonitor`] backups are sound, no funds besides those used for
54 /// unilateral chain closure fees are at risk.
56 /// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
57 /// [`Event`]: lightning::util::events::Event
58 #[must_use = "BackgroundProcessor will immediately stop on drop. It should be stored until shutdown."]
59 pub struct BackgroundProcessor {
60 stop_thread: Arc<AtomicBool>,
61 thread_handle: Option<JoinHandle<Result<(), std::io::Error>>>,
65 const FRESHNESS_TIMER: u64 = 60;
67 const FRESHNESS_TIMER: u64 = 1;
69 #[cfg(all(not(test), not(debug_assertions)))]
70 const PING_TIMER: u64 = 10;
71 /// Signature operations take a lot longer without compiler optimisations.
72 /// Increasing the ping timer allows for this but slower devices will be disconnected if the
73 /// timeout is reached.
74 #[cfg(all(not(test), debug_assertions))]
75 const PING_TIMER: u64 = 30;
77 const PING_TIMER: u64 = 1;
79 /// Prune the network graph of stale entries hourly.
80 const NETWORK_PRUNE_TIMER: u64 = 60 * 60;
83 const SCORER_PERSIST_TIMER: u64 = 30;
85 const SCORER_PERSIST_TIMER: u64 = 1;
88 const FIRST_NETWORK_PRUNE_TIMER: u64 = 60;
90 const FIRST_NETWORK_PRUNE_TIMER: u64 = 1;
92 /// Either [`P2PGossipSync`] or [`RapidGossipSync`].
94 P: Deref<Target = P2PGossipSync<G, A, L>>,
95 R: Deref<Target = RapidGossipSync<G, L>>,
96 G: Deref<Target = NetworkGraph<L>>,
100 where A::Target: chain::Access, L::Target: Logger {
101 /// Gossip sync via the lightning peer-to-peer network as defined by BOLT 7.
103 /// Rapid gossip sync from a trusted server.
110 P: Deref<Target = P2PGossipSync<G, A, L>>,
111 R: Deref<Target = RapidGossipSync<G, L>>,
112 G: Deref<Target = NetworkGraph<L>>,
115 > GossipSync<P, R, G, A, L>
116 where A::Target: chain::Access, L::Target: Logger {
117 fn network_graph(&self) -> Option<&G> {
119 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
120 GossipSync::Rapid(gossip_sync) => Some(gossip_sync.network_graph()),
121 GossipSync::None => None,
125 fn prunable_network_graph(&self) -> Option<&G> {
127 GossipSync::P2P(gossip_sync) => Some(gossip_sync.network_graph()),
128 GossipSync::Rapid(gossip_sync) => {
129 if gossip_sync.is_initial_sync_complete() {
130 Some(gossip_sync.network_graph())
135 GossipSync::None => None,
140 /// (C-not exported) as the bindings concretize everything and have constructors for us
141 impl<P: Deref<Target = P2PGossipSync<G, A, L>>, G: Deref<Target = NetworkGraph<L>>, A: Deref, L: Deref>
142 GossipSync<P, &RapidGossipSync<G, L>, G, A, L>
144 A::Target: chain::Access,
147 /// Initializes a new [`GossipSync::P2P`] variant.
148 pub fn p2p(gossip_sync: P) -> Self {
149 GossipSync::P2P(gossip_sync)
153 /// (C-not exported) as the bindings concretize everything and have constructors for us
154 impl<'a, R: Deref<Target = RapidGossipSync<G, L>>, G: Deref<Target = NetworkGraph<L>>, L: Deref>
156 &P2PGossipSync<G, &'a (dyn chain::Access + Send + Sync), L>,
159 &'a (dyn chain::Access + Send + Sync),
165 /// Initializes a new [`GossipSync::Rapid`] variant.
166 pub fn rapid(gossip_sync: R) -> Self {
167 GossipSync::Rapid(gossip_sync)
171 /// (C-not exported) as the bindings concretize everything and have constructors for us
174 &P2PGossipSync<&'a NetworkGraph<L>, &'a (dyn chain::Access + Send + Sync), L>,
175 &RapidGossipSync<&'a NetworkGraph<L>, L>,
177 &'a (dyn chain::Access + Send + Sync),
183 /// Initializes a new [`GossipSync::None`] variant.
184 pub fn none() -> Self {
189 /// Decorates an [`EventHandler`] with common functionality provided by standard [`EventHandler`]s.
190 struct DecoratingEventHandler<
193 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
194 RGS: Deref<Target = RapidGossipSync<G, L>>,
195 G: Deref<Target = NetworkGraph<L>>,
199 where A::Target: chain::Access, L::Target: Logger {
201 gossip_sync: &'a GossipSync<PGS, RGS, G, A, L>,
207 PGS: Deref<Target = P2PGossipSync<G, A, L>>,
208 RGS: Deref<Target = RapidGossipSync<G, L>>,
209 G: Deref<Target = NetworkGraph<L>>,
212 > EventHandler for DecoratingEventHandler<'a, E, PGS, RGS, G, A, L>
213 where A::Target: chain::Access, L::Target: Logger {
214 fn handle_event(&self, event: &Event) {
215 if let Some(network_graph) = self.gossip_sync.network_graph() {
216 network_graph.handle_event(event);
218 self.event_handler.handle_event(event);
222 impl BackgroundProcessor {
223 /// Start a background thread that takes care of responsibilities enumerated in the [top-level
226 /// The thread runs indefinitely unless the object is dropped, [`stop`] is called, or
227 /// [`Persister::persist_manager`] returns an error. In case of an error, the error is retrieved by calling
228 /// either [`join`] or [`stop`].
230 /// # Data Persistence
232 /// [`Persister::persist_manager`] is responsible for writing out the [`ChannelManager`] to disk, and/or
233 /// uploading to one or more backup services. See [`ChannelManager::write`] for writing out a
234 /// [`ChannelManager`]. See the `lightning-persister` crate for LDK's
235 /// provided implementation.
237 /// [`Persister::persist_graph`] is responsible for writing out the [`NetworkGraph`] to disk, if
238 /// [`GossipSync`] is supplied. See [`NetworkGraph::write`] for writing out a [`NetworkGraph`].
239 /// See the `lightning-persister` crate for LDK's provided implementation.
241 /// Typically, users should either implement [`Persister::persist_manager`] to never return an
242 /// error or call [`join`] and handle any error that may arise. For the latter case,
243 /// `BackgroundProcessor` must be restarted by calling `start` again after handling the error.
247 /// `event_handler` is responsible for handling events that users should be notified of (e.g.,
248 /// payment failed). [`BackgroundProcessor`] may decorate the given [`EventHandler`] with common
249 /// functionality implemented by other handlers.
250 /// * [`P2PGossipSync`] if given will update the [`NetworkGraph`] based on payment failures.
252 /// # Rapid Gossip Sync
254 /// If rapid gossip sync is meant to run at startup, pass [`RapidGossipSync`] via `gossip_sync`
255 /// to indicate that the [`BackgroundProcessor`] should not prune the [`NetworkGraph`] instance
256 /// until the [`RapidGossipSync`] instance completes its first sync.
258 /// [top-level documentation]: BackgroundProcessor
259 /// [`join`]: Self::join
260 /// [`stop`]: Self::stop
261 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
262 /// [`ChannelManager::write`]: lightning::ln::channelmanager::ChannelManager#impl-Writeable
263 /// [`Persister::persist_manager`]: lightning::util::persist::Persister::persist_manager
264 /// [`Persister::persist_graph`]: lightning::util::persist::Persister::persist_graph
265 /// [`NetworkGraph`]: lightning::routing::gossip::NetworkGraph
266 /// [`NetworkGraph::write`]: lightning::routing::gossip::NetworkGraph#impl-Writeable
269 Signer: 'static + Sign,
270 CA: 'static + Deref + Send + Sync,
271 CF: 'static + Deref + Send + Sync,
272 CW: 'static + Deref + Send + Sync,
273 T: 'static + Deref + Send + Sync,
274 K: 'static + Deref + Send + Sync,
275 F: 'static + Deref + Send + Sync,
276 G: 'static + Deref<Target = NetworkGraph<L>> + Send + Sync,
277 L: 'static + Deref + Send + Sync,
278 P: 'static + Deref + Send + Sync,
279 Descriptor: 'static + SocketDescriptor + Send + Sync,
280 CMH: 'static + Deref + Send + Sync,
281 RMH: 'static + Deref + Send + Sync,
282 EH: 'static + EventHandler + Send,
283 PS: 'static + Deref + Send,
284 M: 'static + Deref<Target = ChainMonitor<Signer, CF, T, F, L, P>> + Send + Sync,
285 CM: 'static + Deref<Target = ChannelManager<Signer, CW, T, K, F, L>> + Send + Sync,
286 PGS: 'static + Deref<Target = P2PGossipSync<G, CA, L>> + Send + Sync,
287 RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
288 UMH: 'static + Deref + Send + Sync,
289 PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
290 S: 'static + Deref<Target = SC> + Send + Sync,
291 SC: WriteableScore<'a>,
293 persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
294 gossip_sync: GossipSync<PGS, RGS, G, CA, L>, peer_manager: PM, logger: L, scorer: Option<S>,
297 CA::Target: 'static + chain::Access,
298 CF::Target: 'static + chain::Filter,
299 CW::Target: 'static + chain::Watch<Signer>,
300 T::Target: 'static + BroadcasterInterface,
301 K::Target: 'static + KeysInterface<Signer = Signer>,
302 F::Target: 'static + FeeEstimator,
303 L::Target: 'static + Logger,
304 P::Target: 'static + Persist<Signer>,
305 CMH::Target: 'static + ChannelMessageHandler,
306 RMH::Target: 'static + RoutingMessageHandler,
307 UMH::Target: 'static + CustomMessageHandler,
308 PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
310 let stop_thread = Arc::new(AtomicBool::new(false));
311 let stop_thread_clone = stop_thread.clone();
312 let handle = thread::spawn(move || -> Result<(), std::io::Error> {
313 let event_handler = DecoratingEventHandler {
315 gossip_sync: &gossip_sync,
318 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
319 channel_manager.timer_tick_occurred();
321 let mut last_freshness_call = Instant::now();
322 let mut last_ping_call = Instant::now();
323 let mut last_prune_call = Instant::now();
324 let mut last_scorer_persist_call = Instant::now();
325 let mut have_pruned = false;
328 channel_manager.process_pending_events(&event_handler);
329 chain_monitor.process_pending_events(&event_handler);
331 // Note that the PeerManager::process_events may block on ChannelManager's locks,
332 // hence it comes last here. When the ChannelManager finishes whatever it's doing,
333 // we want to ensure we get into `persist_manager` as quickly as we can, especially
334 // without running the normal event processing above and handing events to users.
336 // Specifically, on an *extremely* slow machine, we may see ChannelManager start
337 // processing a message effectively at any point during this loop. In order to
338 // minimize the time between such processing completing and persisting the updated
339 // ChannelManager, we want to minimize methods blocking on a ChannelManager
340 // generally, and as a fallback place such blocking only immediately before
342 peer_manager.process_events();
344 // We wait up to 100ms, but track how long it takes to detect being put to sleep,
345 // see `await_start`'s use below.
346 let await_start = Instant::now();
347 let updates_available =
348 channel_manager.await_persistable_update_timeout(Duration::from_millis(100));
349 let await_time = await_start.elapsed();
351 if updates_available {
352 log_trace!(logger, "Persisting ChannelManager...");
353 persister.persist_manager(&*channel_manager)?;
354 log_trace!(logger, "Done persisting ChannelManager.");
356 // Exit the loop if the background processor was requested to stop.
357 if stop_thread.load(Ordering::Acquire) == true {
358 log_trace!(logger, "Terminating background processor.");
361 if last_freshness_call.elapsed().as_secs() > FRESHNESS_TIMER {
362 log_trace!(logger, "Calling ChannelManager's timer_tick_occurred");
363 channel_manager.timer_tick_occurred();
364 last_freshness_call = Instant::now();
366 if await_time > Duration::from_secs(1) {
367 // On various platforms, we may be starved of CPU cycles for several reasons.
368 // E.g. on iOS, if we've been in the background, we will be entirely paused.
369 // Similarly, if we're on a desktop platform and the device has been asleep, we
370 // may not get any cycles.
371 // We detect this by checking if our max-100ms-sleep, above, ran longer than a
372 // full second, at which point we assume sockets may have been killed (they
373 // appear to be at least on some platforms, even if it has only been a second).
374 // Note that we have to take care to not get here just because user event
375 // processing was slow at the top of the loop. For example, the sample client
376 // may call Bitcoin Core RPCs during event handling, which very often takes
377 // more than a handful of seconds to complete, and shouldn't disconnect all our
379 log_trace!(logger, "100ms sleep took more than a second, disconnecting peers.");
380 peer_manager.disconnect_all_peers();
381 last_ping_call = Instant::now();
382 } else if last_ping_call.elapsed().as_secs() > PING_TIMER {
383 log_trace!(logger, "Calling PeerManager's timer_tick_occurred");
384 peer_manager.timer_tick_occurred();
385 last_ping_call = Instant::now();
388 // Note that we want to run a graph prune once not long after startup before
389 // falling back to our usual hourly prunes. This avoids short-lived clients never
390 // pruning their network graph. We run once 60 seconds after startup before
391 // continuing our normal cadence.
392 if last_prune_call.elapsed().as_secs() > if have_pruned { NETWORK_PRUNE_TIMER } else { FIRST_NETWORK_PRUNE_TIMER } {
393 // The network graph must not be pruned while rapid sync completion is pending
394 log_trace!(logger, "Assessing prunability of network graph");
395 if let Some(network_graph) = gossip_sync.prunable_network_graph() {
396 network_graph.remove_stale_channels();
398 if let Err(e) = persister.persist_graph(network_graph) {
399 log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
402 last_prune_call = Instant::now();
405 log_trace!(logger, "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.");
409 if last_scorer_persist_call.elapsed().as_secs() > SCORER_PERSIST_TIMER {
410 if let Some(ref scorer) = scorer {
411 log_trace!(logger, "Persisting scorer");
412 if let Err(e) = persister.persist_scorer(&scorer) {
413 log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
416 last_scorer_persist_call = Instant::now();
420 // After we exit, ensure we persist the ChannelManager one final time - this avoids
421 // some races where users quit while channel updates were in-flight, with
422 // ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
423 persister.persist_manager(&*channel_manager)?;
425 // Persist Scorer on exit
426 if let Some(ref scorer) = scorer {
427 persister.persist_scorer(&scorer)?;
430 // Persist NetworkGraph on exit
431 if let Some(network_graph) = gossip_sync.network_graph() {
432 persister.persist_graph(network_graph)?;
437 Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
440 /// Join `BackgroundProcessor`'s thread, returning any error that occurred while persisting
441 /// [`ChannelManager`].
445 /// This function panics if the background thread has panicked such as while persisting or
448 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
449 pub fn join(mut self) -> Result<(), std::io::Error> {
450 assert!(self.thread_handle.is_some());
454 /// Stop `BackgroundProcessor`'s thread, returning any error that occurred while persisting
455 /// [`ChannelManager`].
459 /// This function panics if the background thread has panicked such as while persisting or
462 /// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
463 pub fn stop(mut self) -> Result<(), std::io::Error> {
464 assert!(self.thread_handle.is_some());
465 self.stop_and_join_thread()
468 fn stop_and_join_thread(&mut self) -> Result<(), std::io::Error> {
469 self.stop_thread.store(true, Ordering::Release);
473 fn join_thread(&mut self) -> Result<(), std::io::Error> {
474 match self.thread_handle.take() {
475 Some(handle) => handle.join().unwrap(),
481 impl Drop for BackgroundProcessor {
483 self.stop_and_join_thread().unwrap();
489 use bitcoin::blockdata::block::BlockHeader;
490 use bitcoin::blockdata::constants::genesis_block;
491 use bitcoin::blockdata::locktime::PackedLockTime;
492 use bitcoin::blockdata::transaction::{Transaction, TxOut};
493 use bitcoin::network::constants::Network;
494 use lightning::chain::{BestBlock, Confirm, chainmonitor};
495 use lightning::chain::channelmonitor::ANTI_REORG_DELAY;
496 use lightning::chain::keysinterface::{InMemorySigner, Recipient, KeysInterface, KeysManager};
497 use lightning::chain::transaction::OutPoint;
498 use lightning::get_event_msg;
499 use lightning::ln::channelmanager::{BREAKDOWN_TIMEOUT, ChainParameters, ChannelManager, SimpleArcChannelManager};
500 use lightning::ln::features::{ChannelFeatures, InitFeatures};
501 use lightning::ln::msgs::{ChannelMessageHandler, Init};
502 use lightning::ln::peer_handler::{PeerManager, MessageHandler, SocketDescriptor, IgnoringMessageHandler};
503 use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
504 use lightning::util::config::UserConfig;
505 use lightning::util::events::{Event, MessageSendEventsProvider, MessageSendEvent};
506 use lightning::util::ser::Writeable;
507 use lightning::util::test_utils;
508 use lightning::util::persist::KVStorePersister;
509 use lightning_invoice::payment::{InvoicePayer, Retry};
510 use lightning_invoice::utils::DefaultRouter;
511 use lightning_persister::FilesystemPersister;
513 use std::path::PathBuf;
514 use std::sync::{Arc, Mutex};
515 use std::sync::mpsc::SyncSender;
516 use std::time::Duration;
517 use bitcoin::hashes::Hash;
518 use bitcoin::TxMerkleNode;
519 use lightning::routing::scoring::{FixedPenaltyScorer};
520 use lightning_rapid_gossip_sync::RapidGossipSync;
521 use super::{BackgroundProcessor, GossipSync, FRESHNESS_TIMER};
523 const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
525 #[derive(Clone, Eq, Hash, PartialEq)]
526 struct TestDescriptor{}
527 impl SocketDescriptor for TestDescriptor {
528 fn send_data(&mut self, _data: &[u8], _resume_read: bool) -> usize {
532 fn disconnect_socket(&mut self) {}
535 type ChainMonitor = chainmonitor::ChainMonitor<InMemorySigner, Arc<test_utils::TestChainSource>, Arc<test_utils::TestBroadcaster>, Arc<test_utils::TestFeeEstimator>, Arc<test_utils::TestLogger>, Arc<FilesystemPersister>>;
537 type PGS = Arc<P2PGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>>>;
538 type RGS = Arc<RapidGossipSync<Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestLogger>>>;
541 node: Arc<SimpleArcChannelManager<ChainMonitor, test_utils::TestBroadcaster, test_utils::TestFeeEstimator, test_utils::TestLogger>>,
542 p2p_gossip_sync: PGS,
543 rapid_gossip_sync: RGS,
544 peer_manager: Arc<PeerManager<TestDescriptor, Arc<test_utils::TestChannelMessageHandler>, Arc<test_utils::TestRoutingMessageHandler>, Arc<test_utils::TestLogger>, IgnoringMessageHandler>>,
545 chain_monitor: Arc<ChainMonitor>,
546 persister: Arc<FilesystemPersister>,
547 tx_broadcaster: Arc<test_utils::TestBroadcaster>,
548 network_graph: Arc<NetworkGraph<Arc<test_utils::TestLogger>>>,
549 logger: Arc<test_utils::TestLogger>,
550 best_block: BestBlock,
551 scorer: Arc<Mutex<FixedPenaltyScorer>>,
555 fn p2p_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
556 GossipSync::P2P(self.p2p_gossip_sync.clone())
559 fn rapid_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
560 GossipSync::Rapid(self.rapid_gossip_sync.clone())
563 fn no_gossip_sync(&self) -> GossipSync<PGS, RGS, Arc<NetworkGraph<Arc<test_utils::TestLogger>>>, Arc<test_utils::TestChainSource>, Arc<test_utils::TestLogger>> {
570 let data_dir = self.persister.get_data_dir();
571 match fs::remove_dir_all(data_dir.clone()) {
572 Err(e) => println!("Failed to remove test persister directory {}: {}", data_dir, e),
579 graph_error: Option<(std::io::ErrorKind, &'static str)>,
580 graph_persistence_notifier: Option<SyncSender<()>>,
581 manager_error: Option<(std::io::ErrorKind, &'static str)>,
582 scorer_error: Option<(std::io::ErrorKind, &'static str)>,
583 filesystem_persister: FilesystemPersister,
587 fn new(data_dir: String) -> Self {
588 let filesystem_persister = FilesystemPersister::new(data_dir.clone());
589 Self { graph_error: None, graph_persistence_notifier: None, manager_error: None, scorer_error: None, filesystem_persister }
592 fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
593 Self { graph_error: Some((error, message)), ..self }
596 fn with_graph_persistence_notifier(self, sender: SyncSender<()>) -> Self {
597 Self { graph_persistence_notifier: Some(sender), ..self }
600 fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
601 Self { manager_error: Some((error, message)), ..self }
604 fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
605 Self { scorer_error: Some((error, message)), ..self }
609 impl KVStorePersister for Persister {
610 fn persist<W: Writeable>(&self, key: &str, object: &W) -> std::io::Result<()> {
611 if key == "manager" {
612 if let Some((error, message)) = self.manager_error {
613 return Err(std::io::Error::new(error, message))
617 if key == "network_graph" {
618 if let Some(sender) = &self.graph_persistence_notifier {
619 sender.send(()).unwrap();
622 if let Some((error, message)) = self.graph_error {
623 return Err(std::io::Error::new(error, message))
628 if let Some((error, message)) = self.scorer_error {
629 return Err(std::io::Error::new(error, message))
633 self.filesystem_persister.persist(key, object)
637 fn get_full_filepath(filepath: String, filename: String) -> String {
638 let mut path = PathBuf::from(filepath);
640 path.to_str().unwrap().to_string()
643 fn create_nodes(num_nodes: usize, persist_dir: String) -> Vec<Node> {
644 let mut nodes = Vec::new();
645 for i in 0..num_nodes {
646 let tx_broadcaster = Arc::new(test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new()), blocks: Arc::new(Mutex::new(Vec::new()))});
647 let fee_estimator = Arc::new(test_utils::TestFeeEstimator { sat_per_kw: Mutex::new(253) });
648 let chain_source = Arc::new(test_utils::TestChainSource::new(Network::Testnet));
649 let logger = Arc::new(test_utils::TestLogger::with_id(format!("node {}", i)));
650 let persister = Arc::new(FilesystemPersister::new(format!("{}_persister_{}", persist_dir, i)));
651 let seed = [i as u8; 32];
652 let network = Network::Testnet;
653 let genesis_block = genesis_block(network);
654 let now = Duration::from_secs(genesis_block.header.time as u64);
655 let keys_manager = Arc::new(KeysManager::new(&seed, now.as_secs(), now.subsec_nanos()));
656 let chain_monitor = Arc::new(chainmonitor::ChainMonitor::new(Some(chain_source.clone()), tx_broadcaster.clone(), logger.clone(), fee_estimator.clone(), persister.clone()));
657 let best_block = BestBlock::from_genesis(network);
658 let params = ChainParameters { network, best_block };
659 let manager = Arc::new(ChannelManager::new(fee_estimator.clone(), chain_monitor.clone(), tx_broadcaster.clone(), logger.clone(), keys_manager.clone(), UserConfig::default(), params));
660 let network_graph = Arc::new(NetworkGraph::new(genesis_block.header.block_hash(), logger.clone()));
661 let p2p_gossip_sync = Arc::new(P2PGossipSync::new(network_graph.clone(), Some(chain_source.clone()), logger.clone()));
662 let rapid_gossip_sync = Arc::new(RapidGossipSync::new(network_graph.clone()));
663 let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
664 let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
665 let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
666 let node = Node { node: manager, p2p_gossip_sync, rapid_gossip_sync, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
670 for i in 0..num_nodes {
671 for j in (i+1)..num_nodes {
672 nodes[i].node.peer_connected(&nodes[j].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
673 nodes[j].node.peer_connected(&nodes[i].node.get_our_node_id(), &Init { features: InitFeatures::known(), remote_network_address: None });
680 macro_rules! open_channel {
681 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
682 begin_open_channel!($node_a, $node_b, $channel_value);
683 let events = $node_a.node.get_and_clear_pending_events();
684 assert_eq!(events.len(), 1);
685 let (temporary_channel_id, tx) = handle_funding_generation_ready!(&events[0], $channel_value);
686 end_open_channel!($node_a, $node_b, temporary_channel_id, tx);
691 macro_rules! begin_open_channel {
692 ($node_a: expr, $node_b: expr, $channel_value: expr) => {{
693 $node_a.node.create_channel($node_b.node.get_our_node_id(), $channel_value, 100, 42, None).unwrap();
694 $node_b.node.handle_open_channel(&$node_a.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_a, MessageSendEvent::SendOpenChannel, $node_b.node.get_our_node_id()));
695 $node_a.node.handle_accept_channel(&$node_b.node.get_our_node_id(), InitFeatures::known(), &get_event_msg!($node_b, MessageSendEvent::SendAcceptChannel, $node_a.node.get_our_node_id()));
699 macro_rules! handle_funding_generation_ready {
700 ($event: expr, $channel_value: expr) => {{
702 &Event::FundingGenerationReady { temporary_channel_id, channel_value_satoshis, ref output_script, user_channel_id, .. } => {
703 assert_eq!(channel_value_satoshis, $channel_value);
704 assert_eq!(user_channel_id, 42);
706 let tx = Transaction { version: 1 as i32, lock_time: PackedLockTime(0), input: Vec::new(), output: vec![TxOut {
707 value: channel_value_satoshis, script_pubkey: output_script.clone(),
709 (temporary_channel_id, tx)
711 _ => panic!("Unexpected event"),
716 macro_rules! end_open_channel {
717 ($node_a: expr, $node_b: expr, $temporary_channel_id: expr, $tx: expr) => {{
718 $node_a.node.funding_transaction_generated(&$temporary_channel_id, &$node_b.node.get_our_node_id(), $tx.clone()).unwrap();
719 $node_b.node.handle_funding_created(&$node_a.node.get_our_node_id(), &get_event_msg!($node_a, MessageSendEvent::SendFundingCreated, $node_b.node.get_our_node_id()));
720 $node_a.node.handle_funding_signed(&$node_b.node.get_our_node_id(), &get_event_msg!($node_b, MessageSendEvent::SendFundingSigned, $node_a.node.get_our_node_id()));
724 fn confirm_transaction_depth(node: &mut Node, tx: &Transaction, depth: u32) {
726 let prev_blockhash = node.best_block.block_hash();
727 let height = node.best_block.height() + 1;
728 let header = BlockHeader { version: 0x20000000, prev_blockhash, merkle_root: TxMerkleNode::all_zeros(), time: height, bits: 42, nonce: 42 };
729 let txdata = vec![(0, tx)];
730 node.best_block = BestBlock::new(header.block_hash(), height);
733 node.node.transactions_confirmed(&header, &txdata, height);
734 node.chain_monitor.transactions_confirmed(&header, &txdata, height);
737 node.node.best_block_updated(&header, height);
738 node.chain_monitor.best_block_updated(&header, height);
744 fn confirm_transaction(node: &mut Node, tx: &Transaction) {
745 confirm_transaction_depth(node, tx, ANTI_REORG_DELAY);
749 fn test_background_processor() {
750 // Test that when a new channel is created, the ChannelManager needs to be re-persisted with
751 // updates. Also test that when new updates are available, the manager signals that it needs
752 // re-persistence and is successfully re-persisted.
753 let nodes = create_nodes(2, "test_background_processor".to_string());
755 // Go through the channel creation process so that each node has something to persist. Since
756 // open_channel consumes events, it must complete before starting BackgroundProcessor to
757 // avoid a race with processing events.
758 let tx = open_channel!(nodes[0], nodes[1], 100000);
760 // Initiate the background processors to watch each node.
761 let data_dir = nodes[0].persister.get_data_dir();
762 let persister = Arc::new(Persister::new(data_dir));
763 let event_handler = |_: &_| {};
764 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
766 macro_rules! check_persisted_data {
767 ($node: expr, $filepath: expr) => {
768 let mut expected_bytes = Vec::new();
770 expected_bytes.clear();
771 match $node.write(&mut expected_bytes) {
773 match std::fs::read($filepath) {
775 if bytes == expected_bytes {
784 Err(e) => panic!("Unexpected error: {}", e)
790 // Check that the initial channel manager data is persisted as expected.
791 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "manager".to_string());
792 check_persisted_data!(nodes[0].node, filepath.clone());
795 if !nodes[0].node.get_persistence_condvar_value() { break }
798 // Force-close the channel.
799 nodes[0].node.force_close_broadcasting_latest_txn(&OutPoint { txid: tx.txid(), index: 0 }.to_channel_id(), &nodes[1].node.get_our_node_id()).unwrap();
801 // Check that the force-close updates are persisted.
802 check_persisted_data!(nodes[0].node, filepath.clone());
804 if !nodes[0].node.get_persistence_condvar_value() { break }
807 // Check network graph is persisted
808 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "network_graph".to_string());
809 check_persisted_data!(nodes[0].network_graph, filepath.clone());
811 // Check scorer is persisted
812 let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
813 check_persisted_data!(nodes[0].scorer, filepath.clone());
815 assert!(bg_processor.stop().is_ok());
819 fn test_timer_tick_called() {
820 // Test that ChannelManager's and PeerManager's `timer_tick_occurred` is called every
821 // `FRESHNESS_TIMER`.
822 let nodes = create_nodes(1, "test_timer_tick_called".to_string());
823 let data_dir = nodes[0].persister.get_data_dir();
824 let persister = Arc::new(Persister::new(data_dir));
825 let event_handler = |_: &_| {};
826 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
828 let log_entries = nodes[0].logger.lines.lock().unwrap();
829 let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
830 let second_desired_log = "Calling PeerManager's timer_tick_occurred".to_string();
831 if log_entries.get(&("lightning_background_processor".to_string(), desired_log)).is_some() &&
832 log_entries.get(&("lightning_background_processor".to_string(), second_desired_log)).is_some() {
837 assert!(bg_processor.stop().is_ok());
841 fn test_channel_manager_persist_error() {
842 // Test that if we encounter an error during manager persistence, the thread panics.
843 let nodes = create_nodes(2, "test_persist_error".to_string());
844 open_channel!(nodes[0], nodes[1], 100000);
846 let data_dir = nodes[0].persister.get_data_dir();
847 let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
848 let event_handler = |_: &_| {};
849 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
850 match bg_processor.join() {
851 Ok(_) => panic!("Expected error persisting manager"),
853 assert_eq!(e.kind(), std::io::ErrorKind::Other);
854 assert_eq!(e.get_ref().unwrap().to_string(), "test");
860 fn test_network_graph_persist_error() {
861 // Test that if we encounter an error during network graph persistence, an error gets returned.
862 let nodes = create_nodes(2, "test_persist_network_graph_error".to_string());
863 let data_dir = nodes[0].persister.get_data_dir();
864 let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
865 let event_handler = |_: &_| {};
866 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].p2p_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
868 match bg_processor.stop() {
869 Ok(_) => panic!("Expected error persisting network graph"),
871 assert_eq!(e.kind(), std::io::ErrorKind::Other);
872 assert_eq!(e.get_ref().unwrap().to_string(), "test");
878 fn test_scorer_persist_error() {
879 // Test that if we encounter an error during scorer persistence, an error gets returned.
880 let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
881 let data_dir = nodes[0].persister.get_data_dir();
882 let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
883 let event_handler = |_: &_| {};
884 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
886 match bg_processor.stop() {
887 Ok(_) => panic!("Expected error persisting scorer"),
889 assert_eq!(e.kind(), std::io::ErrorKind::Other);
890 assert_eq!(e.get_ref().unwrap().to_string(), "test");
896 fn test_background_event_handling() {
897 let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
898 let channel_value = 100000;
899 let data_dir = nodes[0].persister.get_data_dir();
900 let persister = Arc::new(Persister::new(data_dir.clone()));
902 // Set up a background event handler for FundingGenerationReady events.
903 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
904 let event_handler = move |event: &Event| {
905 sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
907 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
909 // Open a channel and check that the FundingGenerationReady event was handled.
910 begin_open_channel!(nodes[0], nodes[1], channel_value);
911 let (temporary_channel_id, funding_tx) = receiver
912 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
913 .expect("FundingGenerationReady not handled within deadline");
914 end_open_channel!(nodes[0], nodes[1], temporary_channel_id, funding_tx);
916 // Confirm the funding transaction.
917 confirm_transaction(&mut nodes[0], &funding_tx);
918 let as_funding = get_event_msg!(nodes[0], MessageSendEvent::SendChannelReady, nodes[1].node.get_our_node_id());
919 confirm_transaction(&mut nodes[1], &funding_tx);
920 let bs_funding = get_event_msg!(nodes[1], MessageSendEvent::SendChannelReady, nodes[0].node.get_our_node_id());
921 nodes[0].node.handle_channel_ready(&nodes[1].node.get_our_node_id(), &bs_funding);
922 let _as_channel_update = get_event_msg!(nodes[0], MessageSendEvent::SendChannelUpdate, nodes[1].node.get_our_node_id());
923 nodes[1].node.handle_channel_ready(&nodes[0].node.get_our_node_id(), &as_funding);
924 let _bs_channel_update = get_event_msg!(nodes[1], MessageSendEvent::SendChannelUpdate, nodes[0].node.get_our_node_id());
926 assert!(bg_processor.stop().is_ok());
928 // Set up a background event handler for SpendableOutputs events.
929 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
930 let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
931 let persister = Arc::new(Persister::new(data_dir));
932 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
934 // Force close the channel and check that the SpendableOutputs event was handled.
935 nodes[0].node.force_close_broadcasting_latest_txn(&nodes[0].node.list_channels()[0].channel_id, &nodes[1].node.get_our_node_id()).unwrap();
936 let commitment_tx = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().pop().unwrap();
937 confirm_transaction_depth(&mut nodes[0], &commitment_tx, BREAKDOWN_TIMEOUT as u32);
939 .recv_timeout(Duration::from_secs(EVENT_DEADLINE))
940 .expect("SpendableOutputs not handled within deadline");
942 Event::SpendableOutputs { .. } => {},
943 Event::ChannelClosed { .. } => {},
944 _ => panic!("Unexpected event: {:?}", event),
947 assert!(bg_processor.stop().is_ok());
951 fn test_scorer_persistence() {
952 let nodes = create_nodes(2, "test_scorer_persistence".to_string());
953 let data_dir = nodes[0].persister.get_data_dir();
954 let persister = Arc::new(Persister::new(data_dir));
955 let event_handler = |_: &_| {};
956 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
959 let log_entries = nodes[0].logger.lines.lock().unwrap();
960 let expected_log = "Persisting scorer".to_string();
961 if log_entries.get(&("lightning_background_processor".to_string(), expected_log)).is_some() {
966 assert!(bg_processor.stop().is_ok());
970 fn test_not_pruning_network_graph_until_graph_sync_completion() {
971 let nodes = create_nodes(2, "test_not_pruning_network_graph_until_graph_sync_completion".to_string());
972 let data_dir = nodes[0].persister.get_data_dir();
973 let (sender, receiver) = std::sync::mpsc::sync_channel(1);
974 let persister = Arc::new(Persister::new(data_dir.clone()).with_graph_persistence_notifier(sender));
975 let network_graph = nodes[0].network_graph.clone();
976 let features = ChannelFeatures::empty();
977 network_graph.add_channel_from_partial_announcement(42, 53, features, nodes[0].node.get_our_node_id(), nodes[1].node.get_our_node_id())
978 .expect("Failed to update channel from partial announcement");
979 let original_graph_description = network_graph.to_string();
980 assert!(original_graph_description.contains("42: features: 0000, node_one:"));
981 assert_eq!(network_graph.read_only().channels().len(), 1);
983 let event_handler = |_: &_| {};
984 let background_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].rapid_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
987 let log_entries = nodes[0].logger.lines.lock().unwrap();
988 let expected_log_a = "Assessing prunability of network graph".to_string();
989 let expected_log_b = "Not pruning network graph, either due to pending rapid gossip sync or absence of a prunable graph.".to_string();
990 if log_entries.get(&("lightning_background_processor".to_string(), expected_log_a)).is_some() &&
991 log_entries.get(&("lightning_background_processor".to_string(), expected_log_b)).is_some() {
996 let initialization_input = vec![
997 76, 68, 75, 1, 111, 226, 140, 10, 182, 241, 179, 114, 193, 166, 162, 70, 174, 99, 247,
998 79, 147, 30, 131, 101, 225, 90, 8, 156, 104, 214, 25, 0, 0, 0, 0, 0, 97, 227, 98, 218,
999 0, 0, 0, 4, 2, 22, 7, 207, 206, 25, 164, 197, 231, 230, 231, 56, 102, 61, 250, 251,
1000 187, 172, 38, 46, 79, 247, 108, 44, 155, 48, 219, 238, 252, 53, 192, 6, 67, 2, 36, 125,
1001 157, 176, 223, 175, 234, 116, 94, 248, 201, 225, 97, 235, 50, 47, 115, 172, 63, 136,
1002 88, 216, 115, 11, 111, 217, 114, 84, 116, 124, 231, 107, 2, 158, 1, 242, 121, 152, 106,
1003 204, 131, 186, 35, 93, 70, 216, 10, 237, 224, 183, 89, 95, 65, 3, 83, 185, 58, 138,
1004 181, 64, 187, 103, 127, 68, 50, 2, 201, 19, 17, 138, 136, 149, 185, 226, 156, 137, 175,
1005 110, 32, 237, 0, 217, 90, 31, 100, 228, 149, 46, 219, 175, 168, 77, 4, 143, 38, 128,
1006 76, 97, 0, 0, 0, 2, 0, 0, 255, 8, 153, 192, 0, 2, 27, 0, 0, 0, 1, 0, 0, 255, 2, 68,
1007 226, 0, 6, 11, 0, 1, 2, 3, 0, 0, 0, 2, 0, 40, 0, 0, 0, 0, 0, 0, 3, 232, 0, 0, 3, 232,
1008 0, 0, 0, 1, 0, 0, 0, 0, 58, 85, 116, 216, 255, 8, 153, 192, 0, 2, 27, 0, 0, 25, 0, 0,
1009 0, 1, 0, 0, 0, 125, 255, 2, 68, 226, 0, 6, 11, 0, 1, 5, 0, 0, 0, 0, 29, 129, 25, 192,
1011 nodes[0].rapid_gossip_sync.update_network_graph(&initialization_input[..]).unwrap();
1013 // this should have added two channels
1014 assert_eq!(network_graph.read_only().channels().len(), 3);
1017 .recv_timeout(Duration::from_secs(super::FIRST_NETWORK_PRUNE_TIMER * 5))
1018 .expect("Network graph not pruned within deadline");
1020 background_processor.stop().unwrap();
1022 // all channels should now be pruned
1023 assert_eq!(network_graph.read_only().channels().len(), 0);
1027 fn test_invoice_payer() {
1028 let keys_manager = test_utils::TestKeysInterface::new(&[0u8; 32], Network::Testnet);
1029 let random_seed_bytes = keys_manager.get_secure_random_bytes();
1030 let nodes = create_nodes(2, "test_invoice_payer".to_string());
1032 // Initiate the background processors to watch each node.
1033 let data_dir = nodes[0].persister.get_data_dir();
1034 let persister = Arc::new(Persister::new(data_dir));
1035 let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
1036 let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, Retry::Attempts(2)));
1037 let event_handler = Arc::clone(&invoice_payer);
1038 let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].no_gossip_sync(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
1039 assert!(bg_processor.stop().is_ok());